Changed multiprocessing library to Ray

This commit is contained in:
spla 2022-03-11 19:53:21 +01:00
pare 80e8815166
commit 7256cf2a30
S'han modificat 2 arxius amb 539 adicions i 470 eliminacions

Veure arxiu

@ -1,34 +1,16 @@
from multiprocessing import set_start_method
from multiprocessing import get_context
import time
import urllib3
from datetime import datetime
from mastodon import Mastodon
import os
import json
import sys
import os.path
import os
import time
from datetime import datetime
import urllib3
import requests
import aiohttp
import asyncio
import psycopg2
from itertools import product
import socket
from mastodon import Mastodon
import psycopg2
import matplotlib.pyplot as plt
import pdb
import ray
plt.style.use('seaborn')
start_time = time.time()
client_exceptions = (
aiohttp.ClientResponseError,
aiohttp.ClientConnectionError,
aiohttp.ClientConnectorError,
aiohttp.ClientError,
asyncio.TimeoutError,
socket.gaierror,
)
ray.init(num_cpus = 32) # Specify this system CPUs.
class Server:
@ -44,16 +26,15 @@ class Server:
self.version = self.soft_version
self.now = now
def get_alive_servers(self, *args):
@ray.remote
def get_alive_servers(server):
users = 0
downs = 0
fediverse_db, fediverse_db_user = db_config()
now = self.now
if len(args) != 0:
self.server = args[0]
try:
conn = None
@ -62,16 +43,16 @@ class Server:
cur = conn.cursor()
cur.execute("select alive, software, users_api, version, first_checked_at, downs from fediverse where server=(%s)", (self.server,))
cur.execute("select alive, software, users_api, version, first_checked_at, downs from fediverse where server=(%s)", (server,))
row = cur.fetchone()
if row is not None:
was_alive = row[0]
self.software = row[1]
self.api = row[2]
self.soft_version = row[3]
software = row[1]
api = row[2]
soft_version = row[3]
first_checked_at = row[4]
downs_qty = row[5]
@ -93,29 +74,29 @@ class Server:
user_agent = {'User-agent': 'Mozilla/5.0'}
data = requests.get('https://' + self.server + self.api, headers = user_agent, timeout=3)
data = requests.get('https://' + server + api, headers = user_agent, timeout=3)
try:
self.users = data.json()['usage']['users']['total']
users = data.json()['usage']['users']['total']
if self.users == 0:
if users == 0:
self.users = data.json()['usage']['users']['activeHalfyear']
users = data.json()['usage']['users']['activeHalfyear']
if self.software == 'socialhome':
if software == 'socialhome':
self.soft_version = data.json()['server']['version']
soft_version = data.json()['server']['version']
else:
self.soft_version = data.json()['software']['version']
soft_version = data.json()['software']['version']
if self.software == "wordpress" and "activitypub" in data.json()['protocols']:
if software == "wordpress" and "activitypub" in data.json()['protocols']:
alive = True
elif self.software == "wordpress" and "activitypub" not in data.json()['protocols']:
elif software == "wordpress" and "activitypub" not in data.json()['protocols']:
alive = False
@ -125,21 +106,18 @@ class Server:
except:
self.users = 0
self.soft_version = ""
soft_version = ""
else:
if self.api == '/api/v1/instance?':
if api == '/api/v1/instance?':
try:
self.users = data.json()['stats']['user_count']
self.soft_version = data.json()['version']
users = data.json()['stats']['user_count']
soft_version = data.json()['version']
alive = True
except:
self.users = 0
self.soft_version = ""
soft_version = ""
if alive:
@ -147,103 +125,62 @@ class Server:
downs = downs_qty
else:
if soft_version != "" and soft_version is not None:
downs = 0
if self.soft_version != "" and self.soft_version is not None:
print(f'\n** Server {self.server} ({self.software} {self.soft_version}) is alive! **')
print(f'\n** Server {server} ({software} {soft_version}) is alive! **')
else:
print(f'\n** Server {self.server} ({self.software}) is alive! **')
insert_sql = "INSERT INTO fediverse(server, users, updated_at, software, alive, users_api, version, first_checked_at, last_checked_at, downs) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING"
conn = None
try:
conn = psycopg2.connect(database=fediverse_db, user=fediverse_db_user, password="", host="/var/run/postgresql", port="5432")
cur = conn.cursor()
cur.execute(insert_sql, (self.server, self.users, now, self.software, alive, self.api, self.soft_version, now, now, downs))
if first_checked_at != None:
cur.execute("UPDATE fediverse SET users=(%s), updated_at=(%s), software=(%s), alive=(%s), users_api=(%s), version=(%s), last_checked_at=(%s), downs=(%s) where server=(%s)", (self.users, now, self.software, alive, self.api, self.soft_version, now, downs, self.server))
else:
cur.execute("UPDATE fediverse SET users=(%s), updated_at=(%s), software=(%s), alive=(%s), users_api=(%s), version=(%s), first_checked_at=(%s), last_checked_at=(%s), downs=(%s) where server=(%s)", (self.users, now, self.software, alive, self.api, self.soft_version, now, now, downs, self.server))
cur.execute("UPDATE world SET checked='t' where server=(%s)", (self.server,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
print(f'\n** Server {server} ({software}) is alive! **')
except urllib3.exceptions.ProtocolError as protoerr:
print_dead(self.server)
print_dead(server)
pass
except requests.exceptions.ChunkedEncodingError as chunkerr:
print_dead(self.server)
print_dead(server)
pass
except KeyError as e:
print_dead(self.server)
print_dead(server)
pass
except ValueError as verr:
print_dead(self.server)
print_dead(server)
pass
except requests.exceptions.SSLError as errssl:
print_dead(self.server)
print_dead(server)
pass
except requests.exceptions.HTTPError as errh:
print_dead(self.server)
print_dead(server)
pass
except requests.exceptions.ConnectionError as errc:
print_dead(self.server)
print_dead(server)
pass
except requests.exceptions.Timeout as errt:
print_dead(self.server)
print_dead(server)
pass
except requests.exceptions.RequestException as err:
print_dead(self.server)
print_dead(server)
pass
except socket.gaierror as gai_error:
print_dead(self.server)
print_dead(server)
pass
if not alive:
@ -256,24 +193,31 @@ class Server:
downs = 1
return (server, software, soft_version, alive, api, users, downs, first_checked_at)
def write_alive_server(server, software, soft_version, alive, api, users, downs, first_checked_at):
insert_sql = "INSERT INTO fediverse(server, users, updated_at, software, alive, users_api, version, first_checked_at, last_checked_at, downs) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING"
conn = None
try:
conn = psycopg2.connect(database=fediverse_db, user=fediverse_db_user, password="",
host="/var/run/postgresql", port="5432")
conn = psycopg2.connect(database=fediverse_db, user=fediverse_db_user, password="", host="/var/run/postgresql", port="5432")
cur = conn.cursor()
cur.execute(insert_sql, (server, users, now, software, alive, api, soft_version, now, now, downs))
if first_checked_at != None:
cur.execute("UPDATE fediverse SET updated_at=(%s), alive=(%s), first_checked_at=(%s), downs=(%s) where server=(%s)", (now, alive, now, downs, self.server))
cur.execute("UPDATE fediverse SET users=(%s), updated_at=(%s), software=(%s), alive=(%s), users_api=(%s), version=(%s), last_checked_at=(%s), downs=(%s) where server=(%s)", (users, now, software, alive, api, soft_version, now, downs, server))
else:
cur.execute("UPDATE fediverse SET updated_at=(%s), alive=(%s), downs=(%s) where server=(%s)", (now, alive, downs, self.server))
cur.execute("UPDATE fediverse SET users=(%s), updated_at=(%s), software=(%s), alive=(%s), users_api=(%s), version=(%s), first_checked_at=(%s), last_checked_at=(%s), downs=(%s) where server=(%s)", (users, now, software, alive, api, soft_version, now, now, downs, server))
cur.execute("UPDATE world SET checked='t' where server=(%s)", (self.server,))
cur.execute("UPDATE world SET checked='t' where server=(%s)", (server,))
conn.commit()
@ -289,6 +233,113 @@ class Server:
conn.close()
def write_not_alive_server(server, software, soft_version, alive, api, users, downs, first_checked_at):
conn = None
try:
conn = psycopg2.connect(database=fediverse_db, user=fediverse_db_user, password="", host="/var/run/postgresql", port="5432")
cur = conn.cursor()
if first_checked_at != None:
cur.execute("UPDATE fediverse SET updated_at=(%s), alive=(%s), downs=(%s) where server=(%s)", (now, alive, downs, server))
else:
cur.execute("UPDATE fediverse SET updated_at=(%s), alive=(%s), first_checked_at=(%s), downs=(%s) where server=(%s)", (now, alive, now, downs, server))
cur.execute("UPDATE world SET checked='t' where server=(%s)", (server,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def delete_dead_servers():
conn = None
try:
conn = psycopg2.connect(database=fediverse_db, user=fediverse_db_user, password="", host="/var/run/postgresql", port="5432")
cur = conn.cursor()
cur.execute("select server from fediverse where downs > '14' and not alive and now() - first_checked_at > interval '7 days'")
rows = cur.fetchall()
for row in rows:
print(f'Deleting server {row[0]}...')
cur.execute("delete from fediverse where server=(%s)", (row[0],))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_last_checked_servers():
############################################################################
# get last checked servers from fediverse DB
alive_servers = []
try:
conn = None
conn = psycopg2.connect(database=fediverse_db, user=fediverse_db_user, password="", host="/var/run/postgresql", port="5432")
cur = conn.cursor()
# get world servers list
cur.execute("select server from world where server in (select server from fediverse where users_api != '')")
alive_servers = []
for row in cur:
alive_servers.append(row[0])
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
return alive_servers
def print_dead(server):
print(f'\nServer {server} is dead :-(')
@ -322,29 +373,22 @@ def set_world_servers_check_to_false():
conn.close()
def get_last_checked_servers():
def set_world_servers_check_to_false():
############################################################################
# get last checked servers from fediverse DB
alive_servers = []
# set all world servers's checked column to False
try:
conn = None
conn = psycopg2.connect(database=fediverse_db, user=fediverse_db_user, password="", host="/var/run/postgresql", port="5432")
cur = conn.cursor()
# get world servers list
cur.execute("UPDATE world SET checked='f'")
cur.execute("select server from world where server in (select server from fediverse where users_api != '')")
alive_servers = []
for row in cur:
alive_servers.append(row[0])
conn.commit()
cur.close()
@ -358,8 +402,6 @@ def get_last_checked_servers():
conn.close()
return alive_servers
def mastodon():
# Load secrets from secrets file
@ -416,9 +458,6 @@ def get_parameter(parameter, file_path):
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
###############################################################################
# main
if __name__ == '__main__':
# usage modes
@ -431,36 +470,6 @@ if __name__ == '__main__':
if sys.argv[1] == '--multi':
set_start_method("spawn")
now = datetime.now()
mastodon, mastodon_hostname = mastodon()
fediverse_db, fediverse_db_user = db_config()
total_servers = 0
total_users = 0
set_world_servers_check_to_false()
alive_servers = get_last_checked_servers()
getservers = Server()
getservers.now = datetime.now()
###########################################################################
# multiprocessing!
with get_context("spawn").Pool(processes=32) as pool:
res = pool.starmap(getservers.get_alive_servers, product(alive_servers))
pool.close()
pool.join()
elif sys.argv[1] == '--mono':
now = datetime.now()
mastodon, mastodon_hostname = mastodon()
@ -479,19 +488,42 @@ if __name__ == '__main__':
getservers.now = datetime.now()
i = 0
start = time.time()
while i < len(alive_servers):
results = ray.get([getservers.get_alive_servers.remote(server) for server in alive_servers])
getservers.server = alive_servers[i]
print(f"duration = {time.time() - start}.\nprocessed servers: {len(results)}")
getservers.get_alive_servers()
all_servers = len(results)
i += 1
server_num = 1
for server in results:
servername = server[0]
software = server[1]
soft_version = server[2]
alive = server[3]
api = server[4]
users = server[5]
downs = server[6]
first_checked_at = server[7]
if alive:
print(f'** Saving server {server_num} of {all_servers}: {servername}, alive')
write_alive_server(servername, software, soft_version, alive, api, users, downs, first_checked_at)
else:
usage()
print(f'-- Saving server {server_num} of {all_servers}: {servername}, not alive')
first_checked_at = now if first_checked_at == None else first_checked_at
write_not_alive_server(servername, software, soft_version, alive, api, users, downs, first_checked_at)
server_num += 1
###########################################################################
# get current total servers and users, get users from every software
@ -507,6 +539,7 @@ if __name__ == '__main__':
try:
conn = None
conn = psycopg2.connect(database=fediverse_db, user=fediverse_db_user, password="", host="/var/run/postgresql", port="5432")
cur = conn.cursor()
@ -516,14 +549,19 @@ if __name__ == '__main__':
row = cur.fetchone()
total_servers = row[0]
total_users = row[1]
cur.execute(get_soft_totals_sql)
rows = cur.fetchall()
for row in rows:
soft_total_project.append(row[0])
soft_total_users.append(row[1])
soft_total_servers.append(row[2])
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
@ -540,11 +578,13 @@ if __name__ == '__main__':
# get last check values and write current total ones
select_sql = "select total_servers, total_users from totals order by datetime desc limit 1"
insert_sql = "INSERT INTO totals(datetime, total_servers, total_users) VALUES(%s,%s,%s)"
try:
conn = None
conn = psycopg2.connect(database=fediverse_db, user=fediverse_db_user, password="", host="/var/run/postgresql", port="5432")
cur = conn.cursor()
@ -556,11 +596,13 @@ if __name__ == '__main__':
if row is not None:
servers_before = row[0]
users_before = row[1]
else:
servers_before = 0
users_before = 0
cur.execute(insert_sql, (now, total_servers, total_users))
@ -570,6 +612,7 @@ if __name__ == '__main__':
cur.close()
evo_servers = total_servers - servers_before
evo_users = total_users - users_before
except (Exception, psycopg2.DatabaseError) as error:
@ -627,6 +670,7 @@ if __name__ == '__main__':
row = cur.fetchone()
last_update = row[0]
last_update = last_update.strftime('%m/%d/%Y, %H:%M:%S')
cur.close()
@ -764,30 +808,45 @@ if __name__ == '__main__':
# T O O T !
toot_text = "#fediverse alive servers stats" + " \n"
toot_text += "\n"
if evo_servers >= 0:
toot_text += "alive servers: " + str(f"{total_servers:,}") + " (+"+ str(f"{evo_servers:,}") + ") \n"
toot_text += "max: " + str(f"{max_servers:,}") + "\n"
elif evo_servers < 0:
toot_text += "alive servers: " + str(f"{total_servers:,}") + " ("+ str(f"{evo_servers:,}") + ") \n"
toot_text += "max: " + str(f"{max_servers:,}") + "\n"
if evo_users >= 0:
toot_text += "total users: " + str(f"{total_users:,}") + " (+"+ str(f"{evo_users:,}") + ") \n"
toot_text += "max: " + str(f"{max_users:,}") + "\n"
elif evo_users < 0:
toot_text += "total users: " + str(f"{total_users:,}") + " ("+ str(f"{evo_users:,}") + ") \n"
toot_text += "max: " + str(f"{max_users:,}") + "\n"
toot_text += "\n"
toot_text += "top ten (soft users servers):" + " \n"
toot_text += "\n"
if evo_servers >= 0:
toot_text += "alive servers: " + str(f"{total_servers:,}") + " (+"+ str(f"{evo_servers:,}") + ") \n"
toot_text += "max: " + str(f"{max_servers:,}") + "\n"
elif evo_servers < 0:
toot_text += "alive servers: " + str(f"{total_servers:,}") + " ("+ str(f"{evo_servers:,}") + ") \n"
toot_text += "max: " + str(f"{max_servers:,}") + "\n"
if evo_users >= 0:
toot_text += "total users: " + str(f"{total_users:,}") + " (+"+ str(f"{evo_users:,}") + ") \n"
toot_text += "max: " + str(f"{max_users:,}") + "\n"
elif evo_users < 0:
toot_text += "total users: " + str(f"{total_users:,}") + " ("+ str(f"{evo_users:,}") + ") \n"
toot_text += "max: " + str(f"{max_users:,}") + "\n"
toot_text += "\ntop ten (soft users servers):\n\n"
i = 0
while i < 10:
project_soft = soft_total_project[i]
project_users = soft_total_users[i]
project_servers = soft_total_servers[i]
len_pr_soft = len(project_soft)
toot_text += f":{project_soft}: {project_users:,} {project_servers:,}\n"
@ -795,6 +854,7 @@ if __name__ == '__main__':
i += 1
print("Tooting...")
print(toot_text)
servers_image_id = mastodon.media_post('servers.png', "image/png", description='servers graph').id
@ -802,3 +862,9 @@ if __name__ == '__main__':
users_image_id = mastodon.media_post('users.png', "image/png", description='users graph').id
mastodon.status_post(toot_text, in_reply_to_id=None, media_ids={servers_image_id, users_image_id})
delete_dead_servers()
else:
usage()

Veure arxiu

@ -4,3 +4,6 @@ aiohttp>=3.6.2
aiodns>=2.0.0
matplotlib>=3.3.4
humanfriendly>=9.2
urllib3>=1.26.8
requests>=2.27.1
ray>=1.11.0