corpus/database.py
2022-12-16 11:57:21 +01:00

600 líneas
13 KiB
Python

import os
import sys
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import uuid
from datetime import datetime
import pytz
import pdb
tz = pytz.timezone('Europe/Madrid')
class Database():
name = 'Corpus database manager'
def __init__(self, config_file=None, corpus_db=None, corpus_db_user=None):
self.config_file = "config/db_config.txt"
self.corpus_db = self.__get_parameter("corpus_db", self.config_file)
self.corpus_db_user = self.__get_parameter("corpus_db_user", self.config_file)
db_setup = self.__check_dbsetup(self)
if not db_setup:
self.corpus_db = input("\nCorpus database name: ")
self.corpus_db_user = input("\nCorpus database user: ")
self.__createdb(self)
self.__create_config(self)
self.__write_config(self)
def check_user(self, username):
found_it = False
pending = False
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select username, pending from users where username = (%s)", (username,))
row = cur.fetchone()
if row != None:
found_it = True
pending = row[1]
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return (found_it, pending)
def add_user(self, username, post_id):
is_added = False
sql = "INSERT INTO users(id, username, pending, post_id) VALUES(%s, %s, %s, %s)"
unique_id = str(uuid.uuid4())
pending = True
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (unique_id, username, pending, post_id))
conn.commit()
cur.close()
is_added = True
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return is_added
def confirm_user(self, username):
is_confirmed = False
sql = "UPDATE users set pending = (%s) where username = (%s)"
pending = False
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (pending, username))
conn.commit()
cur.close()
is_confirmed = True
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return is_confirmed
def del_user(self, username):
is_deleted = False
sql = "delete from users where username = (%s)"
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (username,))
conn.commit()
cur.close()
is_deleted = True
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return is_deleted
def save_post(self, username, post):
is_saved = False
sql = "INSERT INTO corpus(id, username, text, created_at) VALUES(%s, %s, %s, %s)"
unique_id = str(uuid.uuid4())
now = datetime.now()
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (unique_id, username, post, now))
conn.commit()
cur.close()
is_saved = True
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return is_saved
def del_user_posts(self, username):
are_deleted = False
sql = "delete from corpus where username = (%s)"
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (username,))
conn.commit()
cur.close()
are_deleted = True
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return are_deleted
def users(self):
user_list = []
sql = "select distinct username from corpus order by 1 asc"
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql)
rows = cur.fetchall()
for row in rows:
user_list.append(row)
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return user_list
def user_posts(self, username):
posts = []
sql = "select text from corpus where username = (%s)"
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (username,))
rows = cur.fetchall()
for row in rows:
posts.append(row)
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return posts
def total_users(self):
total_posts = 0
sql = "select count(username) from users"
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql)
row = cur.fetchone()
if row != None:
total_users = row[0]
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return total_users
def total_posts(self):
total_posts = 0
sql = "select count(text) from corpus"
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql)
row = cur.fetchone()
if row != None:
total_posts = row[0]
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return total_posts
def export(self, username, post, lt_errors, language):
is_saved = False
sql = "INSERT INTO exported(id, username, text, lt_errors, language, exported_at) VALUES(%s, %s, %s, %s, %s, %s)"
unique_id = str(uuid.uuid4())
now = datetime.now()
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(sql, (unique_id, username, post, lt_errors, language, now))
conn.commit()
cur.close()
is_saved = True
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return is_saved
def csv_save(self, filename):
is_saved = False
sql = "copy (select * from exported) to stdout with csv delimiter ';'"
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
with open(filename, "w") as file:
cur.copy_expert(sql, file)
is_saved = True
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return is_saved
@staticmethod
def __check_dbsetup(self):
db_setup = False
try:
conn = None
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
db_setup = True
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return db_setup
@staticmethod
def __createdb(self):
conn = None
try:
conn = psycopg2.connect(dbname='postgres',
user=self.corpus_db_user, host='',
password='')
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
print(f"Creating database {self.corpus_db}. Please wait...")
cur.execute(sql.SQL("CREATE DATABASE {}").format(
sql.Identifier(self.corpus_db))
)
print(f"Database {self.corpus_db} created!\n")
self.__dbtables_schemes(self)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
@staticmethod
def __dbtables_schemes(self):
table = "corpus"
sql = "create table "+table+" (id uuid, username varchar(50), text varchar(500), created_at timestamptz, exported boolean default False, PRIMARY KEY (id))"
self.__create_table(self, table, sql)
table = "users"
sql = "create table "+table+" (id uuid, username varchar(50), pending boolean default True, post_id bigint, PRIMARY KEY (id))"
self.__create_table(self, table, sql)
table = "exported"
sql = "create table "+table+" (id uuid, username varchar(50), text varchar(500), lt_errors int, language varchar(2), PRIMARY KEY (id))"
self.__create_table(self, table, sql)
@staticmethod
def __create_table(self, table, sql):
conn = None
try:
conn = psycopg2.connect(database = self.corpus_db, user = self.corpus_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
print(f"Creating table {table}")
cur.execute(sql)
conn.commit()
print(f"Table {table} created!\n")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def __get_parameter(self, parameter, config_file):
if not os.path.isfile(config_file):
print(f"File {config_file} not found..")
return
with open( config_file ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f"{config_file} Missing parameter {parameter}")
sys.exit(0)
@staticmethod
def __create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_file):
print(self.config_file + " created!")
with open('config/db_config.txt', 'w'): pass
@staticmethod
def __write_config(self):
with open(self.config_file, 'a') as the_file:
the_file.write(f'corpus_db: {self.corpus_db}\ncorpus_db_user: {self.corpus_db_user}')
print(f"adding parameters to {self.config_file}\n")