stats/app/libraries/forgejo.py

489 líneas
16 KiB
Python

import os
import sys
import requests
from requests.models import urlencode
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Forgejo:
name = 'Forgejo API wrapper'
def __init__(self, api_base_url=None, access_token=None, session=None):
self.__forgejo_config_path = "config/forgejo.txt"
is_setup = self.__check_setup(self)
if is_setup:
self.api_base_url = self.__get_parameter("api_base_url", self.__forgejo_config_path)
self.access_token = self.__get_parameter("access_token", self.__forgejo_config_path)
self.stats_repo = self.__get_parameter("stats_repo", self.__forgejo_config_path)
else:
self.api_base_url, self.access_token, self.stats_repo = self.__setup(self)
if session:
self.session = session
else:
self.session = requests.Session()
def admin_users_create(self, email, username, passwd, full_name=None, must_change_password=True, restricted=False, send_notify=True, source_id='0', visibility='private'):
data = {'email':email,
'full_name':username,
'login_name':username,
'must_change_password':must_change_password,
'password':passwd,
'restricted':restricted,
'send_notify':send_notify,
'source_id':source_id,
'username':username,
'visibility':visibility
}
endpoint = self.api_base_url + '/api/v1/admin/users?access_token={0}'.format(self.access_token)
response = self.__api_request('POST', endpoint, data)
registered = response.ok
response = self.__json_allow_dict_attrs(response.json())
return (registered, response)
def admin_users_list(self, page=None, limit=None):
params = dict()
if page != None:
params['page'] = page
if limit != None:
params['limit'] = limit
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/admin/users?{0}'.format(formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def notifications_new(self):
endpoint = self.api_base_url + '/api/v1/notifications/new?token={0}'.format(self.access_token)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
###
### repository
###
def repos_get_repo(self, owner, repo):
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}'.format(owner, repo, formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def repos_issues_search(self, owner, state=None, labels=None, q=None, milestones=None, priority_repo_id=None, issue_type=None, since=None, before=None, assigned=None,
created=None, mentioned=None, review_requested=None, team=None, page=None, limit=None):
params = dict()
if state == None:
params['state'] = 'open'
else:
params['state'] = state
if labels != None:
params['labels'] = labels
if milestones != None:
params['milestones'] = milestones
if q != None:
params['q'] = q
if priority_repo_id != None:
params['priority_repo_id'] = priority_repo_id
if issue_type != None:
params['type'] = issue_type
if since != None:
params['since'] = since
if before != None:
params['before'] = before
if assigned != None:
params['assigned'] = assigned
if created != None:
params['created'] = created
if mentioned != None:
params['mentioned'] = mentioned
if review_requested != None:
params['review_requested'] = review_requested
params['owner'] = owner
if team != None:
params['team'] = team
if page != None:
params['page'] = page
if limit != None:
params['limit'] = limit
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/issues/search?{0}'.format(formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def repos_owner_repo_issues(self, owner, repo, state=None, labels=None, q=None, issue_type=None, milestones=None, since=None, before=None, created_by=None, assigned_by=None,
mentioned_by=None, page=None, limit=None):
"""
if since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00
"""
params = dict()
if state == None:
params['state'] = 'open'
else:
params['state'] = state
params['labels'] = labels
if q != None:
params['q'] = q
params['issue_type'] = issue_type
params['milestones'] = milestones
if since != None:
params['since'] = since
if before != None:
params['before'] = before
if created_by != None:
params['created_by'] = created_by
if assigned_by != None:
params['assigned_by'] = assigned_by
if mentioned_by != None:
params['mentioned_by'] = mentioned_by
params['page'] = page
params['limit'] = limit
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues?{2}'.format(owner, repo, formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def repos_owner_repo_issues_comments(self, owner, repo, since=None, before=None, page=None, limit=None):
"""
if since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00
"""
params = dict()
if since != None:
params['since'] = since
if before != None:
params['before'] = before
params['page'] = page
params['limit'] = limit
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues/comments?{2}'.format(owner, repo, formatted_params)
response = self.__api_request('GET', endpoint)#, data)
response = self.__json_allow_dict_attrs(response.json())
return response
def repo_owner_get_metada(self, owner, repo, filepath):
"""
Gets the metadata and contents (if a file) of an entry in a repository, or list of entries if a dir
"""
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def repo_owner_create_file(self, owner, repo, filepath, author_email, author_name, branch, content, message):
"""
Create a file in a repository
"""
data = {"author":[{"email":author_email},{"name":author_name}],
"branch":branch,
"content":content,
"message":message
}
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
response = self.__api_request('POST', endpoint, data=data)
response = self.__json_allow_dict_attrs(response.json())
return response
def repo_owner_update_file(self, owner, repo, filepath, author_email, author_name, branch, message, sha):
"""
Update a file in a repository
"""
data = {"author":[{"email":author_email},{"name":author_name}],
"branch":branch,
"message":message,
"sha":sha
}
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
response = self.__api_request('PUT', endpoint, data=data)
response = self.__json_allow_dict_attrs(response.json())
return response
def repo_owner_delete_file(self, owner, repo, filepath, author_email, author_name, branch, message, sha):
"""
Delete a file in a repository
"""
data = {"author":[{"email":author_email},{"name":author_name}],
"branch":branch,
"commiter":[{"email":author_email},{"name":author_name}],
"message":message,
"new_branch":branch,
'sha':sha
}
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
response = self.__api_request('DELETE', endpoint, data=data)
response = self.__json_allow_dict_attrs(response.json())
return response
def user(self):
"""
Get the authenticated user
"""
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/user?{0}'.format(formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
@staticmethod
def __check_setup(self):
is_setup = False
if not os.path.isfile(self.__forgejo_config_path):
print(f"File {self.__forgejo_config_path} not found, running setup.")
else:
is_setup = True
return is_setup
@staticmethod
def __setup(self):
if not os.path.exists('config'):
os.makedirs('config')
self.api_base_url = input("Forgejo API base url, in ex. 'https://yourforgejo.instance': ")
self.access_token = input("Forgejo access token: ")
self.stats_repo = input("Stats repo: ")
if not os.path.exists(self.__forgejo_config_path):
with open(self.__forgejo_config_path, 'w'): pass
print(f"{self.__forgejo_config_path} created!")
with open(self.__forgejo_config_path, 'a') as the_file:
print("Writing forgejo parameters to " + self.__forgejo_config_path)
the_file.write(f'api_base_url: {self.api_base_url}\n'+f'access_token: {self.access_token}\nstats_repo: {self.stats_repo}\n')
return (self.api_base_url, self.access_token, self.stats_repo)
@staticmethod
def __get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
def __api_request(self, method, endpoint, data={}):
response = None
try:
kwargs = dict(data=data)
response = self.session.request(method, url = endpoint, **kwargs)
except Exception as e:
raise ForgejoNetworkError(f"Could not complete request: {e}")
if response is None:
raise ForgejoIllegalArgumentError("Illegal request.")
if not response.ok:
try:
if isinstance(response, dict) and 'error' in response:
error_msg = response['error']
elif isinstance(response, str):
error_msg = response
else:
error_msg = None
except ValueError:
error_msg = None
if response.status_code == 404:
ex_type = ForgejoNotFoundError
if not error_msg:
error_msg = 'Endpoint not found.'
# this is for compatibility with older versions
# which raised ForgejoAPIError('Endpoint not found.')
# on any 404
elif response.status_code == 401:
ex_type = ForgejoUnauthorizedError
elif response.status_code == 422:
return response
elif response.status_code == 500:
ex_type = ForgejoInternalServerError
elif response.status_code == 502:
ex_type = ForgejoBadGatewayError
elif response.status_code == 503:
ex_type = ForgejoServiceUnavailableError
elif response.status_code == 504:
ex_type = ForgejoGatewayTimeoutError
elif response.status_code >= 500 and \
response.status_code <= 511:
ex_type = ForgejoServerError
else:
ex_type = ForgejoAPIError
raise ex_type(
'Forgejo API returned error',
response.status_code,
response.reason,
error_msg)
else:
return response
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_objecte
##
# Exceptions
##
class ForgejoError(Exception):
"""Base class for Forgejo.py exceptions"""
class ForgejoIOError(IOError, ForgejoError):
"""Base class for Forgejo.py I/O errors"""
class ForgejoNetworkError(ForgejoIOError):
"""Raised when network communication with the server fails"""
pass
class ForgejoAPIError(ForgejoError):
"""Raised when the forgejo API generates a response that cannot be handled"""
pass
class ForgejoServerError(ForgejoAPIError):
"""Raised if the Server is malconfigured and returns a 5xx error code"""
pass
class ForgejoInternalServerError(ForgejoServerError):
"""Raised if the Server returns a 500 error"""
pass
class ForgejoBadGatewayError(ForgejoServerError):
"""Raised if the Server returns a 502 error"""
pass
class ForgejoServiceUnavailableError(ForgejoServerError):
"""Raised if the Server returns a 503 error"""
pass
class ForgejoGatewayTimeoutError(ForgejoServerError):
"""Raised if the Server returns a 504 error"""
pass
class ForgejoNotFoundError(ForgejoAPIError):
"""Raised when the forgejo API returns a 404 Not Found error"""
pass
class ForgejoUnauthorizedError(ForgejoAPIError):
"""Raised when the forgejo API returns a 401 Unauthorized error
This happens when an OAuth token is invalid or has been revoked,
or when trying to access an endpoint that can't be used without
authentication without providing credentials."""
pass