From 91186b68496937244b3207001434a738d688bfaa Mon Sep 17 00:00:00 2001 From: spla Date: Mon, 19 Dec 2022 19:13:11 +0100 Subject: [PATCH] =?UTF-8?q?dynuapi:=20primera=20versi=C3=B3!?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dynuapi.py | 251 +++++++++++++++++++++++++++++++++++++++++++++++ ipupdate.py | 36 +++++++ requeriments.txt | 1 + 3 files changed, 288 insertions(+) create mode 100644 dynuapi.py create mode 100644 ipupdate.py create mode 100644 requeriments.txt diff --git a/dynuapi.py b/dynuapi.py new file mode 100644 index 0000000..ebafa81 --- /dev/null +++ b/dynuapi.py @@ -0,0 +1,251 @@ +import os +import requests +import json +import pdb + +### +# Dict helper class. +# Defined at top level so it can be pickled. +### +class AttribAccessDict(dict): + def __getattr__(self, attr): + if attr in self: + return self[attr] + else: + raise AttributeError("Attribute not found: " + str(attr)) + + def __setattr__(self, attr, val): + if attr in self: + raise AttributeError("Attribute-style access is read only") + super(AttribAccessDict, self).__setattr__(attr, val) + +class Dynu: + + name = 'Python wrapper for Dynu {d}DNS service' + + def __init__(self, api_base_url=None, api_key=None, session=None): + + self.file_path = "secrets/secrets.txt" + + is_setup = self.__check_setup(self) + + if is_setup: + + self.api_base_url = self.__get_parameter("api_base_url", self.file_path) + self.api_key = self.__get_parameter("api_key", self.file_path) + + else: + + self.api_base_url, self.api_key = self.__setup(self) + + if session: + self.session = session + else: + self.session = requests.Session() + + def dns(self): + + endpoint = self.api_base_url + '/dns' + + headers = { + "Accept": "application/json", + "API-Key": self.api_key + } + + response = self.__api_request('GET', endpoint, headers=headers) + + response = self.__json_allow_dict_attrs(response.json()) + + return response + + def update(self, id, name, group, ipv4Address, ipv6Address=None, ttl=90, ipv4=True, ipv6=None, ipv4WildcardAlias=None, ipv6WildcardAlias=None, allowZoneTransfer=None, dnssec=None): + + endpoint = self.api_base_url + '/dns/{0}'.format(id) + + headers = { + "Accept": "application/json", + "API-Key": self.api_key + } + + data = {'name':name, + 'group':group, + 'ipv4Address':ipv4Address, + 'ipv6Address':'', + 'ttl': ttl, + 'ipv4':True, + 'ipv6':False, + 'ipv4WildcardAlias':True, + 'ipv6WildcardAlias':False, + 'allowZoneTransfer':False, + "dnssec":False + } + + payload = json.dumps(data) + + response = self.__api_request('POST', endpoint, headers=headers, data=payload) + + response = self.__json_allow_dict_attrs(response.json()) + + return response + + + @staticmethod + def __check_setup(self): + + is_setup = False + + if not os.path.isfile(self.file_path): + print(f"File {self.file_path} not found, running setup.") + else: + is_setup = True + + return is_setup + + @staticmethod + def __setup(self): + + if not os.path.exists('secrets'): + os.makedirs('secrets') + + self.api_base_url = input("Dynu {d}DNS API base url, in ex. 'https://api.dynu.com/v2': ") + self.api_key = input("Dynu {d}DNS API key: ") + + if not os.path.exists(self.file_path): + with open(self.file_path, 'w'): pass + print(f"{self.file_path} created!") + + with open(self.file_path, 'a') as the_file: + print("Writing Dynu parameters to " + self.file_path) + the_file.write(f'api_base_url: {self.api_base_url}\n'+f'api_key: {self.api_key}\n') + + return (self.api_base_url, self.api_key) + + @staticmethod + def __get_parameter(parameter, file_path ): + + with open( file_path ) as f: + for line in f: + if line.startswith( parameter ): + return line.replace(parameter + ":", "").strip() + + print(f'{file_path} Missing parameter {parameter}') + sys.exit(0) + + def __api_request(self, method, endpoint, headers={}, data={}): + + response = None + + try: + + kwargs = dict(headers=headers, data=data) + + response = self.session.request(method, url = endpoint, **kwargs) + + except Exception as e: + + raise DynuNetworkError(f"Could not complete request: {e}") + + if response is None: + + raise DynuIllegalArgumentError("Illegal request.") + + if not response.ok: + + try: + if isinstance(response, dict) and 'error' in response: + error_msg = response['error'] + elif isinstance(response, str): + error_msg = response + else: + error_msg = None + except ValueError: + error_msg = None + + if response.status_code == 404: + ex_type = DynuNotFoundError + if not error_msg: + error_msg = 'Endpoint not found.' + # this is for compatibility with older versions + # which raised DynuAPIError('Endpoint not found.') + # on any 404 + elif response.status_code == 401: + ex_type = DynuUnauthorizedError + elif response.status_code == 422: + return response + elif response.status_code == 500: + ex_type = DynuInternalServerError + elif response.status_code == 502: + ex_type = DynuBadGatewayError + elif response.status_code == 503: + ex_type = DynuServiceUnavailableError + elif response.status_code == 504: + ex_type = DynuGatewayTimeoutError + elif response.status_code >= 500 and \ + response.status_code <= 511: + ex_type = DynuServerError + else: + ex_type = DynuAPIError + + raise ex_type( + 'Dynu API returned error', + response.status_code, + response.reason, + error_msg) + + else: + + return response + + @staticmethod + def __json_allow_dict_attrs(json_object): + """ + Makes it possible to use attribute notation to access a dicts + elements, while still allowing the dict to act as a dict. + """ + if isinstance(json_object, dict): + return AttribAccessDict(json_object) + return json_objecte + +## +# Exceptions +## +class DynuError(Exception): + """Base class for Dynu.py exceptions""" + +class DynuIOError(IOError, DynuError): + """Base class for Dynu.py I/O errors""" + +class DynuNetworkError(DynuIOError): + """Raised when network communication with the server fails""" + pass +class DynuAPIError(DynuError): + """Raised when the gitea API generates a response that cannot be handled""" + pass +class DynuServerError(DynuAPIError): + """Raised if the Server is malconfigured and returns a 5xx error code""" + pass +class DynuInternalServerError(DynuServerError): + """Raised if the Server returns a 500 error""" + pass + +class DynuBadGatewayError(DynuServerError): + """Raised if the Server returns a 502 error""" + pass + +class DynuServiceUnavailableError(DynuServerError): + """Raised if the Server returns a 503 error""" + pass +class DynuGatewayTimeoutError(DynuServerError): + """Raised if the Server returns a 504 error""" + pass +class DynuNotFoundError(DynuAPIError): + """Raised when the gitea API returns a 404 Not Found error""" + pass + +class DynuUnauthorizedError(DynuAPIError): + """Raised when the gitea API returns a 401 Unauthorized error + + This happens when an OAuth token is invalid or has been revoked, + or when trying to access an endpoint that can't be used without + authentication without providing credentials.""" + pass diff --git a/ipupdate.py b/ipupdate.py new file mode 100644 index 0000000..8afcdf5 --- /dev/null +++ b/ipupdate.py @@ -0,0 +1,36 @@ +from dynuapi import Dynu +import requests +import json +import pdb + +def get(): + endpoint = 'https://ipinfo.io/json' + response = requests.get(endpoint, verify = True) + + if response.status_code != 200: + return 'Status:', response.status_code, 'Problem with the request. Exiting.' + exit() + + data = response.json() + + return data['ip'] + +if __name__ == '__main__': + + obj = Dynu() + + result = obj.dns() + + domain_id = result.domains[0]['id'] + + domain_name = result.domains[0]['name'] + + group = 'Raspberry' + + ipv4_address = get() + + print(f"IP pública: {ipv4_address}") + + is_updated = obj.update(domain_id, domain_name, group, ipv4_address) + + print(is_updated) diff --git a/requeriments.txt b/requeriments.txt new file mode 100644 index 0000000..f229360 --- /dev/null +++ b/requeriments.txt @@ -0,0 +1 @@ +requests