import os import sys import requests from getpass import getpass from requests.models import urlencode import json import pdb ### # Dict helper class. # Defined at top level so it can be pickled. ### class AttribAccessDict(dict): def __getattr__(self, attr): if attr in self: return self[attr] else: raise AttributeError("Attribute not found: " + str(attr)) def __setattr__(self, attr, val): if attr in self: raise AttributeError("Attribute-style access is read only") super(AttribAccessDict, self).__setattr__(attr, val) class Forgejo: name = 'Forgejo API wrapper' def __init__(self, api_base_url=None, access_token=None, session=None): self.__forgejo_config_path = "config/forgejo.txt" is_setup = self.__check_setup(self) if is_setup: self.api_base_url = self.__get_parameter("api_base_url", self.__forgejo_config_path) self.access_token = self.__get_parameter("access_token", self.__forgejo_config_path) else: self.api_base_url, self.access_token = self.__setup(self) if session: self.session = session else: self.session = requests.Session() def admin_users_create(self, email, username, passwd, full_name=None, must_change_password=True, restricted=False, send_notify=True, source_id=0, visibility="public"): data = { 'email':email, 'full_name':username, 'login_name':username, 'must_change_password':must_change_password, 'password':passwd, 'restricted':restricted, 'send_notify':send_notify, 'source_id':source_id, 'username':username, 'visibility':visibility } endpoint = self.api_base_url + '/api/v1/admin/users?token={0}'.format(self.access_token) response = self.__api_request('POST', endpoint, data) registered = response.ok response = self.__json_allow_dict_attrs(response.json()) return (registered, response) def admin_users_list(self, page=None, limit=None): params = dict() if page != None: params['page'] = page if limit != None: params['limit'] = limit params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/admin/users?{0}'.format(formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def notifications_new(self): endpoint = self.api_base_url + '/api/v1/notifications/new?token={0}'.format(self.access_token) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response ### ### repository ### def repos_get_repo(self, owner, repo): params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}'.format(owner, repo, formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def repos_issues_search(self, owner, state=None, labels=None, q=None, milestones=None, priority_repo_id=None, issue_type=None, since=None, before=None, assigned=None, created=None, mentioned=None, review_requested=None, team=None, page=None, limit=None): params = dict() if state == None: params['state'] = 'open' else: params['state'] = state if labels != None: params['labels'] = labels if milestones != None: params['milestones'] = milestones if q != None: params['q'] = q if priority_repo_id != None: params['priority_repo_id'] = priority_repo_id if issue_type != None: params['type'] = issue_type if since != None: params['since'] = since if before != None: params['before'] = before if assigned != None: params['assigned'] = assigned if created != None: params['created'] = created if mentioned != None: params['mentioned'] = mentioned if review_requested != None: params['review_requested'] = review_requested params['owner'] = owner if team != None: params['team'] = team if page != None: params['page'] = page if limit != None: params['limit'] = limit params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/issues/search?{0}'.format(formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def repos_owner_repo_issues(self, owner, repo, state=None, labels=None, q=None, issue_type=None, milestones=None, since=None, before=None, created_by=None, assigned_by=None, mentioned_by=None, page=None, limit=None): """ List a repository's issues If since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00 """ params = dict() if state == None: params['state'] = 'open' else: params['state'] = state params['labels'] = labels if q != None: params['q'] = q params['issue_type'] = issue_type params['milestones'] = milestones if since != None: params['since'] = since if before != None: params['before'] = before if created_by != None: params['created_by'] = created_by if assigned_by != None: params['assigned_by'] = assigned_by if mentioned_by != None: params['mentioned_by'] = mentioned_by params['page'] = page params['limit'] = limit formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues?{2}'.format(owner, repo, formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def repos_owner_repo_issues_pinned(self, owner, repo): """ List a repo's pinned isssues """ params = dict() formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues?{2}'.format(owner, repo, formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def repos_owner_repo_issues_comments(self, owner, repo, since=None, before=None, page=None, limit=None): """ if since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00 """ params = dict() if since != None: params['since'] = since if before != None: params['before'] = before params['page'] = page params['limit'] = limit formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues/comments?{2}'.format(owner, repo, formatted_params) response = self.__api_request('GET', endpoint)#, data) response = self.__json_allow_dict_attrs(response.json()) return response def repo_owner_get_metadata(self, owner, repo, filepath): """ Gets the metadata and contents (if a file) of an entry in a repository, or list of entries if a dir """ params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def repo_owner_create_file(self, owner, repo, filepath, author_email, author_name, branch, content, message): """ Create a file in a repository """ content = json.dumps(content.decode('UTF-8')).replace('"','') data = {"author":{ "email":author_email, "name":author_name }, "branch":branch, "content":content, "message":message } params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params) response = self.__api_request('POST', endpoint, data) response = self.__json_allow_dict_attrs(response.json()) return response def repo_owner_update_file(self, owner, repo, filepath, author_email, author_name, branch, content, message, sha): """ Update a file in a repository """ content = json.dumps(content.decode('UTF-8')).replace('"','') data = {"author":{ "email":author_email, "name":author_name }, "branch":branch, "content":content, "message":message, "sha":sha } params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params) response = self.__api_request('PUT', endpoint, data) response = self.__json_allow_dict_attrs(response.json()) return response def repo_owner_delete_file(self, owner, repo, filepath, author_email, author_name, branch, message, sha): """ Delete a file in a repository """ data = {"author":{ "email":author_email, "name":author_name }, "branch":branch, "commiter":{ "email":author_email, "name":author_name }, "message":message, "new_branch":branch, 'sha':sha } params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params) response = self.__api_request('DELETE', endpoint, data) response = self.__json_allow_dict_attrs(response.json()) return response def user(self): """ Get the authenticated user """ params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/user?{0}'.format(formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response @staticmethod def __check_setup(self): is_setup = False if not os.path.isfile(self.__forgejo_config_path): print(f"File {self.__forgejo_config_path} not found, running setup.") else: is_setup = True return is_setup @staticmethod def __setup(self): if not os.path.exists('config'): os.makedirs('config') self.api_base_url = input("Forgejo API base url, in ex. 'https://yourforgejo.instance': ") self.access_token = getpass("Forgejo access token: ") if not os.path.exists(self.__forgejo_config_path): with open(self.__forgejo_config_path, 'w'): pass print(f"{self.__forgejo_config_path} created!") with open(self.__forgejo_config_path, 'a') as the_file: print("Writing forgejo parameters to " + self.__forgejo_config_path) the_file.write(f'api_base_url: {self.api_base_url}\n'+f'access_token: {self.access_token}') return (self.api_base_url, self.access_token) @staticmethod def __get_parameter(parameter, file_path ): with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() print(f'{file_path} Missing parameter {parameter}') sys.exit(0) def __api_request(self, method, endpoint, data={}): headers = { 'Accept':'application/json', 'Content-Type':'application/json' } response = None try: response = self.session.request(method, url = endpoint, headers=headers, json=data) except Exception as e: raise ForgejoNetworkError(f"Could not complete request: {e}") if response is None: raise ForgejoIllegalArgumentError("Illegal request.") if not response.ok: try: if isinstance(response, dict) and 'error' in response: error_msg = response['error'] elif isinstance(response, str): error_msg = response else: error_msg = None except ValueError: error_msg = None if response.status_code == 404: ex_type = ForgejoNotFoundError if not error_msg: error_msg = 'Endpoint not found.' # this is for compatibility with older versions # which raised ForgejoAPIError('Endpoint not found.') # on any 404 elif response.status_code == 401: ex_type = ForgejoUnauthorizedError elif response.status_code == 422: return response elif response.status_code == 500: ex_type = ForgejoInternalServerError elif response.status_code == 502: ex_type = ForgejoBadGatewayError elif response.status_code == 503: ex_type = ForgejoServiceUnavailableError elif response.status_code == 504: ex_type = ForgejoGatewayTimeoutError elif response.status_code >= 500 and \ response.status_code <= 511: ex_type = ForgejoServerError else: ex_type = ForgejoAPIError raise ex_type( 'Forgejo API returned error', response.status_code, response.reason, error_msg) else: return response @staticmethod def __json_allow_dict_attrs(json_object): """ Makes it possible to use attribute notation to access a dicts elements, while still allowing the dict to act as a dict. """ if isinstance(json_object, dict): return AttribAccessDict(json_object) return json_object ## # Exceptions ## class ForgejoError(Exception): """Base class for Forgejo.py exceptions""" class ForgejoIOError(IOError, ForgejoError): """Base class for Forgejo.py I/O errors""" class ForgejoNetworkError(ForgejoIOError): """Raised when network communication with the server fails""" pass class ForgejoAPIError(ForgejoError): """Raised when the forgejo API generates a response that cannot be handled""" pass class ForgejoServerError(ForgejoAPIError): """Raised if the Server is malconfigured and returns a 5xx error code""" pass class ForgejoInternalServerError(ForgejoServerError): """Raised if the Server returns a 500 error""" pass class ForgejoBadGatewayError(ForgejoServerError): """Raised if the Server returns a 502 error""" pass class ForgejoServiceUnavailableError(ForgejoServerError): """Raised if the Server returns a 503 error""" pass class ForgejoGatewayTimeoutError(ForgejoServerError): """Raised if the Server returns a 504 error""" pass class ForgejoNotFoundError(ForgejoAPIError): """Raised when the forgejo API returns a 404 Not Found error""" pass class ForgejoUnauthorizedError(ForgejoAPIError): """Raised when the forgejo API returns a 401 Unauthorized error This happens when an OAuth token is invalid or has been revoked, or when trying to access an endpoint that can't be used without authentication without providing credentials.""" pass