import os import feedparser from bs4 import BeautifulSoup from mastodon import Mastodon import psycopg2 import sys import time import requests import shutil import tweepy from tweepy import TweepyException import logging import filetype import ffmpeg logger = logging.getLogger() def get_toot_text(title): soup = BeautifulSoup(title, features='html.parser') delimiter = '###' # unambiguous string for line_break in soup.findAll('br'): # loop through line break tags line_break.replaceWith(delimiter) # replace br tags with delimiter tuit_text_str = soup.get_text().split(delimiter) # get list of strings tuit_text = '' for line in tuit_text_str: tuit_text += f'{line}\n' return tuit_text def get_poll(tuit_text): poll_options = 0 options_lst = [] is_poll = False if "input disable" not in entry.summary else True if is_poll: poll_substring = """""" poll_options = title.count(poll_substring) remaining_str = title i = poll_options while (i > 0): last_option_index = remaining_str.rfind(poll_substring) if i == poll_options: option_str = remaining_str[last_option_index+42:].strip().replace('

','') else: option_str = remaining_str[last_option_index+42:].strip().replace('
','') options_lst.append(option_str) remaining_str = remaining_str[:last_option_index] i-=1 options_lst_copy = options_lst.copy() options_lst_copy.reverse() options_lst = options_lst_copy.copy() first_option_index = tuit_text.rfind(options_lst[0]) tuit_text = tuit_text[:first_option_index-1] return (tuit_text, poll_options, options_lst, is_poll) def get_toot(title): tuit_text = get_toot_text(title) tuit_text, poll_options, options_lst, is_poll = get_poll(tuit_text) return (tuit_text, poll_options, options_lst, is_poll) def compose_poll(tuit_text, poll_options, options_lst, toot_id): try: tweet = apiv2.create_tweet( poll_duration_minutes=4320, poll_options = options_lst, text=tuit_text ) write_db(toot_id, tweet.data['id']) except TweepyException as err: print('Error while trying to publish poll.\n') sys.exit(err) return tweet def compose_tweet(tuit_text, with_images, is_reply): images_id_lst = [] if with_images: for image in images_list: kind = filetype.guess('images/' + image) is_video = True if kind.mime == 'video/mp4' else False if is_video: try: probe = ffmpeg.probe(f'images/{image}') video_duration = float( probe['streams'][0]['duration'] ) except Exception as e: print(f'Error while trying to probe {image}\n{e}') sys.exit(e) if video_duration > 139: print(f'video duration is too large: {video_duration}') continue try: media_upload = apiv1.media_upload( f'images/{image}', media_category='tweet_video' if is_video else 'tweet_image' ) if is_video: if media_upload.processing_info['state'] == 'succeeded': images_id_lst.append(media_upload.media_id) else: images_id_lst.append(media_upload.media_id) except TweepyException as err: print('Error while uploading media!\n') sys.exit(err) # Compose tuit tuit_text2 = '' three_parts = False if len(tuit_text) > 280: tuit_max_length = 250 if with_images else 273 tuit_text1 = '{0}...'.format(tuit_text[:tuit_max_length].rsplit(' ', 1)[0]) tuit_text2 = '{0}'.format(tuit_text[len(tuit_text1) - 2:]) if len(tuit_text2) > 250: three_parts = True tuit_text2 = '{0}'.format(tuit_text[len(tuit_text1) - 2:].rsplit('#', 1)[0]) tuit_text3 = '#{0}'.format(tuit_text[len(tuit_text1) - 2:].rsplit('#', 1)[1].rsplit(' ', 2)[0]) try: first_tweet = apiv1.update_status( status=tuit_text1, in_reply_to_status_id=tweet_id if is_reply else '', media_ids=images_id_lst ) tweet = apiv1.update_status( status=tuit_text2, in_reply_to_status_id=first_tweet.id #media_ids=images_id_lst ) if three_parts: tweet = apiv1.update_status( status=tuit_text3, in_reply_to_status_id=tweet.id ) except TweepyException as err: print('Error while trying to publish split tweet.\n') sys.exit(err) else: try: tweet = apiv1.update_status( status=tuit_text, in_reply_to_status_id=tweet_id if is_reply else '', media_ids=images_id_lst ) except TweepyException as err: print('Error while trying to publish tweet.\n') sys.exit(err) return tweet def get_tweet_id(toot_id): tweet_id = 0 try: conn = None conn = psycopg2.connect(database = feeds_db, user = feeds_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute('select tweet_id from id where toot_id=(%s)', (toot_id,)) row = cur.fetchone() if row != None: tweet_id = row[0] cur.close() return(tweet_id) except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def write_db(toot_id, tweet_id): sql_insert_ids = 'INSERT INTO id(toot_id, tweet_id) VALUES (%s,%s)' conn = None try: conn = psycopg2.connect(database = feeds_db, user = feeds_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute(sql_insert_ids, (toot_id, tweet_id)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def write_image(image_url): if not os.path.exists('images'): os.makedirs('images') filename = image_url.split("/") [-1] r = requests.get(image_url, stream = True) r.raw.decode_content = True with open('images/' + filename, 'wb') as f: shutil.copyfileobj(r.raw, f) return filename def create_api_v1(): auth = tweepy.OAuthHandler(api_key, api_key_secret) auth.set_access_token(access_token, access_token_secret) apiv1 = tweepy.API(auth) try: apiv1.verify_credentials() logged_in = True except Exception as e: logger.error("Error creating API", exc_info=True) raise e logger.info("API created") return (apiv1, logged_in) def create_api_v2(): try: apiv2 = tweepy.Client( consumer_key=api_key, consumer_secret=api_key_secret, access_token=access_token, access_token_secret=access_token_secret ) logged_in = True except Exception as e: logger.error("Error creating API", exc_info=True) raise e logger.info("API v2 created") return (apiv2, logged_in) def mastodon(): # Load secrets from secrets file secrets_filepath = "secrets/secrets.txt" uc_client_id = get_parameter("uc_client_id", secrets_filepath) uc_client_secret = get_parameter("uc_client_secret", secrets_filepath) uc_access_token = get_parameter("uc_access_token", secrets_filepath) # Load configuration from config file config_filepath = "config/config.txt" mastodon_hostname = get_parameter("mastodon_hostname", config_filepath) # Initialise Mastodon API mastodon = Mastodon( client_id=uc_client_id, client_secret=uc_client_secret, access_token=uc_access_token, api_base_url='https://' + mastodon_hostname, ) # Initialise access headers headers = {'Authorization': 'Bearer %s'%uc_access_token} return (mastodon, mastodon_hostname) def db_config(): # Load db configuration from config file db_config_filepath = "config/db_config.txt" feeds_db = get_parameter("feeds_db", db_config_filepath) feeds_db_user = get_parameter("feeds_db_user", db_config_filepath) feeds_url = get_parameter("feeds_url", db_config_filepath) return (feeds_db, feeds_db_user, feeds_url) def twitter_config(): twitter_config_filepath = "config/keys_config.txt" api_key = get_parameter("api_key", twitter_config_filepath) api_key_secret = get_parameter("api_key_secret", twitter_config_filepath) access_token = get_parameter("access_token", twitter_config_filepath) access_token_secret = get_parameter("access_token_secret", twitter_config_filepath) return(api_key, api_key_secret, access_token, access_token_secret) def get_parameter( parameter, file_path ): # Check if secrets file exists if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) sys.exit(0) # Find parameter in file with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() # Cannot find parameter, exit print(file_path + " Missing parameter %s "%parameter) sys.exit(0) # main if __name__ == '__main__': mastodon, mastodon_hostname = mastodon() feeds_db, feeds_db_user, feeds_url = db_config() api_key, api_key_secret, access_token, access_token_secret = twitter_config() logged_in = False try: newsfeeds = feedparser.parse(feeds_url) except: print(newsfeeds.status) sys.exit(0) for entry in newsfeeds.entries: publish = False with_images = False is_reply = False title = entry['summary'] id = entry['id'] link = entry['link'] toot_id = link.rsplit('/')[4] tweet_id = get_tweet_id(toot_id) if tweet_id == 0: publish = True reply_id = mastodon.status(toot_id).in_reply_to_id if reply_id != None: is_reply = True tweet_id = get_tweet_id(reply_id) if "media_content" in entry: with_images = True images_list = [] images = len(entry.media_content) i = 0 while i < images: image_url = entry.media_content[i]['url'] image_filename = write_image(image_url) images_list.append(image_filename) i += 1 ########################################################### if publish: tuit_text, poll_options, options_lst, is_poll = get_toot(title) print("Tooting...") print(tuit_text) if not logged_in: apiv1, logged_in = create_api_v1() apiv2, logged_in = create_api_v2() if is_poll: tweet = compose_poll(tuit_text, poll_options, options_lst, toot_id) else: tweet = compose_tweet(tuit_text, with_images, is_reply) write_db(toot_id, tweet.id) time.sleep(2) else: print("Any new feeds") sys.exit(0)