Merge branch 'master' into stream-timeout

This commit is contained in:
Lorenz Diener 2018-04-17 16:06:00 +02:00 cometido por GitHub
commit 2afc50c803
No se encontró ninguna clave conocida en la base de datos para esta firma
ID de clave GPG: 4AEE18F83AFDEB23
S'han modificat 4 arxius amb 110 adicions i 56 eliminacions

Veure arxiu

@ -725,18 +725,26 @@ Streaming
---------
These functions allow access to the streaming API.
If async is False, these methods block forever (or until an
exception is raised).
If `async` is False, these methods block forever (or until an error is encountered).
If async is True, the listener will listen on another thread and these methods
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling the handles close() method, and the
status of the connection can be verified calling is_alive() on the handle.
If `async` is True, the listener will listen on another thread and these methods
will return a handle corresponding to the open connection. If, in addition, `async_reconnect` is True,
the thread will attempt to reconnect to the streaming API if any errors are encountered, waiting
`async_reconnect_wait_sec` seconds between reconnection attempts. Note that no effort is made
to "catch up" - toots made while the connection is broken will not be received.
The connection may be closed at any time by calling the handles close() method. The
current status of the handler thread can be checked with the handles is_alive() function,
and the streaming status can be checked by calling is_receiving().
The streaming functions take instances of `StreamListener` as the `listener` parameter.
A `CallbackStreamListener` class that allows you to specify function callbacks
directly is included for convenience.
When in not-async mode or async mode without async_reconnect, the stream functions may raise
various exceptions: `MastodonMalformedEventError` if a received event cannot be parsed and
`MastodonNetworkError` if any connection problems occur.
.. automethod:: Mastodon.stream_user
.. automethod:: Mastodon.stream_public
.. automethod:: Mastodon.stream_local

Veure arxiu

