Bot for register xmpp ejabberd accounts from Akkoma instance

This commit is contained in:
spla 2022-08-08 22:36:12 +02:00
pare 1354d25b4b
commit a0b7b6a367
S'han modificat 5 arxius amb 657 adicions i 33 eliminacions

Veure arxiu

@ -1,43 +1,32 @@
# Akkoma.py
Python wrapper for the Akkoma (https://akkoma.dev/AkkomaGang/akkoma) API.
# xmpp Akkoma bot
bot to manage an xmpp ejabberd node by posting keywords to it from your Akkoma account.
# Register your app! This only needs to be done once. Uncomment the code and substitute in your information.
from akkoma import Akkoma
The bot only listen keywords from your Akkoma instance local users. They can register themselves to your xmpp server, unregister and also get your xmpp node stats.
The keywords that Akkoma instance local users can use are:
'''
client_id, client_secret = Akkoma.create_app(
'app_name',
to_file="app_clientcred.txt",
api_base_url = 'https://yourakkoma.instance'
)
'''
@your_bot register
@your_bot unregister
@your_bot stats
# Then login. This can be done every time, or use persisted.
The bot will process the keyword thanks to the wrapper for ejabberd (included) and the wrapper for Akkoma (also included) but first time you run `python xmpp.py` it will ask for the needed parameters like:
from akkoma import Akkoma
akkoma = Akkoma(client_id = "app_clientcred.txt", api_base_url = 'https://yourakkoma.instance')
- api_base_url: http://127.0.0.1:5280
- local_vhost: your local ejabberd vhost
- admin_account: the ejabberd admin account, in exemple admin@ejabberd.server
- admin_pass: ejabberd admin account password
- Akkoma hostname: in ex. akkoma.host
grant_type = 'password'
Before running `python xmpp.py`:
akkoma.log_in(
client_id,
client_secret,
grant_type,
'user',
'password',
to_file = "app_usercred.txt"
)
1. git clone https://git.mastodont.cat/spla/xmpp.git target_dir.
2. `cd target_dir`
3. create the Python Virtual Environment with `python3.x -m venv .`
4. activate it with `source bin/activate`
5. run `pip install -r requirements.txt` to install required libraries.
6. set up your contrab to run `python xmpp.py` every minute.
# To post, create an actual API instance.
Enjoy!
from akkoma import Akkoma
akkoma = Akkoma(
access_token = 'app_usercred.txt',
api_base_url = 'https://yourakkoma.instance'
)
akkoma.status_post('Posting from python using Akkoma.py !')

357
akkomabot.py Normal file
Veure arxiu

@ -0,0 +1,357 @@
from akkoma import Akkoma
from akkoma import AkkomaMalformedEventError, AkkomaNetworkError, AkkomaReadTimeout, AkkomaAPIError, AkkomaIllegalArgumentError
import getpass
import unidecode
import fileinput,re
import os
import sys
import os.path
import pdb
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Akkomabot:
name = 'Akkomabot'
def __init__(self, akkoma=None, akkoma_hostname=None):
file_path = "secrets/secrets.txt"
is_setup = self.check_setup(file_path)
if is_setup:
self.uc_client_id = self.get_parameter("uc_client_id", file_path)
self.uc_client_secret = self.get_parameter("uc_client_secret", file_path)
self.uc_access_token = self.get_parameter("uc_access_token", file_path)
self.akkoma, self.akkoma_hostname = self.log_in(self)
else:
while(True):
logged_in, self.akkoma, self.akkoma_hostname = self.setup()
if not logged_in:
print("\nLog in failed! Try again.\n")
else:
break
@staticmethod
def log_in(self):
file_path = "secrets/secrets.txt"
uc_client_id = self.get_parameter("uc_client_id", file_path)
uc_client_secret = self.get_parameter("uc_client_secret", file_path)
uc_access_token = self.get_parameter("uc_access_token", file_path)
file_path = "config/config.txt"
self.akkoma_hostname = self.get_parameter("akkoma_hostname", file_path)
self.akkoma = Akkoma(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + self.akkoma_hostname,
)
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (self.akkoma, self.akkoma_hostname)
@staticmethod
def check_setup(file_path):
is_setup = False
if not os.path.isfile(file_path):
print(f"File {file_path} not found, running setup.")
return
else:
is_setup = True
return is_setup
def setup(self):
logged_in = False
try:
self.akkoma_hostname = input("Enter Akkoma hostname (or 'q' to exit): ")
if self.akkoma_hostname == 'q':
sys.exit("Bye")
user_name = input("User name, ex. john? ")
user_password = getpass.getpass("User password? ")
app_name = input("App name? ")
client_id, client_secret = Akkoma.create_app(
app_name,
to_file="app_clientcred.txt",
api_base_url=self.akkoma_hostname
)
akkoma = Akkoma(client_id = "app_clientcred.txt", api_base_url = self.akkoma_hostname)
grant_type = 'password'
akkoma.log_in(
client_id,
client_secret,
grant_type,
user_name,
user_password,
scopes = ["read", "write"],
to_file = "app_usercred.txt"
)
if os.path.isfile("app_usercred.txt"):
print(f"Log in succesful!")
logged_in = True
if not os.path.exists('secrets'):
os.makedirs('secrets')
secrets_filepath = 'secrets/secrets.txt'
if not os.path.exists(secrets_filepath):
with open(secrets_filepath, 'w'): pass
print(f"{secrets_filepath} created!")
with open(secrets_filepath, 'a') as the_file:
print("Writing secrets parameter names to " + secrets_filepath)
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
client_path = 'app_clientcred.txt'
with open(client_path) as fp:
line = fp.readline()
cnt = 1
while line:
if cnt == 1:
print("Writing client id to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip())
elif cnt == 2:
print("Writing client secret to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip())
line = fp.readline()
cnt += 1
token_path = 'app_usercred.txt'
with open(token_path) as fp:
line = fp.readline()
print("Writing access token to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip())
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt temp file..")
os.remove("app_clientcred.txt")
if os.path.exists("app_usercred.txt"):
print("Removing app_usercred.txt temp file..")
os.remove("app_usercred.txt")
self.config_filepath = 'config/config.txt'
self.create_config(self)
self.write_config(self)
self.read_config_line(self)
print("Secrets setup done!\n")
except AkkomaIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except AkkomaNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except AkkomaReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except AkkomaAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
return (logged_in, akkoma, self.akkoma_hostname)
@staticmethod
def get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def modify_file(self, file_name, pattern,value=""):
fh=fileinput.input(file_name,inplace=True)
for line in fh:
replacement=pattern + value
line=re.sub(pattern,replacement,line)
sys.stdout.write(line)
fh.close()
@staticmethod
def create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_filepath):
print(self.config_filepath + " created!")
with open('config/config.txt', 'w'): pass
@staticmethod
def write_config(self):
with open(self.config_filepath, 'a') as the_file:
the_file.write('akkoma_hostname: \n')
print(f"adding parameter 'akkoma_hostname' to {self.config_filepath}")
@staticmethod
def read_config_line(self):
with open(self.config_filepath) as fp:
line = fp.readline()
self.modify_file(self, self.config_filepath, "akkoma_hostname: ", value=self.akkoma_hostname)
def get_data(self, notif):
id = notif.id
account_id = notif.account.id
acct = notif.account.acct
status_id = notif.status.id
text = notif.status.content
visibility = notif.status.visibility
reply, question = self.get_question(self, text)
mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility}
mention = self.__json_allow_dict_attrs(mention_dict)
return mention
@staticmethod
def get_question(self, text):
reply = False
keyword = ''
content = self.cleanhtml(self, text)
content = self.unescape(self, content)
try:
start = content.index("@")
end = content.index(" ")
if len(content) > end:
content = content[0: start:] + content[end +1::]
cleanit = content.count('@')
i = 0
while i < cleanit :
start = content.rfind("@")
end = len(content)
content = content[0: start:] + content[end +1::]
i += 1
content = content.lower()
question = content
#keyword_length = 8
keyword = question
if keyword == 'registre' or keyword == 'baixa' or keyword == 'info':
keyword_length = len(keyword)
if unidecode.unidecode(question)[0:keyword_length] == keyword:
reply = True
except:
pass
return (reply, question)
@staticmethod
def cleanhtml(self, raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
@staticmethod
def unescape(self, s):
s = s.replace("&apos;", "'")
return s
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object

187
ejabberdapi.py Normal file
Veure arxiu

@ -0,0 +1,187 @@
import os
import os.path
import requests
import string
import getpass
import secrets
from collections import OrderedDict
import pdb
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Ejabberd:
name = 'Ejabberd API wrapper'
def __init__(self, api_base_url=None, local_vhost=None, admin_account=None, admin_pass=None):
self.ejabberd_config_path = "secrets/ejabberd_secrets.txt"
is_setup = self.check_setup(self)
if is_setup:
self.api_base_url = self.get_parameter("api_base_url", self.ejabberd_config_path)
self.local_vhost = self.get_parameter("local_vhost", self.ejabberd_config_path)
self.admin_account = self.get_parameter("admin_account", self.ejabberd_config_path)
self.admin_pass = self.get_parameter("admin_pass", self.ejabberd_config_path)
else:
self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass = self.setup(self)
def generate_pass(self):
alphabet = string.ascii_letters + string.digits
while True:
password = ''.join(secrets.choice(alphabet) for i in range(10))
if (any(c.islower() for c in password)
and any(c.isupper() for c in password)
and sum(c.isdigit() for c in password) >= 3):
break
return password
def register(self, username, host, user_password):
data = {'user':username,
'host':self.local_vhost,
'password':user_password,
}
API_ENDPOINT = self.api_base_url + '/api/register?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
is_registered = response.ok
if is_registered:
response_text = response.json()
else:
response_text = f"{response.json()['status']}: {response.json()['message']}"
return (is_registered, response_text)
def unregister(self, username, host):
is_unregistered = False
if username+'@'+host == self.admin_account:
message = "ets l'admin, no puc esborrar el teu compte!"
return (is_unregistered, message)
data = {'user':username,
'host':self.local_vhost,
}
API_ENDPOINT = self.api_base_url + '/api/unregister?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
is_unregistered = response.ok
message = "eliminat amb èxit!"
return (is_unregistered, message)
def stats(self):
names_temp = ["registeredusers","onlineusers","onlineusersnode","uptimeseconds","processes"]
names = OrderedDict.fromkeys(names_temp).keys()
stats_dict = {}
for name in names:
data = {
"name": name
}
API_ENDPOINT = self.api_base_url + '/api/stats?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
result = response.json()['stat']
stats_dict[name] = result
stats = self.__json_allow_dict_attrs(stats_dict)
return stats
@staticmethod
def check_setup(self):
is_setup = False
if not os.path.isfile(self.ejabberd_config_path):
print(f"File {self.ejabberd_config_path} not found, running setup.")
else:
is_setup = True
return is_setup
@staticmethod
def setup(self):
if not os.path.exists('secrets'):
os.makedirs('secrets')
self.api_base_url = input("api_base_url, in ex. 'http://127.0.0.1:5280': ")
self.local_vhost = input("local_vhost, in ex. 'ejabberd.server': ")
self.admin_account = input("admin_account, in ex. 'admin@ejabberd.server': ")
self.admin_pass = getpass.getpass("admin_pass, in ex. 'my_very_hard_secret_pass': ")
if not os.path.exists(self.ejabberd_config_path):
with open(self.ejabberd_config_path, 'w'): pass
print(f"{self.ejabberd_config_path} created!")
with open(self.ejabberd_config_path, 'a') as the_file:
print("Writing ejabberd secrets parameters to " + self.ejabberd_config_path)
the_file.write(f'api_base_url: {self.api_base_url}\n'+f'local_vhost: {self.local_vhost}\n'+f'admin_account: {self.admin_account}\n'+f'admin_pass: {self.admin_pass}\n')
return (self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass)
@staticmethod
def get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object

5
requirements.txt Normal file
Veure arxiu

@ -0,0 +1,5 @@
pytz
requests
python-dateutil
decorator
unidecode

86
xmpp.py Normal file
Veure arxiu

@ -0,0 +1,86 @@
from akkomabot import Akkomabot
from ejabberdapi import Ejabberd
import pdb
# main
if __name__ == '__main__':
bot = Akkomabot()
ejabberd = Ejabberd()
notifications = bot.akkoma.notifications()
for notif in notifications:
if notif.type != 'mention':
print(f"Dismissing notification id {notif.id}")
bot.akkoma.notifications_dismiss(notif.id)
else:
mention = bot.get_data(notif)
if mention.reply and '@' not in mention.acct:
if mention.question == 'registre':
password = ejabberd.generate_pass()
is_registered, text = ejabberd.register(mention.acct, bot.akkoma_hostname, password)
if is_registered:
post = f"@{mention.acct} compte xmpp registrat amb èxit!\n\nusuari: {mention.acct}@{bot.akkoma_hostname}\n"
post += f"contrasenya: {password}\nservidor: {bot.akkoma_hostname}"
bot.akkoma.status_post(post, in_reply_to_id=mention.status_id, visibility='direct')
else:
bot.akkoma.status_post(f'@{mention.acct}, {text}', in_reply_to_id=mention.status_id, visibility='direct')
elif mention.question == 'baixa':
is_unregistered, message = ejabberd.unregister(mention.acct, bot.akkoma_hostname)
if is_unregistered:
bot.akkoma.status_post(f"@{mention.acct}, compte xmpp {mention.acct}@{bot.akkoma_hostname}: {message}", in_reply_to_id=mention.status_id, visibility='direct')
else:
bot.akkoma.status_post(f'@{mention.acct}, {message}', in_reply_to_id=mention.status_id, visibility='direct')
elif mention.question == 'info':
stats = ejabberd.stats()
post = f'@{mention.acct}, estadístiques del node #xmpp a {bot.akkoma_hostname}:\n\n'
post += f'usuaris registrats: {stats.registeredusers}\n'
post += f'usuaris en línia: {stats.onlineusers}\n'
post += f'usuaris del node: {stats.onlineusersnode}\n'
post += f'temps en línia (uptime): {stats.uptimeseconds}\n'
post += f'processos: {stats.processes}\n'
bot.akkoma.status_post(post, in_reply_to_id=mention.status_id, visibility=mention.visibility)
print(f"Dismissing notification id {mention.id}")
bot.akkoma.notifications_dismiss(mention.id)
else:
print(f"Dismissing notification id {mention.id}")
bot.akkoma.notifications_dismiss(mention.id)