xmpp/ejabberdapi.py

187 líneas
5,5 KiB
Python

import os
import os.path
import requests
import string
import getpass
import secrets
from collections import OrderedDict
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Ejabberd:
name = 'Ejabberd API wrapper'
def __init__(self, api_base_url=None, local_vhost=None, admin_account=None, admin_pass=None):
self.ejabberd_config_path = "secrets/ejabberd_secrets.txt"
is_setup = self.check_setup(self)
if is_setup:
self.api_base_url = self.get_parameter("api_base_url", self.ejabberd_config_path)
self.local_vhost = self.get_parameter("local_vhost", self.ejabberd_config_path)
self.admin_account = self.get_parameter("admin_account", self.ejabberd_config_path)
self.admin_pass = self.get_parameter("admin_pass", self.ejabberd_config_path)
else:
self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass = self.setup(self)
def generate_pass(self):
alphabet = string.ascii_letters + string.digits
while True:
password = ''.join(secrets.choice(alphabet) for i in range(10))
if (any(c.islower() for c in password)
and any(c.isupper() for c in password)
and sum(c.isdigit() for c in password) >= 3):
break
return password
def register(self, username, host, user_password):
data = {'user':username,
'host':self.local_vhost,
'password':user_password,
}
API_ENDPOINT = self.api_base_url + '/api/register?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
is_registered = response.ok
if is_registered:
response_text = response.json()
else:
response_text = f"{response.json()['status']}: {response.json()['message']}"
return (is_registered, response_text)
def unregister(self, username, host):
is_unregistered = False
is_admin = False
if username+'@'+host == self.admin_account:
is_admin = True
return (is_unregistered, is_admin)
data = {'user':username,
'host':self.local_vhost,
}
API_ENDPOINT = self.api_base_url + '/api/unregister?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
is_unregistered = response.ok
return (is_unregistered, is_admin)
def stats(self):
names_temp = ["registeredusers","onlineusers","onlineusersnode","uptimeseconds","processes"]
names = OrderedDict.fromkeys(names_temp).keys()
stats_dict = {}
for name in names:
data = {
"name": name
}
API_ENDPOINT = self.api_base_url + '/api/stats?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
result = response.json()['stat']
stats_dict[name] = result
stats = self.__json_allow_dict_attrs(stats_dict)
return stats
@staticmethod
def check_setup(self):
is_setup = False
if not os.path.isfile(self.ejabberd_config_path):
print(f"File {self.ejabberd_config_path} not found, running setup.")
else:
is_setup = True
return is_setup
@staticmethod
def setup(self):
if not os.path.exists('secrets'):
os.makedirs('secrets')
self.api_base_url = input("api_base_url, in ex. 'http://127.0.0.1:5280': ")
self.local_vhost = input("local_vhost, in ex. 'ejabberd.server': ")
self.admin_account = input("admin_account, in ex. 'admin@ejabberd.server': ")
self.admin_pass = getpass.getpass("admin_pass, in ex. 'my_very_hard_secret_pass': ")
if not os.path.exists(self.ejabberd_config_path):
with open(self.ejabberd_config_path, 'w'): pass
print(f"{self.ejabberd_config_path} created!")
with open(self.ejabberd_config_path, 'a') as the_file:
print("Writing ejabberd secrets parameters to " + self.ejabberd_config_path)
the_file.write(f'api_base_url: {self.api_base_url}\n'+f'local_vhost: {self.local_vhost}\n'+f'admin_account: {self.admin_account}\n'+f'admin_pass: {self.admin_pass}\n')
return (self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass)
@staticmethod
def get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object