Merge pull request #69 from Chronister/async_streaming

Add async parameter to streaming API calls.
This commit is contained in:
Lorenz Diener 2017-09-08 15:03:15 +02:00 cometido por GitHub
commit 4fbeb7245f

Veure arxiu

@ -15,6 +15,7 @@ import dateutil
import dateutil.parser
import re
import copy
import threading
class Mastodon:
@ -110,9 +111,9 @@ class Mastodon:
self._token_expired = datetime.datetime.now()
self._refresh_token = None
self.ratelimit_limit = 300
self.ratelimit_limit = 150
self.ratelimit_reset = time.time()
self.ratelimit_remaining = 300
self.ratelimit_remaining = 150
self.ratelimit_lastcall = time.time()
self.ratelimit_pacefactor = ratelimit_pacefactor
@ -864,47 +865,59 @@ class Mastodon:
###
# Streaming
###
def user_stream(self, listener):
def user_stream(self, listener, async=False):
"""
Streams events that are relevant to the authorized user, i.e. home
timeline and notifications. 'listener' should be a subclass of
StreamListener.
StreamListener which will receive callbacks for incoming events.
This method blocks forever, calling callbacks on 'listener' for
incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling its close() method.
"""
return self.__stream('/api/v1/streaming/user', listener)
return self.__stream('/api/v1/streaming/user', listener, async=async)
def public_stream(self, listener):
def public_stream(self, listener, async=False):
"""
Streams public events. 'listener' should be a subclass of
StreamListener.
Streams public events. 'listener' should be a subclass of StreamListener
which will receive callbacks for incoming events.
This method blocks forever, calling callbacks on 'listener' for
incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling its close() method.
"""
return self.__stream('/api/v1/streaming/public', listener)
return self.__stream('/api/v1/streaming/public', listener, async=async)
def local_stream(self, listener):
def local_stream(self, listener, async=False):
"""
Streams local events. 'listener' should be a subclass of
StreamListener.
Streams local events. 'listener' should be a subclass of StreamListener
which will receive callbacks for incoming events.
This method blocks forever, calling callbacks on 'listener' for
incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling its close() method.
"""
return self.__stream('/api/v1/streaming/public/local', listener)
return self.__stream('/api/v1/streaming/public/local', listener, async=async)
def hashtag_stream(self, tag, listener):
def hashtag_stream(self, tag, listener, async=False):
"""
Returns all public statuses for the hashtag 'tag'. 'listener' should be
a subclass of StreamListener.
a subclass of StreamListener which will receive callbacks for incoming
events.
This method blocks forever, calling callbacks on 'listener' for
incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a handle corresponding to the open connection. The
connection may be closed at any time by calling its close() method.
"""
return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener)
###
# Internal helpers, dragons probably
###
@ -1080,17 +1093,46 @@ class Mastodon:
return response
def __stream(self, endpoint, listener, params={}):
def __stream(self, endpoint, listener, params={}, async=False):
"""
Internal streaming API helper.
Returns a handle to the open connection that the user can close if they
wish to terminate it.
"""
headers = {}
if self.access_token is not None:
headers = {'Authorization': 'Bearer ' + self.access_token}
url = self.api_base_url + endpoint
with closing(requests.get(url, headers=headers, data=params, stream=True)) as r:
connection = requests.get(url, headers = headers, data = params, stream = True)
class __stream_handle():
def __init__(self, connection):
self.connection = connection
def close(self):
self.connection.close()
def _threadproc(self):
with closing(connection) as r:
try:
listener.handle_stream(r.iter_lines())
except AttributeError as e:
# Eat AttributeError from requests if user closes early
pass
return 0
handle = __stream_handle(connection)
if async:
t = threading.Thread(args=(), target=handle._threadproc)
t.start()
return handle
else:
# Blocking, never returns (can only leave via exception)
with closing(connection) as r:
listener.handle_stream(r.iter_lines())
def __generate_params(self, params, exclude=[]):