From a0b7b6a3675f78da99541ba6aac60871f670530e Mon Sep 17 00:00:00 2001 From: spla Date: Mon, 8 Aug 2022 22:36:12 +0200 Subject: [PATCH] Bot for register xmpp ejabberd accounts from Akkoma instance --- README.md | 55 +++----- akkomabot.py | 357 +++++++++++++++++++++++++++++++++++++++++++++++ ejabberdapi.py | 187 +++++++++++++++++++++++++ requirements.txt | 5 + xmpp.py | 86 ++++++++++++ 5 files changed, 657 insertions(+), 33 deletions(-) create mode 100644 akkomabot.py create mode 100644 ejabberdapi.py create mode 100644 requirements.txt create mode 100644 xmpp.py diff --git a/README.md b/README.md index 4a1b10e..c812189 100644 --- a/README.md +++ b/README.md @@ -1,43 +1,32 @@ -# Akkoma.py -Python wrapper for the Akkoma (https://akkoma.dev/AkkomaGang/akkoma) API. +# xmpp Akkoma bot +bot to manage an xmpp ejabberd node by posting keywords to it from your Akkoma account. - # Register your app! This only needs to be done once. Uncomment the code and substitute in your information. - - from akkoma import Akkoma +The bot only listen keywords from your Akkoma instance local users. They can register themselves to your xmpp server, unregister and also get your xmpp node stats. +The keywords that Akkoma instance local users can use are: - ''' - client_id, client_secret = Akkoma.create_app( - 'app_name', - to_file="app_clientcred.txt", - api_base_url = 'https://yourakkoma.instance' - ) - ''' +@your_bot register +@your_bot unregister +@your_bot stats - # Then login. This can be done every time, or use persisted. +The bot will process the keyword thanks to the wrapper for ejabberd (included) and the wrapper for Akkoma (also included) but first time you run `python xmpp.py` it will ask for the needed parameters like: - from akkoma import Akkoma - - akkoma = Akkoma(client_id = "app_clientcred.txt", api_base_url = 'https://yourakkoma.instance') +- api_base_url: http://127.0.0.1:5280 +- local_vhost: your local ejabberd vhost +- admin_account: the ejabberd admin account, in exemple admin@ejabberd.server +- admin_pass: ejabberd admin account password +- Akkoma hostname: in ex. akkoma.host - grant_type = 'password' +Before running `python xmpp.py`: - akkoma.log_in( - client_id, - client_secret, - grant_type, - 'user', - 'password', - to_file = "app_usercred.txt" - ) +1. git clone https://git.mastodont.cat/spla/xmpp.git target_dir. +2. `cd target_dir` +3. create the Python Virtual Environment with `python3.x -m venv .` +4. activate it with `source bin/activate` +5. run `pip install -r requirements.txt` to install required libraries. +6. set up your contrab to run `python xmpp.py` every minute. - # To post, create an actual API instance. +Enjoy! - from akkoma import Akkoma - - akkoma = Akkoma( - access_token = 'app_usercred.txt', - api_base_url = 'https://yourakkoma.instance' - ) - akkoma.status_post('Posting from python using Akkoma.py !') + diff --git a/akkomabot.py b/akkomabot.py new file mode 100644 index 0000000..71a0a1f --- /dev/null +++ b/akkomabot.py @@ -0,0 +1,357 @@ +from akkoma import Akkoma +from akkoma import AkkomaMalformedEventError, AkkomaNetworkError, AkkomaReadTimeout, AkkomaAPIError, AkkomaIllegalArgumentError +import getpass +import unidecode +import fileinput,re +import os +import sys +import os.path +import pdb + +### +# Dict helper class. +# Defined at top level so it can be pickled. +### +class AttribAccessDict(dict): + def __getattr__(self, attr): + if attr in self: + return self[attr] + else: + raise AttributeError("Attribute not found: " + str(attr)) + + def __setattr__(self, attr, val): + if attr in self: + raise AttributeError("Attribute-style access is read only") + super(AttribAccessDict, self).__setattr__(attr, val) + +class Akkomabot: + + name = 'Akkomabot' + + def __init__(self, akkoma=None, akkoma_hostname=None): + + file_path = "secrets/secrets.txt" + + is_setup = self.check_setup(file_path) + + if is_setup: + + self.uc_client_id = self.get_parameter("uc_client_id", file_path) + self.uc_client_secret = self.get_parameter("uc_client_secret", file_path) + self.uc_access_token = self.get_parameter("uc_access_token", file_path) + + self.akkoma, self.akkoma_hostname = self.log_in(self) + + else: + + while(True): + + logged_in, self.akkoma, self.akkoma_hostname = self.setup() + + if not logged_in: + + print("\nLog in failed! Try again.\n") + + else: + + break + + @staticmethod + def log_in(self): + + file_path = "secrets/secrets.txt" + uc_client_id = self.get_parameter("uc_client_id", file_path) + uc_client_secret = self.get_parameter("uc_client_secret", file_path) + uc_access_token = self.get_parameter("uc_access_token", file_path) + + file_path = "config/config.txt" + self.akkoma_hostname = self.get_parameter("akkoma_hostname", file_path) + + self.akkoma = Akkoma( + client_id = uc_client_id, + client_secret = uc_client_secret, + access_token = uc_access_token, + api_base_url = 'https://' + self.akkoma_hostname, + ) + + headers={ 'Authorization': 'Bearer %s'%uc_access_token } + + return (self.akkoma, self.akkoma_hostname) + + @staticmethod + def check_setup(file_path): + + is_setup = False + + if not os.path.isfile(file_path): + print(f"File {file_path} not found, running setup.") + return + else: + is_setup = True + return is_setup + + def setup(self): + + logged_in = False + + try: + + self.akkoma_hostname = input("Enter Akkoma hostname (or 'q' to exit): ") + + if self.akkoma_hostname == 'q': + + sys.exit("Bye") + + user_name = input("User name, ex. john? ") + user_password = getpass.getpass("User password? ") + app_name = input("App name? ") + + client_id, client_secret = Akkoma.create_app( + app_name, + to_file="app_clientcred.txt", + api_base_url=self.akkoma_hostname + ) + + akkoma = Akkoma(client_id = "app_clientcred.txt", api_base_url = self.akkoma_hostname) + + grant_type = 'password' + + akkoma.log_in( + client_id, + client_secret, + grant_type, + user_name, + user_password, + scopes = ["read", "write"], + to_file = "app_usercred.txt" + ) + + if os.path.isfile("app_usercred.txt"): + + print(f"Log in succesful!") + logged_in = True + + if not os.path.exists('secrets'): + os.makedirs('secrets') + + secrets_filepath = 'secrets/secrets.txt' + + if not os.path.exists(secrets_filepath): + with open(secrets_filepath, 'w'): pass + print(f"{secrets_filepath} created!") + + with open(secrets_filepath, 'a') as the_file: + print("Writing secrets parameter names to " + secrets_filepath) + the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n') + + client_path = 'app_clientcred.txt' + + with open(client_path) as fp: + + line = fp.readline() + cnt = 1 + + while line: + + if cnt == 1: + + print("Writing client id to " + secrets_filepath) + self.modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip()) + + elif cnt == 2: + + print("Writing client secret to " + secrets_filepath) + self.modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip()) + + line = fp.readline() + cnt += 1 + + token_path = 'app_usercred.txt' + + with open(token_path) as fp: + + line = fp.readline() + print("Writing access token to " + secrets_filepath) + self.modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip()) + + if os.path.exists("app_clientcred.txt"): + + print("Removing app_clientcred.txt temp file..") + os.remove("app_clientcred.txt") + + if os.path.exists("app_usercred.txt"): + + print("Removing app_usercred.txt temp file..") + os.remove("app_usercred.txt") + + self.config_filepath = 'config/config.txt' + + self.create_config(self) + self.write_config(self) + self.read_config_line(self) + + print("Secrets setup done!\n") + + except AkkomaIllegalArgumentError as i_error: + + sys.stdout.write(f'\n{str(i_error)}\n') + + except AkkomaNetworkError as n_error: + + sys.stdout.write(f'\n{str(n_error)}\n') + + except AkkomaReadTimeout as r_error: + + sys.stdout.write(f'\n{str(r_error)}\n') + + except AkkomaAPIError as a_error: + + sys.stdout.write(f'\n{str(a_error)}\n') + + return (logged_in, akkoma, self.akkoma_hostname) + + @staticmethod + def get_parameter(parameter, file_path ): + + with open( file_path ) as f: + for line in f: + if line.startswith( parameter ): + return line.replace(parameter + ":", "").strip() + + print(f'{file_path} Missing parameter {parameter}') + sys.exit(0) + + @staticmethod + def modify_file(self, file_name, pattern,value=""): + + fh=fileinput.input(file_name,inplace=True) + + for line in fh: + + replacement=pattern + value + line=re.sub(pattern,replacement,line) + sys.stdout.write(line) + + fh.close() + + @staticmethod + def create_config(self): + + if not os.path.exists('config'): + + os.makedirs('config') + + if not os.path.exists(self.config_filepath): + + print(self.config_filepath + " created!") + with open('config/config.txt', 'w'): pass + + @staticmethod + def write_config(self): + + with open(self.config_filepath, 'a') as the_file: + + the_file.write('akkoma_hostname: \n') + print(f"adding parameter 'akkoma_hostname' to {self.config_filepath}") + + @staticmethod + def read_config_line(self): + + with open(self.config_filepath) as fp: + + line = fp.readline() + self.modify_file(self, self.config_filepath, "akkoma_hostname: ", value=self.akkoma_hostname) + + def get_data(self, notif): + + id = notif.id + + account_id = notif.account.id + + acct = notif.account.acct + + status_id = notif.status.id + + text = notif.status.content + + visibility = notif.status.visibility + + reply, question = self.get_question(self, text) + + mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility} + + mention = self.__json_allow_dict_attrs(mention_dict) + + return mention + + @staticmethod + def get_question(self, text): + + reply = False + + keyword = '' + + content = self.cleanhtml(self, text) + + content = self.unescape(self, content) + + try: + + start = content.index("@") + end = content.index(" ") + if len(content) > end: + + content = content[0: start:] + content[end +1::] + + cleanit = content.count('@') + + i = 0 + while i < cleanit : + + start = content.rfind("@") + end = len(content) + content = content[0: start:] + content[end +1::] + i += 1 + + content = content.lower() + question = content + + #keyword_length = 8 + keyword = question + + if keyword == 'registre' or keyword == 'baixa' or keyword == 'info': + + keyword_length = len(keyword) + + if unidecode.unidecode(question)[0:keyword_length] == keyword: + + reply = True + + except: + + pass + + return (reply, question) + + @staticmethod + def cleanhtml(self, raw_html): + + cleanr = re.compile('<.*?>') + cleantext = re.sub(cleanr, '', raw_html) + return cleantext + + @staticmethod + def unescape(self, s): + + s = s.replace("'", "'") + return s + + @staticmethod + def __json_allow_dict_attrs(json_object): + """ + Makes it possible to use attribute notation to access a dicts + elements, while still allowing the dict to act as a dict. + """ + if isinstance(json_object, dict): + return AttribAccessDict(json_object) + return json_object diff --git a/ejabberdapi.py b/ejabberdapi.py new file mode 100644 index 0000000..8febc78 --- /dev/null +++ b/ejabberdapi.py @@ -0,0 +1,187 @@ +import os +import os.path +import requests +import string +import getpass +import secrets +from collections import OrderedDict +import pdb + +### +# Dict helper class. +# Defined at top level so it can be pickled. +### +class AttribAccessDict(dict): + def __getattr__(self, attr): + if attr in self: + return self[attr] + else: + raise AttributeError("Attribute not found: " + str(attr)) + + def __setattr__(self, attr, val): + if attr in self: + raise AttributeError("Attribute-style access is read only") + super(AttribAccessDict, self).__setattr__(attr, val) + +class Ejabberd: + + name = 'Ejabberd API wrapper' + + def __init__(self, api_base_url=None, local_vhost=None, admin_account=None, admin_pass=None): + + self.ejabberd_config_path = "secrets/ejabberd_secrets.txt" + + is_setup = self.check_setup(self) + + if is_setup: + + self.api_base_url = self.get_parameter("api_base_url", self.ejabberd_config_path) + self.local_vhost = self.get_parameter("local_vhost", self.ejabberd_config_path) + self.admin_account = self.get_parameter("admin_account", self.ejabberd_config_path) + self.admin_pass = self.get_parameter("admin_pass", self.ejabberd_config_path) + + else: + + self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass = self.setup(self) + + def generate_pass(self): + + alphabet = string.ascii_letters + string.digits + + while True: + + password = ''.join(secrets.choice(alphabet) for i in range(10)) + + if (any(c.islower() for c in password) + and any(c.isupper() for c in password) + and sum(c.isdigit() for c in password) >= 3): + break + + return password + + def register(self, username, host, user_password): + + data = {'user':username, + 'host':self.local_vhost, + 'password':user_password, + } + + API_ENDPOINT = self.api_base_url + '/api/register?' + + response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass)) + + is_registered = response.ok + + if is_registered: + + response_text = response.json() + + else: + + response_text = f"{response.json()['status']}: {response.json()['message']}" + + return (is_registered, response_text) + + def unregister(self, username, host): + + is_unregistered = False + + if username+'@'+host == self.admin_account: + + message = "ets l'admin, no puc esborrar el teu compte!" + + return (is_unregistered, message) + + data = {'user':username, + 'host':self.local_vhost, + } + + API_ENDPOINT = self.api_base_url + '/api/unregister?' + + response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass)) + + is_unregistered = response.ok + + message = "eliminat amb èxit!" + + return (is_unregistered, message) + + def stats(self): + + names_temp = ["registeredusers","onlineusers","onlineusersnode","uptimeseconds","processes"] + + names = OrderedDict.fromkeys(names_temp).keys() + + stats_dict = {} + + for name in names: + + data = { + "name": name + } + + API_ENDPOINT = self.api_base_url + '/api/stats?' + + response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass)) + + result = response.json()['stat'] + + stats_dict[name] = result + + stats = self.__json_allow_dict_attrs(stats_dict) + + return stats + + @staticmethod + def check_setup(self): + + is_setup = False + + if not os.path.isfile(self.ejabberd_config_path): + print(f"File {self.ejabberd_config_path} not found, running setup.") + else: + is_setup = True + + return is_setup + + @staticmethod + def setup(self): + + if not os.path.exists('secrets'): + os.makedirs('secrets') + + self.api_base_url = input("api_base_url, in ex. 'http://127.0.0.1:5280': ") + self.local_vhost = input("local_vhost, in ex. 'ejabberd.server': ") + self.admin_account = input("admin_account, in ex. 'admin@ejabberd.server': ") + self.admin_pass = getpass.getpass("admin_pass, in ex. 'my_very_hard_secret_pass': ") + + if not os.path.exists(self.ejabberd_config_path): + with open(self.ejabberd_config_path, 'w'): pass + print(f"{self.ejabberd_config_path} created!") + + with open(self.ejabberd_config_path, 'a') as the_file: + print("Writing ejabberd secrets parameters to " + self.ejabberd_config_path) + the_file.write(f'api_base_url: {self.api_base_url}\n'+f'local_vhost: {self.local_vhost}\n'+f'admin_account: {self.admin_account}\n'+f'admin_pass: {self.admin_pass}\n') + + return (self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass) + + @staticmethod + def get_parameter(parameter, file_path ): + + with open( file_path ) as f: + for line in f: + if line.startswith( parameter ): + return line.replace(parameter + ":", "").strip() + + print(f'{file_path} Missing parameter {parameter}') + sys.exit(0) + + @staticmethod + def __json_allow_dict_attrs(json_object): + """ + Makes it possible to use attribute notation to access a dicts + elements, while still allowing the dict to act as a dict. + """ + if isinstance(json_object, dict): + return AttribAccessDict(json_object) + return json_object diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c393357 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +pytz +requests +python-dateutil +decorator +unidecode diff --git a/xmpp.py b/xmpp.py new file mode 100644 index 0000000..199fdaa --- /dev/null +++ b/xmpp.py @@ -0,0 +1,86 @@ +from akkomabot import Akkomabot +from ejabberdapi import Ejabberd +import pdb + +# main + +if __name__ == '__main__': + + bot = Akkomabot() + + ejabberd = Ejabberd() + + notifications = bot.akkoma.notifications() + + for notif in notifications: + + if notif.type != 'mention': + + print(f"Dismissing notification id {notif.id}") + + bot.akkoma.notifications_dismiss(notif.id) + + else: + + mention = bot.get_data(notif) + + if mention.reply and '@' not in mention.acct: + + if mention.question == 'registre': + + password = ejabberd.generate_pass() + + is_registered, text = ejabberd.register(mention.acct, bot.akkoma_hostname, password) + + if is_registered: + + post = f"@{mention.acct} compte xmpp registrat amb èxit!\n\nusuari: {mention.acct}@{bot.akkoma_hostname}\n" + + post += f"contrasenya: {password}\nservidor: {bot.akkoma_hostname}" + + bot.akkoma.status_post(post, in_reply_to_id=mention.status_id, visibility='direct') + + else: + + bot.akkoma.status_post(f'@{mention.acct}, {text}', in_reply_to_id=mention.status_id, visibility='direct') + + elif mention.question == 'baixa': + + is_unregistered, message = ejabberd.unregister(mention.acct, bot.akkoma_hostname) + + if is_unregistered: + + bot.akkoma.status_post(f"@{mention.acct}, compte xmpp {mention.acct}@{bot.akkoma_hostname}: {message}", in_reply_to_id=mention.status_id, visibility='direct') + + else: + + bot.akkoma.status_post(f'@{mention.acct}, {message}', in_reply_to_id=mention.status_id, visibility='direct') + + elif mention.question == 'info': + + stats = ejabberd.stats() + + post = f'@{mention.acct}, estadístiques del node #xmpp a {bot.akkoma_hostname}:\n\n' + + post += f'usuaris registrats: {stats.registeredusers}\n' + + post += f'usuaris en línia: {stats.onlineusers}\n' + + post += f'usuaris del node: {stats.onlineusersnode}\n' + + post += f'temps en línia (uptime): {stats.uptimeseconds}\n' + + post += f'processos: {stats.processes}\n' + + bot.akkoma.status_post(post, in_reply_to_id=mention.status_id, visibility=mention.visibility) + + print(f"Dismissing notification id {mention.id}") + + bot.akkoma.notifications_dismiss(mention.id) + + else: + + print(f"Dismissing notification id {mention.id}") + + bot.akkoma.notifications_dismiss(mention.id) +