Refactored and added PrettyTables!

This commit is contained in:
spla 2022-08-02 14:26:30 +02:00
pare 05f097a048
commit 60a5be7b3d
S'han modificat 2 arxius amb 125 adicions i 112 eliminacions

Veure arxiu

@ -8,123 +8,98 @@ import os
import sys import sys
import os.path import os.path
import psycopg2 import psycopg2
from prettytable import PrettyTable
##############################################################################
# ask what user to be edit
##############################################################################
def ask_what_user():
try:
useremail = input("Enter user email address: (press q to quit) ")
if useremail == 'q':
sys.exit()
else:
user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback = get_user(useremail)
return (user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback)
except:
if useremail != 'q':
print("Enter an existent email address!")
sys.exit("Good bye!")
###############################################################################
# get user row
###############################################################################
def get_user(email): def get_user(email):
try: found_it = False
conn = psycopg2.connect(database = mailing_db, user = mailing_db_user, password = "", host = "/var/run/postgresql", port = "5432") try:
cur = conn.cursor() conn = psycopg2.connect(database = mailing_db, user = mailing_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur.execute("SELECT account_id, username, email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, feedback FROM " + mailing_db_table + " where email=(%s)", (email,)) cur = conn.cursor()
row = cur.fetchone() cur.execute("SELECT account_id, username, email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, feedback FROM " + mailing_db_table + " where email=(%s)", (email,))
user_id = row[0] row = cur.fetchone()
user_name = row[1]
user_email = row[2]
emailed_at = row[3].replace(tzinfo=None)
deleted = row[4]
elapsed_days = row[5]
to_be_deleted = row[6]
recipient_error = row[7]
user_feedback = row[8]
cur.close() if row != None:
return (user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback)
except (Exception, psycopg2.DatabaseError) as error: found_it = True
print(error) user_id = row[0]
user_name = row[1]
user_email = row[2]
emailed_at = row[3].replace(tzinfo=None)
deleted = row[4]
elapsed_days = row[5]
to_be_deleted = row[6]
recipient_error = row[7]
user_feedback = row[8]
finally: else:
if conn is not None: user_id = ''
user_name = ''
user_email = ''
emailed_at = ''
deleted = False
elapsed_days = '0'
to_be_deleted = False
recipient_error = False
user_feedback = False
conn.close() cur.close()
################################################################################### return (found_it, user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback)
# write to database mailing status of inactive users
################################################################################### except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def update_user(will_be_deleted, recip_error, user_feedback, id): def update_user(will_be_deleted, recip_error, user_feedback, id):
conn = None conn = None
try: try:
conn = psycopg2.connect(database = mailing_db, user = mailing_db_user, password = "", host = "/var/run/postgresql", port = "5432") conn = psycopg2.connect(database = mailing_db, user = mailing_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor() cur = conn.cursor()
cur.execute("UPDATE " + mailing_db_table + " SET to_be_deleted=(%s), recipient_error=(%s), feedback=(%s) where account_id=(%s)", (will_be_deleted, recip_error, user_feedback, id)) cur.execute("UPDATE " + mailing_db_table + " SET to_be_deleted=(%s), recipient_error=(%s), feedback=(%s) where account_id=(%s)", (will_be_deleted, recip_error, user_feedback, id))
print("\n")
print("Updating user " + str(id))
conn.commit() print(f"\nUpdating user {str(id)}")
cur.close() conn.commit()
except (Exception, psycopg2.DatabaseError) as error: cur.close()
print(error) except (Exception, psycopg2.DatabaseError) as error:
finally: print(error)
if conn is not None: finally:
conn.close() if conn is not None:
################################################################################## conn.close()
# print user data
##################################################################################
def print_user(user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback):
print("\n")
print("--------------------------------------------------------------------------------------------------------------------------------")
print("| account_id: " + str(user_id) + " | username: " + str(user_name) + " | email: " + str(user_email) + " | emailed at: " + str(emailed_at) + " |")
print("| deleted: " + str(deleted) + " | elapsed days: " + str(elapsed_days) + " | to be deleted: " + str(to_be_deleted) + " | recipient error: " + str(recipient_error) + " | user feedback: " +str(user_feedback) + " |")
print("--------------------------------------------------------------------------------------------------------------------------------")
print("\n")
###############################################################################
# INITIALISATION
###############################################################################
# Returns the parameter from the specified file
def get_parameter( parameter, file_path ): def get_parameter( parameter, file_path ):
# Check if secrets file exists # Check if secrets file exists
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
if file_path == "secrets/secrets.txt": if file_path == "secrets/secrets.txt":
print("File %s not found, exiting. Run setup.py."%file_path) print(f"File {file_path} not found, exiting. Run setup.py.")
elif file_path == "config.txt": elif file_path == "config.txt":
print("File %s not found, exiting. Run db-setup.py."%file_path) print(f"File {file_path} not found, exiting. Run db-setup.py.")
sys.exit(0) sys.exit(0)
# Find parameter in file # Find parameter in file
@ -134,44 +109,81 @@ def get_parameter( parameter, file_path ):
return line.replace(parameter + ":", "").strip() return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit # Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter) print(f"{file_path} Missing parameter {parameter}\nRun setup.py")
print("Run setup.py")
sys.exit(0) sys.exit(0)
# Load configuration from config file def db_config():
config_filepath = "config.txt"
mastodon_db = get_parameter("mastodon_db", config_filepath)
mastodon_db_user = get_parameter("mastodon_db_user", config_filepath)
mailing_db = get_parameter("mailing_db", config_filepath)
mailing_db_user = get_parameter("mailing_db_user", config_filepath)
mailing_db_table = get_parameter("mailing_db_table", config_filepath)
############################################################################### # Load configuration from config file
config_filepath = "config.txt"
mastodon_db = get_parameter("mastodon_db", config_filepath)
mastodon_db_user = get_parameter("mastodon_db_user", config_filepath)
mailing_db = get_parameter("mailing_db", config_filepath)
mailing_db_user = get_parameter("mailing_db_user", config_filepath)
mailing_db_table = get_parameter("mailing_db_table", config_filepath)
while True: return (mastodon_db, mastodon_db_user, mailing_db, mailing_db_user, mailing_db_table)
user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback = ask_what_user() # main
print_user(user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback) if __name__ == '__main__':
willdeleteit = input("Do you want to mark user to be deleted? (press t for True or f for False) ") mastodon_db, mastodon_db_user, mailing_db, mailing_db_user, mailing_db_table = db_config()
if willdeleteit == 'f':
willdeleteit = 'False'
elif willdeleteit == 't':
willdeleteit = 'True'
recip_error = input("Was the email refused? (press t for True or f for False) ") while True:
if recip_error == 'f':
recip_error = 'False' useremail = input("Enter user email address: (press q to quit) ")
elif recip_error == 't':
recip_error = 'True' if useremail == 'q':
sys.exit('Bye.')
else:
found_it, user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback = get_user(useremail)
if found_it:
print_table = PrettyTable()
print_table.field_names = ['id', 'username', 'email', 'emailed_at', 'deleted', 'elapsed_days', 'to_be_deleted', 'recipient_error', 'user_feedback']
print_table.add_row([user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback])
print(f'\n{print_table}\n')
willdeleteit = input("Do you want to mark user to be deleted? (press t for True or f for False) ")
if willdeleteit == 'f':
willdeleteit = 'False'
elif willdeleteit == 't':
willdeleteit = 'True'
recip_error = input("Was the email refused? (press t for True or f for False) ")
if recip_error == 'f':
recip_error = 'False'
elif recip_error == 't':
recip_error = 'True'
user_feed = input("Have the user replied? (press t for True or f for False) ")
if user_feed == 'f':
user_feed = 'False'
elif user_feed == 't':
user_feed = 'True'
update_user(willdeleteit, recip_error, user_feed, user_id)
found_it, user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback = get_user(useremail)
print_table = PrettyTable()
print_table.field_names = ['id', 'username', 'email', 'emailed_at', 'deleted', 'elapsed_days', 'to_be_deleted', 'recipient_error', 'user_feedback']
print_table.add_row([user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback])
print(f'\n{print_table}\n')
else:
print(f'email {useremail} not found!')
user_feed = input("Have the user replied? (press t for True or f for False) ")
if user_feed == 'f':
user_feed = 'False'
elif user_feed == 't':
user_feed = 'True'
update_user(willdeleteit, recip_error, user_feed, user_id)
user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback = get_user(user_email)
print_user(user_id, user_name, user_email, emailed_at, deleted, elapsed_days, to_be_deleted, recipient_error, user_feedback)

Veure arxiu

@ -1 +1,2 @@
psycopg2-binary psycopg2-binary
prettytable