Add support for OAuth2

This commit is contained in:
Ansem 2017-04-07 21:59:39 +00:00
pare 853ae97dcc
commit ebfe65a295
S'han modificat 2 arxius amb 108 adicions i 32 eliminacions

2
.gitignore vendido
Veure arxiu

@ -90,3 +90,5 @@ ENV/
# Secret files (for credentials used in testing) # Secret files (for credentials used in testing)
*.secret *.secret
pytooter_clientcred.txt
pytooter_usercred.txt

Veure arxiu

@ -1,17 +1,20 @@
# coding: utf-8 # coding: utf-8
import requests
import os import os
from urllib.parse import urlencode
import os.path import os.path
import mimetypes import mimetypes
import time import time
import random import random
import string import string
import pytz
import datetime import datetime
import dateutil
import dateutil.parser import dateutil.parser
import pytz
import dateutil
import requests
class Mastodon: class Mastodon:
""" """
Super basic but thorough and easy to use mastodon.social Super basic but thorough and easy to use mastodon.social
@ -19,9 +22,10 @@ class Mastodon:
If anything is unclear, check the official API docs at If anything is unclear, check the official API docs at
https://github.com/Gargron/mastodon/wiki/API https://github.com/Gargron/mastodon/wiki/API
Presently, only username-password login is supported, somebody please Supported:
patch in Real Proper OAuth if desired. Username-Password Login
OAuth2
""" """
__DEFAULT_BASE_URL = 'https://mastodon.social' __DEFAULT_BASE_URL = 'https://mastodon.social'
__DEFAULT_TIMEOUT = 300 __DEFAULT_TIMEOUT = 300
@ -31,7 +35,7 @@ class Mastodon:
# Registering apps # Registering apps
### ###
@staticmethod @staticmethod
def create_app(client_name, scopes = ['read', 'write', 'follow'], redirect_uris = None, to_file = None, api_base_url = __DEFAULT_BASE_URL, request_timeout = __DEFAULT_TIMEOUT): def create_app(client_name, scopes = ['read', 'write', 'follow'], redirect_uris = None, website = None, to_file = None, api_base_url = __DEFAULT_BASE_URL, request_timeout = __DEFAULT_TIMEOUT):
""" """
Create a new app with given client_name and scopes (read, write, follow) Create a new app with given client_name and scopes (read, write, follow)
@ -50,12 +54,15 @@ class Mastodon:
} }
try: try:
if redirect_uris != None: if redirect_uris is not None:
request_data['redirect_uris'] = redirect_uris; request_data['redirect_uris'] = redirect_uris;
else: else:
request_data['redirect_uris'] = 'urn:ietf:wg:oauth:2.0:oob'; request_data['redirect_uris'] = 'urn:ietf:wg:oauth:2.0:oob';
if website is not None:
response = requests.post(api_base_url + '/api/v1/apps', data = request_data, timeout = request_timeout).json() request_data['website'] = website
response = requests.post(api_base_url + '/api/v1/apps', data = request_data, timeout = request_timeout)
response = response.json()
except Exception as e: except Exception as e:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
@ -99,6 +106,8 @@ class Mastodon:
self.access_token = access_token self.access_token = access_token
self.debug_requests = debug_requests self.debug_requests = debug_requests
self.ratelimit_method = ratelimit_method self.ratelimit_method = ratelimit_method
self._token_expired = datetime.datetime.now()
self._refresh_token = None
self.ratelimit_limit = 150 self.ratelimit_limit = 150
self.ratelimit_reset = time.time() self.ratelimit_reset = time.time()
@ -122,32 +131,97 @@ class Mastodon:
if self.access_token != None and os.path.isfile(self.access_token): if self.access_token != None and os.path.isfile(self.access_token):
with open(self.access_token, 'r') as token_file: with open(self.access_token, 'r') as token_file:
self.access_token = token_file.readline().rstrip() self.access_token = token_file.readline().rstrip()
@property
def token_expired(self) -> bool:
if self._token_expired < datetime.datetime.now():
return True
else:
return False
@token_expired.setter
def token_expired(self, value: int):
self._token_expired = datetime.datetime.now() + datetime.timedelta(seconds=value)
return
@property
def refresh_token(self) -> str:
return self._refresh_token
@refresh_token.setter
def refresh_token(self, value):
self._refresh_token = value
return
def log_in(self, username, password, scopes = ['read', 'write', 'follow'], to_file = None): def auth_request_url(self, client_id: str = None, redirect_uris: str = "urn:ietf:wg:oauth:2.0:oob") -> str:
"""Returns the url that a client needs to request the grant from the server.
https://mastodon.social/oauth/authorize?client_id=XXX&response_type=code&redirect_uris=YYY
""" """
Log in and sets access_token to what was returned. Note that your if client_id is None:
username is the e-mail you use to log in into mastodon. client_id = self.client_id
else:
if os.path.isfile(client_id):
with open(client_id, 'r') as secret_file:
client_id = secret_file.readline().rstrip()
params = {}
params['client_id'] = client_id
params['response_type'] = "code"
params['redirect_uri'] = redirect_uris
formatted_params = urlencode(params)
return "".join([self.api_base_url, "/oauth/authorize?", formatted_params])
Can persist access token to file, to be used in the constructor. def log_in(self, username: str = None, password: str = None,\
code: str = None, redirect_uri: str = "urn:ietf:wg:oauth:2.0:oob", refresh_token: str = None,\
Will throw a MastodonIllegalArgumentError if username / password scopes: list = ['read', 'write', 'follow'], to_file: str = None) -> str:
are wrong, scopes are not valid or granted scopes differ from requested.
Returns the access_token.
""" """
params = self.__generate_params(locals()) Docs: https://github.com/doorkeeper-gem/doorkeeper/wiki/Interacting-as-an-OAuth-client-with-Doorkeeper
Notes:
Your username is the e-mail you use to log in into mastodon.
Can persist access token to file, to be used in the constructor.
Supports refresh_token but Mastodon.social doesn't implement it at the moment.
Handles password, authorization_code, and refresh_token authentication.
Will throw a MastodonIllegalArgumentError if username / password
are wrong, scopes are not valid or granted scopes differ from requested.
Returns:
{
'scope': 'read',
'created_at': 1491599341,
'access_token': 'd8daf46d...',
'token_type': 'bearer'
}
"""
if username is not None and password is not None:
params = self.__generate_params(locals(), ['scopes', 'to_file', 'code', 'refresh_token'])
params['grant_type'] = 'password'
elif code is not None:
params = self.__generate_params(locals(), ['scopes', 'to_file', 'username', 'password', 'refresh_token'])
params['grant_type'] = 'authorization_code'
elif refresh_token is not None:
params = self.__generate_params(locals(), ['scopes', 'to_file', 'username', 'password', 'code'])
params['grant_type'] = 'refresh_token'
else:
raise MastodonIllegalArgumentError('Invalid user name, password, redirect_uris or scopes')
params['client_id'] = self.client_id params['client_id'] = self.client_id
params['client_secret'] = self.client_secret params['client_secret'] = self.client_secret
params['grant_type'] = 'password'
params['scope'] = " ".join(scopes)
try: try:
response = self.__api_request('POST', '/oauth/token', params, do_ratelimiting = False) response = self.__api_request('POST', '/oauth/token', params, do_ratelimiting = False)
self.access_token = response['access_token'] self.access_token = response['access_token']
self.refresh_token = response.get('refresh_token')
self.token_expired = int(response.get('expires_in', 0))
except Exception as e: except Exception as e:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
raise MastodonIllegalArgumentError('Invalid user name, password or scopes: %s' % e) raise MastodonIllegalArgumentError('Invalid user name, password, redirect_uris or scopes: %s' % e)
requested_scopes = " ".join(sorted(scopes)) requested_scopes = " ".join(sorted(scopes))
received_scopes = " ".join(sorted(response["scope"].split(" "))) received_scopes = " ".join(sorted(response["scope"].split(" ")))
@ -157,7 +231,7 @@ class Mastodon:
if to_file != None: if to_file != None:
with open(to_file, 'w') as token_file: with open(to_file, 'w') as token_file:
token_file.write(response['access_token'] + '\n') token_file.write(response + '\n')
return response['access_token'] return response['access_token']
@ -700,13 +774,13 @@ class Mastodon:
if self.ratelimit_method == "throw": if self.ratelimit_method == "throw":
raise MastodonRatelimitError("Hit rate limit.") raise MastodonRatelimitError("Hit rate limit.")
if self.ratelimit_method == "wait" or self.ratelimit_method == "pace": if self.ratelimit_method == "wait" or self.ratelimit_method == "pace":
to_next = self.ratelimit_reset - time.time() to_next = self.ratelimit_reset - time.time()
if to_next > 0: if to_next > 0:
# As a precaution, never sleep longer than 5 minutes # As a precaution, never sleep longer than 5 minutes
to_next = min(to_next, 5 * 60) to_next = min(to_next, 5 * 60)
time.sleep(to_next) time.sleep(to_next)
request_complete = False request_complete = False
return response return response