Added notificactions, me and account methods

This commit is contained in:
spla 2022-08-07 16:50:15 +02:00
pare 4d19b21940
commit 1354d25b4b
S'han modificat 1 arxius amb 134 adicions i 24 eliminacions

158
akkoma.py
Veure arxiu

@ -77,7 +77,7 @@ class AttribAccessDict(dict):
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
@ -145,7 +145,8 @@ class Akkoma:
__DICT_VERSION_POLL = "2.8.0"
__DICT_VERSION_STATUS = bigger_version(bigger_version(bigger_version(bigger_version(bigger_version("3.1.0",
__DICT_VERSION_MEDIA), __DICT_VERSION_ACCOUNT), __DICT_VERSION_APPLICATION), __DICT_VERSION_MENTION), __DICT_VERSION_POLL)
__DICT_VERSION_NOTIFICATION = bigger_version(bigger_version("1.0.0", __DICT_VERSION_ACCOUNT), __DICT_VERSION_STATUS)
@staticmethod
def create_app(app_name, scopes=__DEFAULT_SCOPES, redirect_uris=None, website=None, to_file=None, api_base_url=__DEFAULT_BASE_URL,
request_timeout=__DEFAULT_TIMEOUT, session=None):
@ -233,13 +234,13 @@ class Akkoma:
expect to be installed on the server. The function will throw an error if an unparseable
Version is specified. If no version is specified, Akkoma.py will set `akkoma_version` to the
detected version.
The version check mode can be set to "created" (the default behaviour), "changed" or "none". If set to
"created", Akkoma.py will throw an error if the version of Akkoma it is connected to is too old
to have an endpoint. If it is set to "changed", it will throw an error if the endpoints behaviour has
changed after the version of Akkoma that is connected has been released. If it is set to "none",
version checking is disabled.
`feature_set` can be used to enable behaviour specific to non-mainline Akkoma API implementations.
Details are documented in the functions that provide such functionality. Currently supported feature
sets are `mainline`, `fedibird` and `pleroma`.
@ -247,7 +248,7 @@ class Akkoma:
self.api_base_url = None
if not api_base_url is None:
self.api_base_url = Akkoma.__protocolize(api_base_url)
self.client_id = client_id
self.client_secret = client_secret
self.access_token = access_token
@ -255,9 +256,9 @@ class Akkoma:
self.ratelimit_method = ratelimit_method
self._token_expired = datetime.datetime.now()
self._refresh_token = None
self.__logged_in_id = None
self.ratelimit_limit = 300
self.ratelimit_reset = time.time()
self.ratelimit_remaining = 300
@ -274,14 +275,14 @@ class Akkoma:
self.feature_set = feature_set
if not self.feature_set in ["mainline", "fedibird", "pleroma"]:
raise AkkomaIllegalArgumentError('Requested invalid feature set')
# Token loading
if self.client_id is not None:
if os.path.isfile(self.client_id):
with open(self.client_id, 'r') as secret_file:
self.client_id = secret_file.readline().rstrip()
self.client_secret = secret_file.readline().rstrip()
try_base_url = secret_file.readline().rstrip()
if (not try_base_url is None) and len(try_base_url) != 0:
try_base_url = Akkoma.__protocolize(try_base_url)
@ -295,14 +296,14 @@ class Akkoma:
if self.access_token is not None and os.path.isfile(self.access_token):
with open(self.access_token, 'r') as token_file:
self.access_token = token_file.readline().rstrip()
try_base_url = token_file.readline().rstrip()
if (not try_base_url is None) and len(try_base_url) != 0:
try_base_url = Akkoma.__protocolize(try_base_url)
if not (self.api_base_url is None or try_base_url == self.api_base_url):
raise AkkomaIllegalArgumentError('Mismatch in base URLs between files and/or specified')
self.api_base_url = try_base_url
# Versioning
if akkoma_version == None:
self.retrieve_akkoma_version()
@ -311,11 +312,11 @@ class Akkoma:
self.akkoma_major, self.akkoma_minor, self.akkoma_patch = parse_version_string(akkoma_version)
except:
raise AkkomaVersionError("Bad version specified")
if not version_check_mode in ["created", "changed", "none"]:
raise AkkomaIllegalArgumentError("Invalid version check method.")
self.version_check_mode = version_check_mode
# Ratelimiting parameter check
if ratelimit_method not in ["throw", "wait", "pace"]:
raise AkkomaIllegalArgumentError("Invalid ratelimit method.")
@ -335,16 +336,35 @@ class Akkoma:
self.akkoma_major, self.akkoma_minor, self.akkoma_patch = parse_version_string(version_str)
return version_str
def verify_minimum_version(self, version_str, cached=False):
"""
Update version info from server and verify that at least the specified version is present.
If you specify "cached", the version info update part is skipped.
Returns True if version requirement is satisfied, False if not.
"""
if not cached:
self.retrieve_akkoma_version()
major, minor, patch = parse_version_string(version_str)
if major > self.akkoma_major:
return False
elif major == self.akkoma_major and minor > self.akkoma_minor:
return False
elif major == self.akkoma_major and minor == self.akkoma_minor and patch > self.akkoma_patch:
return False
return True
def log_in(self, client_id=None, client_secret=None, grant_type=None, username=None, password=None, code=None, redirect_uri="urn:ietf:wg:oauth:2.0:oob", refresh_token=None, scopes=__DEFAULT_SCOPES, to_file=None):
"""
Get the access token for a user.
The username is the e-mail used to log in into akkoma.
Can persist access token to file `to_file`, to be used in the constructor.
Handles password and OAuth-based authorization.
Will throw a `AkkomaIllegalArgumentError` if the OAuth or the
username / password credentials given are incorrect, and
`AkkomaAPIError` if all of the requested scopes were not granted.
@ -391,7 +411,7 @@ class Akkoma:
for scope_set in self.__SCOPE_SETS.keys():
if scope_set in received_scopes:
received_scopes += self.__SCOPE_SETS[scope_set]
if not set(scopes) <= set(received_scopes):
raise AkkomaAPIError(
'Granted scopes "' + " ".join(received_scopes) + '" do not contain all of the requested scopes "' + " ".join(scopes) + '".')
@ -400,14 +420,80 @@ class Akkoma:
with open(to_file, 'w') as token_file:
token_file.write(response['access_token'] + "\n")
token_file.write(self.api_base_url + "\n")
self.__logged_in_id = None
return response['access_token']
###
# Reading data: Notifications
###
#@api_version("1.0.0", "2.9.0", __DICT_VERSION_NOTIFICATION)
def notifications(self, id=None, account_id=None, max_id=None, min_id=None, since_id=None, limit=None, mentions_only=None):
"""
Fetch notifications (mentions, favourites, reblogs, follows) for the logged-in
user. Pass `account_id` to get only notifications originating from the given account.
Can be passed an `id` to fetch a single notification.
Returns a list of `notification dicts`_.
"""
if max_id != None:
max_id = self.__unpack_id(max_id)
if min_id != None:
min_id = self.__unpack_id(min_id)
if since_id != None:
since_id = self.__unpack_id(since_id)
if account_id != None:
account_id = self.__unpack_id(account_id)
if id is None:
params = self.__generate_params(locals(), ['id'])
return self.__api_request('GET', '/api/v1/notifications', params)
else:
id = self.__unpack_id(id)
url = '/api/v1/notifications/{0}'.format(str(id))
return self.__api_request('GET', url)
###
# Reading data: Accounts
###
@api_version("1.0.0", "1.0.0", __DICT_VERSION_ACCOUNT)
def account(self, id):
"""
Fetch account information by user `id`.
Does not require authentication for publicly visible accounts.
Returns a `user dict`_.
"""
id = self.__unpack_id(id)
url = '/api/v1/accounts/{0}'.format(str(id))
return self.__api_request('GET', url)
@api_version("1.0.0", "2.1.0", __DICT_VERSION_ACCOUNT)
def account_verify_credentials(self):
"""
Fetch logged-in user's account information.
Returns a `user dict`_ (Starting from 2.1.0, with an additional "source" field).
"""
return self.__api_request('GET', '/api/v1/accounts/verify_credentials')
@api_version("1.0.0", "2.1.0", __DICT_VERSION_ACCOUNT)
def me(self):
"""
Get this users account. Symonym for `account_verify_credentials()`, does exactly
the same thing, just exists becase `account_verify_credentials()` has a confusing
name.
"""
return self.account_verify_credentials()
###
# Internal helpers, dragons probably
###
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
@ -435,7 +521,7 @@ class Akkoma:
except:
raise AkkomaAPIError('Encountered invalid date.')
return json_object
@staticmethod
def __json_truefalse_parse(json_object):
"""
@ -887,7 +973,27 @@ class Akkoma:
params = self.__generate_params(params_initial, ['idempotency_key'])
return self.__api_request('POST', '/api/v1/statuses', params, headers = headers, use_json = use_json)
###
# Writing data: Notifications
###
#@api_version("1.0.0", "1.0.0", "1.0.0")
def notifications_clear(self):
"""
Clear out a users notifications
"""
self.__api_request('POST', '/api/v1/notifications/clear')
#@api_version("1.3.0", "2.9.2", "2.9.2")
def notifications_dismiss(self, id):
"""
Deletes a single notification
"""
id = self.__unpack_id(id)
url = '/api/v1/notifications/{0}/dismiss'.format(str(id))
self.__api_request('POST', url)
###
# Writing data: Media
###
@ -924,7 +1030,7 @@ class Akkoma:
if focus != None:
focus = str(focus[0]) + "," + str(focus[1])
media_file_description = (file_name, media_file, mime_type)
return self.__api_request('POST', '/api/v1/media',
files={'file': media_file_description},
@ -933,7 +1039,7 @@ class Akkoma:
def __unpack_id(self, id):
"""
Internal object-to-id converter
Checks if id is a dict that contains id and
returns the id inside, otherwise just returns
the id straight.
@ -952,7 +1058,7 @@ class Akkoma:
"""Internal helper for oauth code"""
self._refresh_token = value
return
@staticmethod
def __protocolize(base_url):
"""Internal add-protocol-to-url helper"""
@ -992,6 +1098,10 @@ class AkkomaAPIError(AkkomaError):
"""Raised when the akkoma API generates a response that cannot be handled"""
pass
class AkkomaNotFoundError(AkkomaAPIError):
"""Raised when the akkoma API returns a 404 Not Found error"""
pass
class AkkomaMalformedEventError(AkkomaError):
"""Raised when the server-sent event stream is malformed"""
pass