Fix streaming API to be more stable (closes #117)

This commit is contained in:
Lorenz Diener 2017-12-19 13:49:00 +01:00
pare e5c50ea80d
commit 9f9a7826d7
S'han modificat 2 arxius amb 36 adicions i 28 eliminacions

Veure arxiu

@ -1684,7 +1684,7 @@ class Mastodon:
self._thread = threading.current_thread() self._thread = threading.current_thread()
with closing(connection) as r: with closing(connection) as r:
try: try:
listener.handle_stream(r.iter_lines(chunk_size = 1, decode_unicode = True)) listener.handle_stream(r)
except AttributeError as e: except AttributeError as e:
if not self.closed: if not self.closed:
raise e raise e
@ -1699,7 +1699,7 @@ class Mastodon:
else: else:
# Blocking, never returns (can only leave via exception) # Blocking, never returns (can only leave via exception)
with closing(connection) as r: with closing(connection) as r:
listener.handle_stream(r.iter_lines()) listener.handle_stream(r)
def __generate_params(self, params, exclude=[]): def __generate_params(self, params, exclude=[]):
""" """

Veure arxiu

@ -34,38 +34,46 @@ class StreamListener(object):
that the connection is still open.""" that the connection is still open."""
pass pass
def handle_stream(self, lines): def handle_stream(self, response):
""" """
Handles a stream of events from the Mastodon server. When each event Handles a stream of events from the Mastodon server. When each event
is received, the corresponding .on_[name]() method is called. is received, the corresponding .on_[name]() method is called.
lines: an iterable of lines of bytes sent by the Mastodon server, as response; a requests response object with the open stream for reading.
returned by requests.Response.iter_lines().
""" """
event = {} self.event = {}
for raw_line in lines: line_buffer = bytearray()
try: for chunk in response.iter_content(chunk_size = 1):
line = raw_line.decode('utf-8') if chunk:
except UnicodeDecodeError as err: if chunk == b'\n':
six.raise_from( self.handle_line(line_buffer)
MastodonMalformedEventError("Malformed UTF-8", line), line_buffer = bytearray()
err
)
if line.startswith(':'):
self.handle_heartbeat()
elif line == '':
# end of event
self._dispatch(event)
event = {}
else:
key, value = line.split(': ', 1)
# According to the MDN spec, repeating the 'data' key
# represents a newline(!)
if key in event:
event[key] += '\n' + value
else: else:
event[key] = value line_buffer.extend(chunk)
def handle_line(self, raw_line):
try:
line = raw_line.decode('utf-8')
except UnicodeDecodeError as err:
six.raise_from(
MastodonMalformedEventError("Malformed UTF-8", line),
err
)
if line.startswith(':'):
self.handle_heartbeat()
elif line == '':
# end of event
self._dispatch(self.event)
self.event = {}
else:
key, value = line.split(': ', 1)
# According to the MDN spec, repeating the 'data' key
# represents a newline(!)
if key in self.event:
self.event[key] += '\n' + value
else:
self.event[key] = value
def _dispatch(self, event): def _dispatch(self, event):
try: try: