diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 6ab9291..2e620ba 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -89,6 +89,7 @@ class Mastodon: """ __DEFAULT_BASE_URL = 'https://mastodon.social' __DEFAULT_TIMEOUT = 300 + __DEFAULT_STREAM_TIMEOUT = 300 __DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5 __SUPPORTED_MASTODON_VERSION = "2.3.0" @@ -1430,45 +1431,45 @@ class Mastodon: # Streaming ### @api_version("1.1.0", "1.4.2") - def stream_user(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_user(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. """ - return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_public(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_public(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams public events. """ - return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_local(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_local(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Streams local public events. """ - return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("1.1.0", "1.4.2") - def stream_hashtag(self, tag, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_hashtag(self, tag, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream for all public statuses for the hashtag 'tag' seen by the connected instance. """ if tag.startswith("#"): raise MastodonIllegalArgumentError("Tag parameter should omit leading #") - return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) @api_version("2.1.0", "2.1.0") - def stream_list(self, id, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): + def stream_list(self, id, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Stream events for the current user, restricted to accounts on the given list. """ id = self.__unpack_id(id) - return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) + return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) ### # Internal helpers, dragons probably @@ -1710,7 +1711,7 @@ class Mastodon: return response - def __stream(self, endpoint, listener, params={}, run_async=False, reconnect_async=False, reconnect_async_wait_sec=5): + def __stream(self, endpoint, listener, params={}, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): """ Internal streaming API helper. @@ -1742,7 +1743,9 @@ class Mastodon: # Connect function (called and then potentially passed to async handler) def connect_func(): headers = {"Authorization": "Bearer " + self.access_token} - connection = self.session.get(url + endpoint, headers = headers, data = params, stream = True) + connection = self.session.get(url + endpoint, headers = headers, data = params, stream = True, + timeout=(self.request_timeout, timeout)) + if connection.status_code != 200: raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) return connection @@ -1912,6 +1915,10 @@ class MastodonNetworkError(MastodonIOError): """Raised when network communication with the server fails""" pass +class MastodonReadTimeout(MastodonNetworkError): + """Raised when a stream times out""" + pass + class MastodonAPIError(MastodonError): """Raised when the mastodon API generates a response that cannot be handled""" diff --git a/mastodon/streaming.py b/mastodon/streaming.py index 1c73f48..3fbd569 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py @@ -6,8 +6,8 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A import json import six from mastodon import Mastodon -from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError -from requests.exceptions import ChunkedEncodingError +from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout +from requests.exceptions import ChunkedEncodingError, ReadTimeout class StreamListener(object): """Callbacks for the streaming API. Create a subclass, override the on_xxx @@ -68,7 +68,12 @@ class StreamListener(object): MastodonNetworkError("Server ceased communication."), err ) - + except MastodonReadTimeout as err: + six.raise_from( + MastodonReadTimeout("Timed out while reading from server."), + err + ) + def _parse_line(self, line, event): if line.startswith(':'): self.handle_heartbeat()