You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

473 lines
10 KiB

12 months ago
import os
from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
12 months ago
import psycopg2
import sys
import time
import requests
import shutil
import tweepy
from tweepy.errors import (
BadRequest, Forbidden, HTTPException, TooManyRequests, TwitterServerError,
Unauthorized
)
from tweepy import TweepyException
import logging
import pdb
logger = logging.getLogger()
def load_bots():
bot_id_lst = []
hostname_lst = []
username_lst = []
client_id_lst = []
client_secret_lst = []
client_token_lst = []
hostname_soft_lst = []
sql = 'select bot_id, hostname, username, client_id, client_secret, client_token, hostname_soft from maccounts where active'
12 months ago
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql)
rows = cur.fetchall()
for row in rows:
bot_id_lst.append(row[0])
hostname_lst.append(row[1])
username_lst.append(row[2])
client_id_lst.append(row[3])
client_secret_lst.append(row[4])
client_token_lst.append(row[5])
hostname_soft_lst.append(row[6])
12 months ago
cur.close()
return(bot_id_lst, hostname_lst, username_lst, client_id_lst, client_secret_lst, client_token_lst, hostname_soft_lst)
12 months ago
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_tusername(bot_id):
t_username = ''
sql = 'select username from taccounts where bot_id=(%s)'
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (bot_id,))
row = cur.fetchone()
if row != 0:
t_username = row[0]
cur.close()
return(t_username)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_software(bot_id):
software = ''
sql = 'select hostname_soft from maccounts where bot_id=(%s)'
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (bot_id,))
row = cur.fetchone()
if row != 0:
software = row[0]
cur.close()
return(software)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
12 months ago
def parse_url(toot_text):
sub_str = 'http'
find_link = toot_text.find(sub_str)
if find_link != -1:
tuit_text = toot_text[:toot_text.index(sub_str)]
else:
tuit_text = toot_text
return tuit_text
def get_toot_id(tweet_id, software):
12 months ago
toot_id = 0
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
if software == 'mastodon':
cur.execute('select toot_id, tweet_id from id where tweet_id=(%s)', (tweet_id,))
elif software == 'pleroma':
cur.execute('select toot_id, tweet_id from pleroma_id where tweet_id=(%s)', (tweet_id,))
12 months ago
row = cur.fetchone()
if row != None:
toot_id = row[0]
cur.close()
return(toot_id)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def write_db(toot_id, tweet_id, softname):
if softname == 'mastodon':
sql_insert_ids = 'INSERT INTO id(toot_id, tweet_id) VALUES (%s,%s)'
12 months ago
elif softname == 'pleroma':
sql_insert_ids = 'INSERT INTO pleroma_id(toot_id, tweet_id) VALUES (%s,%s)'
12 months ago
conn = None
try:
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql_insert_ids, (toot_id, tweet_id))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def write_image(image_url):
if not os.path.exists('images'):
os.makedirs('images')
filename = image_url.split("/") [-1]
r = requests.get(image_url, stream = True)
r.raw.decode_content = True
with open('images/' + filename, 'wb') as f:
shutil.copyfileobj(r.raw, f)
return filename
def oauth2():
auth = tweepy.OAuthHandler(api_key, api_key_secret)
try:
apiv1 = tweepy.API(auth)
logged_in = True
print(f'\nTwitter API: authentication succesfull!\n')
except Exception as e:
logger.error("Error creating API", exc_info=True)
raise e
logger.info("API created")
return (apiv1, logged_in)
def mastodon_log_in(id, secret, token, hostname):
# Initialise Mastodon API
mastodon = Mastodon(
client_id=id,
client_secret=secret,
access_token=token,
api_base_url='https://' + hostname,
)
# Initialise access headers
headers = {'Authorization': 'Bearer %s'%token}
return (mastodon)
def db_config():
# Load db configuration from config file
db_config_filepath = "config/db_config.txt"
replicator_db = get_parameter("replicator_db", db_config_filepath)
replicator_db_user = get_parameter("replicator_db_user", db_config_filepath)
return (replicator_db, replicator_db_user)
def twitter_config():
sql = 'select api_key, api_key_secret from tsecrets'
try:
conn = None
conn = psycopg2.connect(database = replicator_db, user = replicator_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql)
row = cur.fetchone()
if row != None:
api_key = row[0]
api_key_secret = row[1]
cur.close()
return(api_key, api_key_secret)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_parameter( parameter, file_path ):
12 months ago
# Check if secrets file exists
if not os.path.isfile(file_path):
print("File %s not found, exiting."%file_path)
sys.exit(0)
# Find parameter in file
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
# main
if __name__ == '__main__':
replicator_db, replicator_db_user = db_config()
bot_id_lst, hostname_lst, username_lst, client_id_lst, client_secret_lst, client_token_lst, hostname_soft_lst = load_bots()
12 months ago
logged_in = False
api_key, api_key_secret = twitter_config()
i = 0
while i < len(bot_id_lst):
mastodon = mastodon_log_in(client_id_lst[i], client_secret_lst[i], client_token_lst[i], hostname_lst[i])
t_username = get_tusername(bot_id_lst[i])
software = get_software(bot_id_lst[i])
12 months ago
# check new tweets
if not logged_in:
apiv1, logged_in = oauth2()
try:
print(f'Checking {t_username} for new tweets')
user_id = apiv1.get_user(screen_name=t_username).id
user_tweets = apiv1.user_timeline(user_id=user_id, tweet_mode='extended')
except tweepy.errors.Unauthorized as non_auth_error:
sys.exit(f'\n{non_auth_error}\n\nCheck your Twitter API key & secret!')
except tweepy.errors.TweepyException as tweepy_exception:
sys.exit(f'\n{tweepy_exception}')
for tweet in reversed(user_tweets):
if not tweet.retweeted and tweet.entities['user_mentions'] == []:
tweet_id = tweet.id
toot_id = get_toot_id(tweet_id, software)
12 months ago
if toot_id == 0:
print(f'{t_username}, tweet id: {tweet.id}, {tweet.full_text}')
12 months ago
is_reply = False
is_media = False
toot_text = tweet.full_text
if tweet.in_reply_to_status_id != None and tweet.in_reply_to_user_id == user_id:
tweet_reply_id = tweet.in_reply_to_status_id
reply_toot_id = get_toot_id(tweet_reply_id, software)
12 months ago
if reply_toot_id != 0:
is_reply = True
else:
reply_toot_id = None
else:
reply_toot_id = None
if 'media' in tweet.entities:
is_media = True
toot_text = toot_text.replace(tweet.entities['media'][0]['url'],'')
images_id_lst = []
media_qty = len(tweet.entities['media'])
i = 0
while i < media_qty:
media_url = tweet.entities['media'][0]['media_url']
image_filename = write_image(media_url)
image_id = mastodon.media_post('images/'+ image_filename, "image/png", description='twitter image').id
images_id_lst.append(image_id)
i += 1
if len(tweet.entities['urls']) > 0:
toot_text = parse_url(toot_text)
url = tweet.entities['urls'][0]['expanded_url']
toot_text = f'{toot_text}\n{url}'
try:
toot_id = mastodon.status_post(toot_text, in_reply_to_id=reply_toot_id, media_ids=images_id_lst if is_media else None).id
write_db(toot_id, tweet_id, software)
except MastodonAPIError as a_error:
12 months ago
sys.stdout.write(f'\nbot: {t_username}\nerror: {str(a_error)}\n')
12 months ago
time.sleep(2)
i += 1
print('Done.')