import os import sys import requests from requests.models import urlencode ### # Dict helper class. # Defined at top level so it can be pickled. ### class AttribAccessDict(dict): def __getattr__(self, attr): if attr in self: return self[attr] else: raise AttributeError("Attribute not found: " + str(attr)) def __setattr__(self, attr, val): if attr in self: raise AttributeError("Attribute-style access is read only") super(AttribAccessDict, self).__setattr__(attr, val) class Gitea: name = 'Gitea API wrapper' def __init__(self, api_base_url=None, access_token=None, session=None): self.__gitea_config_path = "config/gitea.txt" is_setup = self.__check_setup(self) if is_setup: self.api_base_url = self.__get_parameter("api_base_url", self.__gitea_config_path) self.access_token = self.__get_parameter("access_token", self.__gitea_config_path) else: self.api_base_url, self.access_token = self.__setup(self) if session: self.session = session else: self.session = requests.Session() def admin_users_create(self, email, username, passwd, full_name=None, must_change_password=True, restricted=False, send_notify=True, source_id='0', visibility='private'): data = {'email':email, 'full_name':username, 'login_name':username, 'must_change_password':must_change_password, 'password':passwd, 'restricted':restricted, 'send_notify':send_notify, 'source_id':source_id, 'username':username, 'visibility':visibility } endpoint = self.api_base_url + '/api/v1/admin/users?access_token={0}'.format(self.access_token) response = self.__api_request('POST', endpoint, data) response = self.__json_allow_dict_attrs(response.json()) return response def admin_users_list(self, page=None, limit=None): params = dict() if page != None: params['page'] = page if limit != None: params['limit'] = limit params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/admin/users?{0}'.format(formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def notifications_new(self): endpoint = self.api_base_url + '/api/v1/notifications/new?token={0}'.format(self.access_token) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response ### ### repository ### def repos_get_repo(self, owner, repo): params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}'.format(owner, repo, formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def repos_issues_search(self, owner, state=None, labels=None, q=None, milestones=None, priority_repo_id=None, issue_type=None, since=None, before=None, assigned=None, created=None, mentioned=None, review_requested=None, team=None, page=None, limit=None): params = dict() if state == None: params['state'] = 'open' else: params['state'] = state if labels != None: params['labels'] = labels if milestones != None: params['milestones'] = milestones if q != None: params['q'] = q if priority_repo_id != None: params['priority_repo_id'] = priority_repo_id if issue_type != None: params['type'] = issue_type if since != None: params['since'] = since if before != None: params['before'] = before if assigned != None: params['assigned'] = assigned if created != None: params['created'] = created if mentioned != None: params['mentioned'] = mentioned if review_requested != None: params['review_requested'] = review_requested params['owner'] = owner if team != None: params['team'] = team if page != None: params['page'] = page if limit != None: params['limit'] = limit params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/issues/search?{0}'.format(formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def repos_owner_repo_issues(self, owner, repo, state=None, labels=None, q=None, issue_type=None, milestones=None, since=None, before=None, created_by=None, assigned_by=None, mentioned_by=None, page=None, limit=None): """ if since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00 """ params = dict() if state == None: params['state'] = 'open' else: params['state'] = state params['labels'] = labels if q != None: params['q'] = q params['issue_type'] = issue_type params['milestones'] = milestones if since != None: params['since'] = since if before != None: params['before'] = before if created_by != None: params['created_by'] = created_by if assigned_by != None: params['assigned_by'] = assigned_by if mentioned_by != None: params['mentioned_by'] = mentioned_by params['page'] = page params['limit'] = limit formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues?{2}'.format(owner, repo, formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def repos_owner_repo_issues_comments(self, owner, repo, since=None, before=None, page=None, limit=None): """ if since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00 """ params = dict() if since != None: params['since'] = since if before != None: params['before'] = before params['page'] = page params['limit'] = limit formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues/comments?{2}'.format(owner, repo, formatted_params) response = self.__api_request('GET', endpoint)#, data) response = self.__json_allow_dict_attrs(response.json()) return response def repo_owner_get_metada(self, owner, repo, filepath): """ Gets the metadata and contents (if a file) of an entry in a repository, or list of entries if a dir """ params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response def repo_owner_create_file(self, owner, repo, filepath, author_email, author_name, branch, message): """ Create a file in a repository """ data = {"author":[{"email":author_email},{"name":author_name}], "branch":branch, "message":message } params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params) response = self.__api_request('POST', endpoint, data=data) response = self.__json_allow_dict_attrs(response.json()) return response def repo_owner_update_file(self, owner, repo, filepath, author_email, author_name, branch, message, sha): """ Update a file in a repository """ data = {"author":[{"email":author_email},{"name":author_name}], "branch":branch, "message":message, "sha":sha } params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params) response = self.__api_request('PUT', endpoint, data=data) response = self.__json_allow_dict_attrs(response.json()) return response def repo_owner_delete_file(self, owner, repo, filepath, author_email, author_name, branch, message, sha): """ Delete a file in a repository """ data = {"author":[{"email":author_email},{"name":author_name}], "branch":branch, "commiter":[{"email":author_email},{"name":author_name}], "message":message, "new_branch":branch, 'sha':sha } params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params) response = self.__api_request('DELETE', endpoint, data=data) response = self.__json_allow_dict_attrs(response.json()) return response def user(self): """ Get the authenticated user """ params = dict() params['token'] = self.access_token formatted_params = urlencode(params) endpoint = self.api_base_url + '/api/v1/user?{0}'.format(formatted_params) response = self.__api_request('GET', endpoint) response = self.__json_allow_dict_attrs(response.json()) return response @staticmethod def __check_setup(self): is_setup = False if not os.path.isfile(self.__gitea_config_path): print(f"File {self.__gitea_config_path} not found, running setup.") else: is_setup = True return is_setup @staticmethod def __setup(self): if not os.path.exists('config'): os.makedirs('config') self.api_base_url = input("Gitea API base url, in ex. 'https://yourgitea.instance': ") self.access_token = input("Gitea access token: ") if not os.path.exists(self.__gitea_config_path): with open(self.__gitea_config_path, 'w'): pass print(f"{self.__gitea_config_path} created!") with open(self.__gitea_config_path, 'a') as the_file: print("Writing gitea parameters to " + self.__gitea_config_path) the_file.write(f'api_base_url: {self.api_base_url}\n'+f'access_token: {self.access_token}\n') return (self.api_base_url, self.access_token) @staticmethod def __get_parameter(parameter, file_path ): with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() print(f'{file_path} Missing parameter {parameter}') sys.exit(0) def __api_request(self, method, endpoint, data={}): response = None try: kwargs = dict(data=data) response = self.session.request(method, url = endpoint, **kwargs) except Exception as e: raise GiteaNetworkError(f"Could not complete request: {e}") if response is None: raise GiteaIllegalArgumentError("Illegal request.") if not response.ok: try: if isinstance(response, dict) and 'error' in response: error_msg = response['error'] elif isinstance(response, str): error_msg = response else: error_msg = None except ValueError: error_msg = None if response.status_code == 404: ex_type = GiteaNotFoundError if not error_msg: error_msg = 'Endpoint not found.' # this is for compatibility with older versions # which raised GiteaAPIError('Endpoint not found.') # on any 404 elif response.status_code == 401: ex_type = GiteaUnauthorizedError elif response.status_code == 422: return response elif response.status_code == 500: ex_type = GiteaInternalServerError elif response.status_code == 502: ex_type = GiteaBadGatewayError elif response.status_code == 503: ex_type = GiteaServiceUnavailableError elif response.status_code == 504: ex_type = GiteaGatewayTimeoutError elif response.status_code >= 500 and \ response.status_code <= 511: ex_type = GiteaServerError else: ex_type = GiteaAPIError raise ex_type( 'Gitea API returned error', response.status_code, response.reason, error_msg) else: return response @staticmethod def __json_allow_dict_attrs(json_object): """ Makes it possible to use attribute notation to access a dicts elements, while still allowing the dict to act as a dict. """ if isinstance(json_object, dict): return AttribAccessDict(json_object) return json_objecte ## # Exceptions ## class GiteaError(Exception): """Base class for Gitea.py exceptions""" class GiteaIOError(IOError, GiteaError): """Base class for Gitea.py I/O errors""" class GiteaNetworkError(GiteaIOError): """Raised when network communication with the server fails""" pass class GiteaAPIError(GiteaError): """Raised when the gitea API generates a response that cannot be handled""" pass class GiteaServerError(GiteaAPIError): """Raised if the Server is malconfigured and returns a 5xx error code""" pass class GiteaInternalServerError(GiteaServerError): """Raised if the Server returns a 500 error""" pass class GiteaBadGatewayError(GiteaServerError): """Raised if the Server returns a 502 error""" pass class GiteaServiceUnavailableError(GiteaServerError): """Raised if the Server returns a 503 error""" pass class GiteaGatewayTimeoutError(GiteaServerError): """Raised if the Server returns a 504 error""" pass class GiteaNotFoundError(GiteaAPIError): """Raised when the gitea API returns a 404 Not Found error""" pass class GiteaUnauthorizedError(GiteaAPIError): """Raised when the gitea API returns a 401 Unauthorized error This happens when an OAuth token is invalid or has been revoked, or when trying to access an endpoint that can't be used without authentication without providing credentials.""" pass