diff --git a/getpeers.py b/getpeers.py index 9114f59..d6c655c 100644 --- a/getpeers.py +++ b/getpeers.py @@ -3,19 +3,18 @@ import sys import time from datetime import datetime import requests -import threading import json import psycopg2 -import pdb - +import ray + +ray.init(num_cpus = 32) # Specify this system CPUs. + def write_servers(world_servers): i = 0 while i < len(world_servers): - saved_at = datetime.now() - insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING" conn = None @@ -26,7 +25,7 @@ def write_servers(world_servers): cur = conn.cursor() - cur.execute(insert_sql, (world_servers[i], peer, updated_at, saved_at,)) + cur.execute(insert_sql, (world_servers[i], peer, now, now,)) print(f'writing to database {str(i)} of {str(len(world_servers))}: {world_servers[i]}') @@ -46,6 +45,7 @@ def write_servers(world_servers): i += 1 +@ray.remote def get_peers(peer): try: @@ -69,6 +69,7 @@ def get_peers(peer): if exist_count == 0: world_peers.append(peer_peer) + except: pass @@ -76,20 +77,8 @@ def get_peers(peer): pass -class Peerthreads(threading.Thread): - def __init__(self, peer, peers_semaphore): - threading.Thread.__init__(self) - self.sql_conn = None - self.peer = peer - self.peers_semaphore = peers_semaphore - - def run(self): - self.peers_semaphore.acquire() - get_peers(self.peer) - self.peers_semaphore.release() - -# Returns the parameter from the specified file def get_parameter( parameter, file_path ): + # Check if secrets file exists if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) @@ -110,9 +99,7 @@ def get_parameter( parameter, file_path ): if __name__ == '__main__': - start_time = time.time() - - updated_at = datetime.now() + now = datetime.now() peers_api = '/api/v1/instance/peers?' @@ -141,16 +128,11 @@ if __name__ == '__main__': world_peers.append(peer) - peers_semaphore = threading.Semaphore(100) - peer_threads = [] - for peer in hostname_peers: - p_thread = Peerthreads(peer, peers_semaphore) - peer_threads.append(p_thread) - p_thread.start() - for p_thread in peer_threads: - p_thread.join() + start = time.time() + + results = ray.get([get_peers.remote(server) for server in hostname_peers]) + + print(f"duration = {time.time() - start}.\nprocessed servers: {len(results)}") write_servers(world_peers) - exec_time = str(round((time.time() - start_time), 2)) - print(exec_time)