fetchseervers now uses Ray library & getpeers refactor

This commit is contained in:
spla 2022-03-13 18:06:07 +01:00
pare 0ad0003d49
commit 93b3a4334c
S'han modificat 3 arxius amb 241 adicions i 184 eliminacions

Veure arxiu

@ -162,8 +162,8 @@ if __name__ == '__main__':
sql = "create table "+table+" (datetime timestamptz PRIMARY KEY, servers INT, users INT)" sql = "create table "+table+" (datetime timestamptz PRIMARY KEY, servers INT, users INT)"
create_table(db, db_user, table, sql) create_table(db, db_user, table, sql)
table = "closed" table = "execution_time"
sql = "create table "+table+" (server varchar(200) PRIMARY KEY, software varchar(10), closed boolean)" sql = "create table "+table+" (program varchar(20) PRIMARY KEY, start timestamptz, finish timestamptz)"
create_table(db, db_user, table, sql) create_table(db, db_user, table, sql)
##################################### #####################################

Veure arxiu

@ -1,6 +1,3 @@
from multiprocessing import set_start_method
from multiprocessing import get_context
from itertools import product
import time import time
from datetime import datetime from datetime import datetime
import os import os
@ -8,24 +5,35 @@ import json
import sys import sys
import os.path import os.path
import psycopg2 import psycopg2
import aiohttp import requests
import asyncio import urllib3
import socket import socket
import pdb import ray
client_exceptions = ( ray.init(num_cpus = 32) # Specify this system CPUs.
aiohttp.ClientResponseError,
aiohttp.ClientConnectionError, apis = ['/api/v1/instance?',
aiohttp.ClientConnectorError, '/nodeinfo/2.0?',
aiohttp.ClientError, '/nodeinfo/2.0.json?',
asyncio.TimeoutError, '/main/nodeinfo/2.0?',
socket.gaierror, '/api/statusnet/config?',
) '/api/nodeinfo/2.0.json?',
'/api/nodeinfo?',
'/wp-json/nodeinfo/2.0?',
'/api/v1/instance/nodeinfo/2.0?',
'/.well-known/x-nodeinfo2?'
]
def is_json(myjson):
try:
json_object = json.loads(myjson)
except ValueError as e:
return False
return True
def write_api(server, software, users, alive, api, soft_version): def write_api(server, software, users, alive, api, soft_version):
now = datetime.now()
fediverse_db, fediverse_db_user = get_db_config() fediverse_db, fediverse_db_user = get_db_config()
insert_sql = "INSERT INTO fediverse(server, updated_at, software, users, alive, users_api, version) VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING" insert_sql = "INSERT INTO fediverse(server, updated_at, software, users, alive, users_api, version) VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING"
@ -63,80 +71,139 @@ def write_api(server, software, users, alive, api, soft_version):
conn.close() conn.close()
async def getsoft(server): @ray.remote
def getsoft(server):
fediverse_db, fediverse_db_user = get_db_config() if server.find(".") == -1:
return
try: if server.find("@") != -1:
return
socket.gethostbyname(server) if server.find("/") != -1:
return
except socket.gaierror as g_error: if server.find(":") != -1:
print(f'Server {server} error: {g_error}')
pass
return return
soft = '' soft = ''
is_nodeinfo = False
url = 'https://' + server url = 'https://' + server
user_agent = {'User-agent': "fediverse's stats (fediverse@mastodont.cat)"} user_agent = {'User-agent': "fediverse's stats (fediverse@mastodont.cat)"}
timeout = aiohttp.ClientTimeout(total=3) try:
async with aiohttp.ClientSession(timeout=timeout, headers=user_agent) as session: response = requests.get(url + '/.well-known/nodeinfo', headers = user_agent, timeout=3)
if response.status_code == 200:
try: try:
async with session.get(url + '/.well-known/nodeinfo') as response: response_json = response.json()
if response.status == 200:
try:
response_json = await response.json()
nodeinfo = response_json['links'][0]['href'].replace(f'https://{server}','') nodeinfo = response_json['links'][0]['href'].replace(f'https://{server}','')
try:
nodeinfo_data = requests.get(url + nodeinfo, headers = user_agent, timeout=3)
if nodeinfo_data.status_code == 200:
nodeinfo_json = nodeinfo_data.json()
is_nodeinfo = True
else:
print(f"Server {server}'s nodeinfo not responding: error code {nodeinfo_data.status_code}")
except: except:
pass
except:
print(f'Server {server} not responding: error code {response.status_code}')
print('*********************************************************************')
pass pass
else: else:
print(f'Server {server} not responding: {response.status}') for api in apis:
pass
async with session.get(url + nodeinfo) as nodeinfo_response:
if nodeinfo_response.status == 200:
try: try:
nodeinfo_json = await nodeinfo_response.json() response = requests.get(url + api, headers = user_agent, timeout=3)
if is_json(response.text):
nodeinfo_json = response.json()
if 'software' in nodeinfo_json:
nodeinfo = api
is_nodeinfo = True
break
elif 'title' in nodeinfo_json:
if nodeinfo_json['title'] == 'Zap':
nodeinfo = api
is_nodeinfo = True
soft = 'zap'
break
elif 'version' in nodeinfo_json:
nodeinfo = api
is_nodeinfo = True
break
except: except:
pass pass
else: except requests.exceptions.SSLError as errssl:
print(f"Server {server}'s nodeinfo not responding: {response.status}")
pass pass
except aiohttp.ClientConnectorError as cc_err: except requests.exceptions.HTTPError as errh:
pass pass
except aiohttp.client_exceptions.ClientConnectorSSLError as ccssl_as: except requests.exceptions.ConnectionError as errc:
pass
except requests.exceptions.ReadTimeout as to_err:
pass
except requests.exceptions.TooManyRedirects as tmr_err:
pass
except urllib3.exceptions.LocationParseError as lp_err:
pass
except requests.exceptions.InvalidURL as iu_err:
pass pass
else: else:
if nodeinfo_response.status == 200 and nodeinfo != '/api/v1/instance?': if is_nodeinfo:
if nodeinfo != '/api/v1/instance?':
if nodeinfo != '/.well-known/x-nodeinfo2?': if nodeinfo != '/.well-known/x-nodeinfo2?':
@ -152,8 +219,7 @@ async def getsoft(server):
write_api(server, soft, users, alive, nodeinfo, soft_version) write_api(server, soft, users, alive, nodeinfo, soft_version)
print('*********************************************************************') print(f"Server {server} ({soft} {soft_version}) is alive!")
print("Server " + server + " (" + soft + " " + soft_version + ") is alive!")
print('*********************************************************************') print('*********************************************************************')
return return
@ -176,10 +242,10 @@ async def getsoft(server):
if soft == 'socialhome': if soft == 'socialhome':
write_api(server, soft, users, alive, api, soft_version) write_api(server, soft, users, alive, nodeinfo, soft_version)
print('*********************************************************************') print('*********************************************************************')
print("Server " + server + " (" + soft + " " + soft_version + ") is alive!") print(f"Server {serve}r ({soft} {soft_version}) is alive!")
print('*********************************************************************') print('*********************************************************************')
return return
@ -188,49 +254,53 @@ async def getsoft(server):
pass pass
if nodeinfo_response.status == 200 and soft == '' and nodeinfo == "/api/v1/instance?": if soft == '' and nodeinfo == "/api/v1/instance?":
soft = 'mastodon' soft = 'mastodon'
try:
users = nodeinfo_json['stats']['user_count'] users = nodeinfo_json['stats']['user_count']
soft_version = nodeinfo_json['version']
if users > 1000000: if users > 1000000:
return return
except:
users = 0
try:
soft_version = nodeinfo_json['version']
except:
soft_version = 'unknown'
alive = True alive = True
write_api(server, soft, users, alive, api) write_api(server, soft, users, alive, nodeinfo, soft_version)
print('*********************************************************************') print('*********************************************************************')
print("Server " + server + " (" + soft + ") is alive!") print(f"Server {server} ({soft}) is alive!")
elif soft == 'zap' and nodeinfo == "/api/v1/instance?":
soft = 'zap'
users = nodeinfo_json['stats']['user_count']
soft_version = nodeinfo_json['version']
alive = True
print(server, soft, users, alive, api)
print('*********************************************************************') print('*********************************************************************')
print(f"Server {server} ({soft}) is alive!")
else: else:
print(f'Server {server} is dead') print(f'Server {server} is dead')
print('*********************************************************************')
def getserver(server, *args):
if len(args) != 0:
server = server[0].rstrip('.').lower()
if server.find(".") == -1:
return
if server.find("@") != -1:
return
if server.find("/") != -1:
return
if server.find(":") != -1:
return
try:
loop = asyncio.get_event_loop()
coroutines = [getsoft(server)]
soft = loop.run_until_complete(asyncio.gather(*coroutines, return_exceptions=True))
except:
pass
def get_world_servers(): def get_world_servers():
@ -246,7 +316,7 @@ def get_world_servers():
# get world servers list # get world servers list
cur.execute("select server from world where checked='f'") cur.execute("select server from world")# where checked='f'")
rows = cur.fetchall() rows = cur.fetchall()
@ -287,11 +357,6 @@ def get_parameter(parameter, file_path):
print(file_path + " Missing parameter %s " % parameter) print(file_path + " Missing parameter %s " % parameter)
sys.exit(0) sys.exit(0)
def usage():
print('usage: python ' + sys.argv[0] + ' --multi' + ' (multiprocessing, fast)')
print('usage: python ' + sys.argv[0] + ' --mono' + ' (one process, slow)')
def get_config(): def get_config():
# Load configuration from config file # Load configuration from config file
@ -312,50 +377,18 @@ def get_db_config():
if __name__ == '__main__': if __name__ == '__main__':
# usage modes ## name: fetchservers.py
if len(sys.argv) == 1:
usage()
elif len(sys.argv) == 2:
if sys.argv[1] == '--multi':
now = datetime.now() now = datetime.now()
start_time = time.time()
mastodon_hostname = get_config() mastodon_hostname = get_config()
fediverse_db, fediverse_db_user = get_db_config() fediverse_db, fediverse_db_user = get_db_config()
world_servers = get_world_servers() world_servers = get_world_servers()
with get_context("spawn").Pool(processes=32) as pool: start = time.time()
res = pool.starmap(getserver, product(world_servers)) results = ray.get([getsoft.remote(server) for server in world_servers])
pool.close() print(f"duration = {time.time() - start}.\nprocessed servers: {len(results)}")
pool.join()
print('Done.')
elif sys.argv[1] == '--mono':
now = datetime.now()
start_time = time.time()
mastodon_hostname = get_config()
fediverse_db, fediverse_db_user = get_db_config()
world_servers = get_world_servers()
for server in world_servers:
getserver(server)
print('Done.')