@ -90,6 +90,7 @@ class Mastodon:
__DEFAULT_BASE_URL = 'https://mastodon.social'
__DEFAULT_TIMEOUT = 300
__DEFAULT_STREAM_TIMEOUT = 300
__DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5
__SUPPORTED_MASTODON_VERSION = "2.2.0"
###
@ -1388,45 +1389,45 @@ class Mastodon:
# Streaming
###
@api_version("1.1.0", "1.4.2")
def stream_user(self, listener, async=False):
def stream_user(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
"""
Streams events that are relevant to the authorized user, i.e. home
timeline and notifications.
"""
return self.__stream('/api/v1/streaming/user', listener, async=async)
return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("1.1.0", "1.4.2")
def stream_public(self, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT):
def stream_public(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
"""
Streams public events.
"""
return self.__stream('/api/v1/streaming/public', listener, async=async, timeout=timeout)
return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("1.1.0", "1.4.2")
def stream_local(self, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT):
def stream_local(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
"""
Streams local public events.
"""
return self.__stream('/api/v1/streaming/public/local', listener, async=async, timeout=timeout)
return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("1.1.0", "1.4.2")
def stream_hashtag(self, tag, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT):
def stream_hashtag(self, tag, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
"""
Stream for all public statuses for the hashtag 'tag' seen by the connected
instance.
"""
if tag.startswith("#"):
raise MastodonIllegalArgumentError("Tag parameter should omit leading #")
return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, timeout=timeout)
return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
@api_version("2.1.0", "2.1.0")
def stream_list(self, id, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT):
def stream_list(self, id, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
"""
Stream events for the current user, restricted to accounts on the given
list.
"""
id = self.__unpack_id(id)
return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, timeout=timeout)
return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
###
# Internal helpers, dragons probably
@ -1668,7 +1669,7 @@ class Mastodon:
return response
def __stream(self, endpoint, listener, params={}, async=False, timeout=__DEFAULT_STREAM_TIMEOUT):
def __stream(self, endpoint, listener, params={}, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
"""
Internal streaming API helper.
@ -1696,19 +1697,29 @@ class Mastodon:
# The streaming server can't handle two slashes in a path, so remove trailing slashes
if url[-1] == '/':
url = url[:-1]
headers = {"Authorization": "Bearer " + self.access_token}
connection = requests.get(url + endpoint, headers = headers, data = params, stream = True,
# Connect function (called and then potentially passed to async handler)
def connect_func():
headers = {"Authorization": "Bearer " + self.access_token}
connection = requests.get(url + endpoint, headers = headers, data = params, stream = True,
timeout=(self.request_timeout, timeout))
if connection.status_code != 200:
raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason)
if connection.status_code != 200:
raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason)
return connection
connection = connect_func()
# Async stream handler
class __stream_handle():
def __init__(self, connection):
def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec):
self.closed = False
self.running = True
self.connection = connection
self.connect_func = connect_func
self.reconnect_async = reconnect_async
self.reconnect_async_wait_sec = reconnect_async_wait_sec
self.reconnecting = False
def close(self):
self.closed = True
self.connection.close()
@ -1716,19 +1727,48 @@ class Mastodon:
def is_alive(self):
return self._thread.is_alive()
def is_receiving(self):
if self.closed or not self.running or self.reconnecting or not self.is_alive():
return False
else:
return True
def _threadproc(self):
self._thread = threading.current_thread()
with closing(connection) as r:
try:
listener.handle_stream(r)
except AttributeError as e:
if not self.closed:
raise e
# Run until closed or until error if not autoreconnecting
while self.running:
with closing(self.connection) as r:
try:
listener.handle_stream(r)
except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e:
if not (self.closed or self.reconnect_async):
raise e
else:
if self.closed:
self.running = False
# Reconnect loop. Try immediately once, then with delays on error.
if self.reconnect_async and not self.closed:
self.reconnecting = True
connect_success = False
while not connect_success:
connect_success = True
try:
self.connection = self.connect_func()
if self.connection.status_code != 200:
time.sleep(self.reconnect_async_wait_sec)
connect_success = False
except:
time.sleep(self.reconnect_async_wait_sec)
connect_success = False
self.reconnecting = False
else:
self.running = False
return 0
handle = __stream_handle(connection)
if async:
if run_async:
handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec)
t = threading.Thread(args=(), daemon = True, target=handle._threadproc)
t.start()
return handle

Veure arxiu

@ -1,4 +1,4 @@
from mastodon.Mastodon import Mastodon
from mastodon.Mastodon import Mastodon, MastodonError, MastodonVersionError, MastodonIllegalArgumentError, MastodonIOError, MastodonFileNotFoundError, MastodonNetworkError, MastodonAPIError, MastodonNotFoundError, MastodonUnauthorizedError, MastodonRatelimitError, MastodonMalformedEventError
from mastodon.streaming import StreamListener, CallbackStreamListener
__all__ = ['Mastodon', 'StreamListener', 'CallbackStreamListener']
__all__ = ['Mastodon', 'StreamListener', 'CallbackStreamListener', 'MastodonError', 'MastodonVersionError', 'MastodonIllegalArgumentError', 'MastodonIOError', 'MastodonFileNotFoundError', 'MastodonNetworkError', 'MastodonAPIError', 'MastodonNotFoundError', 'MastodonUnauthorizedError', 'MastodonRatelimitError', 'MastodonMalformedEventError']

Veure arxiu

@ -6,7 +6,8 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A
import json
import six
from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError
from requests.exceptions import ChunkedEncodingError
class StreamListener(object):
"""Callbacks for the streaming API. Create a subclass, override the on_xxx
@ -43,25 +44,31 @@ class StreamListener(object):
"""
event = {}
line_buffer = bytearray()
for chunk in response.iter_content(chunk_size = 1):
if chunk:
if chunk == b'\n':
try:
line = line_buffer.decode('utf-8')
except UnicodeDecodeError as err:
six.raise_from(
MastodonMalformedEventError("Malformed UTF-8"),
err
)
if line == '':
self._dispatch(event)
event = {}
try:
for chunk in response.iter_content(chunk_size = 1):
if chunk:
if chunk == b'\n':
try:
line = line_buffer.decode('utf-8')
except UnicodeDecodeError as err:
six.raise_from(
MastodonMalformedEventError("Malformed UTF-8"),
err
)
if line == '':
self._dispatch(event)
event = {}
else:
event = self._parse_line(line, event)
line_buffer = bytearray()
else:
event = self._parse_line(line, event)
line_buffer = bytearray()
else:
line_buffer.extend(chunk)
line_buffer.extend(chunk)
except ChunkedEncodingError as err:
six.raise_from(
MastodonNetworkError("Server ceased communication."),
err
)
def _parse_line(self, line, event):
if line.startswith(':'):
self.handle_heartbeat()
@ -102,7 +109,6 @@ class StreamListener(object):
err
)
else:
# TODO: allow handlers to return/raise to stop streaming cleanly
handler(payload)
class CallbackStreamListener(StreamListener):