Replicator can manage several Mastodon, Pleroma o Soapbox bot accounts who will replicate (post to Mastodon, Pleroma or Soapbox servers) several Twitter accounts's tweets of your choice.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
replicator/replicator.py

472 lines
10 KiB

import os
from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
import psycopg2
import sys
import time
import requests
import shutil
import tweepy
from tweepy.errors import (
BadRequest, Forbidden, HTTPException, TooManyRequests, TwitterServerError,
Unauthorized
)
from tweepy import TweepyException
import logging
import pdb
logger = logging.getLogger()
def load_bots():
bot_id_lst = []
hostname_lst = []
username_lst = []
client_id_lst = []
client_secret_lst = []
client_token_lst = []
hostname_soft_lst = []
sql = 'select bot_id, hostname, username, client_id, client_secret, client_token, hostname_soft from maccounts where active'
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql)
rows = cur.fetchall()
for row in rows:
bot_id_lst.append(row[0])
hostname_lst.append(row[1])
username_lst.append(row[2])
client_id_lst.append(row[3])
client_secret_lst.append(row[4])
client_token_lst.append(row[5])
hostname_soft_lst.append(row[6])
cur.close()
return(bot_id_lst, hostname_lst, username_lst, client_id_lst, client_secret_lst, client_token_lst, hostname_soft_lst)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_tusername(bot_id):
t_username = ''
sql = 'select username from taccounts where bot_id=(%s)'
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (bot_id,))
row = cur.fetchone()
if row != 0:
t_username = row[0]
cur.close()
return(t_username)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_software(bot_id):
software = ''
sql = 'select hostname_soft from maccounts where bot_id=(%s)'
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (bot_id,))
row = cur.fetchone()
if row != 0:
software = row[0]
cur.close()
return(software)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def parse_url(toot_text):
sub_str = 'http'
find_link = toot_text.find(sub_str)
if find_link != -1:
tuit_text = toot_text[:toot_text.index(sub_str)]
else:
tuit_text = toot_text
return tuit_text
def get_toot_id(tweet_id, software):
toot_id = 0
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
if software == 'mastodon':
cur.execute('select toot_id, tweet_id from id where tweet_id=(%s)', (tweet_id,))
elif software == 'pleroma':
cur.execute('select toot_id, tweet_id from pleroma_id where tweet_id=(%s)', (tweet_id,))
row = cur.fetchone()
if row != None:
toot_id = row[0]
cur.close()
return(toot_id)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def write_db(toot_id, tweet_id, softname):
if softname == 'mastodon':
sql_insert_ids = 'INSERT INTO id(toot_id, tweet_id) VALUES (%s,%s)'
elif softname == 'pleroma':
sql_insert_ids = 'INSERT INTO pleroma_id(toot_id, tweet_id) VALUES (%s,%s)'
conn = None
try:
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql_insert_ids, (toot_id, tweet_id))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def write_image(image_url):
if not os.path.exists('images'):
os.makedirs('images')
filename = image_url.split("/") [-1]
r = requests.get(image_url, stream = True)
r.raw.decode_content = True
with open('images/' + filename, 'wb') as f:
shutil.copyfileobj(r.raw, f)
return filename
def oauth2():
auth = tweepy.OAuthHandler(api_key, api_key_secret)
try:
apiv1 = tweepy.API(auth)
logged_in = True
print(f'\nTwitter API: authentication succesfull!\n')
except Exception as e:
logger.error("Error creating API", exc_info=True)
raise e
logger.info("API created")
return (apiv1, logged_in)
def mastodon_log_in(id, secret, token, hostname):
# Initialise Mastodon API
mastodon = Mastodon(
client_id=id,
client_secret=secret,
access_token=token,
api_base_url='https://' + hostname,
)
# Initialise access headers
headers = {'Authorization': 'Bearer %s'%token}
return (mastodon)
def db_config():
# Load db configuration from config file
db_config_filepath = "config/db_config.txt"
replicator_db = get_parameter("replicator_db", db_config_filepath)
replicator_db_user = get_parameter("replicator_db_user", db_config_filepath)
return (replicator_db, replicator_db_user)
def twitter_config():
sql = 'select api_key, api_key_secret from tsecrets'
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql)
row = cur.fetchone()
if row != None:
api_key = row[0]
api_key_secret = row[1]
cur.close()
return(api_key, api_key_secret)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_parameter( parameter, file_path ):
# Check if secrets file exists
if not os.path.isfile(file_path):
print("File %s not found, exiting."%file_path)
sys.exit(0)
# Find parameter in file
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
# main
if __name__ == '__main__':
replicator_db, replicator_db_user = db_config()
bot_id_lst, hostname_lst, username_lst, client_id_lst, client_secret_lst, client_token_lst, hostname_soft_lst = load_bots()
logged_in = False
api_key, api_key_secret = twitter_config()
i = 0
while i < len(bot_id_lst):
mastodon = mastodon_log_in(client_id_lst[i], client_secret_lst[i], client_token_lst[i], hostname_lst[i])
t_username = get_tusername(bot_id_lst[i])
software = get_software(bot_id_lst[i])
# check new tweets
if not logged_in:
apiv1, logged_in = oauth2()
try:
print(f'Checking {t_username} for new tweets')
user_id = apiv1.get_user(screen_name=t_username).id
user_tweets = apiv1.user_timeline(user_id=user_id, tweet_mode='extended')
except tweepy.errors.Unauthorized as non_auth_error:
sys.exit(f'\n{non_auth_error}\n\nCheck your Twitter API key & secret!')
except tweepy.errors.TweepyException as tweepy_exception:
sys.exit(f'\n{tweepy_exception}')
for tweet in reversed(user_tweets):
if not tweet.retweeted and tweet.entities['user_mentions'] == []:
tweet_id = tweet.id
toot_id = get_toot_id(tweet_id, software)
if toot_id == 0:
print(f'{t_username}, tweet id: {tweet.id}, {tweet.full_text}')
is_reply = False
is_media = False
toot_text = tweet.full_text
if tweet.in_reply_to_status_id != None and tweet.in_reply_to_user_id == user_id:
tweet_reply_id = tweet.in_reply_to_status_id
reply_toot_id = get_toot_id(tweet_reply_id, software)
if reply_toot_id != 0:
is_reply = True
else:
reply_toot_id = None
else:
reply_toot_id = None
if 'media' in tweet.entities:
is_media = True
toot_text = toot_text.replace(tweet.entities['media'][0]['url'],'')
images_id_lst = []
media_qty = len(tweet.entities['media'])
i = 0
while i < media_qty:
media_url = tweet.entities['media'][0]['media_url']
image_filename = write_image(media_url)
image_id = mastodon.media_post('images/'+ image_filename, "image/png", description='twitter image').id
images_id_lst.append(image_id)
i += 1
if len(tweet.entities['urls']) > 0:
toot_text = parse_url(toot_text)
url = tweet.entities['urls'][0]['expanded_url']
toot_text = f'{toot_text}\n{url}'
try:
toot_id = mastodon.status_post(toot_text, in_reply_to_id=reply_toot_id, media_ids=images_id_lst if is_media else None).id
write_db(toot_id, tweet_id, software)
except MastodonAPIError as a_error:
sys.stdout.write(f'\nbot: {t_username}\nerror: {str(a_error)}\n')
time.sleep(2)
i += 1
print('Done.')