import os import sys from mastodon import Mastodon from mastodon.Mastodon import MastodonNotFoundError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonUnauthorizedError, MastodonIllegalArgumentError import sqlite3 from sqlite3 import Error import getpass import fileinput,re import requests class DomainBlocks(): name = 'Domain blocks for Mastodon' def __init__(self, mastodon_hostname=None, domain_block_api=None, session=None): self.domain_blocks_api = '/api/v1/admin/domain_blocks' self.config_file = "config/config.txt" self.secrets_file = "secrets/secrets.txt" is_setup = self.__check_setup(self.secrets_file) if is_setup: self.__uc_client_id = self.__get_parameter("uc_client_id", self.secrets_file) self.__uc_client_secret = self.__get_parameter("uc_client_secret", self.secrets_file) self.__uc_access_token = self.__get_parameter("uc_access_token", self.secrets_file) self.sqlite_db = 'database/blocksoft.db' self.mastodon, self.mastodon_hostname, self.headers = self.log_in() else: while(True): logged_in, self.mastodon, self.mastodon_hostname = self.setup() if not logged_in: print("\nLog in failed! Try again.\n") else: db_setup = self.__check_dbsetup(self) break if not os.path.exists('database'): os.makedirs('database') self.__check_dbsetup(self) if session: self.session = session else: self.session = requests.Session() def get_servers(self, software): servers_list = [] try: conn = None conn = sqlite3.connect(self.sqlite_db) cur = conn.cursor() cur.execute("select server from blocker where software=?", (software,)) rows = cur.fetchall() for row in rows: servers_list.append(row[0]) cur.close() except sqlite3.DatabaseError as db_error: sys.exit(db_error) finally: if conn is not None: conn.close() return (servers_list) def log_in(self): uc_client_id = self.__get_parameter("uc_client_id", self.secrets_file) uc_client_secret = self.__get_parameter("uc_client_secret", self.secrets_file) uc_access_token = self.__get_parameter("uc_access_token", self.secrets_file) self.mastodon_hostname = self.__get_parameter("mastodon_hostname", self.config_file) mastodon = Mastodon( client_id = uc_client_id, client_secret = uc_client_secret, access_token = uc_access_token, api_base_url = 'https://' + self.mastodon_hostname, ) headers={ 'Authorization': 'Bearer %s'%uc_access_token } return (mastodon, self.mastodon_hostname, headers) def domain_blocks_create(self, server): data = { 'domain': server, 'severity': 'suspend', 'reject_media': 'true', 'reject_reports': 'true', 'obfuscate': 'true', } endpoint = f'https://{self.mastodon_hostname}/{self.domain_blocks_api}' response = self.__api_request('POST', endpoint, data) if response.ok: print(f"Done, {server} is now suspended") else: pass @staticmethod def __check_setup(file_path): is_setup = False if not os.path.isfile(file_path): print(f"File {file_path} not found, running setup.") return else: is_setup = True return is_setup def setup(self): logged_in = False try: self.mastodon_hostname = input("Enter Mastodon hostname (or 'q' to exit): ") if self.mastodon_hostname == 'q': sys.exit("Bye") user_name = input("Bot user name, ex. john? ") user_password = getpass.getpass("Bot password? ") app_name = input("Bot App name? ") client_id, client_secret = Mastodon.create_app( app_name, scopes = ["read", "write", "admin:read", "admin:write"], to_file="app_clientcred.txt", api_base_url=self.mastodon_hostname ) mastodon = Mastodon(client_id = "app_clientcred.txt", api_base_url = self.mastodon_hostname) mastodon.log_in( user_name, user_password, scopes = ["read", "write", "admin:read", "admin:write"], to_file = "app_usercred.txt" ) if os.path.isfile("app_usercred.txt"): print(f"Log in succesful!") logged_in = True if not os.path.exists('secrets'): os.makedirs('secrets') if not os.path.exists(self.secrets_file): with open(self.secrets_file, 'w'): pass print(f"{self.secrets_file} created!") with open(self.secrets_file, 'a') as the_file: print(f"Writing secrets parameter names to {self.secrets_file}") the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n') client_path = 'app_clientcred.txt' with open(client_path) as fp: line = fp.readline() cnt = 1 while line: if cnt == 1: print(f"Writing client id to {self.secrets_file}") self.__modify_file(self, self.secrets_file, "uc_client_id: ", value=line.rstrip()) elif cnt == 2: print(f"Writing client secret to {self.secrets_file}") self.__modify_file(self, self.secrets_file, "uc_client_secret: ", value=line.rstrip()) line = fp.readline() cnt += 1 token_path = 'app_usercred.txt' with open(token_path) as fp: line = fp.readline() print(f"Writing access token to {self.secrets_file}") self.__modify_file(self, self.secrets_file, "uc_access_token: ", value=line.rstrip()) if os.path.exists("app_clientcred.txt"): print("Removing app_clientcred.txt temp file..") os.remove("app_clientcred.txt") if os.path.exists("app_usercred.txt"): print("Removing app_usercred.txt temp file..") os.remove("app_usercred.txt") self.__create_config(self) self.__write_config(self) print("Secrets setup done!\n") print("Blocker setup done!\n") except MastodonIllegalArgumentError as i_error: sys.stdout.write(f'\n{str(i_error)}\n') except MastodonNetworkError as n_error: sys.stdout.write(f'\n{str(n_error)}\n') except MastodonReadTimeout as r_error: sys.stdout.write(f'\n{str(r_error)}\n') except MastodonAPIError as a_error: sys.stdout.write(f'\n{str(a_error)}\n') return (logged_in, mastodon, self.mastodon_hostname) def __api_request(self, method, endpoint, data={}): response = None try: kwargs = dict(data=data) response = self.session.request(method, url = endpoint, headers = self.headers, **kwargs) except Exception as e: raise MastodonNetworkError(f"Could not complete request: {e}") if response is None: raise MastodonIllegalArgumentError("Illegal request.") if not response.ok: try: if isinstance(response, dict) and 'error' in response: error_msg = response['error'] elif isinstance(response, str): error_msg = response else: error_msg = None except ValueError: error_msg = None if response.status_code == 404: ex_type = MastodonNotFoundError if not error_msg: error_msg = 'Endpoint not found.' # this is for compatibility with older versions # which raised MastodonAPIError('Endpoint not found.') # on any 404 elif response.status_code == 401: ex_type = MastodonUnauthorizedError elif response.status_code == 422: return response elif response.status_code == 500: ex_type = MastodonInternalServerError elif response.status_code == 502: ex_type = MastodonBadGatewayError elif response.status_code == 503: ex_type = MastodonServiceUnavailableError elif response.status_code == 504: ex_type = MastodonGatewayTimeoutError elif response.status_code >= 500 and \ response.status_code <= 511: ex_type = MastodonServerError else: ex_type = MastodonAPIError raise ex_type( 'Mastodon API returned error', response.status_code, response.reason, error_msg) else: return response @staticmethod def __check_dbsetup(self): if not os.path.exists('database'): os.makedirs('database') dbsetup = False try: conn = None conn = sqlite3.connect(self.sqlite_db) dbsetup = True self.__createdb(self) except sqlite3.DatabaseError as db_error: print(db_error) return dbsetup @staticmethod def __createdb(self): conn = None try: conn = sqlite3.connect(self.sqlite_db) print(f"Database {self.sqlite_db} created!\n") self.__dbtables_schemes(self) except sqlite3.DatabaseError as db_error: print(db_error) finally: if conn is not None: conn.close() @staticmethod def __dbtables_schemes(self): table = "blocker" sql = "create table "+table+" (server varchar(200), users int, updated_at timestamptz, software varchar(50), alive boolean, users_api varchar(50), version varchar(100), first_checked_at timestamptz, last_checked_at timestamptz, downs int)" self.__create_table(self, table, sql) table = "execution_time" sql = "create table "+table+" (program varchar(30), start timestamptz, finish timestamptz)" self.__create_table(self, table, sql) @staticmethod def __create_table(self, table, sql): conn = None try: conn = sqlite3.connect(self.sqlite_db) cur = conn.cursor() print(f"Creating table {table}") cur.execute(sql) conn.commit() print(f"Table {table} created!\n") except sqlite3.DatabaseError as db_error: print(db_error) finally: if conn is not None: conn.close() def __get_parameter(self, parameter, config_file): if not os.path.isfile(config_file): print(f"File {config_file} not found, exiting.") sys.exit(0) with open( config_file ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() print(f"{config_file} Missing parameter {parameter}") sys.exit(0) @staticmethod def __modify_file(self, file_name, pattern,value=""): fh=fileinput.input(file_name,inplace=True) for line in fh: replacement=pattern + value line=re.sub(pattern,replacement,line) sys.stdout.write(line) fh.close() @staticmethod def __create_config(self): if not os.path.exists('config'): os.makedirs('config') if not os.path.exists(self.config_file): print(self.config_file + " created!") with open('config/config.txt', 'w'): pass @staticmethod def __write_config(self): with open(self.config_file, 'a') as the_file: the_file.write(f'mastodon_hostname: {self.mastodon_hostname}\n') print(f"adding parameter 'mastodon_hostname' to {self.config_file}\n") ############################################################################### # main if __name__ == '__main__': blocker = DomainBlocks() soft_list = 'software.txt' soft_file = open(soft_list, 'r') Lines = soft_file.readlines() for software in Lines: software = software.replace('\n', '') print(f'checking software {software}...') servers_list = blocker.get_servers(software) for server in servers_list: blocker.domain_blocks_create(server)