484 líneas
16 KiB

import os
import sys
import requests
from requests.models import urlencode
# Dict helper class.
# Defined at top level so it can be pickled.
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Gitea:
name = 'Gitea API wrapper'
def __init__(self, api_base_url=None, access_token=None, session=None):
self.__gitea_config_path = "config/gitea.txt"
is_setup = self.__check_setup(self)
if is_setup:
self.api_base_url = self.__get_parameter("api_base_url", self.__gitea_config_path)
self.access_token = self.__get_parameter("access_token", self.__gitea_config_path)
self.api_base_url, self.access_token = self.__setup(self)
if session:
self.session = session
self.session = requests.Session()
def admin_users_create(self, email, username, passwd, full_name=None, must_change_password=True, restricted=False, send_notify=True, source_id='0', visibility='private'):
data = {'email':email,
endpoint = self.api_base_url + '/api/v1/admin/users?access_token={0}'.format(self.access_token)
response = self.__api_request('POST', endpoint, data)
response = self.__json_allow_dict_attrs(response.json())
return response
def admin_users_list(self, page=None, limit=None):
params = dict()
if page != None:
params['page'] = page
if limit != None:
params['limit'] = limit
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/admin/users?{0}'.format(formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def notifications_new(self):
endpoint = self.api_base_url + '/api/v1/notifications/new?token={0}'.format(self.access_token)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
### repository
def repos_get_repo(self, owner, repo):
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}'.format(owner, repo, formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def repos_issues_search(self, owner, state=None, labels=None, q=None, milestones=None, priority_repo_id=None, issue_type=None, since=None, before=None, assigned=None,
created=None, mentioned=None, review_requested=None, team=None, page=None, limit=None):
params = dict()
if state == None:
params['state'] = 'open'
params['state'] = state
if labels != None:
params['labels'] = labels
if milestones != None:
params['milestones'] = milestones
if q != None:
params['q'] = q
if priority_repo_id != None:
params['priority_repo_id'] = priority_repo_id
if issue_type != None:
params['type'] = issue_type
if since != None:
params['since'] = since
if before != None:
params['before'] = before
if assigned != None:
params['assigned'] = assigned
if created != None:
params['created'] = created
if mentioned != None:
params['mentioned'] = mentioned
if review_requested != None:
params['review_requested'] = review_requested
params['owner'] = owner
if team != None:
params['team'] = team
if page != None:
params['page'] = page
if limit != None:
params['limit'] = limit
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/issues/search?{0}'.format(formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def repos_owner_repo_issues(self, owner, repo, state=None, labels=None, q=None, issue_type=None, milestones=None, since=None, before=None, created_by=None, assigned_by=None,
mentioned_by=None, page=None, limit=None):
if since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00
params = dict()
if state == None:
params['state'] = 'open'
params['state'] = state
params['labels'] = labels
if q != None:
params['q'] = q
params['issue_type'] = issue_type
params['milestones'] = milestones
if since != None:
params['since'] = since
if before != None:
params['before'] = before
if created_by != None:
params['created_by'] = created_by
if assigned_by != None:
params['assigned_by'] = assigned_by
if mentioned_by != None:
params['mentioned_by'] = mentioned_by
params['page'] = page
params['limit'] = limit
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues?{2}'.format(owner, repo, formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def repos_owner_repo_issues_comments(self, owner, repo, since=None, before=None, page=None, limit=None):
if since or before are specified, they must have following format: in. ex. 2022-08-13T08:09:07+02:00
params = dict()
if since != None:
params['since'] = since
if before != None:
params['before'] = before
params['page'] = page
params['limit'] = limit
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/issues/comments?{2}'.format(owner, repo, formatted_params)
response = self.__api_request('GET', endpoint)#, data)
response = self.__json_allow_dict_attrs(response.json())
return response
def repo_owner_get_metada(self, owner, repo, filepath):
Gets the metadata and contents (if a file) of an entry in a repository, or list of entries if a dir
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def repo_owner_create_file(self, owner, repo, filepath, author_email, author_name, branch, message):
Create a file in a repository
data = {"author":[{"email":author_email},{"name":author_name}],
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
response = self.__api_request('POST', endpoint, data=data)
response = self.__json_allow_dict_attrs(response.json())
return response
def repo_owner_update_file(self, owner, repo, filepath, author_email, author_name, branch, message, sha):
Update a file in a repository
data = {"author":[{"email":author_email},{"name":author_name}],
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
response = self.__api_request('PUT', endpoint, data=data)
response = self.__json_allow_dict_attrs(response.json())
return response
def repo_owner_delete_file(self, owner, repo, filepath, author_email, author_name, branch, message, sha):
Delete a file in a repository
data = {"author":[{"email":author_email},{"name":author_name}],
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/repos/{0}/{1}/contents/{2}?{3}'.format(owner, repo, filepath, formatted_params)
response = self.__api_request('DELETE', endpoint, data=data)
response = self.__json_allow_dict_attrs(response.json())
return response
def user(self):
Get the authenticated user
params = dict()
params['token'] = self.access_token
formatted_params = urlencode(params)
endpoint = self.api_base_url + '/api/v1/user?{0}'.format(formatted_params)
response = self.__api_request('GET', endpoint)
response = self.__json_allow_dict_attrs(response.json())
return response
def __check_setup(self):
is_setup = False
if not os.path.isfile(self.__gitea_config_path):
print(f"File {self.__gitea_config_path} not found, running setup.")
is_setup = True
return is_setup
def __setup(self):
if not os.path.exists('config'):
self.api_base_url = input("Gitea API base url, in ex. 'https://yourgitea.instance': ")
self.access_token = input("Gitea access token: ")
if not os.path.exists(self.__gitea_config_path):
with open(self.__gitea_config_path, 'w'): pass
print(f"{self.__gitea_config_path} created!")
with open(self.__gitea_config_path, 'a') as the_file:
print("Writing gitea parameters to " + self.__gitea_config_path)
the_file.write(f'api_base_url: {self.api_base_url}\n'+f'access_token: {self.access_token}\n')
return (self.api_base_url, self.access_token)
def __get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
def __api_request(self, method, endpoint, data={}):
response = None
kwargs = dict(data=data)
response = self.session.request(method, url = endpoint, **kwargs)
except Exception as e:
raise GiteaNetworkError(f"Could not complete request: {e}")
if response is None:
raise GiteaIllegalArgumentError("Illegal request.")
if not response.ok:
if isinstance(response, dict) and 'error' in response:
error_msg = response['error']
elif isinstance(response, str):
error_msg = response
error_msg = None
except ValueError:
error_msg = None
if response.status_code == 404:
ex_type = GiteaNotFoundError
if not error_msg:
error_msg = 'Endpoint not found.'
# this is for compatibility with older versions
# which raised GiteaAPIError('Endpoint not found.')
# on any 404
elif response.status_code == 401:
ex_type = GiteaUnauthorizedError
elif response.status_code == 422:
return response
elif response.status_code == 500:
ex_type = GiteaInternalServerError
elif response.status_code == 502:
ex_type = GiteaBadGatewayError
elif response.status_code == 503:
ex_type = GiteaServiceUnavailableError
elif response.status_code == 504:
ex_type = GiteaGatewayTimeoutError
elif response.status_code >= 500 and \
response.status_code <= 511:
ex_type = GiteaServerError
ex_type = GiteaAPIError
raise ex_type(
'Gitea API returned error',
return response
def __json_allow_dict_attrs(json_object):
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_objecte
# Exceptions
class GiteaError(Exception):
"""Base class for exceptions"""
class GiteaIOError(IOError, GiteaError):
"""Base class for I/O errors"""
class GiteaNetworkError(GiteaIOError):
"""Raised when network communication with the server fails"""
class GiteaAPIError(GiteaError):
"""Raised when the gitea API generates a response that cannot be handled"""
class GiteaServerError(GiteaAPIError):
"""Raised if the Server is malconfigured and returns a 5xx error code"""
class GiteaInternalServerError(GiteaServerError):
"""Raised if the Server returns a 500 error"""
class GiteaBadGatewayError(GiteaServerError):
"""Raised if the Server returns a 502 error"""
class GiteaServiceUnavailableError(GiteaServerError):
"""Raised if the Server returns a 503 error"""
class GiteaGatewayTimeoutError(GiteaServerError):
"""Raised if the Server returns a 504 error"""
class GiteaNotFoundError(GiteaAPIError):
"""Raised when the gitea API returns a 404 Not Found error"""
class GiteaUnauthorizedError(GiteaAPIError):
"""Raised when the gitea API returns a 401 Unauthorized error
This happens when an OAuth token is invalid or has been revoked,
or when trying to access an endpoint that can't be used without
authentication without providing credentials."""