import getpass import os import sys import psycopg2 from mastodon import Mastodon from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError import uuid import pdb def check_account(username): is_duplicate = False sql = 'select username from maccounts where username=(%s)' conn = None try: conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(sql, (username,)) row = cur.fetchone() if row != None: is_duplicate = True except (Exception, psycopg2.DatabaseError) as error: check_error = error.pgcode sys.stdout.write(f'\n{str(error)}\n') finally: if conn is not None: conn.close() return is_duplicate def save_bot_account(hostname, username, client_id, client_secret, token, twitter_username): bot_uuid = str(uuid.uuid4()) save_error = None mastodon_sql = f'INSERT INTO maccounts (bot_id, hostname, username, client_id, client_secret, client_token) VALUES (%s,%s,%s,%s,%s,%s)' twitter_sql = f'INSERT INTO taccounts (bot_id, username) VALUES (%s,%s)' conn = None try: conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(mastodon_sql, (bot_uuid, hostname, username, client_id, client_secret, token)) cur.execute(twitter_sql, (bot_uuid, twitter_username)) conn.commit() except (Exception, psycopg2.DatabaseError) as error: save_error = error.pgcode sys.stdout.write(f'\n{str(error)}\n') finally: if conn is not None: conn.close() return save_error def get_config(): # Load configuration from config file config_filepath = "config/db_config.txt" replicator_db = get_db_params("replicator_db", config_filepath) replicator_db_user = get_db_params("replicator_db_user", config_filepath) return (config_filepath, replicator_db, replicator_db_user) def check_db_conn(): try: conn = None conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432") except (Exception, psycopg2.DatabaseError) as error: sys.stdout.write(f'\n{str(error)}\n') sys.exit("Exiting. Run 'python db-setup' again") def ask_account(): print(f'Setting up Mastodon bot account...\n') hostname = input("Enter Mastodon hostname: ") user_name = input(f'User name, ex. user@{hostname}? ') user_password = getpass.getpass("User password? ") app_name = 'replicator' twitter_username = input(f'Twitter username (ex. jack)? ') return (hostname, user_name, user_password, app_name, twitter_username) def log_in(hostname, user_name, user_password, app_name): token = '' try: response = Mastodon.create_app( app_name, scopes=["read","write"], to_file=None, api_base_url=hostname ) client_id = response[0] client_secret = response[1] mastodon = Mastodon(client_id = client_id, client_secret = client_secret, api_base_url = hostname) token = mastodon.log_in( user_name, user_password, scopes = ["read", "write"], to_file = None ) except MastodonIllegalArgumentError as i_error: sys.stdout.write(f'\n{str(i_error)}\n') except MastodonNetworkError as n_error: sys.stdout.write(f'\n{str(n_error)}\n') except MastodonReadTimeout as r_error: sys.stdout.write(f'\n{str(r_error)}\n') except MastodonAPIError as a_error: sys.stdout.write(f'\n{str(a_error)}\n') finally: return (client_id, client_secret, token) def get_db_params( parameter, file_path ): if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) sys.exit(0) # Find parameter in file with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() # Cannot find parameter, exit print(file_path + " Missing parameter %s "%parameter) sys.exit(0) ############################################################################### # main if __name__ == '__main__': config_filepath, replicator_db, replicator_db_user = get_config() check_db_conn() hostname, user_name, user_password, app_name, twitter_username = ask_account() is_duplicate = check_account(user_name) if is_duplicate: print(f'Mastodon account already exist!') sys.exit() else: client_id, client_secret, token = log_in(hostname, user_name, user_password, app_name) if len(token) > 0: save_error = save_bot_account(hostname, user_name, client_id, client_secret, token, twitter_username) if save_error == None: print(f'Done.') else: if save_error == '423505': sys.exit()