Merge pull request #128 from codl/stream-timeout

add timeouts to streams
This commit is contained in:
Lorenz Diener 2018-05-06 15:58:37 +02:00 cometido por GitHub
commit 1a62b6a5a6
No se encontró ninguna clave conocida en la base de datos para esta firma
ID de clave GPG: 4AEE18F83AFDEB23
S'han modificat 2 arxius amb 27 adicions i 15 eliminacions

Veure arxiu

@ -89,6 +89,7 @@ class Mastodon:
""" """
__DEFAULT_BASE_URL = 'https://mastodon.social' __DEFAULT_BASE_URL = 'https://mastodon.social'
__DEFAULT_TIMEOUT = 300 __DEFAULT_TIMEOUT = 300
__DEFAULT_STREAM_TIMEOUT = 300
__DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5 __DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5
__SUPPORTED_MASTODON_VERSION = "2.3.0" __SUPPORTED_MASTODON_VERSION = "2.3.0"
@ -1430,45 +1431,45 @@ class Mastodon:
# Streaming # Streaming
### ###
@api_version("1.1.0", "1.4.2") @api_version("1.1.0", "1.4.2")
def stream_user(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): def stream_user(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
""" """
Streams events that are relevant to the authorized user, i.e. home Streams events that are relevant to the authorized user, i.e. home
timeline and notifications. timeline and notifications.
""" """
return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("1.1.0", "1.4.2") @api_version("1.1.0", "1.4.2")
def stream_public(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): def stream_public(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
""" """
Streams public events. Streams public events.
""" """
return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("1.1.0", "1.4.2") @api_version("1.1.0", "1.4.2")
def stream_local(self, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): def stream_local(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
""" """
Streams local public events. Streams local public events.
""" """
return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("1.1.0", "1.4.2") @api_version("1.1.0", "1.4.2")
def stream_hashtag(self, tag, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): def stream_hashtag(self, tag, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
""" """
Stream for all public statuses for the hashtag 'tag' seen by the connected Stream for all public statuses for the hashtag 'tag' seen by the connected
instance. instance.
""" """
if tag.startswith("#"): if tag.startswith("#"):
raise MastodonIllegalArgumentError("Tag parameter should omit leading #") raise MastodonIllegalArgumentError("Tag parameter should omit leading #")
return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("2.1.0", "2.1.0") @api_version("2.1.0", "2.1.0")
def stream_list(self, id, listener, run_async=False, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC): def stream_list(self, id, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
""" """
Stream events for the current user, restricted to accounts on the given Stream events for the current user, restricted to accounts on the given
list. list.
""" """
id = self.__unpack_id(id) id = self.__unpack_id(id)
return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec) return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
### ###
# Internal helpers, dragons probably # Internal helpers, dragons probably
@ -1710,7 +1711,7 @@ class Mastodon:
return response return response
def __stream(self, endpoint, listener, params={}, run_async=False, reconnect_async=False, reconnect_async_wait_sec=5): def __stream(self, endpoint, listener, params={}, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
""" """
Internal streaming API helper. Internal streaming API helper.
@ -1742,7 +1743,9 @@ class Mastodon:
# Connect function (called and then potentially passed to async handler) # Connect function (called and then potentially passed to async handler)
def connect_func(): def connect_func():
headers = {"Authorization": "Bearer " + self.access_token} headers = {"Authorization": "Bearer " + self.access_token}
connection = self.session.get(url + endpoint, headers = headers, data = params, stream = True) connection = self.session.get(url + endpoint, headers = headers, data = params, stream = True,
timeout=(self.request_timeout, timeout))
if connection.status_code != 200: if connection.status_code != 200:
raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason)
return connection return connection
@ -1912,6 +1915,10 @@ class MastodonNetworkError(MastodonIOError):
"""Raised when network communication with the server fails""" """Raised when network communication with the server fails"""
pass pass
class MastodonReadTimeout(MastodonNetworkError):
"""Raised when a stream times out"""
pass
class MastodonAPIError(MastodonError): class MastodonAPIError(MastodonError):
"""Raised when the mastodon API generates a response that cannot be handled""" """Raised when the mastodon API generates a response that cannot be handled"""

Veure arxiu

@ -6,8 +6,8 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A
import json import json
import six import six
from mastodon import Mastodon from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout
from requests.exceptions import ChunkedEncodingError from requests.exceptions import ChunkedEncodingError, ReadTimeout
class StreamListener(object): class StreamListener(object):
"""Callbacks for the streaming API. Create a subclass, override the on_xxx """Callbacks for the streaming API. Create a subclass, override the on_xxx
@ -68,6 +68,11 @@ class StreamListener(object):
MastodonNetworkError("Server ceased communication."), MastodonNetworkError("Server ceased communication."),
err err
) )
except MastodonReadTimeout as err:
six.raise_from(
MastodonReadTimeout("Timed out while reading from server."),
err
)
def _parse_line(self, line, event): def _parse_line(self, line, event):
if line.startswith(':'): if line.startswith(':'):