271 líneas
8,5 KiB
Python
271 líneas
8,5 KiB
Python
import os
|
|
import sys
|
|
import requests
|
|
from requests.models import urlencode
|
|
import pdb
|
|
|
|
###
|
|
# Dict helper class.
|
|
# Defined at top level so it can be pickled.
|
|
###
|
|
class AttribAccessDict(dict):
|
|
def __getattr__(self, attr):
|
|
if attr in self:
|
|
return self[attr]
|
|
else:
|
|
raise AttributeError("Attribute not found: " + str(attr))
|
|
|
|
def __setattr__(self, attr, val):
|
|
if attr in self:
|
|
raise AttributeError("Attribute-style access is read only")
|
|
super(AttribAccessDict, self).__setattr__(attr, val)
|
|
|
|
class Pixelfed:
|
|
|
|
name = 'Pixelfed API wrapper'
|
|
|
|
def __init__(self, api_base_url=None, access_token=None, pixelfed_path=None, session=None):
|
|
|
|
self.__pixelfed_config_path = "config/pixelfed.txt"
|
|
|
|
is_setup = self.__check_setup(self)
|
|
|
|
if is_setup:
|
|
|
|
self.api_base_url = self.__get_parameter("api_base_url", self.__pixelfed_config_path)
|
|
self.access_token = self.__get_parameter("access_token", self.__pixelfed_config_path)
|
|
self.pixelfed_path = self.__get_parameter("pixelfed_path", self.__pixelfed_config_path)
|
|
|
|
else:
|
|
|
|
self.api_base_url, self.access_token, self.pixelfed_path = self.__setup(self)
|
|
|
|
if session:
|
|
self.session = session
|
|
else:
|
|
self.session = requests.Session()
|
|
|
|
def email_verification(self, email, username):
|
|
|
|
data = {'email':email,
|
|
'username':username
|
|
}
|
|
ut = self.access_token
|
|
|
|
rt = self.access_token
|
|
|
|
endpoint = self.api_base_url + '/api/v1.1/auth/iarer?ut={0}&rt={1}'.format(ut, rt)
|
|
|
|
response = self.__api_request('GET', endpoint, data)
|
|
|
|
registered = response.ok
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return (registered, response)
|
|
|
|
def verify_credentials(self):
|
|
|
|
endpoint = self.api_base_url + '/api/v1/accounts/verify_credentials'
|
|
|
|
response = self.__api_request('GET', endpoint)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def pre_flight_check(self):
|
|
|
|
endpoint = self.api_base_url + '/api/v1.1/auth/iarpfc'
|
|
|
|
response = self.__api_request('GET', endpoint)
|
|
|
|
registered = response.ok
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return (registered, response)
|
|
|
|
def in_app_registration(self, email, username, passwd):
|
|
|
|
data = {'email':email,
|
|
'username':username,
|
|
'password':passwd
|
|
}
|
|
|
|
endpoint = self.api_base_url + '/api/v1.1/auth/iar'
|
|
|
|
response = self.__api_request('POST', endpoint, data)
|
|
|
|
registered = response.ok
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return (registered, response)
|
|
|
|
@staticmethod
|
|
def __check_setup(self):
|
|
|
|
is_setup = False
|
|
|
|
if not os.path.isfile(self.__pixelfed_config_path):
|
|
print(f"File {self.__pixelfed_config_path} not found, running setup.")
|
|
else:
|
|
is_setup = True
|
|
|
|
return is_setup
|
|
|
|
@staticmethod
|
|
def __setup(self):
|
|
|
|
if not os.path.exists('config'):
|
|
os.makedirs('config')
|
|
|
|
self.api_base_url = input("Pixelfed API base url, in ex. 'https://yourpixelfed.instance': ")
|
|
self.access_token = input("Pixelfed access token: ")
|
|
self.pixelfed_path = input("Pixelfed root dir: ")
|
|
|
|
if not os.path.exists(self.__pixelfed_config_path):
|
|
with open(self.__pixelfed_config_path, 'w'): pass
|
|
print(f"{self.__pixelfed_config_path} created!")
|
|
|
|
with open(self.__pixelfed_config_path, 'a') as the_file:
|
|
print("Writing pixelfed parameters to " + self.__pixelfed_config_path)
|
|
the_file.write(f'api_base_url: {self.api_base_url}\n'+f'access_token: {self.access_token}\n'+f'pixelfed_path: {self.pixelfed_path}\n')
|
|
|
|
return (self.api_base_url, self.access_token, self.pixelfed_path)
|
|
|
|
@staticmethod
|
|
def __get_parameter(parameter, file_path ):
|
|
|
|
with open( file_path ) as f:
|
|
for line in f:
|
|
if line.startswith( parameter ):
|
|
return line.replace(parameter + ":", "").strip()
|
|
|
|
print(f'{file_path} Missing parameter {parameter}')
|
|
sys.exit(0)
|
|
|
|
def __api_request(self, method, endpoint, data={}):
|
|
|
|
headers = {
|
|
'Authorization': f"Bearer {self.access_token}",
|
|
'Accept': 'application/json',
|
|
}
|
|
|
|
response = None
|
|
|
|
try:
|
|
|
|
kwargs = dict(data=data)
|
|
|
|
response = self.session.request(method, url = endpoint, headers = headers ,**kwargs)
|
|
|
|
except Exception as e:
|
|
|
|
raise PixelfedNetworkError(f"Could not complete request: {e}")
|
|
|
|
if response is None:
|
|
|
|
raise PixelfedIllegalArgumentError("Illegal request.")
|
|
|
|
if not response.ok:
|
|
|
|
try:
|
|
if isinstance(response, dict) and 'error' in response:
|
|
error_msg = response['error']
|
|
elif isinstance(response, str):
|
|
error_msg = response
|
|
else:
|
|
error_msg = None
|
|
except ValueError:
|
|
error_msg = None
|
|
|
|
if response.status_code == 404:
|
|
ex_type = PixelfedNotFoundError
|
|
if not error_msg:
|
|
error_msg = 'Endpoint not found.'
|
|
# this is for compatibility with older versions
|
|
# which raised PixelfedAPIError('Endpoint not found.')
|
|
# on any 404
|
|
elif response.status_code == 401:
|
|
ex_type = PixelfedUnauthorizedError
|
|
elif response.status_code == 422:
|
|
return response
|
|
elif response.status_code == 500:
|
|
ex_type = PixelfedInternalServerError
|
|
elif response.status_code == 502:
|
|
ex_type = PixelfedBadGatewayError
|
|
elif response.status_code == 503:
|
|
ex_type = PixelfedServiceUnavailableError
|
|
elif response.status_code == 504:
|
|
ex_type = PixelfedGatewayTimeoutError
|
|
elif response.status_code >= 500 and \
|
|
response.status_code <= 511:
|
|
ex_type = PixelfedServerError
|
|
else:
|
|
ex_type = PixelfedAPIError
|
|
|
|
raise ex_type(
|
|
'Pixelfed API returned error',
|
|
response.status_code,
|
|
response.reason,
|
|
error_msg)
|
|
|
|
else:
|
|
|
|
return response
|
|
|
|
@staticmethod
|
|
def __json_allow_dict_attrs(json_object):
|
|
"""
|
|
Makes it possible to use attribute notation to access a dicts
|
|
elements, while still allowing the dict to act as a dict.
|
|
"""
|
|
if isinstance(json_object, dict):
|
|
return AttribAccessDict(json_object)
|
|
return json_objecte
|
|
|
|
##
|
|
# Exceptions
|
|
##
|
|
class PixelfedError(Exception):
|
|
"""Base class for Pixelfed.py exceptions"""
|
|
|
|
class PixelfedIOError(IOError, PixelfedError):
|
|
"""Base class for Pixelfed.py I/O errors"""
|
|
|
|
class PixelfedNetworkError(PixelfedIOError):
|
|
"""Raised when network communication with the server fails"""
|
|
pass
|
|
class PixelfedAPIError(PixelfedError):
|
|
"""Raised when the pixelfed API generates a response that cannot be handled"""
|
|
pass
|
|
class PixelfedServerError(PixelfedAPIError):
|
|
"""Raised if the Server is malconfigured and returns a 5xx error code"""
|
|
pass
|
|
class PixelfedInternalServerError(PixelfedServerError):
|
|
"""Raised if the Server returns a 500 error"""
|
|
pass
|
|
|
|
class PixelfedBadGatewayError(PixelfedServerError):
|
|
"""Raised if the Server returns a 502 error"""
|
|
pass
|
|
|
|
class PixelfedServiceUnavailableError(PixelfedServerError):
|
|
"""Raised if the Server returns a 503 error"""
|
|
pass
|
|
class PixelfedGatewayTimeoutError(PixelfedServerError):
|
|
"""Raised if the Server returns a 504 error"""
|
|
pass
|
|
class PixelfedNotFoundError(PixelfedAPIError):
|
|
"""Raised when the pixelfed API returns a 404 Not Found error"""
|
|
pass
|
|
|
|
class PixelfedUnauthorizedError(PixelfedAPIError):
|
|
"""Raised when the pixelfed API returns a 401 Unauthorized error
|
|
|
|
This happens when an OAuth token is invalid or has been revoked,
|
|
or when trying to access an endpoint that can't be used without
|
|
authentication without providing credentials."""
|
|
pass
|