""" Handlers for the Streaming API: https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-API.md """ import json import logging import six from mastodon import Mastodon log = logging.getLogger(__name__) class MalformedEventError(Exception): """Raised when the server-sent event stream is malformed.""" pass class StreamListener(object): """Callbacks for the streaming API. Create a subclass, override the on_xxx methods for the kinds of events you're interested in, then pass an instance of your subclass to Mastodon.user_stream(), Mastodon.public_stream(), or Mastodon.hashtag_stream().""" def on_update(self, status): """A new status has appeared! 'status' is the parsed JSON dictionary describing the status.""" pass def on_notification(self, notification): """A new notification. 'notification' is the parsed JSON dictionary describing the notification.""" pass def on_delete(self, status_id): """A status has been deleted. status_id is the status' integer ID.""" pass def handle_heartbeat(self): """The server has sent us a keep-alive message. This callback may be useful to carry out periodic housekeeping tasks, or just to confirm that the connection is still open.""" pass def handle_stream(self, lines): """ Handles a stream of events from the Mastodon server. When each event is received, the corresponding .on_[name]() method is called. lines: an iterable of lines of bytes sent by the Mastodon server, as returned by requests.Response.iter_lines(). """ event = {} for raw_line in lines: try: line = raw_line.decode('utf-8') except UnicodeDecodeError as err: six.raise_from( MalformedEventError("Malformed UTF-8", line), err ) if line.startswith(':'): self.handle_heartbeat() elif line == '': # end of event self._dispatch(event) event = {} else: key, value = line.split(': ', 1) # According to the MDN spec, repeating the 'data' key # represents a newline(!) if key in event: event[key] += '\n' + value else: event[key] = value # end of stream if event: log.warn("outstanding partial event at end of stream: %s", event) def _dispatch(self, event): try: name = event['event'] data = event['data'] payload = json.loads(data, object_hook = Mastodon._Mastodon__json_hooks) except KeyError as err: six.raise_from( MalformedEventError('Missing field', err.args[0], event), err ) except ValueError as err: # py2: plain ValueError # py3: json.JSONDecodeError, a subclass of ValueError six.raise_from( MalformedEventError('Bad JSON', data), err ) handler_name = 'on_' + name try: handler = getattr(self, handler_name) except AttributeError: log.warn("Unhandled event '%s'", name) else: # TODO: allow handlers to return/raise to stop streaming cleanly handler(payload)