import os import sys from datetime import datetime from urllib.request import pathname2url import sqlite3 from sqlite3 import Error import pdb class Database: name = 'spamguard database manager' def __init__(self, sqlite_db=None): self.sqlite_db = 'database/spamguard.db' is_setup = self.__check_dbsetup(self) if is_setup: return else: if not os.path.exists('database'): os.makedirs('database') self.__createdb(self) def write_user(self, user): now = datetime.now() insert_sql = "INSERT INTO spamguard(user, created_at) VALUES(?,?) ON CONFLICT DO NOTHING" conn = None try: conn = sqlite3.connect(self.sqlite_db) cur = conn.cursor() print(f'Writing {user} data...') cur.execute(insert_sql, (user, now)) conn.commit() cur.close() except sqlite3.DatabaseError as db_error: print(db_error) finally: if conn is not None: conn.close() def read_evo(self): users = 0 conn = None try: conn = sqlite3.connect(self.sqlite_db) cur = conn.cursor() cur.execute("select users from evo order by updated_at desc limit 1") row = cur.fetchone() if row != None: users = row[0] cur.close() return users except sqlite3.DatabaseError as db_error: print(db_error) finally: if conn is not None: conn.close() def signups_ratio(self): ratio = 0 conn = None try: conn = sqlite3.connect(self.sqlite_db) cur = conn.cursor() cur.execute("select count(*) from spamguard where created_at > datetime('now','+120 minutes','-60 minutes')") row = cur.fetchone() if row != None: ratio = row[0] cur.close() return ratio except sqlite3.DatabaseError as db_error: print(db_error) finally: if conn is not None: conn.close() def update_evo(self, users): now = datetime.now() insert_sql = "INSERT INTO evo(users, updated_at) VALUES(?,?) ON CONFLICT DO NOTHING" conn = None try: conn = sqlite3.connect(self.sqlite_db) cur = conn.cursor() cur.execute(insert_sql, (users, now)) print(f'Writing evo data...') conn.commit() cur.close() except sqlite3.DatabaseError as db_error: print(db_error) finally: if conn is not None: conn.close() @staticmethod def __check_dbsetup(self): dbsetup = False try: dburi = 'file:{}?mode=rw'.format(pathname2url(self.sqlite_db)) conn = sqlite3.connect(dburi, uri=True) dbsetup = True except sqlite3.OperationalError as db_error: print(db_error) return dbsetup @staticmethod def __createdb(self): conn = None try: conn = sqlite3.connect(self.sqlite_db) print(f"Database {self.sqlite_db} created!\n") self.__dbtables_schemes(self) except sqlite3.DatabaseError as db_error: print(db_error) finally: if conn is not None: conn.close() @staticmethod def __dbtables_schemes(self): table = "spamguard" sql = f"create table {table} (user varchar(30), created_at timestamptz)" self.__create_table(self, table, sql) table = "evo" sql = f"create table {table} (users INT, updated_at timestamptz)" self.__create_table(self, table, sql) @staticmethod def __create_table(self, table, sql): conn = None try: conn = sqlite3.connect(self.sqlite_db) cur = conn.cursor() print(f"Creating table {table}") cur.execute(sql) conn.commit() print(f"Table {table} created!\n") except sqlite3.DatabaseError as db_error: print(db_error) finally: if conn is not None: conn.close()