226 líneas
5,8 KiB
Python
226 líneas
5,8 KiB
Python
import getpass
|
|
import os
|
|
import sys
|
|
import psycopg2
|
|
from mastodon import Mastodon
|
|
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
|
|
import uuid
|
|
import pdb
|
|
|
|
def get_config():
|
|
|
|
# Load configuration from config file
|
|
config_filepath = "config/db_config.txt"
|
|
replicator_db = get_db_params("replicator_db", config_filepath)
|
|
replicator_db_user = get_db_params("replicator_db_user", config_filepath)
|
|
|
|
return (config_filepath, replicator_db, replicator_db_user)
|
|
|
|
def check_db_conn():
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.stdout.write(f'\n{str(error)}\n')
|
|
|
|
sys.exit("Exiting. Run 'python db-setup' again")
|
|
|
|
class Bot:
|
|
|
|
name = "Bot"
|
|
|
|
def __init__(self, hostname, software, username, password, app_name, twitter_username):
|
|
|
|
self.hostname = hostname
|
|
self.software = software
|
|
self.username = username
|
|
self.password = password
|
|
self.app_name = app_name
|
|
self.twitter_username = twitter_username
|
|
|
|
def log_in(self):
|
|
|
|
token = ''
|
|
|
|
try:
|
|
|
|
response = Mastodon.create_app(
|
|
self.app_name,
|
|
scopes=["read","write"],
|
|
to_file=None,
|
|
api_base_url=self.hostname
|
|
)
|
|
client_id = response[0]
|
|
client_secret = response[1]
|
|
|
|
mastodon = Mastodon(client_id = client_id, client_secret = client_secret, api_base_url = self.hostname)
|
|
|
|
token = mastodon.log_in(
|
|
self.username,
|
|
self.password,
|
|
scopes = ["read", "write"],
|
|
to_file = None
|
|
)
|
|
|
|
print('Log in succesful!')
|
|
|
|
except MastodonIllegalArgumentError as i_error:
|
|
|
|
sys.stdout.write(f'\n{str(i_error)}\n')
|
|
|
|
except MastodonNetworkError as n_error:
|
|
|
|
sys.stdout.write(f'\n{str(n_error)}\n')
|
|
|
|
except MastodonReadTimeout as r_error:
|
|
|
|
sys.stdout.write(f'\n{str(r_error)}\n')
|
|
|
|
except MastodonAPIError as a_error:
|
|
|
|
sys.stdout.write(f'\n{str(a_error)}\n')
|
|
|
|
finally:
|
|
|
|
return (client_id, client_secret, token)
|
|
|
|
def check_account(self):
|
|
|
|
is_duplicate = False
|
|
|
|
sql = 'select username from maccounts where username=(%s)'
|
|
|
|
conn = None
|
|
|
|
try:
|
|
|
|
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute(sql, (self.username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
is_duplicate = True
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
check_error = error.pgcode
|
|
|
|
sys.stdout.write(f'\n{str(error)}\n')
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
return is_duplicate
|
|
|
|
def save_account(self, client_id, client_secret, token):
|
|
|
|
bot_uuid = str(uuid.uuid4())
|
|
|
|
save_error = None
|
|
|
|
mastodon_sql = f'INSERT INTO maccounts (bot_id, hostname, username, client_id, client_secret, client_token, hostname_soft) VALUES (%s,%s,%s,%s,%s,%s,%s)'
|
|
|
|
twitter_sql = f'INSERT INTO taccounts (bot_id, username) VALUES (%s,%s)'
|
|
|
|
conn = None
|
|
|
|
try:
|
|
|
|
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute(mastodon_sql, (bot_uuid, self.hostname, self.username, client_id, client_secret, token, self.software))
|
|
|
|
cur.execute(twitter_sql, (bot_uuid, self.twitter_username))
|
|
|
|
conn.commit()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
save_error = error.pgcode
|
|
|
|
sys.stdout.write(f'\n{str(error)}\n')
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
return save_error
|
|
|
|
def get_db_params( parameter, file_path ):
|
|
|
|
if not os.path.isfile(file_path):
|
|
print("File %s not found, exiting."%file_path)
|
|
sys.exit(0)
|
|
|
|
# Find parameter in file
|
|
with open( file_path ) as f:
|
|
for line in f:
|
|
if line.startswith( parameter ):
|
|
return line.replace(parameter + ":", "").strip()
|
|
|
|
# Cannot find parameter, exit
|
|
print(file_path + " Missing parameter %s "%parameter)
|
|
sys.exit(0)
|
|
|
|
|
|
###############################################################################
|
|
# main
|
|
|
|
if __name__ == '__main__':
|
|
|
|
config_filepath, replicator_db, replicator_db_user = get_config()
|
|
|
|
check_db_conn()
|
|
|
|
bot_hostname = input("Enter Mastodon/Pleroma hostname: ")
|
|
bot_software = input("Server software (mastodon or pleroma)? ")
|
|
bot_username = input(f'User name, ex. user@{bot_hostname}? ')
|
|
bot_password = getpass.getpass("User password? ")
|
|
bot_app_name = 'replicator'
|
|
bot_twitter_username = input(f'Twitter username (ex. jack)? ')
|
|
|
|
myBot = Bot(bot_hostname, bot_software, bot_username, bot_password, bot_app_name, bot_twitter_username)
|
|
|
|
is_duplicate = myBot.check_account()
|
|
|
|
if is_duplicate:
|
|
|
|
print(f'Bot account already exist!')
|
|
|
|
sys.exit()
|
|
|
|
else:
|
|
|
|
client_id, client_secret, token = myBot.log_in()
|
|
|
|
if len(token) > 0:
|
|
|
|
save_error = myBot.save_account(client_id, client_secret, token)
|
|
|
|
if save_error == None:
|
|
|
|
print(f'Done.')
|
|
|
|
else:
|
|
|
|
if save_error == '423505':
|
|
|
|
sys.exit()
|