replicator/bots-setup.py

566 líneas
13 KiB
Python

import getpass
import os
import time
import sys
import psycopg2
from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
import uuid
import requests
from prettytable import PrettyTable
import pdb
menu_options = {
1: 'New Bot',
2: 'Delete Bot',
3: 'Activate Bot',
4: 'Deactivate Bot',
5: 'List Bots',
6: 'Exit',
}
def print_menu():
for key in menu_options.keys():
print (key, '-', menu_options[key] )
def get_config():
# Load configuration from config file
config_filepath = "config/db_config.txt"
replicator_db = get_db_params("replicator_db", config_filepath)
replicator_db_user = get_db_params("replicator_db_user", config_filepath)
return (config_filepath, replicator_db, replicator_db_user)
def check_db_conn():
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
except (Exception, psycopg2.DatabaseError) as error:
sys.stdout.write(f'\n{str(error)}\n')
sys.exit("Exiting. Run 'python db-setup' again")
def check_host(url):
host_exists = False
try:
r = requests.get(f'https://{url}')
host_exists = True
except requests.exceptions.RequestException as error:
print(f'\n{url}: {error}')
time.sleep(3)
return host_exists
class Bot:
name = "Bot"
def __init__(self):
self.hostname = input("\nEnter Mastodon/Pleroma hostname: ")
self.software = input("Server software (mastodon or pleroma)? ")
self.username = input(f'User name, ex. user@{self.hostname}? ')
self.password = getpass.getpass("User password? ")
self.app_name = 'replicator'
self.twitter_username = input(f'Twitter username (ex. jack)? ')
def log_in(self):
token = ''
try:
response = Mastodon.create_app(
self.app_name,
scopes=["read","write"],
to_file=None,
api_base_url=self.hostname
)
client_id = response[0]
client_secret = response[1]
mastodon = Mastodon(client_id = client_id, client_secret = client_secret, api_base_url = self.hostname)
token = mastodon.log_in(
self.username,
self.password,
scopes = ["read", "write"],
to_file = None
)
print('Log in succesful!')
except MastodonIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except MastodonNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except MastodonReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except MastodonAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
finally:
return (client_id, client_secret, token)
def check_account(self):
is_duplicate = False
sql = 'select username from maccounts where username=(%s)'
conn = None
try:
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (self.username,))
row = cur.fetchone()
if row != None:
is_duplicate = True
except (Exception, psycopg2.DatabaseError) as error:
check_error = error.pgcode
sys.stdout.write(f'\n{str(error)}\n')
finally:
if conn is not None:
conn.close()
return is_duplicate
def save_account(self, client_id, client_secret, token):
bot_uuid = str(uuid.uuid4())
save_error = None
mastodon_sql = 'INSERT INTO maccounts (bot_id, hostname, username, client_id, client_secret, client_token, hostname_soft) VALUES (%s,%s,%s,%s,%s,%s,%s)'
twitter_sql = 'INSERT INTO taccounts (bot_id, username) VALUES (%s,%s)'
conn = None
try:
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(mastodon_sql, (bot_uuid, self.hostname, self.username, client_id, client_secret, token, self.software))
cur.execute(twitter_sql, (bot_uuid, self.twitter_username))
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
save_error = error.pgcode
sys.stdout.write(f'\n{str(error)}\n')
finally:
if conn is not None:
conn.close()
return save_error
class GetBot:
name = "GetBot"
def __init__(self):
self.twitter_username = input(f'Twitter username? ')
def delete_account(self):
del_error = None
twitter_sql = 'SELECT bot_id from taccounts where username=(%s)'
twitter_del_sql = 'DELETE FROM taccounts WHERE username=(%s)'
mastodon_del_sql = 'DELETE FROM maccounts WHERE bot_id=(%s)'
conn = None
try:
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(twitter_sql, (self.twitter_username,))
row = cur.fetchone()
if row != None:
delete_it = confirm_deletion(self.twitter_username)
if delete_it:
bot_id = row[0]
cur.execute(twitter_del_sql, (self.twitter_username,))
cur.execute(mastodon_del_sql, (bot_id,))
conn.commit()
print(f'{self.twitter_username} had been deleted.')
time.sleep(2)
else:
print(f'{self.twitter_username} does not exists!')
time.sleep(2)
except (Exception, psycopg2.DatabaseError) as error:
del_error = error.pgcode
sys.stdout.write(f'\n{str(error)}\n')
finally:
if conn is not None:
conn.close()
return (del_error)
def activate_account(self):
twitter_sql = 'SELECT bot_id from taccounts where username=(%s)'
activate_sql = 'update maccounts set active = True WHERE bot_id=(%s)'
conn = None
try:
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(twitter_sql, (self.twitter_username,))
row = cur.fetchone()
if row != None:
cur.execute(activate_sql, (row[0],))
conn.commit()
print(f'{self.twitter_username} had been activated')
time.sleep(2)
else:
print(f'{self.twitter_username} is not in the database')
except (Exception, psycopg2.DatabaseError) as error:
activate_error = error.pgcode
sys.stdout.write(f'\n{str(activate_error)}\n')
finally:
if conn is not None:
conn.close()
def deactivate_account(self):
twitter_sql = 'SELECT bot_id from taccounts where username=(%s)'
deactivate_sql = 'update maccounts set active = False WHERE bot_id=(%s)'
conn = None
try:
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(twitter_sql, (self.twitter_username,))
row = cur.fetchone()
if row != None:
cur.execute(deactivate_sql, (row[0],))
conn.commit()
print(f'{self.twitter_username} had been deactivated')
time.sleep(2)
else:
print(f'{self.twitter_username} is not in the database')
except (Exception, psycopg2.DatabaseError) as error:
activate_error = error.pgcode
sys.stdout.write(f'\n{str(activate_error)}\n')
finally:
if conn is not None:
conn.close()
def print(self):
print(f'\nBot found: {self.twitter_username}\n')
def confirm_deletion(twitter_username):
answer = ''
while (answer != 'Yes' and answer != 'No'):
answer = input(f'Are you sure deleting {twitter_username} (Yes or No)?: ')
if answer == 'Yes':
print('Ok, deleting it...')
delete_it = True
if answer == 'No':
print('Doing nothing.')
delete_it = False
return delete_it
def list_bots():
bots_list = []
t_usernames_lst = []
mastodon_sql = 'SELECT bot_id, hostname, username, hostname_soft, active FROM maccounts'
twitter_sql = 'SELECT username from taccounts where bot_id=(%s)'
conn = None
try:
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(mastodon_sql)
rows = cur.fetchall()
for row in rows:
cur.execute(twitter_sql, (row[0],))
username_row = cur.fetchone()
t_usernames_lst.append(username_row[0])
bots_list.append(row)
except (Exception, psycopg2.DatabaseError) as error:
get_error = error.pgcode
sys.stdout.write(f'\n{str(error)}\n')
finally:
if conn is not None:
conn.close()
return (bots_list, t_usernames_lst)
def get_db_params( parameter, file_path ):
if not os.path.isfile(file_path):
print("File %s not found, exiting."%file_path)
sys.exit(0)
# Find parameter in file
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
###############################################################################
# main
if __name__ == '__main__':
while(True):
print('\n')
print_menu()
option = ''
try:
option = int(input('\nEnter your choice: '))
except:
print('\nWrong input. Please enter a number between 1 and 4.\n')
if option == 1:
config_filepath, replicator_db, replicator_db_user = get_config()
check_db_conn()
myBot = Bot()
host_exists = check_host(myBot.hostname)
if host_exists:
is_duplicate = myBot.check_account()
if is_duplicate:
print(f'\nBot account already exist!\n')
time.sleep(3)
else:
client_id, client_secret, token = myBot.log_in()
if len(token) > 0:
save_error = myBot.save_account(client_id, client_secret, token)
if save_error == None:
print(f'Bot added!')
time.sleep(3)
else:
if save_error == '423505':
print(f'error: {save_error}')
elif option == 2:
config_filepath, replicator_db, replicator_db_user = get_config()
check_db_conn()
getbot = GetBot()
getbot.delete_account()
elif option == 3:
config_filepath, replicator_db, replicator_db_user = get_config()
check_db_conn()
getbot = GetBot()
getbot.activate_account()
elif option == 4:
config_filepath, replicator_db, replicator_db_user = get_config()
check_db_conn()
getbot = GetBot()
getbot.deactivate_account()
elif option == 5:
config_filepath, replicator_db, replicator_db_user = get_config()
check_db_conn()
bots_list, t_usernames_lst = list_bots()
print_table = PrettyTable()
print_table.field_names = ['No.','Twitter username', 'bot account', 'host', 'host software', 'active']
i = 0
for bot in bots_list:
print_table.add_row([str(i+1), t_usernames_lst[i], bot[2], bot[1], bot[3], bot[4]])
i += 1
print(print_table)
print(f'Total: {len(bots_list)}')
elif option == 6:
print('Bye!')
exit()