import os import sys import time from datetime import datetime import requests import json import psycopg2 import ray import pdb ray.init(num_cpus = 32) # Specify this system CPUs. def write_server(server, federated_with): insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at, checked) VALUES(%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING" conn = None try: conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(insert_sql, (server, federated_with, now, now, 'f')) print(f'writing {server} to world database') conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() @ray.remote def get_peers(peer): try: user_agent = {'User-agent': "fediverse's stats (fediverse@mastodont.cat)"} domain_res = requests.get('https://' + peer + '/api/v1/instance', headers = user_agent, timeout=3) domain_res_json = domain_res.json() if domain_res.status_code == 200: domain_uri = domain_res_json['uri'].replace('https://', '') if domain_uri != peer: print(f'{peer} is an aliased domain of {domain_uri}!') else: response = requests.get('https://' + peer + peers_api, headers = user_agent, timeout=3) response_json = response.json() if response.status_code == 200: try: print(f"Server: {peer}, federated with {str(len(response_json))} servers") for peer_peer in response_json: write_server(peer_peer, peer) except: pass except: pass def save_time(program, start, finish): insert_sql = "INSERT INTO execution_time(program, start, finish) VALUES(%s,%s,%s) ON CONFLICT DO NOTHING" conn = None try: conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(insert_sql, (program, start, finish,)) cur.execute("UPDATE execution_time SET start=(%s), finish=(%s) where program=(%s)", (start, finish, program)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def get_parameter( parameter, file_path ): # Check if secrets file exists if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) sys.exit(0) # Find parameter in file with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() # Cannot find parameter, exit print(file_path + " Missing parameter %s "%parameter) sys.exit(0) ############################################################################### # main if __name__ == '__main__': now = datetime.now() peers_api = '/api/v1/instance/peers?' # Load configuration from config file config_filepath = "config/config.txt" mastodon_hostname = get_parameter("mastodon_hostname", config_filepath) # Load database config from db_config file db_config_filepath = "config/db_config.txt" fediverse_db = get_parameter("fediverse_db", db_config_filepath) fediverse_db_user = get_parameter("fediverse_db_user", db_config_filepath) user_agent = {'User-agent': "fediverse's stats (fediverse@mastodont.cat)"} res = requests.get('https://' + mastodon_hostname + peers_api, headers = user_agent, timeout=3) hostname_peers = res.json() start = datetime.now() program = 'getpeers' finish = start save_time(program, start, finish) for peer in hostname_peers: write_server(peer, mastodon_hostname) results = ray.get([get_peers.remote(server) for server in hostname_peers]) finish = datetime.now() print(f"duration = {finish - start}.\nprocessed servers: {len(results)}") save_time(program, start, finish)