Many fixes for streaming stuff

This commit is contained in:
Lorenz Diener 2017-11-24 15:08:34 +01:00
pare cea4d4251a
commit e220e7cc60
S'han modificat 4 arxius amb 51 adicions i 34 eliminacions

Veure arxiu

@ -37,7 +37,7 @@ Mastodon.py
) )
mastodon.toot('Tooting from python using #mastodonpy !') mastodon.toot('Tooting from python using #mastodonpy !')
`Mastodon`_ is an ostatus based twitter-like federated social `Mastodon`_ is an ActivityPub and OStatus based twitter-like federated social
network node. It has an API that allows you to interact with its network node. It has an API that allows you to interact with its
every aspect. This is a simple python wrapper for that api, provided every aspect. This is a simple python wrapper for that api, provided
as a single python module. By default, it talks to the as a single python module. By default, it talks to the
@ -531,11 +531,18 @@ Streaming
--------- ---------
These functions allow access to the streaming API. These functions allow access to the streaming API.
If async is False, these methods block forever (or until an
exception is raised).
If async is True, the listener will listen on another thread and these methods
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling its close() method.
.. automethod:: Mastodon.user_stream .. automethod:: Mastodon.user_stream
.. automethod:: Mastodon.public_stream .. automethod:: Mastodon.public_stream
.. automethod:: Mastodon.local_stream
.. automethod:: Mastodon.hashtag_stream .. automethod:: Mastodon.hashtag_stream
.. _Mastodon: https://github.com/tootsuite/mastodon .. _Mastodon: https://github.com/tootsuite/mastodon
.. _Mastodon flagship instance: http://mastodon.social/ .. _Mastodon flagship instance: http://mastodon.social/
.. _Mastodon api docs: https://github.com/tootsuite/documentation/ .. _Mastodon api docs: https://github.com/tootsuite/documentation/

Veure arxiu

@ -17,6 +17,7 @@ import re
import copy import copy
import threading import threading
import sys import sys
try: try:
from urllib.parse import urlparse from urllib.parse import urlparse
except ImportError: except ImportError:
@ -1014,12 +1015,6 @@ class Mastodon:
Streams events that are relevant to the authorized user, i.e. home Streams events that are relevant to the authorized user, i.e. home
timeline and notifications. 'listener' should be a subclass of timeline and notifications. 'listener' should be a subclass of
StreamListener which will receive callbacks for incoming events. StreamListener which will receive callbacks for incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling its close() method.
""" """
return self.__stream('/api/v1/streaming/user', listener, async=async) return self.__stream('/api/v1/streaming/user', listener, async=async)
@ -1027,12 +1022,6 @@ class Mastodon:
""" """
Streams public events. 'listener' should be a subclass of StreamListener Streams public events. 'listener' should be a subclass of StreamListener
which will receive callbacks for incoming events. which will receive callbacks for incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling its close() method.
""" """
return self.__stream('/api/v1/streaming/public', listener, async=async) return self.__stream('/api/v1/streaming/public', listener, async=async)
@ -1041,11 +1030,6 @@ class Mastodon:
Streams local events. 'listener' should be a subclass of StreamListener Streams local events. 'listener' should be a subclass of StreamListener
which will receive callbacks for incoming events. which will receive callbacks for incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling its close() method.
""" """
return self.__stream('/api/v1/streaming/public/local', listener, async=async) return self.__stream('/api/v1/streaming/public/local', listener, async=async)
@ -1054,12 +1038,6 @@ class Mastodon:
Returns all public statuses for the hashtag 'tag'. 'listener' should be Returns all public statuses for the hashtag 'tag'. 'listener' should be
a subclass of StreamListener which will receive callbacks for incoming a subclass of StreamListener which will receive callbacks for incoming
events. events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling its close() method.
""" """
return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener) return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener)

Veure arxiu

@ -1,4 +1,4 @@
from mastodon.Mastodon import Mastodon from mastodon.Mastodon import Mastodon
from mastodon.streaming import StreamListener from mastodon.streaming import StreamListener, CallbackStreamListener
__all__ = ['Mastodon', 'StreamListener'] __all__ = ['Mastodon', 'StreamListener', 'CallbackStreamListener']

Veure arxiu

@ -67,10 +67,6 @@ class StreamListener(object):
else: else:
event[key] = value event[key] = value
# end of stream
if event:
log.warn("outstanding partial event at end of stream: %s", event)
def _dispatch(self, event): def _dispatch(self, event):
try: try:
name = event['event'] name = event['event']
@ -92,8 +88,44 @@ class StreamListener(object):
handler_name = 'on_' + name handler_name = 'on_' + name
try: try:
handler = getattr(self, handler_name) handler = getattr(self, handler_name)
except AttributeError: except AttributeError as err:
log.warn("Unhandled event '%s'", name) six.raise_from(
MastodonMalformedEventError('Bad event type', name),
err
)
else: else:
# TODO: allow handlers to return/raise to stop streaming cleanly # TODO: allow handlers to return/raise to stop streaming cleanly
handler(payload) handler(payload)
class CallbackStreamListener(StreamListener):
"""
Simple callback stream handler class.
Can optionally additionally send local update events to a separate handler.
"""
def __init__(self, update_handler = None, local_update_handler = None, delete_handler = None, notification_handler = None):
super(CallbackStreamListener, self).__init__()
self.update_handler = update_handler
self.local_update_handler = local_update_handler
self.delete_handler = delete_handler
self.notification_handler = notification_handler
def on_update(self, status):
if self.update_handler != None:
self.update_handler(status)
try:
if self.local_update_handler != None and not "@" in status["account"]["acct"]:
self.local_update_handler(status)
except Exception as err:
six.raise_from(
MastodonMalformedEventError('received bad update', status),
err
)
def on_delete(self, deleted_id):
if self.delete_handler != None:
self.delete_handler(deleted_id)
def on_notification(self, notification):
if self.notification_handler != None:
self.notification_handler(notification)