Veure arxiu

@ -6,14 +6,11 @@ import requests
import json import json
import psycopg2 import psycopg2
import ray import ray
import pdb
ray.init(num_cpus = 32) # Specify this system CPUs. ray.init(num_cpus = 32) # Specify this system CPUs.
def write_servers(world_servers): def write_server(server, federated_with):
i = 0
while i < len(world_servers):
insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING"
@ -25,9 +22,9 @@ def write_servers(world_servers):
cur = conn.cursor() cur = conn.cursor()
cur.execute(insert_sql, (world_servers[i], peer, now, now,)) cur.execute(insert_sql, (server, federated_with, now, now,))
print(f'writing to database {str(i)} of {str(len(world_servers))}: {world_servers[i]}') print(f'writing {server} to world database')
conn.commit() conn.commit()
@ -43,8 +40,6 @@ def write_servers(world_servers):
conn.close() conn.close()
i += 1
@ray.remote @ray.remote
def get_peers(peer): def get_peers(peer):
@ -64,11 +59,7 @@ def get_peers(peer):
for peer_peer in response_json: for peer_peer in response_json:
exist_count = world_peers.count(peer_peer) write_server(peer_peer, peer)
if exist_count == 0:
world_peers.append(peer_peer)
except: except:
@ -77,6 +68,36 @@ def get_peers(peer):
pass pass
def save_time(program, start, finish):
insert_sql = "INSERT INTO execution_time(program, start, finish) VALUES(%s,%s,%s) ON CONFLICT DO NOTHING"
conn = None
try:
conn = psycopg2.connect(database = fediverse_db, user = fediverse_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute(insert_sql, (program, start, finish,))
cur.execute("UPDATE execution_time SET start=(%s), finish=(%s) where program=(%s)", (start, finish, program))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_parameter( parameter, file_path ): def get_parameter( parameter, file_path ):
# Check if secrets file exists # Check if secrets file exists
@ -112,27 +133,30 @@ if __name__ == '__main__':
fediverse_db = get_parameter("fediverse_db", db_config_filepath) fediverse_db = get_parameter("fediverse_db", db_config_filepath)
fediverse_db_user = get_parameter("fediverse_db_user", db_config_filepath) fediverse_db_user = get_parameter("fediverse_db_user", db_config_filepath)
world_peers = []
user_agent = {'User-agent': "fediverse's stats (fediverse@mastodont.cat)"} user_agent = {'User-agent': "fediverse's stats (fediverse@mastodont.cat)"}
res = requests.get('https://' + mastodon_hostname + peers_api, headers = user_agent, timeout=3) res = requests.get('https://' + mastodon_hostname + peers_api, headers = user_agent, timeout=3)
hostname_peers = res.json() hostname_peers = res.json()
start = datetime.now()
program = 'getpeers'
finish = start
save_time(program, start, finish)
for peer in hostname_peers: for peer in hostname_peers:
exist_count = world_peers.count(peer) write_server(peer, mastodon_hostname)
if exist_count == 0:
world_peers.append(peer)
start = time.time()
results = ray.get([get_peers.remote(server) for server in hostname_peers]) results = ray.get([get_peers.remote(server) for server in hostname_peers])
print(f"duration = {time.time() - start}.\nprocessed servers: {len(results)}") finish = datetime.now()
print(f"duration = {finish - start}.\nprocessed servers: {len(results)}")
save_time(program, start, finish)
write_servers(world_peers)