diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index f073a66..a74f59a 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -15,6 +15,7 @@ import dateutil import dateutil.parser import re import copy +import threading class Mastodon: @@ -110,9 +111,9 @@ class Mastodon: self._token_expired = datetime.datetime.now() self._refresh_token = None - self.ratelimit_limit = 300 + self.ratelimit_limit = 150 self.ratelimit_reset = time.time() - self.ratelimit_remaining = 300 + self.ratelimit_remaining = 150 self.ratelimit_lastcall = time.time() self.ratelimit_pacefactor = ratelimit_pacefactor @@ -864,47 +865,59 @@ class Mastodon: ### # Streaming ### - def user_stream(self, listener): + def user_stream(self, listener, async=False): """ Streams events that are relevant to the authorized user, i.e. home timeline and notifications. 'listener' should be a subclass of - StreamListener. + StreamListener which will receive callbacks for incoming events. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is False, this method blocks forever. + + If async is True, 'listener' will listen on another thread and this method + will return a handle corresponding to the open connection. The + connection may be closed at any time by calling its close() method. """ - return self.__stream('/api/v1/streaming/user', listener) + return self.__stream('/api/v1/streaming/user', listener, async=async) - def public_stream(self, listener): + def public_stream(self, listener, async=False): """ - Streams public events. 'listener' should be a subclass of - StreamListener. + Streams public events. 'listener' should be a subclass of StreamListener + which will receive callbacks for incoming events. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is False, this method blocks forever. + + If async is True, 'listener' will listen on another thread and this method + will return a handle corresponding to the open connection. The + connection may be closed at any time by calling its close() method. """ - return self.__stream('/api/v1/streaming/public', listener) + return self.__stream('/api/v1/streaming/public', listener, async=async) - def local_stream(self, listener): + def local_stream(self, listener, async=False): """ - Streams local events. 'listener' should be a subclass of - StreamListener. + Streams local events. 'listener' should be a subclass of StreamListener + which will receive callbacks for incoming events. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is False, this method blocks forever. + + If async is True, 'listener' will listen on another thread and this method + will return a handle corresponding to the open connection. The + connection may be closed at any time by calling its close() method. """ - return self.__stream('/api/v1/streaming/public/local', listener) + return self.__stream('/api/v1/streaming/public/local', listener, async=async) - def hashtag_stream(self, tag, listener): + def hashtag_stream(self, tag, listener, async=False): """ Returns all public statuses for the hashtag 'tag'. 'listener' should be - a subclass of StreamListener. + a subclass of StreamListener which will receive callbacks for incoming + events. - This method blocks forever, calling callbacks on 'listener' for - incoming events. + If async is False, this method blocks forever. + + If async is True, 'listener' will listen on another thread and this method + will return a handle corresponding to the open connection. The + connection may be closed at any time by calling its close() method. """ return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener) - ### # Internal helpers, dragons probably ### @@ -1080,18 +1093,47 @@ class Mastodon: return response - def __stream(self, endpoint, listener, params={}): + def __stream(self, endpoint, listener, params={}, async=False): """ Internal streaming API helper. + + Returns a handle to the open connection that the user can close if they + wish to terminate it. """ headers = {} if self.access_token is not None: headers = {'Authorization': 'Bearer ' + self.access_token} - url = self.api_base_url + endpoint - with closing(requests.get(url, headers=headers, data=params, stream=True)) as r: - listener.handle_stream(r.iter_lines()) + + connection = requests.get(url, headers = headers, data = params, stream = True) + + class __stream_handle(): + def __init__(self, connection): + self.connection = connection + + def close(self): + self.connection.close() + + def _threadproc(self): + with closing(connection) as r: + try: + listener.handle_stream(r.iter_lines()) + except AttributeError as e: + # Eat AttributeError from requests if user closes early + pass + return 0 + + handle = __stream_handle(connection) + + if async: + t = threading.Thread(args=(), target=handle._threadproc) + t.start() + return handle + else: + # Blocking, never returns (can only leave via exception) + with closing(connection) as r: + listener.handle_stream(r.iter_lines()) def __generate_params(self, params, exclude=[]): """