Akkoma and EjabberdAPI classes are now downloable from PyPi

This commit is contained in:
spla 2022-08-29 10:41:50 +02:00
parent a6a6ff3f15
commit 791ebc2cb9
4 changed files with 5 additions and 1480 deletions


File diff suppressed because it is too large Load diff

View file

@ -1,373 +0,0 @@
import os
import os.path
import requests
import string
import getpass
import secrets
from collections import OrderedDict
import pdb
# Dict helper class.
# Defined at top level so it can be pickled.
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Ejabberd:
name = 'Ejabberd API wrapper'
def __init__(self, api_base_url=None, local_vhost=None, admin_account=None, admin_pass=None):
self.__ejabberd_config_path = "secrets/ejabberd_secrets.txt"
is_setup = self.__check_setup(self)
if is_setup:
self.__api_base_url = self.__get_parameter("api_base_url", self.__ejabberd_config_path)
self.__local_vhost = self.__get_parameter("local_vhost", self.__ejabberd_config_path)
self.__admin_account = self.__get_parameter("admin_account", self.__ejabberd_config_path)
self.__admin_pass = self.__get_parameter("admin_pass", self.__ejabberd_config_path)
self.__api_base_url, self.__local_vhost, self.__admin_account, self.__admin_pass = self.setup(self)
def generate_pass(self):
alphabet = string.ascii_letters + string.digits
while True:
password = ''.join(secrets.choice(alphabet) for i in range(10))
if (any(c.islower() for c in password)
and any(c.isupper() for c in password)
and sum(c.isdigit() for c in password) >= 3):
return password
def check_account(self, username, host):
data = {'user':username,
endpoint = self.__api_base_url + '/api/check_account?'
response = self.__api_request(endpoint, data)
account_exists = True if response.json() == 0 else False
return account_exists
def register(self, username, host, user_password):
account_exists = self.check_account(username, host)
if not account_exists:
data = {'user':username,
endpoint = self.__api_base_url + '/api/register?'
response = self.__api_request(endpoint, data)
is_registered = response.ok
if is_registered:
response_text = response.json()
response_text = f"{response.json()['status']}: {response.json()['message']}"
is_registered = False
response_text = f"el compte {username}@{host} ja existeix!"
return (is_registered, response_text)
def unregister(self, username, host):
is_unregistered = False
is_admin = False
if username+'@'+host == self.__admin_account:
is_admin = True
return (is_unregistered, is_admin)
data = {'user':username,
endpoint = self.__api_base_url + '/api/unregister?'
response = self.__api_request(endpoint, data)
is_unregistered = response.ok
return (is_unregistered, is_admin)
def stats(self):
names_temp = ["registeredusers","onlineusers","onlineusersnode","uptimeseconds","processes"]
names = OrderedDict.fromkeys(names_temp).keys()
stats_dict = {}
for name in names:
data = {
"name": name
endpoint = self.__api_base_url + '/api/stats?'
response = self.__api_request(endpoint, data)
result = response.json()['stat']
stats_dict[name] = result
stats = self.__json_allow_dict_attrs(stats_dict)
return stats
def status(self):
data = {
endpoint = self.__api_base_url + '/api/status?'
response = self.__api_request(endpoint, data)
result = response.json()
return result
def user_sessions_info(self, username, host):
temp_dict = {}
sessions_dict = {}
data = {'user':username,
endpoint = self.__api_base_url + '/api/user_sessions_info?'
response = self.__api_request(endpoint, data)
i = 0
while i < len(response.json()):
temp_dict['connection'] = response.json()[i]['connection']
temp_dict['ip'] = response.json()[i]['ip']
temp_dict['port'] = response.json()[i]['port']
temp_dict['priority'] = response.json()[i]['priority']
temp_dict['node'] = response.json()[i]['node']
temp_dict['uptime'] = response.json()[i]['uptime']
temp_dict['status'] = response.json()[i]['status']
temp_dict['resource'] = response.json()[i]['resource']
temp_dict['statustext'] = response.json()[i]['statustext']
if len(sessions_dict) > 0:
ds = [temp_dict, sessions_dict]
sessions_temp = {}
for k in temp_dict.keys():
sessions_temp[k] = tuple(sessions_temp[k] for sessions_temp in ds)
sessions_dict = temp_dict.copy()
sessions_temp = sessions_dict.copy()
i += 1
sessions = self.__json_allow_dict_attrs(sessions_temp)
return sessions
def __api_request(self, endpoint, data):
response = requests.post(url = endpoint, json = data, auth=(self._Ejabberd__admin_account, self._Ejabberd__admin_pass))
except Exception as e:
raise EjabberdNetworkError(f"Could not complete request: {e}")
if response is None:
raise EjabberdIllegalArgumentError("Illegal request.")
if not response.ok:
if isinstance(response, dict) and 'error' in response:
error_msg = response['error']
elif isinstance(response, str):
error_msg = response
error_msg = None
except ValueError:
error_msg = None
if response.status_code == 404:
ex_type = EjabberdNotFoundError
if not error_msg:
error_msg = 'Endpoint not found.'
# this is for compatibility with older versions
# which raised EjabberdAPIError('Endpoint not found.')
# on any 404
elif response.status_code == 401:
ex_type = EjabberdUnauthorizedError
elif response.status_code == 500:
ex_type = EjabberdInternalServerError
elif response.status_code == 502:
ex_type = EjabberdBadGatewayError
elif response.status_code == 503:
ex_type = EjabberdServiceUnavailableError
elif response.status_code == 504:
ex_type = EjabberdGatewayTimeoutError
elif response.status_code >= 500 and \
response.status_code <= 511:
ex_type = EjabberdServerError
ex_type = EjabberdAPIError
raise ex_type(
'Ejabberd API returned error',
return response
def __check_setup(self):
is_setup = False
if not os.path.isfile(self.__ejabberd_config_path):
print(f"File {self.__ejabberd_config_path} not found, running setup.")
is_setup = True
return is_setup
def setup(self):
if not os.path.exists('secrets'):
self.__api_base_url = input("api_base_url, in ex. '': ")
self.__local_vhost = input("local_vhost, in ex. 'ejabberd.server': ")
self.__admin_account = input("admin_account, in ex. 'admin@ejabberd.server': ")
self.__admin_pass = getpass.getpass("admin_pass, in ex. 'my_very_hard_secret_pass': ")
if not os.path.exists(self.__ejabberd_config_path):
with open(self.__ejabberd_config_path, 'w'): pass
print(f"{self.__ejabberd_config_path} created!")
with open(self.__ejabberd_config_path, 'a') as the_file:
print("Writing ejabberd secrets parameters to " + self.__ejabberd_config_path)
the_file.write(f'api_base_url: {self.__api_base_url}\n'+f'local_vhost: {self.__local_vhost}\n'+f'admin_account: {self.__admin_account}\n'+f'admin_pass: {self.__admin_pass}\n')
return (self.__api_base_url, self.__local_vhost, self.__admin_account, self.__admin_pass)
def __get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
def __json_allow_dict_attrs(json_object):
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object
# Exceptions
class EjabberdError(Exception):
"""Base class for Mastodon.py exceptions"""
class EjabberdIOError(IOError, EjabberdError):
"""Base class for Mastodon.py I/O errors"""
class EjabberdNetworkError(EjabberdIOError):
"""Raised when network communication with the server fails"""
class EjabberdAPIError(EjabberdError):
"""Raised when the mastodon API generates a response that cannot be handled"""
class EjabberdServerError(EjabberdAPIError):
"""Raised if the Server is malconfigured and returns a 5xx error code"""
class EjabberdInternalServerError(EjabberdServerError):
"""Raised if the Server returns a 500 error"""
class EjabberdBadGatewayError(EjabberdServerError):
"""Raised if the Server returns a 502 error"""
class EjabberdServiceUnavailableError(EjabberdServerError):
"""Raised if the Server returns a 503 error"""
class EjabberdGatewayTimeoutError(EjabberdServerError):
"""Raised if the Server returns a 504 error"""
class EjabberdNotFoundError(EjabberdAPIError):
"""Raised when the ejabberd API returns a 404 Not Found error"""
class EjabberdUnauthorizedError(EjabberdAPIError):
"""Raised when the ejabberd API returns a 401 Unauthorized error
This happens when an OAuth token is invalid or has been revoked,
or when trying to access an endpoint that can't be used without
authentication without providing credentials."""

View file

@ -3,3 +3,5 @@ requests

View file

@ -9,7 +9,9 @@ if __name__ == '__main__':
bot = Akkomabot()
ejabberd = Ejabberd()
ejabberd_secrets_file = 'secrets/ejabberd_secrets.txt'
ejabberd = Ejabberd(ejabberd_secrets_file)