From 93b3a4334c733812614f41c9f96e6d4b42e8fc23 Mon Sep 17 00:00:00 2001 From: spla Date: Sun, 13 Mar 2022 18:06:07 +0100 Subject: [PATCH] fetchseervers now uses Ray library & getpeers refactor --- db-setup.py | 4 +- fetchservers.py | 323 ++++++++++++++++++++++++++---------------------- getpeers.py | 98 +++++++++------ 3 files changed, 241 insertions(+), 184 deletions(-) diff --git a/db-setup.py b/db-setup.py index 833bc38..65bfdca 100644 --- a/db-setup.py +++ b/db-setup.py @@ -162,8 +162,8 @@ if __name__ == '__main__': sql = "create table "+table+" (datetime timestamptz PRIMARY KEY, servers INT, users INT)" create_table(db, db_user, table, sql) - table = "closed" - sql = "create table "+table+" (server varchar(200) PRIMARY KEY, software varchar(10), closed boolean)" + table = "execution_time" + sql = "create table "+table+" (program varchar(20) PRIMARY KEY, start timestamptz, finish timestamptz)" create_table(db, db_user, table, sql) ##################################### diff --git a/fetchservers.py b/fetchservers.py index 6a197b3..15d347f 100644 --- a/fetchservers.py +++ b/fetchservers.py @@ -1,6 +1,3 @@ -from multiprocessing import set_start_method -from multiprocessing import get_context -from itertools import product import time from datetime import datetime import os @@ -8,24 +5,35 @@ import json import sys import os.path import psycopg2 -import aiohttp -import asyncio +import requests +import urllib3 import socket -import pdb +import ray -client_exceptions = ( - aiohttp.ClientResponseError, - aiohttp.ClientConnectionError, - aiohttp.ClientConnectorError, - aiohttp.ClientError, - asyncio.TimeoutError, - socket.gaierror, -) +ray.init(num_cpus = 32) # Specify this system CPUs. + +apis = ['/api/v1/instance?', + '/nodeinfo/2.0?', + '/nodeinfo/2.0.json?', + '/main/nodeinfo/2.0?', + '/api/statusnet/config?', + '/api/nodeinfo/2.0.json?', + '/api/nodeinfo?', + '/wp-json/nodeinfo/2.0?', + '/api/v1/instance/nodeinfo/2.0?', + '/.well-known/x-nodeinfo2?' + ] + +def is_json(myjson): + + try: + json_object = json.loads(myjson) + except ValueError as e: + return False + return True def write_api(server, software, users, alive, api, soft_version): - now = datetime.now() - fediverse_db, fediverse_db_user = get_db_config() insert_sql = "INSERT INTO fediverse(server, updated_at, software, users, alive, users_api, version) VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING" @@ -63,80 +71,139 @@ def write_api(server, software, users, alive, api, soft_version): conn.close() -async def getsoft(server): +@ray.remote +def getsoft(server): - fediverse_db, fediverse_db_user = get_db_config() - - try: - - socket.gethostbyname(server) - - except socket.gaierror as g_error: - - print(f'Server {server} error: {g_error}') - pass + if server.find(".") == -1: + return + if server.find("@") != -1: + return + if server.find("/") != -1: + return + if server.find(":") != -1: return soft = '' + is_nodeinfo = False + url = 'https://' + server user_agent = {'User-agent': "fediverse's stats (fediverse@mastodont.cat)"} - timeout = aiohttp.ClientTimeout(total=3) + try: - async with aiohttp.ClientSession(timeout=timeout, headers=user_agent) as session: + response = requests.get(url + '/.well-known/nodeinfo', headers = user_agent, timeout=3) - try: + if response.status_code == 200: - async with session.get(url + '/.well-known/nodeinfo') as response: + try: - if response.status == 200: + response_json = response.json() - try: + nodeinfo = response_json['links'][0]['href'].replace(f'https://{server}','') - response_json = await response.json() + try: - nodeinfo = response_json['links'][0]['href'].replace(f'https://{server}','') + nodeinfo_data = requests.get(url + nodeinfo, headers = user_agent, timeout=3) - except: + if nodeinfo_data.status_code == 200: - pass - else: + nodeinfo_json = nodeinfo_data.json() - print(f'Server {server} not responding: {response.status}') - - pass - - async with session.get(url + nodeinfo) as nodeinfo_response: - - if nodeinfo_response.status == 200: - - try: - - nodeinfo_json = await nodeinfo_response.json() - - except: - - pass + is_nodeinfo = True else: - print(f"Server {server}'s nodeinfo not responding: {response.status}") + print(f"Server {server}'s nodeinfo not responding: error code {nodeinfo_data.status_code}") - pass + except: - except aiohttp.ClientConnectorError as cc_err: + pass - pass + except: - except aiohttp.client_exceptions.ClientConnectorSSLError as ccssl_as: - - pass + print(f'Server {server} not responding: error code {response.status_code}') + print('*********************************************************************') + pass else: - if nodeinfo_response.status == 200 and nodeinfo != '/api/v1/instance?': + for api in apis: + + try: + + response = requests.get(url + api, headers = user_agent, timeout=3) + + if is_json(response.text): + + nodeinfo_json = response.json() + + if 'software' in nodeinfo_json: + + nodeinfo = api + + is_nodeinfo = True + + break + + elif 'title' in nodeinfo_json: + + if nodeinfo_json['title'] == 'Zap': + + nodeinfo = api + + is_nodeinfo = True + + soft = 'zap' + + break + + elif 'version' in nodeinfo_json: + + nodeinfo = api + + is_nodeinfo = True + + break + + except: + + pass + + except requests.exceptions.SSLError as errssl: + + pass + + except requests.exceptions.HTTPError as errh: + + pass + + except requests.exceptions.ConnectionError as errc: + + pass + + except requests.exceptions.ReadTimeout as to_err: + + pass + + except requests.exceptions.TooManyRedirects as tmr_err: + + pass + + except urllib3.exceptions.LocationParseError as lp_err: + + pass + + except requests.exceptions.InvalidURL as iu_err: + + pass + + else: + + if is_nodeinfo: + + if nodeinfo != '/api/v1/instance?': if nodeinfo != '/.well-known/x-nodeinfo2?': @@ -152,8 +219,7 @@ async def getsoft(server): write_api(server, soft, users, alive, nodeinfo, soft_version) - print('*********************************************************************') - print("Server " + server + " (" + soft + " " + soft_version + ") is alive!") + print(f"Server {server} ({soft} {soft_version}) is alive!") print('*********************************************************************') return @@ -176,10 +242,10 @@ async def getsoft(server): if soft == 'socialhome': - write_api(server, soft, users, alive, api, soft_version) + write_api(server, soft, users, alive, nodeinfo, soft_version) print('*********************************************************************') - print("Server " + server + " (" + soft + " " + soft_version + ") is alive!") + print(f"Server {serve}r ({soft} {soft_version}) is alive!") print('*********************************************************************') return @@ -188,49 +254,53 @@ async def getsoft(server): pass - if nodeinfo_response.status == 200 and soft == '' and nodeinfo == "/api/v1/instance?": + if soft == '' and nodeinfo == "/api/v1/instance?": soft = 'mastodon' - users = nodeinfo_json['stats']['user_count'] - soft_version = nodeinfo_json['version'] - if users > 1000000: - return + + try: + + users = nodeinfo_json['stats']['user_count'] + + if users > 1000000: + + return + + except: + + users = 0 + + try: + + soft_version = nodeinfo_json['version'] + + except: + + soft_version = 'unknown' + alive = True - write_api(server, soft, users, alive, api) + write_api(server, soft, users, alive, nodeinfo, soft_version) print('*********************************************************************') - print("Server " + server + " (" + soft + ") is alive!") + print(f"Server {server} ({soft}) is alive!") + + elif soft == 'zap' and nodeinfo == "/api/v1/instance?": + + soft = 'zap' + users = nodeinfo_json['stats']['user_count'] + soft_version = nodeinfo_json['version'] + alive = True + + print(server, soft, users, alive, api) + print('*********************************************************************') + print(f"Server {server} ({soft}) is alive!") - else: + else: - print(f'Server {server} is dead') - -def getserver(server, *args): - - if len(args) != 0: - - server = server[0].rstrip('.').lower() - - if server.find(".") == -1: - return - if server.find("@") != -1: - return - if server.find("/") != -1: - return - if server.find(":") != -1: - return - - try: - - loop = asyncio.get_event_loop() - coroutines = [getsoft(server)] - soft = loop.run_until_complete(asyncio.gather(*coroutines, return_exceptions=True)) - - except: - - pass + print(f'Server {server} is dead') + print('*********************************************************************') def get_world_servers(): @@ -246,7 +316,7 @@ def get_world_servers(): # get world servers list - cur.execute("select server from world where checked='f'") + cur.execute("select server from world")# where checked='f'") rows = cur.fetchall() @@ -287,11 +357,6 @@ def get_parameter(parameter, file_path): print(file_path + " Missing parameter %s " % parameter) sys.exit(0) -def usage(): - - print('usage: python ' + sys.argv[0] + ' --multi' + ' (multiprocessing, fast)') - print('usage: python ' + sys.argv[0] + ' --mono' + ' (one process, slow)') - def get_config(): # Load configuration from config file @@ -312,50 +377,18 @@ def get_db_config(): if __name__ == '__main__': - # usage modes + ## name: fetchservers.py - if len(sys.argv) == 1: + now = datetime.now() - usage() + mastodon_hostname = get_config() - elif len(sys.argv) == 2: + fediverse_db, fediverse_db_user = get_db_config() - if sys.argv[1] == '--multi': + world_servers = get_world_servers() - now = datetime.now() + start = time.time() - start_time = time.time() + results = ray.get([getsoft.remote(server) for server in world_servers]) - mastodon_hostname = get_config() - - fediverse_db, fediverse_db_user = get_db_config() - - world_servers = get_world_servers() - - with get_context("spawn").Pool(processes=32) as pool: - - res = pool.starmap(getserver, product(world_servers)) - - pool.close() - - pool.join() - - print('Done.') - - elif sys.argv[1] == '--mono': - - now = datetime.now() - - start_time = time.time() - - mastodon_hostname = get_config() - - fediverse_db, fediverse_db_user = get_db_config() - - world_servers = get_world_servers() - - for server in world_servers: - - getserver(server) - - print('Done.') + print(f"duration = {time.time() - start}.\nprocessed servers: {len(results)}") diff --git a/getpeers.py b/getpeers.py index d6c655c..d034227 100644 --- a/getpeers.py +++ b/getpeers.py @@ -6,44 +6,39 @@ import requests import json import psycopg2 import ray +import pdb ray.init(num_cpus = 32) # Specify this system CPUs. -def write_servers(world_servers): +def write_server(server, federated_with): - i = 0 + insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" - while i < len(world_servers): + conn = None - insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" + try: - conn = None + conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") - try: + cur = conn.cursor() - conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") + cur.execute(insert_sql, (server, federated_with, now, now,)) - cur = conn.cursor() + print(f'writing {server} to world database') - cur.execute(insert_sql, (world_servers[i], peer, now, now,)) + conn.commit() - print(f'writing to database {str(i)} of {str(len(world_servers))}: {world_servers[i]}') + cur.close() - conn.commit() + except (Exception, psycopg2.DatabaseError) as error: - cur.close() + print(error) - except (Exception, psycopg2.DatabaseError) as error: + finally: - print(error) + if conn is not None: - finally: - - if conn is not None: - - conn.close() - - i += 1 + conn.close() @ray.remote def get_peers(peer): @@ -64,11 +59,7 @@ def get_peers(peer): for peer_peer in response_json: - exist_count = world_peers.count(peer_peer) - - if exist_count == 0: - - world_peers.append(peer_peer) + write_server(peer_peer, peer) except: @@ -77,6 +68,36 @@ def get_peers(peer): pass +def save_time(program, start, finish): + + insert_sql = "INSERT INTO execution_time(program, start, finish) VALUES(%s,%s,%s) ON CONFLICT DO NOTHING" + + conn = None + + try: + + conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") + + cur = conn.cursor() + + cur.execute(insert_sql, (program, start, finish,)) + + cur.execute("UPDATE execution_time SET start=(%s), finish=(%s) where program=(%s)", (start, finish, program)) + + conn.commit() + + cur.close() + + except (Exception, psycopg2.DatabaseError) as error: + + print(error) + + finally: + + if conn is not None: + + conn.close() + def get_parameter( parameter, file_path ): # Check if secrets file exists @@ -112,27 +133,30 @@ if __name__ == '__main__': fediverse_db = get_parameter("fediverse_db", db_config_filepath) fediverse_db_user = get_parameter("fediverse_db_user", db_config_filepath) - world_peers = [] - user_agent = {'User-agent': "fediverse's stats (fediverse@mastodont.cat)"} res = requests.get('https://' + mastodon_hostname + peers_api, headers = user_agent, timeout=3) hostname_peers = res.json() + start = datetime.now() + + program = 'getpeers' + + finish = start + + save_time(program, start, finish) + for peer in hostname_peers: - exist_count = world_peers.count(peer) - - if exist_count == 0: - - world_peers.append(peer) - - start = time.time() + write_server(peer, mastodon_hostname) results = ray.get([get_peers.remote(server) for server in hostname_peers]) - print(f"duration = {time.time() - start}.\nprocessed servers: {len(results)}") + finish = datetime.now() + + print(f"duration = {finish - start}.\nprocessed servers: {len(results)}") + + save_time(program, start, finish) - write_servers(world_peers)