From 2283983227a0f89fc7ffdcd1f0048d37a14eae55 Mon Sep 17 00:00:00 2001 From: spla Date: Fri, 5 Jun 2020 13:24:58 +0200 Subject: [PATCH] Added aiohttp support for asynchronous peers requests --- getworld.py | 149 +++++++++++++++++++++++++++------------------------- 1 file changed, 77 insertions(+), 72 deletions(-) diff --git a/getworld.py b/getworld.py index c1db9c5..d09cb7e 100644 --- a/getworld.py +++ b/getworld.py @@ -20,15 +20,19 @@ import psycopg2 from itertools import product from multiprocessing import Pool, Lock, Process, Queue, current_process -import queue # imported for using queue.Empty exception +import queue import multiprocessing -from decimal import * -getcontext().prec = 2 +import aiohttp +import aiodns +import asyncio +from aiohttp import ClientError, ClientSession, ClientConnectionError, ClientConnectorError, ClientSSLError, ClientConnectorSSLError, ServerTimeoutError +from asyncio import TimeoutError +import socket +from socket import gaierror, gethostbyname -############################################################################### -# INITIALISATION -############################################################################### +updated_at = datetime.now() +peers_api = '/api/v1/instance/peers?' def is_json(myjson): try: @@ -37,86 +41,88 @@ def is_json(myjson): return False return True -def getpeers(server): +def getserver(server): - global updated_at + if server.find(".") == -1: + return + if server.find("@") != -1: + return + if server.find("/") != -1: + return + if server.find(":") != -1: + return - try: + try: - res = requests.get('https://' + server + '/api/v1/instance/peers?',timeout=3) + loop = asyncio.get_event_loop() + coroutines = [getpeers(server)] + loop.run_until_complete(asyncio.gather(*coroutines, return_exceptions=True)) - if (res.ok): + except: - if server.find(".") != -1 and server.find("@") == -1: - server_peers = res.json() - print("Server: " + server + ", " + "federated with " + str(len(server_peers)) + " servers") - else: - print("Server " + str(server) + " is not a domain") + pass - i = 0 - while i < len(server_peers) and server.find(".") != -1 and server.find("@") == -1: +async def getpeers(server): - saved_at = datetime.now() - insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" - conn = None + try: + + socket.gethostbyname(server) + + except socket.gaierror: + + return + + url = 'https://' + server + + timeout = aiohttp.ClientTimeout(total=3) + async with aiohttp.ClientSession(timeout=timeout) as session: try: + async with session.get(url+peers_api) as response: + if response.status == 200: + try: + response_json = await response.json() + print("Server: " + server + ", " + "federated with " + str(len(response_json)) + " servers") + i = 0 + while i < len(response_json): - conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") + saved_at = datetime.now() + insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" + conn = None + try: - cur = conn.cursor() + conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") - cur.execute(insert_sql, (server_peers[i], server, updated_at, saved_at,)) + cur = conn.cursor() - conn.commit() + cur.execute(insert_sql, (response_json[i], server, updated_at, saved_at,)) - cur.close() + conn.commit() - except (Exception, psycopg2.DatabaseError) as error: + cur.close() - print(error) + except (Exception, psycopg2.DatabaseError) as error: - finally: + print(error) - if conn is not None: + finally: - conn.close() + if conn is not None: - i += 1 + conn.close() - except KeyError as e: + i += 1 - pass - return + except: - except ValueError as verr: + pass - pass - return + except aiohttp.ClientConnectorError as err: - except requests.exceptions.SSLError as errssl: + pass - pass - return - - except requests.exceptions.HTTPError as errh: - - pass - return - - except requests.exceptions.ConnectionError as errc: - - pass - return - - except requests.exceptions.Timeout as errt: - - pass - return - - except requests.exceptions.RequestException as err: - - pass - return +############################################################################### +# INITIALISATION +############################################################################### # Returns the parameter from the specified file def get_parameter( parameter, file_path ): @@ -162,19 +168,18 @@ mastodon = Mastodon( headers={ 'Authorization': 'Bearer %s'%uc_access_token } ############################################################################### -# get current datetime and Mastodon hostname peers -############################################################################### +# main -updated_at = datetime.now() +if __name__ == '__main__': -getpeers(mastodon_hostname) -self_peers = mastodon.instance_peers() + getserver(mastodon_hostname) + self_peers = mastodon.instance_peers() -########################################################################### + ########################################################################### -nprocs = multiprocessing.cpu_count() -with multiprocessing.Pool(processes=nprocs) as pool: - results = pool.starmap(getpeers, product(self_peers)) + nprocs = multiprocessing.cpu_count() + with multiprocessing.Pool(processes=nprocs) as pool: + results = pool.starmap(getserver, product(self_peers)) -exec_time = str(round((time.time() - start_time), 2)) -print(exec_time) + exec_time = str(round((time.time() - start_time), 2)) + print(exec_time)