489 líneas
16 KiB
Python
489 líneas
16 KiB
Python
import os
|
|
import sys
|
|
import requests
|
|
from requests.models import urlencode
|
|
|
|
###
|
|
# Dict helper class.
|
|
# Defined at top level so it can be pickled.
|
|
###
|
|
class AttribAccessDict(dict):
|
|
def __getattr__(self, attr):
|
|
if attr in self:
|
|
return self[attr]
|
|
else:
|
|
raise AttributeError("Attribute not found: " + str(attr))
|
|
|
|
def __setattr__(self, attr, val):
|
|
if attr in self:
|
|
raise AttributeError("Attribute-style access is read only")
|
|
super(AttribAccessDict, self).__setattr__(attr, val)
|
|
|
|
class Forgejo:
|
|
|
|
name = 'Forgejo API wrapper'
|
|
|
|
def __init__(self, api_base_url=None, access_token=None, session=None):
|
|
|
|
self.__forgejo_config_path = "config/forgejo.txt"
|
|
|
|
is_setup = self.__check_setup(self)
|
|
|
|
if is_setup:
|
|
|
|
self.api_base_url = self.__get_parameter("api_base_url", self.__forgejo_config_path)
|
|
self.access_token = self.__get_parameter("access_token", self.__forgejo_config_path)
|
|
self.stats_repo = self.__get_parameter("stats_repo", self.__forgejo_config_path)
|
|
|
|
else:
|
|
|
|
self.api_base_url, self.access_token, self.stats_repo = self.__setup(self)
|
|
|
|
if session:
|
|
self.session = session
|
|
else:
|
|
self.session = requests.Session()
|
|
|
|
def admin_users_create(self, email, username, passwd, full_name=None, must_change_password=True, restricted=False, send_notify=True, source_id='0', visibility='private'):
|
|
|
|
data = {'email':email,
|
|
'full_name':username,
|
|
'login_name':username,
|
|
'must_change_password':must_change_password,
|
|
'password':passwd,
|
|
'restricted':restricted,
|
|
'send_notify':send_notify,
|
|
'source_id':source_id,
|
|
'username':username,
|
|
'visibility':visibility
|
|
}
|
|
|
|
endpoint = self.api_base_url + '/api/v1/admin/users?access_token={0}'.format(self.access_token)
|
|
|
|
response = self.__api_request('POST', endpoint, data)
|
|
|
|
registered = response.ok
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return (registered, response)
|
|
|
|
def admin_users_list(self, page=None, limit=None):
|
|
|
|
params = dict()
|
|
if page != None:
|
|
params['page'] = page
|
|
if limit != None:
|
|
params['limit'] = limit
|
|
params['token'] = self.access_token
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/admin/users?{0}'.format(formatted_params)
|
|
|
|
response = self.__api_request('GET', endpoint)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def notifications_new(self):
|
|
|
|
endpoint = self.api_base_url + '/api/v1/notifications/new?token={0}'.format(self.access_token)
|
|
|
|
response = self.__api_request('GET', endpoint)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
###
|
|
### repository
|
|
###
|
|
|
|
def repos_get_repo(self, owner, repo):
|
|
|
|
params = dict()
|
|
params['token'] = self.access_token
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}'.format(owner, repo, formatted_params)
|
|
|
|
response = self.__api_request('GET', endpoint)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def repos_issues_search(self, owner, state=None, labels=None, q=None, milestones=None, priority_repo_id=None, issue_type=None, since=None, before=None, assigned=None,
|
|
created=None, mentioned=None, review_requested=None, team=None, page=None, limit=None):
|
|
|
|
params = dict()
|
|
|
|
if state == None:
|
|
params['state'] = 'open'
|
|
else:
|
|
params['state'] = state
|
|
if labels != None:
|
|
params['labels'] = labels
|
|
if milestones != None:
|
|
params['milestones'] = milestones
|
|
if q != None:
|
|
params['q'] = q
|
|
if priority_repo_id != None:
|
|
params['priority_repo_id'] = priority_repo_id
|
|
if issue_type != None:
|
|
params['type'] = issue_type
|
|
if since != None:
|
|
params['since'] = since
|
|
if before != None:
|
|
params['before'] = before
|
|
if assigned != None:
|
|
params['assigned'] = assigned
|
|
if created != None:
|
|
params['created'] = created
|
|
if mentioned != None:
|
|
params['mentioned'] = mentioned
|
|
if review_requested != None:
|
|
params['review_requested'] = review_requested
|
|
params['owner'] = owner
|
|
if team != None:
|
|
params['team'] = team
|
|
if page != None:
|
|
params['page'] = page
|
|
if limit != None:
|
|
params['limit'] = limit
|
|
params['token'] = self.access_token
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/repos/issues/search?{0}'.format(formatted_params)
|
|
|
|
response = self.__api_request('GET', endpoint)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def repos_owner_repo_issues(self, owner, repo, state=None, labels=None, q=None, issue_type=None, milestones=None, since=None, before=None, created_by=None, assigned_by=None,
|
|
mentioned_by=None, page=None, limit=None):
|
|
"""
|
|
if since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00
|
|
"""
|
|
params = dict()
|
|
|
|
if state == None:
|
|
params['state'] = 'open'
|
|
else:
|
|
params['state'] = state
|
|
params['labels'] = labels
|
|
if q != None:
|
|
params['q'] = q
|
|
params['issue_type'] = issue_type
|
|
params['milestones'] = milestones
|
|
if since != None:
|
|
params['since'] = since
|
|
if before != None:
|
|
params['before'] = before
|
|
if created_by != None:
|
|
params['created_by'] = created_by
|
|
if assigned_by != None:
|
|
params['assigned_by'] = assigned_by
|
|
if mentioned_by != None:
|
|
params['mentioned_by'] = mentioned_by
|
|
params['page'] = page
|
|
params['limit'] = limit
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues?{2}'.format(owner, repo, formatted_params)
|
|
|
|
response = self.__api_request('GET', endpoint)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def repos_owner_repo_issues_comments(self, owner, repo, since=None, before=None, page=None, limit=None):
|
|
"""
|
|
if since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00
|
|
"""
|
|
params = dict()
|
|
if since != None:
|
|
params['since'] = since
|
|
if before != None:
|
|
params['before'] = before
|
|
params['page'] = page
|
|
params['limit'] = limit
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues/comments?{2}'.format(owner, repo, formatted_params)
|
|
|
|
response = self.__api_request('GET', endpoint)#, data)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def repo_owner_get_metada(self, owner, repo, filepath):
|
|
"""
|
|
Gets the metadata and contents (if a file) of an entry in a repository, or list of entries if a dir
|
|
"""
|
|
|
|
params = dict()
|
|
params['token'] = self.access_token
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
|
|
|
|
response = self.__api_request('GET', endpoint)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def repo_owner_create_file(self, owner, repo, filepath, author_email, author_name, branch, content, message):
|
|
"""
|
|
Create a file in a repository
|
|
"""
|
|
|
|
data = {"author":[{"email":author_email},{"name":author_name}],
|
|
"branch":branch,
|
|
"content":content,
|
|
"message":message
|
|
}
|
|
|
|
params = dict()
|
|
params['token'] = self.access_token
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
|
|
|
|
response = self.__api_request('POST', endpoint, data=data)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def repo_owner_update_file(self, owner, repo, filepath, author_email, author_name, branch, message, sha):
|
|
"""
|
|
Update a file in a repository
|
|
"""
|
|
data = {"author":[{"email":author_email},{"name":author_name}],
|
|
"branch":branch,
|
|
"message":message,
|
|
"sha":sha
|
|
}
|
|
|
|
params = dict()
|
|
params['token'] = self.access_token
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
|
|
|
|
response = self.__api_request('PUT', endpoint, data=data)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def repo_owner_delete_file(self, owner, repo, filepath, author_email, author_name, branch, message, sha):
|
|
"""
|
|
Delete a file in a repository
|
|
"""
|
|
data = {"author":[{"email":author_email},{"name":author_name}],
|
|
"branch":branch,
|
|
"commiter":[{"email":author_email},{"name":author_name}],
|
|
"message":message,
|
|
"new_branch":branch,
|
|
'sha':sha
|
|
}
|
|
|
|
params = dict()
|
|
params['token'] = self.access_token
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
|
|
|
|
response = self.__api_request('DELETE', endpoint, data=data)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
def user(self):
|
|
"""
|
|
Get the authenticated user
|
|
"""
|
|
|
|
params = dict()
|
|
params['token'] = self.access_token
|
|
formatted_params = urlencode(params)
|
|
|
|
endpoint = self.api_base_url + '/api/v1/user?{0}'.format(formatted_params)
|
|
|
|
response = self.__api_request('GET', endpoint)
|
|
|
|
response = self.__json_allow_dict_attrs(response.json())
|
|
|
|
return response
|
|
|
|
@staticmethod
|
|
def __check_setup(self):
|
|
|
|
is_setup = False
|
|
|
|
if not os.path.isfile(self.__forgejo_config_path):
|
|
print(f"File {self.__forgejo_config_path} not found, running setup.")
|
|
else:
|
|
is_setup = True
|
|
|
|
return is_setup
|
|
|
|
@staticmethod
|
|
def __setup(self):
|
|
|
|
if not os.path.exists('config'):
|
|
os.makedirs('config')
|
|
|
|
self.api_base_url = input("Forgejo API base url, in ex. 'https://yourforgejo.instance': ")
|
|
self.access_token = input("Forgejo access token: ")
|
|
self.stats_repo = input("Stats repo: ")
|
|
|
|
|
|
if not os.path.exists(self.__forgejo_config_path):
|
|
with open(self.__forgejo_config_path, 'w'): pass
|
|
print(f"{self.__forgejo_config_path} created!")
|
|
|
|
with open(self.__forgejo_config_path, 'a') as the_file:
|
|
print("Writing forgejo parameters to " + self.__forgejo_config_path)
|
|
the_file.write(f'api_base_url: {self.api_base_url}\n'+f'access_token: {self.access_token}\nstats_repo: {self.stats_repo}\n')
|
|
|
|
return (self.api_base_url, self.access_token, self.stats_repo)
|
|
|
|
@staticmethod
|
|
def __get_parameter(parameter, file_path ):
|
|
|
|
with open( file_path ) as f:
|
|
for line in f:
|
|
if line.startswith( parameter ):
|
|
return line.replace(parameter + ":", "").strip()
|
|
|
|
print(f'{file_path} Missing parameter {parameter}')
|
|
sys.exit(0)
|
|
|
|
def __api_request(self, method, endpoint, data={}):
|
|
|
|
response = None
|
|
|
|
try:
|
|
|
|
kwargs = dict(data=data)
|
|
|
|
response = self.session.request(method, url = endpoint, **kwargs)
|
|
|
|
except Exception as e:
|
|
|
|
raise ForgejoNetworkError(f"Could not complete request: {e}")
|
|
|
|
if response is None:
|
|
|
|
raise ForgejoIllegalArgumentError("Illegal request.")
|
|
|
|
if not response.ok:
|
|
|
|
try:
|
|
if isinstance(response, dict) and 'error' in response:
|
|
error_msg = response['error']
|
|
elif isinstance(response, str):
|
|
error_msg = response
|
|
else:
|
|
error_msg = None
|
|
except ValueError:
|
|
error_msg = None
|
|
|
|
if response.status_code == 404:
|
|
ex_type = ForgejoNotFoundError
|
|
if not error_msg:
|
|
error_msg = 'Endpoint not found.'
|
|
# this is for compatibility with older versions
|
|
# which raised ForgejoAPIError('Endpoint not found.')
|
|
# on any 404
|
|
elif response.status_code == 401:
|
|
ex_type = ForgejoUnauthorizedError
|
|
elif response.status_code == 422:
|
|
return response
|
|
elif response.status_code == 500:
|
|
ex_type = ForgejoInternalServerError
|
|
elif response.status_code == 502:
|
|
ex_type = ForgejoBadGatewayError
|
|
elif response.status_code == 503:
|
|
ex_type = ForgejoServiceUnavailableError
|
|
elif response.status_code == 504:
|
|
ex_type = ForgejoGatewayTimeoutError
|
|
elif response.status_code >= 500 and \
|
|
response.status_code <= 511:
|
|
ex_type = ForgejoServerError
|
|
else:
|
|
ex_type = ForgejoAPIError
|
|
|
|
raise ex_type(
|
|
'Forgejo API returned error',
|
|
response.status_code,
|
|
response.reason,
|
|
error_msg)
|
|
|
|
else:
|
|
|
|
return response
|
|
|
|
@staticmethod
|
|
def __json_allow_dict_attrs(json_object):
|
|
"""
|
|
Makes it possible to use attribute notation to access a dicts
|
|
elements, while still allowing the dict to act as a dict.
|
|
"""
|
|
if isinstance(json_object, dict):
|
|
return AttribAccessDict(json_object)
|
|
return json_objecte
|
|
|
|
##
|
|
# Exceptions
|
|
##
|
|
class ForgejoError(Exception):
|
|
"""Base class for Forgejo.py exceptions"""
|
|
|
|
class ForgejoIOError(IOError, ForgejoError):
|
|
"""Base class for Forgejo.py I/O errors"""
|
|
|
|
class ForgejoNetworkError(ForgejoIOError):
|
|
"""Raised when network communication with the server fails"""
|
|
pass
|
|
class ForgejoAPIError(ForgejoError):
|
|
"""Raised when the forgejo API generates a response that cannot be handled"""
|
|
pass
|
|
class ForgejoServerError(ForgejoAPIError):
|
|
"""Raised if the Server is malconfigured and returns a 5xx error code"""
|
|
pass
|
|
class ForgejoInternalServerError(ForgejoServerError):
|
|
"""Raised if the Server returns a 500 error"""
|
|
pass
|
|
|
|
class ForgejoBadGatewayError(ForgejoServerError):
|
|
"""Raised if the Server returns a 502 error"""
|
|
pass
|
|
|
|
class ForgejoServiceUnavailableError(ForgejoServerError):
|
|
"""Raised if the Server returns a 503 error"""
|
|
pass
|
|
class ForgejoGatewayTimeoutError(ForgejoServerError):
|
|
"""Raised if the Server returns a 504 error"""
|
|
pass
|
|
class ForgejoNotFoundError(ForgejoAPIError):
|
|
"""Raised when the forgejo API returns a 404 Not Found error"""
|
|
pass
|
|
|
|
class ForgejoUnauthorizedError(ForgejoAPIError):
|
|
"""Raised when the forgejo API returns a 401 Unauthorized error
|
|
|
|
This happens when an OAuth token is invalid or has been revoked,
|
|
or when trying to access an endpoint that can't be used without
|
|
authentication without providing credentials."""
|
|
pass
|