200 líneas
5,9 KiB
Python
200 líneas
5,9 KiB
Python
import os
|
|
import os.path
|
|
import requests
|
|
import string
|
|
import getpass
|
|
import secrets
|
|
from collections import OrderedDict
|
|
|
|
###
|
|
# Dict helper class.
|
|
# Defined at top level so it can be pickled.
|
|
###
|
|
class AttribAccessDict(dict):
|
|
def __getattr__(self, attr):
|
|
if attr in self:
|
|
return self[attr]
|
|
else:
|
|
raise AttributeError("Attribute not found: " + str(attr))
|
|
|
|
def __setattr__(self, attr, val):
|
|
if attr in self:
|
|
raise AttributeError("Attribute-style access is read only")
|
|
super(AttribAccessDict, self).__setattr__(attr, val)
|
|
|
|
class Ejabberd:
|
|
|
|
name = 'Ejabberd API wrapper'
|
|
|
|
def __init__(self, api_base_url=None, local_vhost=None, admin_account=None, admin_pass=None):
|
|
|
|
self.__ejabberd_config_path = "secrets/ejabberd_secrets.txt"
|
|
|
|
is_setup = self.__check_setup(self)
|
|
|
|
if is_setup:
|
|
|
|
self.__api_base_url = self.__get_parameter("api_base_url", self.__ejabberd_config_path)
|
|
self.__local_vhost = self.__get_parameter("local_vhost", self.__ejabberd_config_path)
|
|
self.__admin_account = self.__get_parameter("admin_account", self.__ejabberd_config_path)
|
|
self.__admin_pass = self.__get_parameter("admin_pass", self.__ejabberd_config_path)
|
|
|
|
else:
|
|
|
|
self.__api_base_url, self.__local_vhost, self.__admin_account, self.__admin_pass = self.setup(self)
|
|
|
|
def generate_pass(self):
|
|
|
|
alphabet = string.ascii_letters + string.digits
|
|
|
|
while True:
|
|
|
|
password = ''.join(secrets.choice(alphabet) for i in range(10))
|
|
|
|
if (any(c.islower() for c in password)
|
|
and any(c.isupper() for c in password)
|
|
and sum(c.isdigit() for c in password) >= 3):
|
|
break
|
|
|
|
return password
|
|
|
|
def register(self, username, host, user_password):
|
|
|
|
data = {'user':username,
|
|
'host':self.__local_vhost,
|
|
'password':user_password,
|
|
}
|
|
|
|
API_ENDPOINT = self.__api_base_url + '/api/register?'
|
|
|
|
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.__admin_account, self.__admin_pass))
|
|
|
|
is_registered = response.ok
|
|
|
|
if is_registered:
|
|
|
|
response_text = response.json()
|
|
|
|
else:
|
|
|
|
response_text = f"{response.json()['status']}: {response.json()['message']}"
|
|
|
|
return (is_registered, response_text)
|
|
|
|
def unregister(self, username, host):
|
|
|
|
is_unregistered = False
|
|
|
|
is_admin = False
|
|
|
|
if username+'@'+host == self.__admin_account:
|
|
|
|
is_admin = True
|
|
|
|
return (is_unregistered, is_admin)
|
|
|
|
data = {'user':username,
|
|
'host':self.__local_vhost,
|
|
}
|
|
|
|
API_ENDPOINT = self.__api_base_url + '/api/unregister?'
|
|
|
|
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.__admin_account, self.__admin_pass))
|
|
|
|
is_unregistered = response.ok
|
|
|
|
return (is_unregistered, is_admin)
|
|
|
|
def stats(self):
|
|
|
|
names_temp = ["registeredusers","onlineusers","onlineusersnode","uptimeseconds","processes"]
|
|
|
|
names = OrderedDict.fromkeys(names_temp).keys()
|
|
|
|
stats_dict = {}
|
|
|
|
for name in names:
|
|
|
|
data = {
|
|
"name": name
|
|
}
|
|
|
|
API_ENDPOINT = self.__api_base_url + '/api/stats?'
|
|
|
|
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.__admin_account, self.__admin_pass))
|
|
|
|
result = response.json()['stat']
|
|
|
|
stats_dict[name] = result
|
|
|
|
stats = self.__json_allow_dict_attrs(stats_dict)
|
|
|
|
return stats
|
|
|
|
def status(self):
|
|
|
|
data = {
|
|
}
|
|
|
|
API_ENDPOINT = self.__api_base_url + '/api/status?'
|
|
|
|
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.__admin_account, self.__admin_pass))
|
|
|
|
result = response.json()
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
def __check_setup(self):
|
|
|
|
is_setup = False
|
|
|
|
if not os.path.isfile(self.__ejabberd_config_path):
|
|
print(f"File {self.__ejabberd_config_path} not found, running setup.")
|
|
else:
|
|
is_setup = True
|
|
|
|
return is_setup
|
|
|
|
@staticmethod
|
|
def setup(self):
|
|
|
|
if not os.path.exists('secrets'):
|
|
os.makedirs('secrets')
|
|
|
|
self.__api_base_url = input("api_base_url, in ex. 'http://127.0.0.1:5280': ")
|
|
self.__local_vhost = input("local_vhost, in ex. 'ejabberd.server': ")
|
|
self.__admin_account = input("admin_account, in ex. 'admin@ejabberd.server': ")
|
|
self.__admin_pass = getpass.getpass("admin_pass, in ex. 'my_very_hard_secret_pass': ")
|
|
|
|
if not os.path.exists(self.__ejabberd_config_path):
|
|
with open(self.__ejabberd_config_path, 'w'): pass
|
|
print(f"{self.__ejabberd_config_path} created!")
|
|
|
|
with open(self.__ejabberd_config_path, 'a') as the_file:
|
|
print("Writing ejabberd secrets parameters to " + self.__ejabberd_config_path)
|
|
the_file.write(f'api_base_url: {self.__api_base_url}\n'+f'local_vhost: {self.__local_vhost}\n'+f'admin_account: {self.__admin_account}\n'+f'admin_pass: {self.__admin_pass}\n')
|
|
|
|
return (self.__api_base_url, self.__local_vhost, self.__admin_account, self.__admin_pass)
|
|
|
|
@staticmethod
|
|
def __get_parameter(parameter, file_path ):
|
|
|
|
with open( file_path ) as f:
|
|
for line in f:
|
|
if line.startswith( parameter ):
|
|
return line.replace(parameter + ":", "").strip()
|
|
|
|
print(f'{file_path} Missing parameter {parameter}')
|
|
sys.exit(0)
|
|
|
|
@staticmethod
|
|
def __json_allow_dict_attrs(json_object):
|
|
"""
|
|
Makes it possible to use attribute notation to access a dicts
|
|
elements, while still allowing the dict to act as a dict.
|
|
"""
|
|
if isinstance(json_object, dict):
|
|
return AttribAccessDict(json_object)
|
|
return json_object
|