import time start_time = time.time() from six.moves import urllib from datetime import datetime from subprocess import call from mastodon import Mastodon import threading import os import json import signal import sys import os.path import requests import operator import calendar import psycopg2 from itertools import product from multiprocessing import Pool, Lock, Process, Queue, current_process import queue import multiprocessing import aiohttp import aiodns import asyncio from aiohttp import ClientError, ClientSession, ClientConnectionError, ClientConnectorError, ClientSSLError, ClientConnectorSSLError, ServerTimeoutError from asyncio import TimeoutError import socket from socket import gaierror, gethostbyname updated_at = datetime.now() peers_api = '/api/v1/instance/peers?' lemmy_api = '/api/v2/site?' def is_json(myjson): try: json_object = json.loads(myjson) except ValueError as e: return False return True def get_lemmy_server(server): if server.find(".") == -1: return if server.find("@") != -1: return if server.find("/") != -1: return if server.find(":") != -1: return try: loop = asyncio.get_event_loop() coroutines = [get_lemmy_peers(server)] loop.run_until_complete(asyncio.gather(*coroutines, return_exceptions=True)) except: pass def getserver(server): if server.find(".") == -1: return if server.find("@") != -1: return if server.find("/") != -1: return if server.find(":") != -1: return try: loop = asyncio.get_event_loop() coroutines = [getpeers(server)] loop.run_until_complete(asyncio.gather(*coroutines, return_exceptions=True)) except: pass async def get_lemmy_peers(server): try: socket.gethostbyname(server) except socket.gaierror: return url = 'https://' + server timeout = aiohttp.ClientTimeout(total=3) async with aiohttp.ClientSession(timeout=timeout) as session: try: async with session.get(url+lemmy_api) as resp: response = await resp.json() if resp.status == 200: try: data = response['federated_instances']['linked'] print("Server: " + server + ", " + "federated with " + str(len(data)) + " servers") i = 0 while i < len(data): saved_at = datetime.now() insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" conn = None try: conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(insert_sql, (data[i], server, updated_at, saved_at,)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() i += 1 except: pass except aiohttp.ClientConnectorError as err: pass async def getpeers(server): try: socket.gethostbyname(server) except socket.gaierror: return url = 'https://' + server timeout = aiohttp.ClientTimeout(total=3) async with aiohttp.ClientSession(timeout=timeout) as session: try: async with session.get(url+peers_api) as resp: response = await resp.json() if resp.status == 200: try: response_json = response print("Server: " + server + ", " + "federated with " + str(len(response_json)) + " servers") i = 0 while i < len(response_json): saved_at = datetime.now() insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" conn = None try: conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(insert_sql, (response_json[i], server, updated_at, saved_at,)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() i += 1 except: pass except aiohttp.ClientConnectorError as err: pass ############################################################################### # INITIALISATION ############################################################################### # Returns the parameter from the specified file def get_parameter( parameter, file_path ): # Check if secrets file exists if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) sys.exit(0) # Find parameter in file with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() # Cannot find parameter, exit print(file_path + " Missing parameter %s "%parameter) sys.exit(0) # Load secrets from secrets file secrets_filepath = "secrets/secrets.txt" uc_client_id = get_parameter("uc_client_id", secrets_filepath) uc_client_secret = get_parameter("uc_client_secret", secrets_filepath) uc_access_token = get_parameter("uc_access_token", secrets_filepath) # Load configuration from config file config_filepath = "config/config.txt" mastodon_hostname = get_parameter("mastodon_hostname", config_filepath) # Load database config from db_config file db_config_filepath = "config/db_config.txt" fediverse_db = get_parameter("fediverse_db", db_config_filepath) fediverse_db_user = get_parameter("fediverse_db_user", db_config_filepath) # Initialise Mastodon API mastodon = Mastodon( client_id = uc_client_id, client_secret = uc_client_secret, access_token = uc_access_token, api_base_url = 'https://' + mastodon_hostname, ) # Initialise access headers headers={ 'Authorization': 'Bearer %s'%uc_access_token } ############################################################################### # main if __name__ == '__main__': lemmy_server = 'lemmy.ml' get_lemmy_server(lemmy_server) getserver(mastodon_hostname) self_peers = mastodon.instance_peers() ########################################################################### nprocs = multiprocessing.cpu_count() with multiprocessing.Pool(processes=nprocs) as pool: results = pool.starmap(getserver, product(self_peers)) exec_time = str(round((time.time() - start_time), 2)) print(exec_time)