Check and store data from sign-ups registered on your Mastodon server last seven says.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
spamcheck/torips.py

129 lines
2.8 KiB

import os
import datetime
import psycopg2
import requests
import re
import sys
def insert_tor_ip(tor_ip):
insert_sql = 'INSERT INTO torexit_ips(created_at, ip) VALUES(%s,%s) ON CONFLICT DO NOTHING'
conn = None
try:
conn = psycopg2.connect(database = spamcheck_db, user = spamcheck_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(insert_sql, (now, tor_ip))
conn.commit()
print(f'Tor IP {tor_ip} saved to database')
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_torbulk_list():
is_saved = False
try:
user_agent = {'User-agent': 'Mozilla/5.0'}
response = requests.get('https://check.torproject.org/exit-addresses', headers = user_agent, timeout=3)
if response.status_code == 200:
with open('torbulkexitlist', 'wb') as f:
f.write(response.content)
is_saved = True
else:
sys.exit(f'download error: {response.status_code}')
except requests.exceptions.ConnectionError as conn_error:
print(f'{conn_error}')
pass
return is_saved
def db_config():
# Load db configuration from config file
config_filepath = "config/config.txt"
spamcheck_db = get_parameter("spamcheck_db", config_filepath)
spamcheck_db_user = get_parameter("spamcheck_db_user", config_filepath)
return (spamcheck_db, spamcheck_db_user)
def get_parameter( parameter, file_path ):
if not os.path.isfile(file_path):
print("File %s not found, exiting."%file_path)
sys.exit(0)
# Find parameter in file
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
# Cannot find parameter, exit
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
if __name__ == '__main__':
spamcheck_db, spamcheck_db_user = db_config()
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
is_saved = get_torbulk_list()
if is_saved:
filepath = 'torbulkexitlist'
with open(filepath) as fp:
line = fp.readline()
cnt = 1
while line:
line = fp.readline().rstrip('\n')
if 'ExitAddress' in line:
pattern = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
ip = pattern.search(line)[0]
insert_tor_ip(ip)
cnt += 1
print(f'Tor exit nodes: {cnt-1}')
else:
sys.exit('error downloading torbulkexitlist file')