Changed Threads processing to Ray parallel processing
This commit is contained in:
pare
1636e86b6d
commit
0ad0003d49
S'han modificat 1 arxius amb 14 adicions i 32 eliminacions
44
getpeers.py
44
getpeers.py
|
@ -3,10 +3,11 @@ import sys
|
|||
import time
|
||||
from datetime import datetime
|
||||
import requests
|
||||
import threading
|
||||
import json
|
||||
import psycopg2
|
||||
import pdb
|
||||
import ray
|
||||
|
||||
ray.init(num_cpus = 32) # Specify this system CPUs.
|
||||
|
||||
def write_servers(world_servers):
|
||||
|
||||
|
@ -14,8 +15,6 @@ def write_servers(world_servers):
|
|||
|
||||
while i < len(world_servers):
|
||||
|
||||
saved_at = datetime.now()
|
||||
|
||||
insert_sql = "INSERT INTO world(server, federated_with, updated_at, saved_at) VALUES(%s,%s,%s,%s) ON CONFLICT DO NOTHING"
|
||||
|
||||
conn = None
|
||||
|
@ -26,7 +25,7 @@ def write_servers(world_servers):
|
|||
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute(insert_sql, (world_servers[i], peer, updated_at, saved_at,))
|
||||
cur.execute(insert_sql, (world_servers[i], peer, now, now,))
|
||||
|
||||
print(f'writing to database {str(i)} of {str(len(world_servers))}: {world_servers[i]}')
|
||||
|
||||
|
@ -46,6 +45,7 @@ def write_servers(world_servers):
|
|||
|
||||
i += 1
|
||||
|
||||
@ray.remote
|
||||
def get_peers(peer):
|
||||
|
||||
try:
|
||||
|
@ -69,6 +69,7 @@ def get_peers(peer):
|
|||
if exist_count == 0:
|
||||
|
||||
world_peers.append(peer_peer)
|
||||
|
||||
except:
|
||||
|
||||
pass
|
||||
|
@ -76,20 +77,8 @@ def get_peers(peer):
|
|||
|
||||
pass
|
||||
|
||||
class Peerthreads(threading.Thread):
|
||||
def __init__(self, peer, peers_semaphore):
|
||||
threading.Thread.__init__(self)
|
||||
self.sql_conn = None
|
||||
self.peer = peer
|
||||
self.peers_semaphore = peers_semaphore
|
||||
|
||||
def run(self):
|
||||
self.peers_semaphore.acquire()
|
||||
get_peers(self.peer)
|
||||
self.peers_semaphore.release()
|
||||
|
||||
# Returns the parameter from the specified file
|
||||
def get_parameter( parameter, file_path ):
|
||||
|
||||
# Check if secrets file exists
|
||||
if not os.path.isfile(file_path):
|
||||
print("File %s not found, exiting."%file_path)
|
||||
|
@ -110,9 +99,7 @@ def get_parameter( parameter, file_path ):
|
|||
|
||||
if __name__ == '__main__':
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
updated_at = datetime.now()
|
||||
now = datetime.now()
|
||||
|
||||
peers_api = '/api/v1/instance/peers?'
|
||||
|
||||
|
@ -141,16 +128,11 @@ if __name__ == '__main__':
|
|||
|
||||
world_peers.append(peer)
|
||||
|
||||
peers_semaphore = threading.Semaphore(100)
|
||||
peer_threads = []
|
||||
for peer in hostname_peers:
|
||||
p_thread = Peerthreads(peer, peers_semaphore)
|
||||
peer_threads.append(p_thread)
|
||||
p_thread.start()
|
||||
for p_thread in peer_threads:
|
||||
p_thread.join()
|
||||
start = time.time()
|
||||
|
||||
results = ray.get([get_peers.remote(server) for server in hostname_peers])
|
||||
|
||||
print(f"duration = {time.time() - start}.\nprocessed servers: {len(results)}")
|
||||
|
||||
write_servers(world_peers)
|
||||
|
||||
exec_time = str(round((time.time() - start_time), 2))
|
||||
print(exec_time)
|
||||
|
|
Loading…
Referencia en una nova incidència