from multiprocessing import set_start_method from multiprocessing import get_context from itertools import product import os import sys import time from datetime import datetime import requests import json import psycopg2 import ray import pdb ray.init(num_cpus = 25) # Specify this system CPUs. def get_totals(): conn = None try: conn = psycopg2.connect(database = cleanserver_db, user = cleanserver_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute('select count(*) from peers') row = cur.fetchone() total_peers = row[0] if row != None else 0 cur.execute('select count(*) from peers where not alive') row = cur.fetchone() not_alive_peers = row[0] if row != None else 0 cur.execute('select count(*) from peers where alive') row = cur.fetchone() alive_peers = row[0] if row != None else 0 cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() return (alive_peers, not_alive_peers, total_peers) def dead(): dead_lst = [] conn = None try: conn = psycopg2.connect(database = cleanserver_db, user = cleanserver_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select server from peers where downs >=(%s)", (purge_days,)) rows = cur.fetchall() for row in rows: dead_lst.append(row) cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() return dead_lst def delete_deads(): conn = None try: conn = psycopg2.connect(database = cleanserver_db, user = cleanserver_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("delete from peers where downs >=(%s)", (purge_days,)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def write_server(server, alive, checked): cleanserver_db, cleanserver_db_user = dbconfig() updated_at = datetime.now() saved_at = datetime.now() select_sql = 'SELECT downs from peers where server=(%s)' if alive: insert_sql = "INSERT INTO peers(server, updated_at, saved_at, alive, checked, downs) VALUES(%s,%s,%s,%s,%s,%s) ON CONFLICT (server) DO UPDATE SET (server, updated_at, saved_at, alive, checked, downs) = (EXCLUDED.server, EXCLUDED.updated_at, EXCLUDED.saved_at, EXCLUDED.alive, EXCLUDED.checked, EXCLUDED.downs)" else: insert_sql = "INSERT INTO peers(server, updated_at, alive, checked, downs) VALUES(%s,%s,%s,%s,%s) ON CONFLICT (server) DO UPDATE SET (server, updated_at, alive, checked, downs) = (EXCLUDED.server, EXCLUDED.updated_at, EXCLUDED.alive, EXCLUDED.checked, EXCLUDED.downs)" conn = None try: conn = psycopg2.connect(database = cleanserver_db, user = cleanserver_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(select_sql, (server,)) row = cur.fetchone() downs_before = row[0] if row != None else 0 downs_now = downs_before + 1 if not alive else 0 if alive: cur.execute(insert_sql, (server, updated_at, saved_at, alive, checked, downs_now)) else: cur.execute(insert_sql, (server, updated_at, alive, checked, downs_now)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def write_totals(alive, not_alive, total): now = datetime.now() insert_sql = "INSERT INTO totals(datetime, alive, dead, total) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" conn = None try: conn = psycopg2.connect(database = cleanserver_db, user = cleanserver_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(insert_sql, (now, alive, not_alive, total)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() @ray.remote def check_peers(peer): cleanserver_db, cleanserver_db_user = dbconfig() user_agent = {'User-agent': 'Mozilla/5.0'} try: response = requests.get(f'https://{peer}', headers = user_agent, timeout=3) if response.status_code == 200: print(f'Server: {peer} is alive') alive = True checked = True write_server(peer, alive, checked) except requests.exceptions.ChunkedEncodingError as chunk_err: print(f'Server: {peer} is dead') alive = False checked = True write_server(peer, alive, checked) except requests.exceptions.InvalidSchema as invalid_err: print(f'Server: {peer} is dead') alive = False checked = True write_server(peer, alive, checked) except requests.exceptions.ConnectTimeout as err_ct: print(f'Server: {peer} is dead') alive = False checked = True write_server(peer, alive, checked) except requests.exceptions.ConnectionError as err_ce: print(f'Server: {peer} is dead') alive = False checked = True write_server(peer, alive, checked) except requests.exceptions.ReadTimeout as err_rt: print(f'Server: {peer} is dead') alive = False checked = True write_server(peer, alive, checked) except requests.exceptions.TooManyRedirects as err_mr: print(f'Server: {peer} is dead') alive = False checked = True write_server(peer, alive, checked) def purge(server): print(f'Purging server {server}') os.system(f'RAILS_ENV=production {rvm_ruby} {mastodon_full_path}/bin/tootctl domains purge {server}') def dbconfig(): # Load database config from db_config file db_config_filepath = "config/db_config.txt" cleanserver_db = get_parameter("cleanserver_db", db_config_filepath) cleanserver_db_user = get_parameter("cleanserver_db_user", db_config_filepath) return (cleanserver_db, cleanserver_db_user) def get_parameter( parameter, file_path ): # Check if secrets file exists if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) sys.exit(0) # Find parameter in file with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() # Cannot find parameter, exit print(file_path + " Missing parameter %s "%parameter) sys.exit(0) ############################################################################### # main if __name__ == '__main__': rvm_ruby = os.environ['HOME'] + "/.rbenv/shims/ruby" start_time = time.time() updated_at = datetime.now() peers_api = '/api/v1/instance/peers?' # Load configuration from config file config_filepath = "config/config.txt" mastodon_hostname = get_parameter("mastodon_hostname", config_filepath) mastodon_full_path = get_parameter("mastodon_full_path", config_filepath) purge_days = get_parameter("purge_days", config_filepath) cleanserver_db, cleanserver_db_user = dbconfig() ########################################## # purge dead servers dead_lst = dead() if len(dead_lst) > 0: servers_count = 1 for server in dead_lst: print(f'Purging server {servers_count} of {len(dead_lst)}...') purge(server[0]) servers_count+=1 ########################################## # delete dead servers old than purge_days delete_deads() ########################################## # get current hostname peers user_agent = {'User-agent': 'Mozilla/5.0'} res = requests.get('https://' + mastodon_hostname + peers_api, headers = user_agent, timeout=3) hostname_peers = res.json() ray.get([check_peers.remote(peer) for peer in hostname_peers]) alive_peers, not_alive_peers, total_peers = get_totals() print(f"\nTotal {mastodon_hostname}'s federated servers: {total_peers}") print(f'Alive servers: {alive_peers}') print(f'Not Alive servers: {not_alive_peers}') write_totals(alive_peers, not_alive_peers, total_peers) exec_time = str(round((time.time() - start_time), 2)) print(f'Execution time: {exec_time} seconds')