Added aiohttp support for asynchronous peers requests

This commit is contained in:
spla 2020-06-05 13:24:58 +02:00
pare 5288dde459
commit 2283983227
S'han modificat 1 arxius amb 77 adicions i 72 eliminacions

Veure arxiu

@ -20,15 +20,19 @@ import psycopg2
from itertools import product from itertools import product
from multiprocessing import Pool, Lock, Process, Queue, current_process from multiprocessing import Pool, Lock, Process, Queue, current_process
import queue # imported for using queue.Empty exception import queue
import multiprocessing import multiprocessing
from decimal import * import aiohttp
getcontext().prec = 2 import aiodns
import asyncio
from aiohttp import ClientError, ClientSession, ClientConnectionError, ClientConnectorError, ClientSSLError, ClientConnectorSSLError, ServerTimeoutError
from asyncio import TimeoutError
import socket
from socket import gaierror, gethostbyname
############################################################################### updated_at = datetime.now()
# INITIALISATION peers_api = '/api/v1/instance/peers?'
###############################################################################
def is_json(myjson): def is_json(myjson):
try: try:
@ -37,86 +41,88 @@ def is_json(myjson):
return False return False
return True return True
def getpeers(server): def getserver(server):
global updated_at if server.find(".") == -1:
return
if server.find("@") != -1:
return
if server.find("/") != -1:
return
if server.find(":") != -1:
return
try: try:
res = requests.get('https://' + server + '/api/v1/instance/peers?',timeout=3) loop = asyncio.get_event_loop()
coroutines = [getpeers(server)]
loop.run_until_complete(asyncio.gather(*coroutines, return_exceptions=True))
if (res.ok): except:
if server.find(".") != -1 and server.find("@") == -1: pass
server_peers = res.json()
print("Server: " + server + ", " + "federated with " + str(len(server_peers)) + " servers")
else:
print("Server " + str(server) + " is not a domain")
i = 0 async def getpeers(server):
while i < len(server_peers) and server.find(".") != -1 and server.find("@") == -1:
saved_at = datetime.now() try:
insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING"
conn = None socket.gethostbyname(server)
except socket.gaierror:
return
url = 'https://' + server
timeout = aiohttp.ClientTimeout(total=3)
async with aiohttp.ClientSession(timeout=timeout) as session:
try: try:
async with session.get(url+peers_api) as response:
if response.status == 200:
try:
response_json = await response.json()
print("Server: " + server + ", " + "federated with " + str(len(response_json)) + " servers")
i = 0
while i < len(response_json):
conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") saved_at = datetime.now()
insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING"
conn = None
try:
cur = conn.cursor() conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur.execute(insert_sql, (server_peers[i], server, updated_at, saved_at,)) cur = conn.cursor()
conn.commit() cur.execute(insert_sql, (response_json[i], server, updated_at, saved_at,))
cur.close() conn.commit()
except (Exception, psycopg2.DatabaseError) as error: cur.close()
print(error) except (Exception, psycopg2.DatabaseError) as error:
finally: print(error)
if conn is not None: finally:
conn.close() if conn is not None:
i += 1 conn.close()
except KeyError as e: i += 1
pass except:
return
except ValueError as verr: pass
pass except aiohttp.ClientConnectorError as err:
return
except requests.exceptions.SSLError as errssl: pass
pass ###############################################################################
return # INITIALISATION
###############################################################################
except requests.exceptions.HTTPError as errh:
pass
return
except requests.exceptions.ConnectionError as errc:
pass
return
except requests.exceptions.Timeout as errt:
pass
return
except requests.exceptions.RequestException as err:
pass
return
# Returns the parameter from the specified file # Returns the parameter from the specified file
def get_parameter( parameter, file_path ): def get_parameter( parameter, file_path ):
@ -162,19 +168,18 @@ mastodon = Mastodon(
headers={ 'Authorization': 'Bearer %s'%uc_access_token } headers={ 'Authorization': 'Bearer %s'%uc_access_token }
############################################################################### ###############################################################################
# get current datetime and Mastodon hostname peers # main
###############################################################################
updated_at = datetime.now() if __name__ == '__main__':
getpeers(mastodon_hostname) getserver(mastodon_hostname)
self_peers = mastodon.instance_peers() self_peers = mastodon.instance_peers()
########################################################################### ###########################################################################
nprocs = multiprocessing.cpu_count() nprocs = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=nprocs) as pool: with multiprocessing.Pool(processes=nprocs) as pool:
results = pool.starmap(getpeers, product(self_peers)) results = pool.starmap(getserver, product(self_peers))
exec_time = str(round((time.time() - start_time), 2)) exec_time = str(round((time.time() - start_time), 2))
print(exec_time) print(exec_time)