mastotuit/mastotuit.py

241 líneas
6,4 KiB
Python

import os
import feedparser
import re
from mastodon import Mastodon
import psycopg2
import sys
import time
import tweepy
from tweepy import TweepError
import logging
import pdb
logger = logging.getLogger()
def cleanhtml(raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
def unescape(s):
s = s.replace("&apos;", "'")
s = s.replace('&quot;', '"')
return s
def create_api():
auth = tweepy.OAuthHandler(api_key, api_key_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True,
wait_on_rate_limit_notify=True)
try:
api.verify_credentials()
logged_in = True
except Exception as e:
logger.error("Error creating API", exc_info=True)
raise e
logger.info("API created")
return (api, logged_in)
def mastodon():
# Load secrets from secrets file
secrets_filepath = "secrets/secrets.txt"
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
# Load configuration from config file
config_filepath = "config/config.txt"
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath)
# Initialise Mastodon API
mastodon = Mastodon(
client_id=uc_client_id,
client_secret=uc_client_secret,
access_token=uc_access_token,
api_base_url='https://' + mastodon_hostname,
)
# Initialise access headers
headers = {'Authorization': 'Bearer %s'%uc_access_token}
return (mastodon, mastodon_hostname)
def db_config():
# Load db configuration from config file
db_config_filepath = "config/db_config.txt"
feeds_db = get_parameter("feeds_db", db_config_filepath)
feeds_db_user = get_parameter("feeds_db_user", db_config_filepath)
feeds_url = get_parameter("feeds_url", db_config_filepath)
return (feeds_db, feeds_db_user, feeds_url)
def twitter_config():
twitter_config_filepath = "config/keys_config.txt"
api_key = get_parameter("api_key", twitter_config_filepath)
api_key_secret = get_parameter("api_key_secret", twitter_config_filepath)
access_token = get_parameter("access_token", twitter_config_filepath)
access_token_secret = get_parameter("access_token_secret", twitter_config_filepath)
return(api_key, api_key_secret, access_token, access_token_secret)
# Returns the parameter from the specified file
def get_parameter( parameter, file_path ):
# Check if secrets file exists
if not os.path.isfile(file_path):
print("File %s not found, exiting."%file_path)
sys.exit(0)
# Find parameter in file
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
###############################################################################
# main
if __name__ == '__main__':
#######################################################################
#mastodon, mastodon_hostname = mastodon()
feeds_db, feeds_db_user, feeds_url = db_config()
api_key, api_key_secret, access_token, access_token_secret = twitter_config()
publish = 0
logged_in = False
try:
newsfeeds = feedparser.parse(feeds_url)
except:
print(newsfeeds.status)
sys.exit(0)
for entry in newsfeeds.entries:
title = entry['summary']
id = entry['id']
link = entry['link']
###################################################################
# check database if feed is already published
try:
conn = None
conn = psycopg2.connect(database = feeds_db, user = feeds_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute('select link from feeds where link=(%s)', (link,))
row = cur.fetchone()
if row == None:
publish = 1
else:
publish = 0
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
###########################################################
if publish == 1:
toot_text = str(title)+'\n'
toot_text = cleanhtml(toot_text)
toot_text = unescape(toot_text)
toot_text += '\n' + str(link) + '\n'
print("Tooting...")
print(toot_text)
if not logged_in:
api, logged_in = create_api()
if len(toot_text) < 280:
try:
api.update_status(toot_text)
except TweepError as err:
print('\n')
sys.exit(err)
else:
toot_text1 = toot_text[:275].rsplit(' ', 1)[0] + ' (1/2)'
toot_text2 = toot_text[int(len(toot_text1)-6):] + ' (2/2)'
try:
first_tweet = api.update_status(toot_text1)
api.update_status(toot_text2, in_reply_to_status_id=first_tweet.id)
except TweepError as err:
print('\n')
sys.exit(err)
time.sleep(2)
#########################################################
insert_line = 'INSERT INTO feeds(link) VALUES (%s)'
conn = None
try:
conn = psycopg2.connect(database = feeds_db, user = feeds_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(insert_line, (link,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
else:
print("Any new feeds")
sys.exit(0)