blocksoft/app/libraries/peers.py

265 líneas
6,6 KiB
Python
Original Vista normal Històric

2023-04-13 18:01:18 +02:00
import time
from datetime import datetime
import os
import json
import sys
import os.path
from app.libraries.setup import Setup
from app.libraries.database import Database
import requests
import urllib3
import socket
class Peers():
name = "Get peers from server"
def __init__(self, server=None, db=None, setup=None):
self.server = server
self.db = Database()
self.setup = Setup()
def getpeers(self, server):
try:
res = requests.get(f'https://{server}/{self.setup.peers_api}, headers = self.setup.user_agent, timeout=3')
peers = res.json()
if isinstance(peers, list):
self.db.update_peer(server)
return peers
else:
return None
except requests.exceptions.SSLError as errssl:
pass
except requests.exceptions.HTTPError as errh:
pass
except requests.exceptions.ConnectionError as errc:
pass
except requests.exceptions.ReadTimeout as to_err:
pass
except requests.exceptions.TooManyRedirects as tmr_err:
pass
except urllib3.exceptions.LocationParseError as lp_err:
pass
except requests.exceptions.InvalidURL as iu_err:
pass
except requests.exceptions.ChunkedEncodingError as chunk_err:
print(f'ChunkedEncodingError! {server}')
pass
except requests.exceptions.JSONDecodeError as jsondec_err:
print(jsondec_err)
pass
print('*** Not peers api!')
return None
def getapi(self, server):
if server.find(".") == -1:
return
if server.find("@") != -1:
return
if server.find("/") != -1:
return
if server.find(":") != -1:
return
is_nodeinfo = False
nodeinfo = ''
url = 'https://' + server
try:
response = requests.get(url + '/.well-known/nodeinfo', headers = self.setup.user_agent, timeout=3)
if response.status_code == 200:
try:
response_json = response.json()
if len(response_json['links']) == 1:
start = [pos for pos, char in enumerate(response_json['links'][0]['href']) if char == '/'][1]
end = [pos for pos, char in enumerate(response_json['links'][0]['href']) if char == '/'][2]
server = response_json['links'][0]['href'][start+1:end]
server_idx = response_json['links'][0]['href'].index(server)
nodeinfo = response_json['links'][0]['href'][server_idx:].replace(server, '')
elif len(response_json['links']) == 2:
start = [pos for pos, char in enumerate(response_json['links'][1]['href']) if char == '/'][1]
end = [pos for pos, char in enumerate(response_json['links'][1]['href']) if char == '/'][2]
server = response_json['links'][1]['href'][start+1:end]
server_idx = response_json['links'][1]['href'].index(server)
nodeinfo = response_json['links'][1]['href'][server_idx:].replace(server, '')
except:
pass
except requests.exceptions.ChunkedEncodingError as chunk_err:
pass
except requests.exceptions.InvalidSchema as invalid_err:
pass
except requests.exceptions.SSLError as errssl:
pass
except requests.exceptions.HTTPError as errh:
pass
except requests.exceptions.ConnectionError as errc:
pass
except requests.exceptions.ReadTimeout as to_err:
pass
except requests.exceptions.TooManyRedirects as tmr_err:
pass
except urllib3.exceptions.LocationParseError as lp_err:
pass
except requests.exceptions.InvalidURL as iu_err:
pass
except requests.exceptions.ChunkedEncodingError as chunk_err:
print(f'ChunkedEncodingError! {server}')
pass
return nodeinfo
def updateapi(self, server):
if server.find(".") == -1:
return
if server.find("@") != -1:
return
if server.find("/") != -1:
return
if server.find(":") != -1:
return
nodeinfo = ''
url = 'https://' + server
try:
response = requests.get(url + '/.well-known/nodeinfo', headers = self.setup.user_agent, timeout=3)
if response.status_code == 200:
try:
response_json = response.json()
if len(response_json['links']) == 1:
start = [pos for pos, char in enumerate(response_json['links'][0]['href']) if char == '/'][1]
end = [pos for pos, char in enumerate(response_json['links'][0]['href']) if char == '/'][2]
server = response_json['links'][0]['href'][start+1:end]
server_idx = response_json['links'][0]['href'].index(server)
nodeinfo = response_json['links'][0]['href'][server_idx:].replace(server, '')
elif len(response_json['links']) == 2:
start = [pos for pos, char in enumerate(response_json['links'][1]['href']) if char == '/'][1]
end = [pos for pos, char in enumerate(response_json['links'][1]['href']) if char == '/'][2]
server = response_json['links'][1]['href'][start+1:end]
server_idx = response_json['links'][1]['href'].index(server)
nodeinfo = response_json['links'][1]['href'][server_idx:].replace(server, '')
except:
pass
except requests.exceptions.SSLError as errssl:
pass
except requests.exceptions.HTTPError as errh:
pass
except requests.exceptions.ConnectionError as errc:
pass
except requests.exceptions.ReadTimeout as to_err:
pass
except requests.exceptions.TooManyRedirects as tmr_err:
pass
except urllib3.exceptions.LocationParseError as lp_err:
pass
except requests.exceptions.InvalidURL as iu_err:
pass
except requests.exceptions.ChunkedEncodingError as chunk_err:
print(f'ChunkedEncodingError! {server}')
pass
return server, nodeinfo