Merge branch 'master' into streaming

This commit is contained in:
Lorenz Diener 2017-04-10 10:33:25 +02:00 cometido por GitHub
commit 965d514de1
S'han modificat 2 arxius amb 111 adicions i 33 eliminacions

2
.gitignore vendido
Veure arxiu

@ -90,3 +90,5 @@ ENV/
# Secret files (for credentials used in testing) # Secret files (for credentials used in testing)
*.secret *.secret
pytooter_clientcred.txt
pytooter_usercred.txt

Veure arxiu

@ -1,19 +1,18 @@
# coding: utf-8 # coding: utf-8
import requests
import os import os
from urllib.parse import urlencode
import os.path import os.path
import mimetypes import mimetypes
import time import time
import random import random
import string import string
import pytz
import datetime import datetime
import dateutil import dateutil
import dateutil.parser import dateutil.parser
from contextlib import closing from contextlib import closing
import requests
class Mastodon: class Mastodon:
""" """
@ -22,9 +21,10 @@ class Mastodon:
If anything is unclear, check the official API docs at If anything is unclear, check the official API docs at
https://github.com/Gargron/mastodon/wiki/API https://github.com/Gargron/mastodon/wiki/API
Presently, only username-password login is supported, somebody please Supported:
patch in Real Proper OAuth if desired. Username-Password Login
OAuth2
""" """
__DEFAULT_BASE_URL = 'https://mastodon.social' __DEFAULT_BASE_URL = 'https://mastodon.social'
__DEFAULT_TIMEOUT = 300 __DEFAULT_TIMEOUT = 300
@ -34,7 +34,7 @@ class Mastodon:
# Registering apps # Registering apps
### ###
@staticmethod @staticmethod
def create_app(client_name, scopes = ['read', 'write', 'follow'], redirect_uris = None, to_file = None, api_base_url = __DEFAULT_BASE_URL, request_timeout = __DEFAULT_TIMEOUT): def create_app(client_name, scopes = ['read', 'write', 'follow'], redirect_uris = None, website = None, to_file = None, api_base_url = __DEFAULT_BASE_URL, request_timeout = __DEFAULT_TIMEOUT):
""" """
Create a new app with given client_name and scopes (read, write, follow) Create a new app with given client_name and scopes (read, write, follow)
@ -53,12 +53,15 @@ class Mastodon:
} }
try: try:
if redirect_uris != None: if redirect_uris is not None:
request_data['redirect_uris'] = redirect_uris; request_data['redirect_uris'] = redirect_uris;
else: else:
request_data['redirect_uris'] = 'urn:ietf:wg:oauth:2.0:oob'; request_data['redirect_uris'] = 'urn:ietf:wg:oauth:2.0:oob';
if website is not None:
response = requests.post(api_base_url + '/api/v1/apps', data = request_data, timeout = request_timeout).json() request_data['website'] = website
response = requests.post(api_base_url + '/api/v1/apps', data = request_data, timeout = request_timeout)
response = response.json()
except Exception as e: except Exception as e:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
@ -102,6 +105,8 @@ class Mastodon:
self.access_token = access_token self.access_token = access_token
self.debug_requests = debug_requests self.debug_requests = debug_requests
self.ratelimit_method = ratelimit_method self.ratelimit_method = ratelimit_method
self._token_expired = datetime.datetime.now()
self._refresh_token = None
self.ratelimit_limit = 150 self.ratelimit_limit = 150
self.ratelimit_reset = time.time() self.ratelimit_reset = time.time()
@ -125,32 +130,92 @@ class Mastodon:
if self.access_token != None and os.path.isfile(self.access_token): if self.access_token != None and os.path.isfile(self.access_token):
with open(self.access_token, 'r') as token_file: with open(self.access_token, 'r') as token_file:
self.access_token = token_file.readline().rstrip() self.access_token = token_file.readline().rstrip()
@property
def token_expired(self) -> bool:
if self._token_expired < datetime.datetime.now():
return True
else:
return False
@token_expired.setter
def token_expired(self, value: int):
self._token_expired = datetime.datetime.now() + datetime.timedelta(seconds=value)
return
@property
def refresh_token(self) -> str:
return self._refresh_token
@refresh_token.setter
def refresh_token(self, value):
self._refresh_token = value
return
def log_in(self, username, password, scopes = ['read', 'write', 'follow'], to_file = None): def auth_request_url(self, client_id: str = None, redirect_uris: str = "urn:ietf:wg:oauth:2.0:oob") -> str:
"""Returns the url that a client needs to request the grant from the server.
https://mastodon.social/oauth/authorize?client_id=XXX&response_type=code&redirect_uris=YYY
""" """
Log in and sets access_token to what was returned. Note that your if client_id is None:
username is the e-mail you use to log in into mastodon. client_id = self.client_id
else:
if os.path.isfile(client_id):
with open(client_id, 'r') as secret_file:
client_id = secret_file.readline().rstrip()
params = {}
params['client_id'] = client_id
params['response_type'] = "code"
params['redirect_uri'] = redirect_uris
formatted_params = urlencode(params)
return "".join([self.api_base_url, "/oauth/authorize?", formatted_params])
Can persist access token to file, to be used in the constructor. def log_in(self, username: str = None, password: str = None,\
code: str = None, redirect_uri: str = "urn:ietf:wg:oauth:2.0:oob", refresh_token: str = None,\
Will throw a MastodonIllegalArgumentError if username / password scopes: list = ['read', 'write', 'follow'], to_file: str = None) -> str:
are wrong, scopes are not valid or granted scopes differ from requested.
Returns the access_token.
""" """
params = self.__generate_params(locals()) Docs: https://github.com/doorkeeper-gem/doorkeeper/wiki/Interacting-as-an-OAuth-client-with-Doorkeeper
Notes:
Your username is the e-mail you use to log in into mastodon.
Can persist access token to file, to be used in the constructor.
Supports refresh_token but Mastodon.social doesn't implement it at the moment.
Handles password, authorization_code, and refresh_token authentication.
Will throw a MastodonIllegalArgumentError if username / password
are wrong, scopes are not valid or granted scopes differ from requested.
Returns:
str @access_token
"""
if username is not None and password is not None:
params = self.__generate_params(locals(), ['scopes', 'to_file', 'code', 'refresh_token'])
params['grant_type'] = 'password'
elif code is not None:
params = self.__generate_params(locals(), ['scopes', 'to_file', 'username', 'password', 'refresh_token'])
params['grant_type'] = 'authorization_code'
elif refresh_token is not None:
params = self.__generate_params(locals(), ['scopes', 'to_file', 'username', 'password', 'code'])
params['grant_type'] = 'refresh_token'
else:
raise MastodonIllegalArgumentError('Invalid user name, password, redirect_uris or scopes')
params['client_id'] = self.client_id params['client_id'] = self.client_id
params['client_secret'] = self.client_secret params['client_secret'] = self.client_secret
params['grant_type'] = 'password'
params['scope'] = " ".join(scopes)
try: try:
response = self.__api_request('POST', '/oauth/token', params, do_ratelimiting = False) response = self.__api_request('POST', '/oauth/token', params, do_ratelimiting = False)
self.access_token = response['access_token'] self.access_token = response['access_token']
self.refresh_token = response.get('refresh_token')
self.token_expired = int(response.get('expires_in', 0))
except Exception as e: except Exception as e:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
raise MastodonIllegalArgumentError('Invalid user name, password or scopes: %s' % e) raise MastodonIllegalArgumentError('Invalid user name, password, redirect_uris or scopes: %s' % e)
requested_scopes = " ".join(sorted(scopes)) requested_scopes = " ".join(sorted(scopes))
received_scopes = " ".join(sorted(response["scope"].split(" "))) received_scopes = " ".join(sorted(response["scope"].split(" ")))
@ -335,6 +400,17 @@ class Mastodon:
""" """
params = self.__generate_params(locals()) params = self.__generate_params(locals())
return self.__api_request('GET', '/api/v1/accounts/search', params) return self.__api_request('GET', '/api/v1/accounts/search', params)
def content_search(self, q, resolve = False):
"""
Fetch matching hashtags, accounts and statuses. Will search federated
instances if resolve is True.
Returns a dict of lists.
"""
params = self.__generate_params(locals())
return self.__api_request('GET', '/api/v1/search', params)
### ###
# Reading data: Mutes and Blocks # Reading data: Mutes and Blocks
@ -568,7 +644,7 @@ class Mastodon:
Returns a media dict. This contains the id that can be used in Returns a media dict. This contains the id that can be used in
status_post to attach the media file to a toot. status_post to attach the media file to a toot.
""" """
if os.path.isfile(media_file) and mime_type == None: if mime_type == None and os.path.isfile(media_file):
mime_type = mimetypes.guess_type(media_file)[0] mime_type = mimetypes.guess_type(media_file)[0]
media_file = open(media_file, 'rb') media_file = open(media_file, 'rb')
@ -734,13 +810,13 @@ class Mastodon:
if self.ratelimit_method == "throw": if self.ratelimit_method == "throw":
raise MastodonRatelimitError("Hit rate limit.") raise MastodonRatelimitError("Hit rate limit.")
if self.ratelimit_method == "wait" or self.ratelimit_method == "pace": if self.ratelimit_method == "wait" or self.ratelimit_method == "pace":
to_next = self.ratelimit_reset - time.time() to_next = self.ratelimit_reset - time.time()
if to_next > 0: if to_next > 0:
# As a precaution, never sleep longer than 5 minutes # As a precaution, never sleep longer than 5 minutes
to_next = min(to_next, 5 * 60) to_next = min(to_next, 5 * 60)
time.sleep(to_next) time.sleep(to_next)
request_complete = False request_complete = False
return response return response