Total refactoring

This commit is contained in:
spla 2023-05-06 15:05:41 +02:00
pare 6f0c2a7678
commit 89c5460ed3
S'han modificat 7 arxius amb 412 adicions i 451 eliminacions

Veure arxiu

@ -15,12 +15,9 @@ Within Python Virtual Environment:
1. Run `pip install -r requirements.txt` to install all needed libraries. 1. Run `pip install -r requirements.txt` to install all needed libraries.
2. Run `python db-setup.py` to create needed database and table. It's needed to control what feeds are already published. db-setup.py 2. Run `python mastofeeds.py` to start publishing feeds. First time run will ask parameters to configure everything.
will also ask you the feed's url.
3. Run `python setup.py` to get your bot's access token of an existing user. It will be saved to 'secrets/secrets.txt' for further use. 3. Use your favourite scheduling method to set `python mastofeeds.py` to run regularly.
4. Run `python mastofeeds.py` to start publishing feeds. 6.5.2023 - Total refactor
5. Use your favourite scheduling method to set `python mastofeeds.py` to run regularly.

228
app/libraries/database.py Normal file
Veure arxiu

@ -0,0 +1,228 @@
import os
import sys
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import uuid
from datetime import datetime
import pytz
import pdb
tz = pytz.timezone('Europe/Madrid')
class Database():
name = 'mastofeeds database library'
def __init__(self, config_file=None, mastofeeds_db=None, mastofeeds_db_user=None, mastofeeds_db_user_password=None, feeds_url=None):
self.config_file = "config/db_config.txt"
self.mastofeeds_db = self.__get_parameter("mastofeeds_db", self.config_file)
self.mastofeeds_db_user = self.__get_parameter("mastofeeds_db_user", self.config_file)
self.mastofeeds_db_user_password = self.__get_parameter("mastofeeds_db_user_password", self.config_file)
self.feeds_url = self.__get_parameter("feeds_url", self.config_file)
db_setup = self.__check_dbsetup(self)
if not db_setup:
self.mastofeeds_db = input("\nmastofeeds database name: ")
self.mastofeeds_db_user = input("\nmastofeeds database user: ")
self.mastofeeds_db_user_password = input("\nmastofeeds database user password: ")
self.feeds_url = input("\nenter feed URL: ")
self.__createdb(self)
self.__create_config(self)
self.__write_config(self)
def published(self, link):
# check database if feed is already published
publish = False
try:
conn = None
conn = psycopg2.connect(database = self.mastofeeds_db, user = self.mastofeeds_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute('select link from mastofeeds where link=(%s)', (link,))
row = cur.fetchone()
if row == None:
publish = True
cur.close()
return publish
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def write_feed(self, link):
insert_line = 'INSERT INTO mastofeeds(link) VALUES (%s)'
conn = None
try:
conn = psycopg2.connect(database = self.mastofeeds_db, user = self.mastofeeds_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(insert_line, (link,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
@staticmethod
def __check_dbsetup(self):
db_setup = False
try:
conn = None
conn = psycopg2.connect(database = self.mastofeeds_db, user = self.mastofeeds_db_user, password = self.mastofeeds_db_user_password, host = "/var/run/postgresql", port = "5432")
db_setup = True
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return db_setup
@staticmethod
def __createdb(self):
conn = None
try:
conn = psycopg2.connect(dbname='postgres',
user=self.mastofeeds_db_user, host='',
password=self.mastofeeds_db_user_password)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
print(f"Creating database {self.mastofeeds_db}. Please wait...")
cur.execute(sql.SQL("CREATE DATABASE {}").format(
sql.Identifier(self.mastofeeds_db))
)
print(f"Database {self.mastofeeds_db} created!\n")
self.__dbtables_schemes(self)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
@staticmethod
def __dbtables_schemes(self):
db = self.mastofeeds_db
table = "mastofeeds"
sql = "create table "+table+" (link varchar(300) PRIMARY KEY)"
self.__create_table(self, table, sql)
@staticmethod
def __create_table(self, table, sql):
conn = None
try:
conn = psycopg2.connect(database = self.mastofeeds_db, user = self.mastofeeds_db_user, password = self.mastofeeds_db_user_password, host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
print(f"Creating table {table}")
cur.execute(sql)
conn.commit()
print(f"Table {table} created!\n")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def __get_parameter(self, parameter, config_file):
if not os.path.isfile(config_file):
print(f"File {config_file} not found..")
return
with open( config_file ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f"{config_file} Missing parameter {parameter}")
sys.exit(0)
@staticmethod
def __create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_file):
print(self.config_file + " created!")
with open(self.config_file, 'w'): pass
@staticmethod
def __write_config(self):
with open(self.config_file, 'a') as the_file:
the_file.write(f'mastofeeds_db: {self.mastofeeds_db}\nmastofeeds_db_user: {self.mastofeeds_db_user}\nmastofeeds_db_user_password: {self.mastofeeds_db_user_password}\nfeeds_url: {self.feeds_url}')
print(f"adding parameters to {self.config_file}\n")

157
app/libraries/setup.py Normal file
Veure arxiu

@ -0,0 +1,157 @@
import os
import sys
from datetime import datetime
import pytz
from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
import pdb
class Setup():
name = 'fediverse setup'
def __init__(self, config_file=None, mastodon_hostname=None, peers_api=None, user_agent=None, secrets_filepath=None, mastodon_app_token=None):
self.config_file = "config/config.txt"
self.mastodon_hostname = self.__get_parameter("mastodon_hostname", self.config_file)
self.peers_api = '/api/v1/instance/peers?'
self.user_agent = {'User-agent': "fediverse's stats (fediverse@mastodont.cat)"}
self.secrets_filepath = 'secrets/secrets.txt'
is_setup = self.__check_mastodon_setup(self)
if is_setup:
self.mastodon_app_token = self.__get_mastodon_parameter("mastodon_app_token", self.secrets_filepath)
else:
self.mastodon_app_token = self.mastodon_setup(self)
@staticmethod
def __check_mastodon_setup(self):
is_setup = False
if not os.path.isfile(self.secrets_filepath):
print(f"File {self.secrets_filepath} not found, running setup.")
else:
is_setup = True
return is_setup
@staticmethod
def mastodon_setup(self):
if not os.path.exists('secrets'):
os.makedirs('secrets')
self.mastodon_user = input("Mastodon user login? ")
self.mastodon_password = input("Mastodon user password? ")
self.app_name = 'fediverse'
self.mastodon_app_token = self.mastodon_log_in()
if not os.path.exists(self.secrets_filepath):
with open(self.secrets_filepath, 'w'): pass
print(f"{self.secrets_filepath} created!")
with open(self.secrets_filepath, 'a') as the_file:
print("Writing Mastodon parameters to " + self.secrets_filepath)
the_file.write(f'mastodon_app_token: {self.mastodon_app_token}')
return self.mastodon_app_token
def mastodon_log_in(self):
token = ''
try:
response = Mastodon.create_app(
self.app_name,
scopes=["read","write"],
to_file=None,
api_base_url=self.mastodon_hostname
)
client_id = response[0]
client_secret = response[1]
mastodon = Mastodon(client_id = client_id, client_secret = client_secret, api_base_url = self.mastodon_hostname)
token = mastodon.log_in(
self.mastodon_user,
self.mastodon_password,
scopes = ["read", "write"],
to_file = None
)
print('Log in succesful!')
except MastodonIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except MastodonNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except MastodonReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except MastodonAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
finally:
return token
def __get_parameter(self, parameter, config_file):
if not os.path.isfile(config_file):
print(f"File {config_file} not found..")
self.mastodon_hostname = input("\nMastodon hostname: ")
self.__create_config(self)
self.__write_config(self)
with open( self.config_file ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
def __get_mastodon_parameter(self, parameter, secrets_filepath):
if not os.path.isfile(secrets_filepath):
print(f"File {secrets_filepath} not found..")
self.sign_in()
with open( self.secrets_filepath ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
@staticmethod
def __create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_file):
print(self.config_file + " created!")
with open(self.config_file, 'w'): pass
@staticmethod
def __write_config(self):
with open(self.config_file, 'a') as the_file:
the_file.write(f'mastodon_hostname: {self.mastodon_hostname}')
print(f"adding parameters to {self.config_file}\n")

Veure arxiu

@ -1,150 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import getpass
import os
import sys
from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
# Returns the parameter from the specified file
def get_parameter( parameter, file_path ):
# Check if secrets file exists
if not os.path.isfile(file_path):
print("File %s not found, asking."%file_path)
write_parameter( parameter, file_path )
#sys.exit(0)
# Find parameter in file
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
def write_parameter( parameter, file_path ):
print("Setting up newsfeed parameters...")
print("\n")
feeds_db = input("feeds db name: ")
feeds_db_user = input("feeds db user: ")
feeds_url = input("enter feeds url: ")
with open(file_path, "w") as text_file:
print("feeds_db: {}".format(feeds_db), file=text_file)
print("feeds_db_user: {}".format(feeds_db_user), file=text_file)
print("feeds_url: {}".format(feeds_url), file=text_file)
def create_table(db, db_user, table, sql):
conn = None
try:
conn = psycopg2.connect(database = db, user = db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
print("Creating table.. "+table)
# Create the table in PostgreSQL database
cur.execute(sql)
conn.commit()
print("Table "+table+" created!")
print("\n")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
###############################################################################
# main
if __name__ == '__main__':
# Load configuration from config file
config_filepath = "db_config.txt"
feeds_db = get_parameter("feeds_db", config_filepath)
feeds_db_user = get_parameter("feeds_db_user", config_filepath)
feeds_url = get_parameter("feeds_url", config_filepath)
############################################################
# create database
############################################################
conn = None
try:
conn = psycopg2.connect(dbname='postgres', user=feeds_db_user, host='', password='')
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
print("Creating database " + feeds_db + ". Please wait...")
cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(feeds_db)))
print("Database " + feeds_db + " created!")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
#############################################################################################
try:
conn = None
conn = psycopg2.connect(database = feeds_db, user = feeds_db_user, password = "", host = "/var/run/postgresql", port = "5432")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# Load configuration from config file
os.remove("db_config.txt")
print("Exiting. Run setup again with right parameters")
sys.exit(0)
if conn is not None:
print("\n")
print("Host parameters saved to config.txt!")
print("\n")
############################################################
# Create needed tables
############################################################
print("Creating table...")
#####################################
db = feeds_db
db_user = feeds_db_user
table = "feeds"
sql = "create table "+table+" (link varchar(300) PRIMARY KEY)"
create_table(db, db_user, table, sql)
#####################################
print("Done!")
print("Now you can run setup.py!")
print("\n")

Veure arxiu

@ -1,66 +1,33 @@
import os import os
import sys
import time
from app.libraries.database import Database
from app.libraries.setup import Setup
import feedparser import feedparser
from mastodon import Mastodon from mastodon import Mastodon
import psycopg2 import psycopg2
import sys import pdb
import time
# Returns the parameter from the specified file
def get_parameter( parameter, file_path ):
# Check if secrets file exists
if not os.path.isfile(file_path):
print("File %s not found, exiting."%file_path)
sys.exit(0)
# Find parameter in file
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
###############################################################################
# main # main
if __name__ == '__main__': if __name__ == '__main__':
# Load secrets from secrets file pdb.set_trace()
secrets_filepath = "secrets/secrets.txt" db = Database()
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
# Load configuration from config file setup = Setup()
db_config_filepath = "db_config.txt"
feeds_db = get_parameter("feeds_db", db_config_filepath)
feeds_db_user = get_parameter("feeds_db_user", db_config_filepath)
feeds_url = get_parameter("feeds_url", db_config_filepath)
# Load configuration from config file
config_filepath = "config.txt"
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath) # E.g., mastodon.social
# Initialise Mastodon API
mastodon = Mastodon( mastodon = Mastodon(
client_id = uc_client_id, access_token = setup.mastodon_app_token,
client_secret = uc_client_secret, api_base_url= setup.mastodon_hostname
access_token = uc_access_token, )
api_base_url = 'https://' + mastodon_hostname,
)
# Initialise access headers #publish = 0
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
#######################################################################
publish = 0
try: try:
newsfeeds = feedparser.parse(feeds_url) newsfeeds = feedparser.parse(db.feeds_url)
print(newsfeeds.status) print(newsfeeds.status)
except: except:
@ -74,77 +41,28 @@ if __name__ == '__main__':
id = entry['id'] id = entry['id']
link = entry['link'] link = entry['link']
###################################################################
# check database if feed is already published # check database if feed is already published
try: publish = db.published(link)
conn = None if publish:
conn = psycopg2.connect(database = feeds_db, user = feeds_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute('select link from feeds where link=(%s)', (link,))
row = cur.fetchone()
if row == None:
publish = 1
else:
publish = 0
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
###########################################################
if publish == 1:
toot_text = str(title)+'\n' toot_text = str(title)+'\n'
toot_text += str(link) toot_text += str(link)
print("Tooting...") print("Tooting...")
print(toot_text) print(toot_text)
mastodon.status_post(toot_text, in_reply_to_id=None,) mastodon.status_post(toot_text, in_reply_to_id=None,)
time.sleep(2) time.sleep(2)
######################################################### # write feed
insert_line = 'INSERT INTO feeds(id, link) VALUES (%s, %s)' db.write_feed(link)
conn = None
try:
conn = psycopg2.connect(database = feeds_db, user = feeds_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(insert_line, (id, link,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
else: else:
print("Any new feeds") print("Any new feeds")

Veure arxiu

@ -1,3 +1,4 @@
Mastodon.py>=1.5.1 pytz
psycopg2-binary>=2.8.4 Mastodon.py
feedparser>=5.2.1 psycopg2
feedparser

190
setup.py
Veure arxiu

@ -1,190 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import getpass
from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
import fileinput,re
import os
import sys
def create_dir():
if not os.path.exists('secrets'):
os.makedirs('secrets')
def create_file():
if not os.path.exists('secrets/secrets.txt'):
with open('secrets/secrets.txt', 'w'): pass
print(secrets_filepath + " created!")
def create_config():
if not os.path.exists(config_filepath):
print(config_filepath + " created!")
with open('config.txt', 'w'): pass
def write_params():
with open(secrets_filepath, 'a') as the_file:
print("Writing secrets parameter names to " + secrets_filepath)
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
def write_config():
with open(config_filepath, 'a') as the_file:
the_file.write('mastodon_hostname: \n')
print("adding parameter name 'mastodon_hostname' to "+ config_filepath)
def read_client_lines(self):
client_path = 'app_clientcred.txt'
with open(client_path) as fp:
line = fp.readline()
cnt = 1
while line:
if cnt == 1:
print("Writing client id to " + secrets_filepath)
modify_file(secrets_filepath, "uc_client_id: ", value=line.rstrip())
elif cnt == 2:
print("Writing client secret to " + secrets_filepath)
modify_file(secrets_filepath, "uc_client_secret: ", value=line.rstrip())
line = fp.readline()
cnt += 1
def read_token_line(self):
token_path = 'app_usercred.txt'
with open(token_path) as fp:
line = fp.readline()
print("Writing access token to " + secrets_filepath)
modify_file(secrets_filepath, "uc_access_token: ", value=line.rstrip())
def read_config_line():
with open(config_filepath) as fp:
line = fp.readline()
modify_file(config_filepath, "mastodon_hostname: ", value=hostname)
def log_in():
error = 0
try:
global hostname
hostname = input("Enter Mastodon hostname: ")
user_name = input("User name, ex. user@" + hostname +"? ")
user_password = getpass.getpass("User password? ")
app_name = input("This app name? ")
Mastodon.create_app(app_name, scopes=["read","write"],
to_file="app_clientcred.txt", api_base_url=hostname)
mastodon = Mastodon(client_id = "app_clientcred.txt", api_base_url = hostname)
mastodon.log_in(
user_name,
user_password,
scopes = ["read", "write"],
to_file = "app_usercred.txt"
)
except MastodonIllegalArgumentError as i_error:
error = 1
if os.path.exists("secrets/secrets.txt"):
print("Removing secrets/secrets.txt file..")
os.remove("secrets/secrets.txt")
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt file..")
os.remove("app_clientcred.txt")
sys.exit(i_error)
except MastodonNetworkError as n_error:
error = 1
if os.path.exists("secrets/secrets.txt"):
print("Removing secrets/secrets.txt file..")
os.remove("secrets/secrets.txt")
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt file..")
os.remove("app_clientcred.txt")
sys.exit(n_error)
except MastodonReadTimeout as r_error:
error = 1
if os.path.exists("secrets/secrets.txt"):
print("Removing secrets/secrets.txt file..")
os.remove("secrets/secrets.txt")
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt file..")
os.remove("app_clientcred.txt")
sys.exit(r_error)
except MastodonAPIError as a_error:
error = 1
if os.path.exists("secrets/secrets.txt"):
print("Removing secrets/secrets.txt file..")
os.remove("secrets/secrets.txt")
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt file..")
os.remove("app_clientcred.txt")
sys.exit(a_error)
finally:
if error == 0:
create_dir()
create_file()
write_params()
client_path = 'app_clientcred.txt'
read_client_lines(client_path)
token_path = 'app_usercred.txt'
read_token_line(token_path)
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt temp file..")
os.remove("app_clientcred.txt")
if os.path.exists("app_usercred.txt"):
print("Removing app_usercred.txt temp file..")
os.remove("app_usercred.txt")
print("Secrets setup done!\n")
def modify_file(file_name,pattern,value=""):
fh=fileinput.input(file_name,inplace=True)
for line in fh:
replacement=pattern + value
line=re.sub(pattern,replacement,line)
sys.stdout.write(line)
fh.close()
# Returns the parameter from the specified file
def get_parameter( parameter, file_path ):
# Check if secrets file exists
if not os.path.isfile(file_path):
print("File %s not found, creating it."%file_path)
log_in()
# Find parameter in file
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
# Returns the parameter from the specified file
def get_hostname( parameter, config_filepath ):
# Check if secrets file exists
if not os.path.isfile(config_filepath):
print("File %s not found, creating it."%config_filepath)
create_config()
# Find parameter in file
with open( config_filepath ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(config_filepath + " Missing parameter %s "%parameter)
write_config()
read_config_line()
print("hostname setup done!")
sys.exit(0)
###############################################################################
# main
if __name__ == '__main__':
# Load secrets from secrets file
secrets_filepath = "secrets/secrets.txt"
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
# Load configuration from config file
config_filepath = "config.txt"
mastodon_hostname = get_hostname("mastodon_hostname", config_filepath)