import os import sys from mastodon import Mastodon from mastodon.Mastodon import MastodonNotFoundError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonUnauthorizedError, MastodonIllegalArgumentError import psycopg2 from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import getpass import fileinput,re import requests class DomainBlocks(): name = 'Domain blocks for Mastodon' def __init__(self, mastodon_hostname=None, domain_block_api=None, session=None): self.domain_blocks_api = '/api/v1/admin/domain_blocks' self.config_file = "config/config.txt" self.secrets_file = "secrets/secrets.txt" is_setup = self.__check_setup(self.secrets_file) if is_setup: self.__uc_client_id = self.__get_parameter("uc_client_id", self.secrets_file) self.__uc_client_secret = self.__get_parameter("uc_client_secret", self.secrets_file) self.__uc_access_token = self.__get_parameter("uc_access_token", self.secrets_file) self.blocker_db, self.blocker_db_user = self.db_config() self.mastodon, self.mastodon_hostname, self.headers = self.log_in() else: while(True): logged_in, self.mastodon, self.mastodon_hostname = self.setup() if not logged_in: print("\nLog in failed! Try again.\n") else: break db_setup = self.__check_dbsetup(self) if not db_setup: self.__createdb(self) if session: self.session = session else: self.session = requests.Session() def get_servers(self, software): servers_list = [] try: conn = None conn = psycopg2.connect(database = self.blocker_db, user = self.blocker_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select server from blocker where software=(%s)", (software,)) rows = cur.fetchall() for row in rows: servers_list.append(row[0]) cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() return (servers_list) def log_in(self): uc_client_id = self.__get_parameter("uc_client_id", self.secrets_file) uc_client_secret = self.__get_parameter("uc_client_secret", self.secrets_file) uc_access_token = self.__get_parameter("uc_access_token", self.secrets_file) self.mastodon_hostname = self.__get_parameter("mastodon_hostname", self.config_file) mastodon = Mastodon( client_id = uc_client_id, client_secret = uc_client_secret, access_token = uc_access_token, api_base_url = 'https://' + self.mastodon_hostname, ) headers={ 'Authorization': 'Bearer %s'%uc_access_token } return (mastodon, self.mastodon_hostname, headers) def domain_blocks_create(self, server, priv_comment, pub_comment): data = { 'domain': server, 'severity': 'suspend', 'reject_media': 'true', 'reject_reports': 'true', 'private_comment': priv_comment, 'public_comment': pub_comment, 'obfuscate': 'true', } endpoint = f'https://{self.mastodon_hostname}/{self.domain_blocks_api}' response = self.__api_request('POST', endpoint, data) if response.ok: print(f"Done, {server} is now suspended") else: pass @staticmethod def __check_setup(file_path): is_setup = False if not os.path.isfile(file_path): print(f"File {file_path} not found, running setup.") return else: is_setup = True return is_setup def setup(self): logged_in = False try: self.mastodon_hostname = input("Enter Mastodon hostname (or 'q' to exit): ") if self.mastodon_hostname == 'q': sys.exit("Bye") user_name = input("Bot user name, ex. john? ") user_password = getpass.getpass("Bot password? ") app_name = input("Bot App name? ") self.blocker_db = input("Blocker's database name: ") self.blocker_db_user = input("blocker's database user: ") client_id, client_secret = Mastodon.create_app( app_name, scopes = ["read", "write", "admin:read", "admin:write"], to_file="app_clientcred.txt", api_base_url=self.mastodon_hostname ) mastodon = Mastodon(client_id = "app_clientcred.txt", api_base_url = self.mastodon_hostname) mastodon.log_in( user_name, user_password, scopes = ["read", "write", "admin:read", "admin:write"], to_file = "app_usercred.txt" ) if os.path.isfile("app_usercred.txt"): print(f"Log in succesful!") logged_in = True if not os.path.exists('secrets'): os.makedirs('secrets') if not os.path.exists(self.secrets_file): with open(self.secrets_file, 'w'): pass print(f"{self.secrets_file} created!") with open(self.secrets_file, 'a') as the_file: print(f"Writing secrets parameter names to {self.secrets_file}") the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n') client_path = 'app_clientcred.txt' with open(client_path) as fp: line = fp.readline() cnt = 1 while line: if cnt == 1: print(f"Writing client id to {self.secrets_file}") self.__modify_file(self, self.secrets_file, "uc_client_id: ", value=line.rstrip()) elif cnt == 2: print(f"Writing client secret to {self.secrets_file}") self.__modify_file(self, self.secrets_file, "uc_client_secret: ", value=line.rstrip()) line = fp.readline() cnt += 1 token_path = 'app_usercred.txt' with open(token_path) as fp: line = fp.readline() print(f"Writing access token to {self.secrets_file}") self.__modify_file(self, self.secrets_file, "uc_access_token: ", value=line.rstrip()) if os.path.exists("app_clientcred.txt"): print("Removing app_clientcred.txt temp file..") os.remove("app_clientcred.txt") if os.path.exists("app_usercred.txt"): print("Removing app_usercred.txt temp file..") os.remove("app_usercred.txt") self.__create_config(self) self.__write_config(self) print("Secrets setup done!\n") with open(self.config_file, 'a') as the_file: print(f"Writing Mastodon hostname parameter to {self.config_file}") the_file.write(f'blocker_db: {self.blocker_db}\n') the_file.write(f'blocker_db_user: {self.blocker_db_user}\n') print("Blocker setup done!\n") except MastodonIllegalArgumentError as i_error: sys.stdout.write(f'\n{str(i_error)}\n') except MastodonNetworkError as n_error: sys.stdout.write(f'\n{str(n_error)}\n') except MastodonReadTimeout as r_error: sys.stdout.write(f'\n{str(r_error)}\n') except MastodonAPIError as a_error: sys.stdout.write(f'\n{str(a_error)}\n') return (logged_in, mastodon, self.mastodon_hostname) def __api_request(self, method, endpoint, data={}): response = None try: kwargs = dict(data=data) response = self.session.request(method, url = endpoint, headers = self.headers, **kwargs) except Exception as e: raise MastodonNetworkError(f"Could not complete request: {e}") if response is None: raise MastodonIllegalArgumentError("Illegal request.") if not response.ok: try: if isinstance(response, dict) and 'error' in response: error_msg = response['error'] elif isinstance(response, str): error_msg = response else: error_msg = None except ValueError: error_msg = None if response.status_code == 404: ex_type = MastodonNotFoundError if not error_msg: error_msg = 'Endpoint not found.' # this is for compatibility with older versions # which raised MastodonAPIError('Endpoint not found.') # on any 404 elif response.status_code == 401: ex_type = MastodonUnauthorizedError elif response.status_code == 422: return response elif response.status_code == 500: ex_type = MastodonInternalServerError elif response.status_code == 502: ex_type = MastodonBadGatewayError elif response.status_code == 503: ex_type = MastodonServiceUnavailableError elif response.status_code == 504: ex_type = MastodonGatewayTimeoutError elif response.status_code >= 500 and \ response.status_code <= 511: ex_type = MastodonServerError else: ex_type = MastodonAPIError raise ex_type( 'Mastodon API returned error', response.status_code, response.reason, error_msg) else: return response def db_config(self): self.blocker_db = self.__get_parameter("blocker_db", self.config_file) self.blocker_db_user = self.__get_parameter("blocker_db_user", self.config_file) return (self.blocker_db, self.blocker_db_user) @staticmethod def __check_dbsetup(self): dbsetup = False try: conn = None conn = psycopg2.connect(database = self.blocker_db, user = self.blocker_db_user, password = "", host = "/var/run/postgresql", port = "5432") dbsetup = True except (Exception, psycopg2.DatabaseError) as error: print(error) return dbsetup @staticmethod def __createdb(self): conn = None try: conn = psycopg2.connect(dbname='postgres', user=self.blocker_db_user, host='', password='') conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() print(f"Creating database {self.blocker_db}. Please wait...") cur.execute(sql.SQL("CREATE DATABASE {}").format( sql.Identifier(self.blocker_db)) ) print(f"Database {self.blocker_db} created!\n") self.__dbtables_schemes(self) except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() @staticmethod def __dbtables_schemes(self): table = "blocker" sql = "create table "+table+" (server varchar(200), users int, updated_at timestamptz, software varchar(50), alive boolean, users_api varchar(50), version varchar(100), first_checked_at timestamptz, last_checked_at timestamptz, downs int)" self.__create_table(self, table, sql) table = "execution_time" sql = "create table "+table+" (program varchar(30), start timestamptz, finish timestamptz)" self.__create_table(self, table, sql) @staticmethod def __create_table(self, table, sql): conn = None try: conn = psycopg2.connect(database = self.blocker_db, user = self.blocker_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() print(f"Creating table {table}") cur.execute(sql) conn.commit() print(f"Table {table} created!\n") except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def __get_parameter(self, parameter, config_file): if not os.path.isfile(config_file): print(f"File {config_file} not found, exiting.") sys.exit(0) with open( config_file ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() print(f"{config_file} Missing parameter {parameter}") sys.exit(0) @staticmethod def __modify_file(self, file_name, pattern,value=""): fh=fileinput.input(file_name,inplace=True) for line in fh: replacement=pattern + value line=re.sub(pattern,replacement,line) sys.stdout.write(line) fh.close() @staticmethod def __create_config(self): if not os.path.exists('config'): os.makedirs('config') if not os.path.exists(self.config_file): print(self.config_file + " created!") with open('config/config.txt', 'w'): pass @staticmethod def __write_config(self): with open(self.config_file, 'a') as the_file: the_file.write(f'mastodon_hostname: {self.mastodon_hostname}\n') print(f"adding parameter 'mastodon_hostname' to {self.config_file}\n") ############################################################################### # main if __name__ == '__main__': blocker = DomainBlocks() soft_list = 'software.txt' soft_file = open(soft_list, 'r') Lines = soft_file.readlines() for softline in Lines: split_char = ' ' software = softline.partition(split_char)[0] private_comment = softline.partition(split_char)[2] public_comment = softline.partition(split_char)[2] print(f'checking software {software}...') servers_list = blocker.get_servers(software) for server in servers_list: blocker.domain_blocks_create(server, private_comment, public_comment )