import getpass import os import time import sys import psycopg2 from mastodon import Mastodon from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError import uuid import requests from prettytable import PrettyTable import pdb menu_options = { 1: 'New Bot', 2: 'Delete Bot', 3: 'Activate Bot', 4: 'Deactivate Bot', 5: 'List Bots', 6: 'Exit', } def print_menu(): for key in menu_options.keys(): print (key, '-', menu_options[key] ) def get_config(): # Load configuration from config file config_filepath = "config/db_config.txt" replicator_db = get_db_params("replicator_db", config_filepath) replicator_db_user = get_db_params("replicator_db_user", config_filepath) return (config_filepath, replicator_db, replicator_db_user) def check_db_conn(): try: conn = None conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") except (Exception, psycopg2.DatabaseError) as error: sys.stdout.write(f'\n{str(error)}\n') sys.exit("Exiting. Run 'python db-setup' again") def check_host(url): host_exists = False try: r = requests.get(f'https://{url}') host_exists = True except requests.exceptions.RequestException as error: print(f'\n{url}: {error}') time.sleep(3) return host_exists class Bot: name = "Bot" def __init__(self): self.hostname = input("\nEnter Mastodon/Pleroma hostname: ") self.software = input("Server software (mastodon or pleroma)? ") self.username = input(f'User name, ex. user@{self.hostname}? ') self.password = getpass.getpass("User password? ") self.app_name = 'replicator' self.twitter_username = input(f'Twitter username (ex. jack)? ') def log_in(self): token = '' try: response = Mastodon.create_app( self.app_name, scopes=["read","write"], to_file=None, api_base_url=self.hostname ) client_id = response[0] client_secret = response[1] mastodon = Mastodon(client_id = client_id, client_secret = client_secret, api_base_url = self.hostname) token = mastodon.log_in( self.username, self.password, scopes = ["read", "write"], to_file = None ) print('Log in succesful!') except MastodonIllegalArgumentError as i_error: sys.stdout.write(f'\n{str(i_error)}\n') except MastodonNetworkError as n_error: sys.stdout.write(f'\n{str(n_error)}\n') except MastodonReadTimeout as r_error: sys.stdout.write(f'\n{str(r_error)}\n') except MastodonAPIError as a_error: sys.stdout.write(f'\n{str(a_error)}\n') finally: return (client_id, client_secret, token) def check_account(self): is_duplicate = False sql = 'select username from maccounts where username=(%s)' conn = None try: conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(sql, (self.username,)) row = cur.fetchone() if row != None: is_duplicate = True except (Exception, psycopg2.DatabaseError) as error: check_error = error.pgcode sys.stdout.write(f'\n{str(error)}\n') finally: if conn is not None: conn.close() return is_duplicate def save_account(self, client_id, client_secret, token): bot_uuid = str(uuid.uuid4()) save_error = None mastodon_sql = 'INSERT INTO maccounts (bot_id, hostname, username, client_id, client_secret, client_token, hostname_soft) VALUES (%s,%s,%s,%s,%s,%s,%s)' twitter_sql = 'INSERT INTO taccounts (bot_id, username) VALUES (%s,%s)' conn = None try: conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(mastodon_sql, (bot_uuid, self.hostname, self.username, client_id, client_secret, token, self.software)) cur.execute(twitter_sql, (bot_uuid, self.twitter_username)) conn.commit() except (Exception, psycopg2.DatabaseError) as error: save_error = error.pgcode sys.stdout.write(f'\n{str(error)}\n') finally: if conn is not None: conn.close() return save_error class GetBot: name = "GetBot" def __init__(self): self.twitter_username = input(f'Twitter username? ') def delete_account(self): del_error = None twitter_sql = 'SELECT bot_id from taccounts where username=(%s)' twitter_del_sql = 'DELETE FROM taccounts WHERE username=(%s)' mastodon_del_sql = 'DELETE FROM maccounts WHERE bot_id=(%s)' conn = None try: conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(twitter_sql, (self.twitter_username,)) row = cur.fetchone() if row != None: delete_it = confirm_deletion(self.twitter_username) if delete_it: bot_id = row[0] cur.execute(twitter_del_sql, (self.twitter_username,)) cur.execute(mastodon_del_sql, (bot_id,)) conn.commit() print(f'{self.twitter_username} had been deleted.') time.sleep(2) else: print(f'{self.twitter_username} does not exists!') time.sleep(2) except (Exception, psycopg2.DatabaseError) as error: del_error = error.pgcode sys.stdout.write(f'\n{str(error)}\n') finally: if conn is not None: conn.close() return (del_error) def activate_account(self): twitter_sql = 'SELECT bot_id from taccounts where username=(%s)' activate_sql = 'update maccounts set active = True WHERE bot_id=(%s)' conn = None try: conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(twitter_sql, (self.twitter_username,)) row = cur.fetchone() if row != None: cur.execute(activate_sql, (row[0],)) conn.commit() print(f'{self.twitter_username} had been activated') time.sleep(2) else: print(f'{self.twitter_username} is not in the database') except (Exception, psycopg2.DatabaseError) as error: activate_error = error.pgcode sys.stdout.write(f'\n{str(activate_error)}\n') finally: if conn is not None: conn.close() def deactivate_account(self): twitter_sql = 'SELECT bot_id from taccounts where username=(%s)' deactivate_sql = 'update maccounts set active = False WHERE bot_id=(%s)' conn = None try: conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(twitter_sql, (self.twitter_username,)) row = cur.fetchone() if row != None: cur.execute(deactivate_sql, (row[0],)) conn.commit() print(f'{self.twitter_username} had been deactivated') time.sleep(2) else: print(f'{self.twitter_username} is not in the database') except (Exception, psycopg2.DatabaseError) as error: activate_error = error.pgcode sys.stdout.write(f'\n{str(activate_error)}\n') finally: if conn is not None: conn.close() def print(self): print(f'\nBot found: {self.twitter_username}\n') def confirm_deletion(twitter_username): answer = '' while (answer != 'Yes' and answer != 'No'): answer = input(f'Are you sure deleting {twitter_username} (Yes or No)?: ') if answer == 'Yes': print('Ok, deleting it...') delete_it = True if answer == 'No': print('Doing nothing.') delete_it = False return delete_it def list_bots(): bots_list = [] t_usernames_lst = [] mastodon_sql = 'SELECT bot_id, hostname, username, hostname_soft, active FROM maccounts' twitter_sql = 'SELECT username from taccounts where bot_id=(%s)' conn = None try: conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(mastodon_sql) rows = cur.fetchall() for row in rows: cur.execute(twitter_sql, (row[0],)) username_row = cur.fetchone() t_usernames_lst.append(username_row[0]) bots_list.append(row) except (Exception, psycopg2.DatabaseError) as error: get_error = error.pgcode sys.stdout.write(f'\n{str(error)}\n') finally: if conn is not None: conn.close() return (bots_list, t_usernames_lst) def get_db_params( parameter, file_path ): if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) sys.exit(0) # Find parameter in file with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() # Cannot find parameter, exit print(file_path + " Missing parameter %s "%parameter) sys.exit(0) ############################################################################### # main if __name__ == '__main__': while(True): print('\n') print_menu() option = '' try: option = int(input('\nEnter your choice: ')) except: print('\nWrong input. Please enter a number between 1 and 4.\n') if option == 1: config_filepath, replicator_db, replicator_db_user = get_config() check_db_conn() myBot = Bot() host_exists = check_host(myBot.hostname) if host_exists: is_duplicate = myBot.check_account() if is_duplicate: print(f'\nBot account already exist!\n') time.sleep(3) else: client_id, client_secret, token = myBot.log_in() if len(token) > 0: save_error = myBot.save_account(client_id, client_secret, token) if save_error == None: print(f'Bot added!') time.sleep(3) else: if save_error == '423505': print(f'error: {save_error}') elif option == 2: config_filepath, replicator_db, replicator_db_user = get_config() check_db_conn() getbot = GetBot() getbot.delete_account() elif option == 3: config_filepath, replicator_db, replicator_db_user = get_config() check_db_conn() getbot = GetBot() getbot.activate_account() elif option == 4: config_filepath, replicator_db, replicator_db_user = get_config() check_db_conn() getbot = GetBot() getbot.deactivate_account() elif option == 5: config_filepath, replicator_db, replicator_db_user = get_config() check_db_conn() bots_list, t_usernames_lst = list_bots() print_table = PrettyTable() print_table.field_names = ['No.','Twitter username', 'bot account', 'host', 'host software', 'active'] i = 0 for bot in bots_list: print_table.add_row([str(i+1), t_usernames_lst[i], bot[2], bot[1], bot[3], bot[4]]) i += 1 print(print_table) print(f'Total: {len(bots_list)}') elif option == 6: print('Bye!') exit()