import os import sys import time from datetime import datetime import requests import urllib3 import json import psycopg2 from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT #import ray import pdb #ray.init(num_cpus = 32) # Specify this system CPUs. class Peers: name = 'Mastodon server peers' def __init__(self, mastodon_hostname=None, peers_api=None): self.peers_api = '/api/v1/instance/peers' self.config_path = "config/config.txt" self.apis = ['/api/v1/instance?', '/nodeinfo/2.0?', '/nodeinfo/2.0.json?', '/main/nodeinfo/2.0?', '/api/statusnet/config?', '/api/nodeinfo/2.0.json?', '/api/nodeinfo?', '/wp-json/nodeinfo/2.0?', '/api/v1/instance/nodeinfo/2.0?', '/.well-known/x-nodeinfo2?' ] is_setup = self.__check_setup(self) if is_setup: self.mastodon_hostname = self.__get_parameter("mastodon_hostname", self.config_path) self.blocker_db = self.__get_parameter("blocker_db", self.config_path) self.blocker_db_user = self.__get_parameter("blocker_db_user", self.config_path) else: self.mastodon_hostname, self.blocker_db, self.blocker_db_user = self.__setup(self) db_setup = self.__check_dbsetup(self) if not db_setup: self.__createdb(self) def get_peers(self): user_agent = {'User-agent': "fediverse's stats (fediverse@soc.catala.digital)"} res = requests.get(f'https://{self.mastodon_hostname}{self.peers_api}', headers = user_agent, timeout=3) peers = res.json() return peers def getsoft(self, server): if server.find(".") == -1: return if server.find("@") != -1: return if server.find("/") != -1: return if server.find(":") != -1: return soft = '' is_nodeinfo = False url = 'https://' + server user_agent = {'User-agent': "fediverse's stats (fediverse@soc.catala.digital)"} try: response = requests.get(url + '/.well-known/nodeinfo', headers = user_agent, timeout=3) if response.status_code == 200: try: response_json = response.json() nodeinfo = response_json['links'][0]['href'].replace(f'https://{server}','') try: nodeinfo_data = requests.get(url + nodeinfo, headers = user_agent, timeout=3) if nodeinfo_data.status_code == 200: nodeinfo_json = nodeinfo_data.json() is_nodeinfo = True else: print(f"Server {server}'s nodeinfo not responding: error code {nodeinfo_data.status_code}") except: pass except: print(f'Server {server} not responding: error code {response.status_code}') print('*********************************************************************') pass else: for api in self.apis: try: response = requests.get(url + api, headers = user_agent, timeout=3) if is_json(response.text): nodeinfo_json = response.json() if 'software' in nodeinfo_json: nodeinfo = api is_nodeinfo = True break elif 'title' in nodeinfo_json: if nodeinfo_json['title'] == 'Zap': nodeinfo = api is_nodeinfo = True soft = 'zap' break elif 'version' in nodeinfo_json: nodeinfo = api is_nodeinfo = True break except: pass except requests.exceptions.SSLError as errssl: pass except requests.exceptions.HTTPError as errh: pass except requests.exceptions.ConnectionError as errc: pass except requests.exceptions.ReadTimeout as to_err: pass except requests.exceptions.TooManyRedirects as tmr_err: pass except urllib3.exceptions.LocationParseError as lp_err: pass except requests.exceptions.InvalidURL as iu_err: pass except requests.exceptions.ChunkedEncodingError as chunk_err: print(f'ChunkedEncodingError! {server}') pass except ray.exceptions.RaySystemError as ray_sys_error: print(ray_sys_error) pass else: if is_nodeinfo: if nodeinfo != '/api/v1/instance?': if nodeinfo != '/.well-known/x-nodeinfo2?': try: soft = nodeinfo_json['software']['name'] soft = soft.lower() soft_version = nodeinfo_json['software']['version'] users = nodeinfo_json['usage']['users']['total'] alive = True self.write_api(server, soft, users, alive, nodeinfo, soft_version) print(f"Server {server} ({soft} {soft_version}) is alive!") print('*********************************************************************') return except: pass else: try: soft = nodeinfo_json['server']['software'] soft = soft.lower() soft_version = nodeinfo_json['server']['version'] users = nodeinfo_json['usage']['users']['total'] alive = True if soft == 'socialhome': self.write_api(server, soft, users, alive, nodeinfo, soft_version) print('*********************************************************************') print(f"Server {serve}r ({soft} {soft_version}) is alive!") print('*********************************************************************') return except: pass if soft == '' and nodeinfo == "/api/v1/instance?": soft = 'mastodon' try: users = nodeinfo_json['stats']['user_count'] if users > 1000000: return except: users = 0 try: soft_version = nodeinfo_json['version'] except: soft_version = 'unknown' alive = True self.write_api(self, server, soft, users, alive, nodeinfo, soft_version) print('*********************************************************************') print(f"Server {server} ({soft}) is alive!") elif soft == 'zap' and nodeinfo == "/api/v1/instance?": soft = 'zap' users = nodeinfo_json['stats']['user_count'] soft_version = nodeinfo_json['version'] alive = True print(server, soft, users, alive, api) print('*********************************************************************') print(f"Server {server} ({soft}) is alive!") else: print(f'Server {server} is dead') print('*********************************************************************') def write_api(self, server, software, users, alive, api, soft_version): insert_sql = "INSERT INTO blocker(server, updated_at, software, users, alive, users_api, version) VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING" conn = None try: conn = psycopg2.connect(database=self.blocker_db, user=self.blocker_db_user, password="", host="/var/run/postgresql", port="5432") cur = conn.cursor() print(f'Writing {server} nodeinfo data...') cur.execute(insert_sql, (server, now, software, users, alive, api, soft_version)) cur.execute( "UPDATE blocker SET updated_at=(%s), software=(%s), users=(%s), alive=(%s), users_api=(%s), version=(%s) where server=(%s)", (now, software, users, alive, api, soft_version, server) ) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def save_time(self, program, start, finish): insert_sql = "INSERT INTO execution_time(program, start, finish) VALUES(%s,%s,%s) ON CONFLICT DO NOTHING" conn = None try: conn = psycopg2.connect(database = self.blocker_db, user = self.blocker_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(insert_sql, (program, start, finish,)) cur.execute("UPDATE execution_time SET start=(%s), finish=(%s) where program=(%s)", (start, finish, program)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() @staticmethod def __check_setup(self): is_setup = False if not os.path.isfile(self.config_path): print(f"File {self.config_path} not found, running setup.") else: is_setup = True return is_setup @staticmethod def __setup(self): if not os.path.exists('config'): os.makedirs('config') self.mastodon_hostname = input("Mastodon hostname: ") self.blocker_db = input("Blocker database name: ") self.blocker_db_user = input("Blocker database user: ") if not os.path.exists(self.config_path): with open(self.config_path, 'w'): pass print(f"{self.config_path} created!") with open(self.config_path, 'a') as the_file: print("Writing Blocker parameters to " + self.config_path) the_file.write(f'mastodon_hostname: {self.mastodon_hostname}\n') the_file.write(f'blocker_db: {self.blocker_db}\n') the_file.write(f'blocker_db_user: {self.blocker_db_user}\n') return (self.mastodon_hostname, self.blocker_db, self.blocker_db_user) @staticmethod def __check_dbsetup(self): dbsetup = False try: conn = None conn = psycopg2.connect(database = self.blocker_db, user = self.blocker_db_user, password = "", host = "/var/run/postgresql", port = "5432") dbsetup = True except (Exception, psycopg2.DatabaseError) as error: print(error) return dbsetup @staticmethod def __createdb(self): conn = None try: conn = psycopg2.connect(dbname='postgres', user=self.blocker_db_user, host='', password='') conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() print(f"Creating database {self.blocker_db}. Please wait...") cur.execute(sql.SQL("CREATE DATABASE {}").format( sql.Identifier(self.blocker_db)) ) print(f"Database {self.blocker_db} created!\n") self.__dbtables_schemes(self) except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() @staticmethod def __dbtables_schemes(self): table = "blocker" sql = f"""create table {table} (server varchar(200) PRIMARY KEY, users INT, updated_at timestamptz, software varchar(50), alive boolean, users_api varchar(50), version varchar(100), first_checked_at timestamptz, last_checked_at timestamptz, downs int)""" self.__create_table(self, table, sql) table = "execution_time" sql = "create table "+table+" (program varchar(30) PRIMARY KEY, start timestamptz, finish timestamptz)" self.__create_table(self, table, sql) @staticmethod def __create_table(self, table, sql): conn = None try: conn = psycopg2.connect(database = self.blocker_db, user = self.blocker_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() print(f"Creating table {table}") cur.execute(sql) conn.commit() print(f"Table {table} created!\n") except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() @staticmethod def __get_parameter(parameter, file_path ): with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() print(f'{file_path} Missing parameter {parameter}') sys.exit(0) def write_server(server, federated_with): insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at, checked) VALUES(%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING" conn = None try: conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(insert_sql, (server, federated_with, now, now, 'f')) print(f'writing {server} to world database') conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() ############################################################################### # main if __name__ == '__main__': obj = Peers() peers = obj.get_peers() print(f"{obj.mastodon_hostname}'s peers: {len(peers)}") now = datetime.now() start = datetime.now() program = obj.name finish = start obj.save_time(program, start, finish) for peer in peers: obj.getsoft(peer) #results = ray.get([obj.getsoft.remote(server) for server in peers]) finish = datetime.now() print(f"duration = {finish - start}.\nprocessed servers: {len(peers)}") obj.save_time(program, start, finish)