ab60931620
This is missing any error handling and rate-limiting around the stream itself, but once the stream is established, the full range of events are supported. Fixes issue #14.
107 líneas
3,4 KiB
Python
107 líneas
3,4 KiB
Python
'''
|
|
Handlers for the Streaming API:
|
|
https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-API.md
|
|
'''
|
|
|
|
import json
|
|
import logging
|
|
import six
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class MalformedEventError(Exception):
|
|
'''Raised when the server-sent event stream is malformed.'''
|
|
pass
|
|
|
|
|
|
class StreamListener(object):
|
|
'''Callbacks for the streaming API. Create a subclass, override the on_xxx
|
|
methods for the kinds of events you're interested in, then pass an instance
|
|
of your subclass to Mastodon.user_stream(), Mastodon.public_stream(), or
|
|
Mastodon.hashtag_stream().'''
|
|
|
|
def on_update(self, status):
|
|
'''A new status has appeared! 'status' is the parsed JSON dictionary
|
|
describing the status.'''
|
|
pass
|
|
|
|
def on_notification(self, notification):
|
|
'''A new notification. 'notification' is the parsed JSON dictionary
|
|
describing the notification.'''
|
|
pass
|
|
|
|
def on_delete(self, status_id):
|
|
'''A status has been deleted. status_id is the status' integer ID.'''
|
|
pass
|
|
|
|
def handle_heartbeat(self):
|
|
'''The server has sent us a keep-alive message. This callback may be
|
|
useful to carry out periodic housekeeping tasks, or just to confirm
|
|
that the connection is still open.'''
|
|
|
|
def handle_stream(self, lines):
|
|
'''
|
|
Handles a stream of events from the Mastodon server. When each event
|
|
is received, the corresponding .on_[name]() method is called.
|
|
|
|
lines: an iterable of lines of bytes sent by the Mastodon server, as
|
|
returned by requests.Response.iter_lines().
|
|
'''
|
|
event = {}
|
|
for raw_line in lines:
|
|
try:
|
|
line = raw_line.decode('utf-8')
|
|
except UnicodeDecodeError as err:
|
|
six.raise_from(
|
|
MalformedEventError("Malformed UTF-8", line),
|
|
err
|
|
)
|
|
|
|
if line.startswith(':'):
|
|
self.handle_heartbeat()
|
|
elif line == '':
|
|
# end of event
|
|
self._despatch(event)
|
|
event = {}
|
|
else:
|
|
key, value = line.split(': ', 1)
|
|
# According to the MDN spec, repeating the 'data' key
|
|
# represents a newline(!)
|
|
if key in event:
|
|
event[key] += '\n' + value
|
|
else:
|
|
event[key] = value
|
|
|
|
# end of stream
|
|
if event:
|
|
log.warn("outstanding partial event at end of stream: %s", event)
|
|
|
|
def _despatch(self, event):
|
|
try:
|
|
name = event['event']
|
|
data = event['data']
|
|
payload = json.loads(data)
|
|
except KeyError as err:
|
|
six.raise_from(
|
|
MalformedEventError('Missing field', err.args[0], event),
|
|
err
|
|
)
|
|
except ValueError as err:
|
|
# py2: plain ValueError
|
|
# py3: json.JSONDecodeError, a subclass of ValueError
|
|
six.raise_from(
|
|
MalformedEventError('Bad JSON', data),
|
|
err
|
|
)
|
|
|
|
handler_name = 'on_' + name
|
|
try:
|
|
handler = getattr(self, handler_name)
|
|
except AttributeError:
|
|
log.warn("Unhandled event '%s'", name)
|
|
else:
|
|
# TODO: allow handlers to return/raise to stop streaming cleanly
|
|
handler(payload)
|
|
|