mastotuit/mastotuit.py

471 líneas
11 KiB
Python

import os
import feedparser
from bs4 import BeautifulSoup
from mastodon import Mastodon
import psycopg2
import sys
import time
import requests
import shutil
import tweepy
from tweepy import TweepyException
import logging
import filetype
import ffmpeg
logger = logging.getLogger()
def get_toot_text(title):
soup = BeautifulSoup(title, features='html.parser')
delimiter = '###' # unambiguous string
for line_break in soup.findAll('br'): # loop through line break tags
line_break.replaceWith(delimiter) # replace br tags with delimiter
tuit_text_str = soup.get_text().split(delimiter) # get list of strings
tuit_text = ''
for line in tuit_text_str:
tuit_text += f'{line}\n'
return tuit_text
def get_poll(tuit_text):
poll_substring = '[ ]'
poll_options = tuit_text.count(poll_substring)
is_poll = False if poll_options == 0 else True
options_lst = []
remain_str = tuit_text.replace('\n', '')
i = poll_options
while (i > 0):
last_option_index = remain_str.rfind('[ ]')
option_str = remain_str[last_option_index+3:].strip()
options_lst.append(option_str)
remain_str = remain_str[:last_option_index]
i-=1
if is_poll:
options_lst_copy = options_lst.copy()
options_lst_copy.reverse()
options_lst = options_lst_copy.copy()
tuit_text = remain_str
return (tuit_text, poll_options, options_lst, is_poll)
def get_toot(title):
tuit_text = get_toot_text(title)
tuit_text, poll_options, options_lst, is_poll = get_poll(tuit_text)
return (tuit_text, poll_options, options_lst, is_poll)
def compose_poll(tuit_text, poll_options, options_lst, toot_id):
try:
tweet = apiv2.create_tweet(
poll_duration_minutes=4320,
poll_options = options_lst,
text=tuit_text
)
write_db(toot_id, tweet.data['id'])
except TweepyException as err:
print('Error while trying to publish poll.\n')
sys.exit(err)
return tweet
def compose_tweet(tuit_text, with_images, is_reply):
images_id_lst = []
if with_images:
for image in images_list:
kind = filetype.guess('images/' + image)
is_video = True if kind.mime == 'video/mp4' else False
if is_video:
try:
probe = ffmpeg.probe(f'images/{image}')
video_duration = float(
probe['streams'][0]['duration']
)
except Exception as e:
print(f'Error while trying to probe {image}\n{e}')
sys.exit(e)
if video_duration > 139:
print(f'video duration is too large: {video_duration}')
continue
try:
media_upload = apiv1.media_upload(
f'images/{image}',
media_category='tweet_video' if is_video else 'tweet_image'
)
if is_video:
if media_upload.processing_info['state'] == 'succeeded':
images_id_lst.append(media_upload.media_id)
else:
images_id_lst.append(media_upload.media_id)
except TweepyException as err:
print('Error while uploading media!\n')
sys.exit(err)
# Compose tuit
tuit_text2 = ''
three_parts = False
if len(tuit_text) > 280:
tuit_max_length = 250 if with_images else 273
tuit_text1 = '{0}...'.format(tuit_text[:tuit_max_length].rsplit(' ', 1)[0])
tuit_text2 = '{0}'.format(tuit_text[len(tuit_text1) - 2:])
if len(tuit_text2) > 250:
three_parts = True
tuit_text2 = '{0}'.format(tuit_text[len(tuit_text1) - 2:].rsplit('#', 1)[0])
tuit_text3 = '#{0}'.format(tuit_text[len(tuit_text1) - 2:].rsplit('#', 1)[1].rsplit(' ', 2)[0])
try:
first_tweet = apiv1.update_status(
status=tuit_text1,
in_reply_to_status_id=tweet_id if is_reply else '',
media_ids=images_id_lst
)
tweet = apiv1.update_status(
status=tuit_text2,
in_reply_to_status_id=first_tweet.id
#media_ids=images_id_lst
)
if three_parts:
tweet = apiv1.update_status(
status=tuit_text3,
in_reply_to_status_id=tweet.id
)
except TweepyException as err:
print('Error while trying to publish split tweet.\n')
sys.exit(err)
else:
try:
tweet = apiv1.update_status(
status=tuit_text,
in_reply_to_status_id=tweet_id if is_reply else '',
media_ids=images_id_lst
)
except TweepyException as err:
print('Error while trying to publish tweet.\n')
sys.exit(err)
return tweet
def get_tweet_id(toot_id):
tweet_id = 0
try:
conn = None
conn = psycopg2.connect(database = feeds_db, user = feeds_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute('select tweet_id from id where toot_id=(%s)', (toot_id,))
row = cur.fetchone()
if row != None:
tweet_id = row[0]
cur.close()
return(tweet_id)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def write_db(toot_id, tweet_id):
sql_insert_ids = 'INSERT INTO id(toot_id, tweet_id) VALUES (%s,%s)'
conn = None
try:
conn = psycopg2.connect(database = feeds_db, user = feeds_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql_insert_ids, (toot_id, tweet_id))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def write_image(image_url):
if not os.path.exists('images'):
os.makedirs('images')
filename = image_url.split("/") [-1]
r = requests.get(image_url, stream = True)
r.raw.decode_content = True
with open('images/' + filename, 'wb') as f:
shutil.copyfileobj(r.raw, f)
return filename
def create_api_v1():
auth = tweepy.OAuthHandler(api_key, api_key_secret)
auth.set_access_token(access_token, access_token_secret)
apiv1 = tweepy.API(auth)
try:
apiv1.verify_credentials()
logged_in = True
except Exception as e:
logger.error("Error creating API", exc_info=True)
raise e
logger.info("API created")
return (apiv1, logged_in)
def create_api_v2():
try:
apiv2 = tweepy.Client(
consumer_key=api_key,
consumer_secret=api_key_secret,
access_token=access_token,
access_token_secret=access_token_secret
)
logged_in = True
except Exception as e:
logger.error("Error creating API", exc_info=True)
raise e
logger.info("API v2 created")
return (apiv2, logged_in)
def mastodon():
# Load secrets from secrets file
secrets_filepath = "secrets/secrets.txt"
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
# Load configuration from config file
config_filepath = "config/config.txt"
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath)
# Initialise Mastodon API
mastodon = Mastodon(
client_id=uc_client_id,
client_secret=uc_client_secret,
access_token=uc_access_token,
api_base_url='https://' + mastodon_hostname,
)
# Initialise access headers
headers = {'Authorization': 'Bearer %s'%uc_access_token}
return (mastodon, mastodon_hostname)
def db_config():
# Load db configuration from config file
db_config_filepath = "config/db_config.txt"
feeds_db = get_parameter("feeds_db", db_config_filepath)
feeds_db_user = get_parameter("feeds_db_user", db_config_filepath)
feeds_url = get_parameter("feeds_url", db_config_filepath)
return (feeds_db, feeds_db_user, feeds_url)
def twitter_config():
twitter_config_filepath = "config/keys_config.txt"
api_key = get_parameter("api_key", twitter_config_filepath)
api_key_secret = get_parameter("api_key_secret", twitter_config_filepath)
access_token = get_parameter("access_token", twitter_config_filepath)
access_token_secret = get_parameter("access_token_secret", twitter_config_filepath)
return(api_key, api_key_secret, access_token, access_token_secret)
def get_parameter( parameter, file_path ):
# Check if secrets file exists
if not os.path.isfile(file_path):
print("File %s not found, exiting."%file_path)
sys.exit(0)
# Find parameter in file
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
# main
if __name__ == '__main__':
mastodon, mastodon_hostname = mastodon()
feeds_db, feeds_db_user, feeds_url = db_config()
api_key, api_key_secret, access_token, access_token_secret = twitter_config()
logged_in = False
try:
newsfeeds = feedparser.parse(feeds_url)
except:
print(newsfeeds.status)
sys.exit(0)
for entry in newsfeeds.entries:
publish = False
with_images = False
is_reply = False
title = entry['summary']
id = entry['id']
link = entry['link']
toot_id = link.rsplit('/')[4]
tweet_id = get_tweet_id(toot_id)
if tweet_id == 0:
publish = True
reply_id = mastodon.status(toot_id).in_reply_to_id
if reply_id != None:
is_reply = True
tweet_id = get_tweet_id(reply_id)
if "media_content" in entry:
with_images = True
images_list = []
images = len(entry.media_content)
i = 0
while i < images:
image_url = entry.media_content[i]['url']
image_filename = write_image(image_url)
images_list.append(image_filename)
i += 1
###########################################################
if publish:
tuit_text, poll_options, options_lst, is_poll = get_toot(title)
print("Tooting...")
print(tuit_text)
if not logged_in:
apiv1, logged_in = create_api_v1()
apiv2, logged_in = create_api_v2()
if is_poll:
tweet = compose_poll(tuit_text, poll_options, options_lst, toot_id)
else:
tweet = compose_tweet(tuit_text, with_images, is_reply)
write_db(toot_id, tweet.id)
time.sleep(2)
else:
print("Any new feeds")
sys.exit(0)