blocksoft/sqlite-bs.py

482 lines
13 KiB
Python

import os
import sys
from mastodon import Mastodon
from mastodon.Mastodon import MastodonNotFoundError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonUnauthorizedError, MastodonIllegalArgumentError
import sqlite3
from sqlite3 import Error
import getpass
import fileinput,re
import requests
class DomainBlocks():
name = 'Domain blocks for Mastodon'
def __init__(self, mastodon_hostname=None, domain_block_api=None, session=None):
self.domain_blocks_api = '/api/v1/admin/domain_blocks'
self.config_file = "config/config.txt"
self.secrets_file = "secrets/secrets.txt"
is_setup = self.__check_setup(self.secrets_file)
if is_setup:
self.__uc_client_id = self.__get_parameter("uc_client_id", self.secrets_file)
self.__uc_client_secret = self.__get_parameter("uc_client_secret", self.secrets_file)
self.__uc_access_token = self.__get_parameter("uc_access_token", self.secrets_file)
self.sqlite_db = 'database/blocksoft.db'
self.mastodon, self.mastodon_hostname, self.headers = self.log_in()
else:
while(True):
logged_in, self.mastodon, self.mastodon_hostname = self.setup()
if not logged_in:
print("\nLog in failed! Try again.\n")
else:
db_setup = self.__check_dbsetup(self)
break
if not os.path.exists('database'):
os.makedirs('database')
self.__check_dbsetup(self)
if session:
self.session = session
else:
self.session = requests.Session()
def get_servers(self, software):
servers_list = []
try:
conn = None
conn = sqlite3.connect(self.sqlite_db)
cur = conn.cursor()
cur.execute("select server from blocker where software=?", (software,))
rows = cur.fetchall()
for row in rows:
servers_list.append(row[0])
cur.close()
except sqlite3.DatabaseError as db_error:
sys.exit(db_error)
finally:
if conn is not None:
conn.close()
return (servers_list)
def log_in(self):
uc_client_id = self.__get_parameter("uc_client_id", self.secrets_file)
uc_client_secret = self.__get_parameter("uc_client_secret", self.secrets_file)
uc_access_token = self.__get_parameter("uc_access_token", self.secrets_file)
self.mastodon_hostname = self.__get_parameter("mastodon_hostname", self.config_file)
mastodon = Mastodon(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + self.mastodon_hostname,
)
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (mastodon, self.mastodon_hostname, headers)
def domain_blocks_create(self, server):
data = {
'domain': server,
'severity': 'suspend',
'reject_media': 'true',
'reject_reports': 'true',
'obfuscate': 'true',
}
endpoint = f'https://{self.mastodon_hostname}/{self.domain_blocks_api}'
response = self.__api_request('POST', endpoint, data)
if response.ok:
print(f"Done, {server} is now suspended")
else:
pass
@staticmethod
def __check_setup(file_path):
is_setup = False
if not os.path.isfile(file_path):
print(f"File {file_path} not found, running setup.")
return
else:
is_setup = True
return is_setup
def setup(self):
logged_in = False
try:
self.mastodon_hostname = input("Enter Mastodon hostname (or 'q' to exit): ")
if self.mastodon_hostname == 'q':
sys.exit("Bye")
user_name = input("Bot user name, ex. john? ")
user_password = getpass.getpass("Bot password? ")
app_name = input("Bot App name? ")
client_id, client_secret = Mastodon.create_app(
app_name,
scopes = ["read", "write", "admin:read", "admin:write"],
to_file="app_clientcred.txt",
api_base_url=self.mastodon_hostname
)
mastodon = Mastodon(client_id = "app_clientcred.txt", api_base_url = self.mastodon_hostname)
mastodon.log_in(
user_name,
user_password,
scopes = ["read", "write", "admin:read", "admin:write"],
to_file = "app_usercred.txt"
)
if os.path.isfile("app_usercred.txt"):
print(f"Log in succesful!")
logged_in = True
if not os.path.exists('secrets'):
os.makedirs('secrets')
if not os.path.exists(self.secrets_file):
with open(self.secrets_file, 'w'): pass
print(f"{self.secrets_file} created!")
with open(self.secrets_file, 'a') as the_file:
print(f"Writing secrets parameter names to {self.secrets_file}")
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
client_path = 'app_clientcred.txt'
with open(client_path) as fp:
line = fp.readline()
cnt = 1
while line:
if cnt == 1:
print(f"Writing client id to {self.secrets_file}")
self.__modify_file(self, self.secrets_file, "uc_client_id: ", value=line.rstrip())
elif cnt == 2:
print(f"Writing client secret to {self.secrets_file}")
self.__modify_file(self, self.secrets_file, "uc_client_secret: ", value=line.rstrip())
line = fp.readline()
cnt += 1
token_path = 'app_usercred.txt'
with open(token_path) as fp:
line = fp.readline()
print(f"Writing access token to {self.secrets_file}")
self.__modify_file(self, self.secrets_file, "uc_access_token: ", value=line.rstrip())
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt temp file..")
os.remove("app_clientcred.txt")
if os.path.exists("app_usercred.txt"):
print("Removing app_usercred.txt temp file..")
os.remove("app_usercred.txt")
self.__create_config(self)
self.__write_config(self)
print("Secrets setup done!\n")
print("Blocker setup done!\n")
except MastodonIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except MastodonNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except MastodonReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except MastodonAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
return (logged_in, mastodon, self.mastodon_hostname)
def __api_request(self, method, endpoint, data={}):
response = None
try:
kwargs = dict(data=data)
response = self.session.request(method, url = endpoint, headers = self.headers, **kwargs)
except Exception as e:
raise MastodonNetworkError(f"Could not complete request: {e}")
if response is None:
raise MastodonIllegalArgumentError("Illegal request.")
if not response.ok:
try:
if isinstance(response, dict) and 'error' in response:
error_msg = response['error']
elif isinstance(response, str):
error_msg = response
else:
error_msg = None
except ValueError:
error_msg = None
if response.status_code == 404:
ex_type = MastodonNotFoundError
if not error_msg:
error_msg = 'Endpoint not found.'
# this is for compatibility with older versions
# which raised MastodonAPIError('Endpoint not found.')
# on any 404
elif response.status_code == 401:
ex_type = MastodonUnauthorizedError
elif response.status_code == 422:
return response
elif response.status_code == 500:
ex_type = MastodonInternalServerError
elif response.status_code == 502:
ex_type = MastodonBadGatewayError
elif response.status_code == 503:
ex_type = MastodonServiceUnavailableError
elif response.status_code == 504:
ex_type = MastodonGatewayTimeoutError
elif response.status_code >= 500 and \
response.status_code <= 511:
ex_type = MastodonServerError
else:
ex_type = MastodonAPIError
raise ex_type(
'Mastodon API returned error',
response.status_code,
response.reason,
error_msg)
else:
return response
@staticmethod
def __check_dbsetup(self):
if not os.path.exists('database'):
os.makedirs('database')
dbsetup = False
try:
conn = None
conn = sqlite3.connect(self.sqlite_db)
dbsetup = True
self.__createdb(self)
except sqlite3.DatabaseError as db_error:
print(db_error)
return dbsetup
@staticmethod
def __createdb(self):
conn = None
try:
conn = sqlite3.connect(self.sqlite_db)
print(f"Database {self.sqlite_db} created!\n")
self.__dbtables_schemes(self)
except sqlite3.DatabaseError as db_error:
print(db_error)
finally:
if conn is not None:
conn.close()
@staticmethod
def __dbtables_schemes(self):
table = "blocker"
sql = "create table "+table+" (server varchar(200), users int, updated_at timestamptz, software varchar(50), alive boolean, users_api varchar(50), version varchar(100), first_checked_at timestamptz, last_checked_at timestamptz, downs int)"
self.__create_table(self, table, sql)
table = "execution_time"
sql = "create table "+table+" (program varchar(30), start timestamptz, finish timestamptz)"
self.__create_table(self, table, sql)
@staticmethod
def __create_table(self, table, sql):
conn = None
try:
conn = sqlite3.connect(self.sqlite_db)
cur = conn.cursor()
print(f"Creating table {table}")
cur.execute(sql)
conn.commit()
print(f"Table {table} created!\n")
except sqlite3.DatabaseError as db_error:
print(db_error)
finally:
if conn is not None:
conn.close()
def __get_parameter(self, parameter, config_file):
if not os.path.isfile(config_file):
print(f"File {config_file} not found, exiting.")
sys.exit(0)
with open( config_file ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f"{config_file} Missing parameter {parameter}")
sys.exit(0)
@staticmethod
def __modify_file(self, file_name, pattern,value=""):
fh=fileinput.input(file_name,inplace=True)
for line in fh:
replacement=pattern + value
line=re.sub(pattern,replacement,line)
sys.stdout.write(line)
fh.close()
@staticmethod
def __create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_file):
print(self.config_file + " created!")
with open('config/config.txt', 'w'): pass
@staticmethod
def __write_config(self):
with open(self.config_file, 'a') as the_file:
the_file.write(f'mastodon_hostname: {self.mastodon_hostname}\n')
print(f"adding parameter 'mastodon_hostname' to {self.config_file}\n")
###############################################################################
# main
if __name__ == '__main__':
blocker = DomainBlocks()
soft_list = 'software.txt'
soft_file = open(soft_list, 'r')
Lines = soft_file.readlines()
for software in Lines:
software = software.replace('\n', '')
print(f'checking software {software}...')
servers_list = blocker.get_servers(software)
for server in servers_list:
blocker.domain_blocks_create(server)