Merge pull request #239 from arittner/stream_close

Resilient stream.close handling and early close() while retry sleeps
This commit is contained in:
Aljoscha Rittner 2022-06-25 22:19:30 +02:00 cometido por GitHub
commit cab72d0489
No se encontró ninguna clave conocida en la base de datos para esta firma
ID de clave GPG: 4AEE18F83AFDEB23

Veure arxiu

@ -3610,7 +3610,8 @@ class Mastodon:
def close(self): def close(self):
self.closed = True self.closed = True
self.connection.close() if not self.connection is None:
self.connection.close()
def is_alive(self): def is_alive(self):
return self._thread.is_alive() return self._thread.is_alive()
@ -3621,6 +3622,14 @@ class Mastodon:
else: else:
return True return True
def _sleep_attentive(self):
if self._thread != threading.current_thread():
raise RuntimeError ("Illegal call from outside the stream_handle thread")
time_remaining = self.reconnect_async_wait_sec
while time_remaining>0 and not self.closed:
time.sleep(0.5)
time_remaining -= 0.5
def _threadproc(self): def _threadproc(self):
self._thread = threading.current_thread() self._thread = threading.current_thread()
@ -3642,16 +3651,26 @@ class Mastodon:
self.reconnecting = True self.reconnecting = True
connect_success = False connect_success = False
while not connect_success: while not connect_success:
connect_success = True if self.closed:
# Someone from outside stopped the streaming
self.running = False
break
try: try:
self.connection = self.connect_func() the_connection = self.connect_func()
if self.connection.status_code != 200: if the_connection.status_code != 200:
time.sleep(self.reconnect_async_wait_sec) exception = MastodonNetworkError(f"Could not connect to server. "
connect_success = False f"HTTP status: {the_connection.status_code}")
exception = MastodonNetworkError("Could not connect to server.")
listener.on_abort(exception) listener.on_abort(exception)
self._sleep_attentive()
if self.closed:
# Here we have maybe a rare race condition. Exactly on connect, someone
# stopped the streaming before. We close the previous established connection:
the_connection.close()
else:
self.connection = the_connection
connect_success = True
except: except:
time.sleep(self.reconnect_async_wait_sec) self._sleep_attentive()
connect_success = False connect_success = False
self.reconnecting = False self.reconnecting = False
else: else: