Add async autoreconnect

This commit is contained in:
Lorenz Diener 2018-04-17 14:35:09 +02:00
pare 86ec5d7eca
commit d0ae9dcd05
S'han modificat 2 arxius amb 59 adicions i 29 eliminacions

Veure arxiu

@ -1387,45 +1387,45 @@ class Mastodon:
# Streaming # Streaming
### ###
@api_version("1.1.0", "1.4.2") @api_version("1.1.0", "1.4.2")
def stream_user(self, listener, async=False): def stream_user(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
""" """
Streams events that are relevant to the authorized user, i.e. home Streams events that are relevant to the authorized user, i.e. home
timeline and notifications. timeline and notifications.
""" """
return self.__stream('/api/v1/streaming/user', listener, async=async) return self.__stream('/api/v1/streaming/user', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("1.1.0", "1.4.2") @api_version("1.1.0", "1.4.2")
def stream_public(self, listener, async=False): def stream_public(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
""" """
Streams public events. Streams public events.
""" """
return self.__stream('/api/v1/streaming/public', listener, async=async) return self.__stream('/api/v1/streaming/public', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("1.1.0", "1.4.2") @api_version("1.1.0", "1.4.2")
def stream_local(self, listener, async=False): def stream_local(self, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
""" """
Streams local public events. Streams local public events.
""" """
return self.__stream('/api/v1/streaming/public/local', listener, async=async) return self.__stream('/api/v1/streaming/public/local', listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("1.1.0", "1.4.2") @api_version("1.1.0", "1.4.2")
def stream_hashtag(self, tag, listener, async=False): def stream_hashtag(self, tag, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
""" """
Stream for all public statuses for the hashtag 'tag' seen by the connected Stream for all public statuses for the hashtag 'tag' seen by the connected
instance. instance.
""" """
if tag.startswith("#"): if tag.startswith("#"):
raise MastodonIllegalArgumentError("Tag parameter should omit leading #") raise MastodonIllegalArgumentError("Tag parameter should omit leading #")
return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async) return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("2.1.0", "2.1.0") @api_version("2.1.0", "2.1.0")
def stream_list(self, id, listener, async=False): def stream_list(self, id, listener, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
""" """
Stream events for the current user, restricted to accounts on the given Stream events for the current user, restricted to accounts on the given
list. list.
""" """
id = self.__unpack_id(id) id = self.__unpack_id(id)
return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async) return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
### ###
# Internal helpers, dragons probably # Internal helpers, dragons probably
@ -1667,7 +1667,7 @@ class Mastodon:
return response return response
def __stream(self, endpoint, listener, params={}, async=False): def __stream(self, endpoint, listener, params={}, async=False, reconnect_async=False, reconnect_async_wait_sec=5):
""" """
Internal streaming API helper. Internal streaming API helper.
@ -1695,18 +1695,27 @@ class Mastodon:
# The streaming server can't handle two slashes in a path, so remove trailing slashes # The streaming server can't handle two slashes in a path, so remove trailing slashes
if url[-1] == '/': if url[-1] == '/':
url = url[:-1] url = url[:-1]
# Connect function (called and then potentially passed to async handler)
def connect_func():
headers = {"Authorization": "Bearer " + self.access_token}
connection = requests.get(url + endpoint, headers = headers, data = params, stream = True)
headers = {"Authorization": "Bearer " + self.access_token} if connection.status_code != 200:
connection = requests.get(url + endpoint, headers = headers, data = params, stream = True) raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason)
return connection
if connection.status_code != 200: connection = connect_func()
raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason)
# Async stream handler
class __stream_handle(): class __stream_handle():
def __init__(self, connection): def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec):
self.closed = False self.closed = False
self.running = True
self.connection = connection self.connection = connection
self.connect_func = connect_func
self.reconnect_async = reconnect_async
self.reconnect_async_wait_sec = reconnect_async_wait_sec
def close(self): def close(self):
self.closed = True self.closed = True
self.connection.close() self.connection.close()
@ -1716,17 +1725,39 @@ class Mastodon:
def _threadproc(self): def _threadproc(self):
self._thread = threading.current_thread() self._thread = threading.current_thread()
with closing(connection) as r:
try: # Run until closed or until error if not autoreconnecting
listener.handle_stream(r) while self.running:
except AttributeError as e: with closing(self.connection) as r:
if not self.closed: try:
raise e listener.handle_stream(r)
except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e:
if not (self.closed or self.reconnect_async):
raise e
else:
if self.closed:
self.running = False
# Reconnect loop. Try immediately once, then with delays on error.
if self.reconnect_async and not self.closed:
connect_success = False
while not connect_success:
connect_success = True
try:
self.connection = self.connect_func()
if self.connection.status_code != 200:
time.sleep(self.reconnect_async_wait_sec)
connect_success = False
except:
time.sleep(self.reconnect_async_wait_sec)
connect_success = False
else:
self.running = False
return 0 return 0
handle = __stream_handle(connection)
if async: if async:
handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec)
t = threading.Thread(args=(), daemon = True, target=handle._threadproc) t = threading.Thread(args=(), daemon = True, target=handle._threadproc)
t.start() t.start()
return handle return handle

Veure arxiu

@ -6,7 +6,7 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A
import json import json
import six import six
from mastodon import Mastodon from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError
from requests.exceptions import ChunkedEncodingError from requests.exceptions import ChunkedEncodingError
class StreamListener(object): class StreamListener(object):
@ -109,7 +109,6 @@ class StreamListener(object):
err err
) )
else: else:
# TODO: allow handlers to return/raise to stop streaming cleanly
handler(payload) handler(payload)
class CallbackStreamListener(StreamListener): class CallbackStreamListener(StreamListener):