import os import sys import random import psycopg2 from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from getpass import getpass import pdb class Database(): name = 'database library' def __init__(self, config_file=None, db=None, db_user=None, db_user_password=None, table=None, service=None): self.config_file = "config/db_config.txt" self.db = self.__get_parameter("db", self.config_file) self.db_user = self.__get_parameter("db_user", self.config_file) self.db_user_password = self.__get_parameter("db_user_password", self.config_file) self.db_table = self.__get_parameter("db_table", self.config_file) self.service = self.__get_parameter("service", self.config_file) db_setup = self.__check_dbsetup(self) if not db_setup: self.db = input("\ndatabase name: ") self.db_user = input("\ndatabase user: ") self.db_user_password = getpass("\ndatabase user password: ") self.db_table = input("\ndatabase's table name: ") self.service = input("\nservice: ") self.__createdb(self) self.__create_config(self) self.__write_config(self) def user_save(self, id, username, date, service): insert_sql = "INSERT INTO users(id, username, created_at, service) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" conn = None try: conn = psycopg2.connect(database=self.db, user=self.db_user, password="", host="/var/run/postgresql", port="5432") cur = conn.cursor() cur.execute(insert_sql, (id, username, date, service)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def load_users(self): users = [] select_sql = "select id, username, created_at, service from users order by created_at, id asc" conn = None try: conn = psycopg2.connect(database=self.db, user=self.db_user, password="", host="/var/run/postgresql", port="5432") cur = conn.cursor() cur.execute(select_sql) rows = cur.fetchall() for row in rows: new_user = {"id": row[0], "username": row[1], "created_at": row[2], "service": row[3]} users.append(new_user) return users cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def delete_user(self, id, username): delete_sql = "delete from users where id=(%s) and username=(%s)" conn = None try: conn = psycopg2.connect(database=self.db, user=self.db_user, password="", host="/var/run/postgresql", port="5432") cur = conn.cursor() cur.execute(delete_sql, (id, username)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() @staticmethod def __check_dbsetup(self): db_setup = False try: conn = None conn = psycopg2.connect(database = self.db, user = self.db_user, password = self.db_user_password, host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select * from information_schema.tables where table_name=(%s)", (self.db_table,)) db_setup = bool(cur.rowcount) except (Exception, psycopg2.DatabaseError) as error: print(error) return db_setup @staticmethod def __createdb(self): conn = None try: conn = psycopg2.connect(dbname='postgres', user=self.db_user, host='', password=self.db_user_password) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() print(f"Creating database {self.db}. Please wait...") cur.execute(sql.SQL("CREATE DATABASE {}").format( sql.Identifier(self.db)) ) print(f"Database {self.db} created!\n") self.__dbtables_schemes(self) except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() @staticmethod def __dbtables_schemes(self): table = self.db_table sql = f"create table {table} (id bigint, username varchar, created_at date, service varchar)" self.__create_table(self, table, sql) index = "users_pkey" sql_index = f'CREATE UNIQUE INDEX {index} ON {table} (id, username)' self.__create_index(self, table, index, sql_index) @staticmethod def __create_table(self, table, sql): conn = None try: conn = psycopg2.connect(database = self.db, user = self.db_user, password = self.db_user_password, host = "/var/run/postgresql", port = "5432") cur = conn.cursor() print(f"Creating table {table}") cur.execute(sql) conn.commit() print(f"Table {table} created!\n") except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() @staticmethod def __create_index(self, table, index, sql_index): conn = None try: conn = psycopg2.connect(database = self.db, user = self.db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() print(f"Creating index...{index}") # Create the table in PostgreSQL database cur.execute(sql_index) conn.commit() print(f"Index {index} created!\n") except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def __get_parameter(self, parameter, config_file): if not os.path.isfile(config_file): print(f"File {config_file} not found..") return with open( config_file ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() print(f"{config_file} Missing parameter {parameter}") sys.exit(0) @staticmethod def __create_config(self): if not os.path.exists('config'): os.makedirs('config') if not os.path.exists(self.config_file): print(self.config_file + " created!") with open(self.config_file, 'w'): pass @staticmethod def __write_config(self): with open(self.config_file, 'a') as the_file: the_file.write(f'db: {self.db}\ndb_user: {self.db_user}\ndb_user_password: {self.db_user_password}\ndb_table: {self.db_table}\nservice: {self.service}') print(f"adding parameters to {self.config_file}\n")