Add async parameter to streaming API calls. If true, calls the streaming API on a separate thread and returns the Response object to the user so they can close it at their discretion.

This commit is contained in:
Chronister 2017-08-12 22:21:37 -07:00
pare fccc4e1986
commit a6a1ddbed1

Veure arxiu

@ -16,6 +16,7 @@ import dateutil
import dateutil.parser
import re
import copy
import threading
class Mastodon:
"""
@ -818,46 +819,63 @@ class Mastodon:
###
# Streaming
###
def user_stream(self, listener):
def user_stream(self, listener, async=False):
"""
Streams events that are relevant to the authorized user, i.e. home
timeline and notifications. 'listener' should be a subclass of
StreamListener.
StreamListener which will receive callbacks for incoming events.
This method blocks forever, calling callbacks on 'listener' for
incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a requests.Response instance corresponding to the open
connection. The connection may be closed at any time by calling its
close() method.
"""
return self.__stream('/api/v1/streaming/user', listener)
return self.__stream('/api/v1/streaming/user', listener, async=async)
def public_stream(self, listener):
def public_stream(self, listener, async=False):
"""
Streams public events. 'listener' should be a subclass of
StreamListener.
Streams public events. 'listener' should be a subclass of StreamListener
which will receive callbacks for incoming events.
This method blocks forever, calling callbacks on 'listener' for
incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a requests.Response instance corresponding to the open
connection. The connection may be closed at any time by calling its
close() method.
"""
return self.__stream('/api/v1/streaming/public', listener)
return self.__stream('/api/v1/streaming/public', listener, async=async)
def local_stream(self, listener):
def local_stream(self, listener, async=False):
"""
Streams local events. 'listener' should be a subclass of
StreamListener.
Streams local events. 'listener' should be a subclass of StreamListener
which will receive callbacks for incoming events.
This method blocks forever, calling callbacks on 'listener' for
incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a requests.Response instance corresponding to the open
connection. The connection may be closed at any time by calling its
close() method.
"""
return self.__stream('/api/v1/streaming/public/local', listener)
return self.__stream('/api/v1/streaming/public/local', listener, async=async)
def hashtag_stream(self, tag, listener):
def hashtag_stream(self, tag, listener, async=False):
"""
Returns all public statuses for the hashtag 'tag'. 'listener' should be
a subclass of StreamListener.
a subclass of StreamListener which will receive callbacks for incoming
events.
This method blocks forever, calling callbacks on 'listener' for
incoming events.
If async is False, this method blocks forever.
If async is True, 'listener' will listen on another thread and this method
will return a requests.Response instance corresponding to the open
connection. The connection may be closed at any time by calling its
close() method.
"""
return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag})
return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}, async=async)
###
# Internal helpers, dragons probably
@ -1017,19 +1035,39 @@ class Mastodon:
return response
def __stream(self, endpoint, listener, params = {}):
def __stream(self, endpoint, listener, params = {}, async=False):
"""
Internal streaming API helper.
Returns the requests.Response instance corresponding to the open websocket
connection.
"""
headers = {}
if self.access_token != None:
headers = {'Authorization': 'Bearer ' + self.access_token}
url = self.api_base_url + endpoint
with closing(requests.get(url, headers = headers, data = params, stream = True)) as r:
listener.handle_stream(r.iter_lines())
connection = requests.get(url, headers = headers, data = params, stream = True)
def __stream_threadproc():
with closing(connection) as r:
try:
listener.handle_stream(r.iter_lines())
except AttributeError as e:
# TODO If the user closes the connection early, requests gets
# confused and throws an AttributeError
pass
return 0
if async:
t = threading.Thread(args=(), target=__stream_threadproc)
t.start()
return connection
else:
# Blocking, never returns (can only leave via exception)
return __stream_threadproc()
def __generate_params(self, params, exclude = []):
"""