Comparar commits

...

5 commits

Autor SHA1 Mensaje Fecha
spla a0b7b6a367 Bot for register xmpp ejabberd accounts from Akkoma instance 2022-08-08 22:36:12 +02:00
spla 1354d25b4b Added notificactions, me and account methods 2022-08-07 16:50:15 +02:00
spla 4d19b21940 Update README.md 2022-07-24 21:54:08 +02:00
spla 4fced75d90 Updated 2022-07-24 21:18:19 +02:00
spla 05ebc69894 Add LICENSE and README.md 2022-07-24 21:14:42 +02:00
S'han modificat 7 arxius amb 822 adicions i 24 eliminacions

21
LICENSE Normal file
Veure arxiu

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 Lorenz Diener / Mastodon.py contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

32
README.md Normal file
Veure arxiu

@ -0,0 +1,32 @@
# xmpp Akkoma bot
bot to manage an xmpp ejabberd node by posting keywords to it from your Akkoma account.
The bot only listen keywords from your Akkoma instance local users. They can register themselves to your xmpp server, unregister and also get your xmpp node stats.
The keywords that Akkoma instance local users can use are:
@your_bot register
@your_bot unregister
@your_bot stats
The bot will process the keyword thanks to the wrapper for ejabberd (included) and the wrapper for Akkoma (also included) but first time you run `python xmpp.py` it will ask for the needed parameters like:
- api_base_url: http://127.0.0.1:5280
- local_vhost: your local ejabberd vhost
- admin_account: the ejabberd admin account, in exemple admin@ejabberd.server
- admin_pass: ejabberd admin account password
- Akkoma hostname: in ex. akkoma.host
Before running `python xmpp.py`:
1. git clone https://git.mastodont.cat/spla/xmpp.git target_dir.
2. `cd target_dir`
3. create the Python Virtual Environment with `python3.x -m venv .`
4. activate it with `source bin/activate`
5. run `pip install -r requirements.txt` to install required libraries.
6. set up your contrab to run `python xmpp.py` every minute.
Enjoy!

110
akkoma.py
Veure arxiu

@ -145,6 +145,7 @@ class Akkoma:
__DICT_VERSION_POLL = "2.8.0"
__DICT_VERSION_STATUS = bigger_version(bigger_version(bigger_version(bigger_version(bigger_version("3.1.0",
__DICT_VERSION_MEDIA), __DICT_VERSION_ACCOUNT), __DICT_VERSION_APPLICATION), __DICT_VERSION_MENTION), __DICT_VERSION_POLL)
__DICT_VERSION_NOTIFICATION = bigger_version(bigger_version("1.0.0", __DICT_VERSION_ACCOUNT), __DICT_VERSION_STATUS)
@staticmethod
def create_app(app_name, scopes=__DEFAULT_SCOPES, redirect_uris=None, website=None, to_file=None, api_base_url=__DEFAULT_BASE_URL,
@ -335,6 +336,25 @@ class Akkoma:
self.akkoma_major, self.akkoma_minor, self.akkoma_patch = parse_version_string(version_str)
return version_str
def verify_minimum_version(self, version_str, cached=False):
"""
Update version info from server and verify that at least the specified version is present.
If you specify "cached", the version info update part is skipped.
Returns True if version requirement is satisfied, False if not.
"""
if not cached:
self.retrieve_akkoma_version()
major, minor, patch = parse_version_string(version_str)
if major > self.akkoma_major:
return False
elif major == self.akkoma_major and minor > self.akkoma_minor:
return False
elif major == self.akkoma_major and minor == self.akkoma_minor and patch > self.akkoma_patch:
return False
return True
def log_in(self, client_id=None, client_secret=None, grant_type=None, username=None, password=None, code=None, redirect_uri="urn:ietf:wg:oauth:2.0:oob", refresh_token=None, scopes=__DEFAULT_SCOPES, to_file=None):
"""
Get the access token for a user.
@ -404,6 +424,72 @@ class Akkoma:
self.__logged_in_id = None
return response['access_token']
###
# Reading data: Notifications
###
#@api_version("1.0.0", "2.9.0", __DICT_VERSION_NOTIFICATION)
def notifications(self, id=None, account_id=None, max_id=None, min_id=None, since_id=None, limit=None, mentions_only=None):
"""
Fetch notifications (mentions, favourites, reblogs, follows) for the logged-in
user. Pass `account_id` to get only notifications originating from the given account.
Can be passed an `id` to fetch a single notification.
Returns a list of `notification dicts`_.
"""
if max_id != None:
max_id = self.__unpack_id(max_id)
if min_id != None:
min_id = self.__unpack_id(min_id)
if since_id != None:
since_id = self.__unpack_id(since_id)
if account_id != None:
account_id = self.__unpack_id(account_id)
if id is None:
params = self.__generate_params(locals(), ['id'])
return self.__api_request('GET', '/api/v1/notifications', params)
else:
id = self.__unpack_id(id)
url = '/api/v1/notifications/{0}'.format(str(id))
return self.__api_request('GET', url)
###
# Reading data: Accounts
###
@api_version("1.0.0", "1.0.0", __DICT_VERSION_ACCOUNT)
def account(self, id):
"""
Fetch account information by user `id`.
Does not require authentication for publicly visible accounts.
Returns a `user dict`_.
"""
id = self.__unpack_id(id)
url = '/api/v1/accounts/{0}'.format(str(id))
return self.__api_request('GET', url)
@api_version("1.0.0", "2.1.0", __DICT_VERSION_ACCOUNT)
def account_verify_credentials(self):
"""
Fetch logged-in user's account information.
Returns a `user dict`_ (Starting from 2.1.0, with an additional "source" field).
"""
return self.__api_request('GET', '/api/v1/accounts/verify_credentials')
@api_version("1.0.0", "2.1.0", __DICT_VERSION_ACCOUNT)
def me(self):
"""
Get this users account. Symonym for `account_verify_credentials()`, does exactly
the same thing, just exists becase `account_verify_credentials()` has a confusing
name.
"""
return self.account_verify_credentials()
###
# Internal helpers, dragons probably
###
@ -888,6 +974,26 @@ class Akkoma:
params = self.__generate_params(params_initial, ['idempotency_key'])
return self.__api_request('POST', '/api/v1/statuses', params, headers = headers, use_json = use_json)
###
# Writing data: Notifications
###
#@api_version("1.0.0", "1.0.0", "1.0.0")
def notifications_clear(self):
"""
Clear out a users notifications
"""
self.__api_request('POST', '/api/v1/notifications/clear')
#@api_version("1.3.0", "2.9.2", "2.9.2")
def notifications_dismiss(self, id):
"""
Deletes a single notification
"""
id = self.__unpack_id(id)
url = '/api/v1/notifications/{0}/dismiss'.format(str(id))
self.__api_request('POST', url)
###
# Writing data: Media
###
@ -992,6 +1098,10 @@ class AkkomaAPIError(AkkomaError):
"""Raised when the akkoma API generates a response that cannot be handled"""
pass
class AkkomaNotFoundError(AkkomaAPIError):
"""Raised when the akkoma API returns a 404 Not Found error"""
pass
class AkkomaMalformedEventError(AkkomaError):
"""Raised when the server-sent event stream is malformed"""
pass

357
akkomabot.py Normal file
Veure arxiu

@ -0,0 +1,357 @@
from akkoma import Akkoma
from akkoma import AkkomaMalformedEventError, AkkomaNetworkError, AkkomaReadTimeout, AkkomaAPIError, AkkomaIllegalArgumentError
import getpass
import unidecode
import fileinput,re
import os
import sys
import os.path
import pdb
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Akkomabot:
name = 'Akkomabot'
def __init__(self, akkoma=None, akkoma_hostname=None):
file_path = "secrets/secrets.txt"
is_setup = self.check_setup(file_path)
if is_setup:
self.uc_client_id = self.get_parameter("uc_client_id", file_path)
self.uc_client_secret = self.get_parameter("uc_client_secret", file_path)
self.uc_access_token = self.get_parameter("uc_access_token", file_path)
self.akkoma, self.akkoma_hostname = self.log_in(self)
else:
while(True):
logged_in, self.akkoma, self.akkoma_hostname = self.setup()
if not logged_in:
print("\nLog in failed! Try again.\n")
else:
break
@staticmethod
def log_in(self):
file_path = "secrets/secrets.txt"
uc_client_id = self.get_parameter("uc_client_id", file_path)
uc_client_secret = self.get_parameter("uc_client_secret", file_path)
uc_access_token = self.get_parameter("uc_access_token", file_path)
file_path = "config/config.txt"
self.akkoma_hostname = self.get_parameter("akkoma_hostname", file_path)
self.akkoma = Akkoma(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + self.akkoma_hostname,
)
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (self.akkoma, self.akkoma_hostname)
@staticmethod
def check_setup(file_path):
is_setup = False
if not os.path.isfile(file_path):
print(f"File {file_path} not found, running setup.")
return
else:
is_setup = True
return is_setup
def setup(self):
logged_in = False
try:
self.akkoma_hostname = input("Enter Akkoma hostname (or 'q' to exit): ")
if self.akkoma_hostname == 'q':
sys.exit("Bye")
user_name = input("User name, ex. john? ")
user_password = getpass.getpass("User password? ")
app_name = input("App name? ")
client_id, client_secret = Akkoma.create_app(
app_name,
to_file="app_clientcred.txt",
api_base_url=self.akkoma_hostname
)
akkoma = Akkoma(client_id = "app_clientcred.txt", api_base_url = self.akkoma_hostname)
grant_type = 'password'
akkoma.log_in(
client_id,
client_secret,
grant_type,
user_name,
user_password,
scopes = ["read", "write"],
to_file = "app_usercred.txt"
)
if os.path.isfile("app_usercred.txt"):
print(f"Log in succesful!")
logged_in = True
if not os.path.exists('secrets'):
os.makedirs('secrets')
secrets_filepath = 'secrets/secrets.txt'
if not os.path.exists(secrets_filepath):
with open(secrets_filepath, 'w'): pass
print(f"{secrets_filepath} created!")
with open(secrets_filepath, 'a') as the_file:
print("Writing secrets parameter names to " + secrets_filepath)
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
client_path = 'app_clientcred.txt'
with open(client_path) as fp:
line = fp.readline()
cnt = 1
while line:
if cnt == 1:
print("Writing client id to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip())
elif cnt == 2:
print("Writing client secret to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip())
line = fp.readline()
cnt += 1
token_path = 'app_usercred.txt'
with open(token_path) as fp:
line = fp.readline()
print("Writing access token to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip())
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt temp file..")
os.remove("app_clientcred.txt")
if os.path.exists("app_usercred.txt"):
print("Removing app_usercred.txt temp file..")
os.remove("app_usercred.txt")
self.config_filepath = 'config/config.txt'
self.create_config(self)
self.write_config(self)
self.read_config_line(self)
print("Secrets setup done!\n")
except AkkomaIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except AkkomaNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except AkkomaReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except AkkomaAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
return (logged_in, akkoma, self.akkoma_hostname)
@staticmethod
def get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def modify_file(self, file_name, pattern,value=""):
fh=fileinput.input(file_name,inplace=True)
for line in fh:
replacement=pattern + value
line=re.sub(pattern,replacement,line)
sys.stdout.write(line)
fh.close()
@staticmethod
def create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_filepath):
print(self.config_filepath + " created!")
with open('config/config.txt', 'w'): pass
@staticmethod
def write_config(self):
with open(self.config_filepath, 'a') as the_file:
the_file.write('akkoma_hostname: \n')
print(f"adding parameter 'akkoma_hostname' to {self.config_filepath}")
@staticmethod
def read_config_line(self):
with open(self.config_filepath) as fp:
line = fp.readline()
self.modify_file(self, self.config_filepath, "akkoma_hostname: ", value=self.akkoma_hostname)
def get_data(self, notif):
id = notif.id
account_id = notif.account.id
acct = notif.account.acct
status_id = notif.status.id
text = notif.status.content
visibility = notif.status.visibility
reply, question = self.get_question(self, text)
mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility}
mention = self.__json_allow_dict_attrs(mention_dict)
return mention
@staticmethod
def get_question(self, text):
reply = False
keyword = ''
content = self.cleanhtml(self, text)
content = self.unescape(self, content)
try:
start = content.index("@")
end = content.index(" ")
if len(content) > end:
content = content[0: start:] + content[end +1::]
cleanit = content.count('@')
i = 0
while i < cleanit :
start = content.rfind("@")
end = len(content)
content = content[0: start:] + content[end +1::]
i += 1
content = content.lower()
question = content
#keyword_length = 8
keyword = question
if keyword == 'registre' or keyword == 'baixa' or keyword == 'info':
keyword_length = len(keyword)
if unidecode.unidecode(question)[0:keyword_length] == keyword:
reply = True
except:
pass
return (reply, question)
@staticmethod
def cleanhtml(self, raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
@staticmethod
def unescape(self, s):
s = s.replace("&apos;", "'")
return s
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object

187
ejabberdapi.py Normal file
Veure arxiu

@ -0,0 +1,187 @@
import os
import os.path
import requests
import string
import getpass
import secrets
from collections import OrderedDict
import pdb
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Ejabberd:
name = 'Ejabberd API wrapper'
def __init__(self, api_base_url=None, local_vhost=None, admin_account=None, admin_pass=None):
self.ejabberd_config_path = "secrets/ejabberd_secrets.txt"
is_setup = self.check_setup(self)
if is_setup:
self.api_base_url = self.get_parameter("api_base_url", self.ejabberd_config_path)
self.local_vhost = self.get_parameter("local_vhost", self.ejabberd_config_path)
self.admin_account = self.get_parameter("admin_account", self.ejabberd_config_path)
self.admin_pass = self.get_parameter("admin_pass", self.ejabberd_config_path)
else:
self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass = self.setup(self)
def generate_pass(self):
alphabet = string.ascii_letters + string.digits
while True:
password = ''.join(secrets.choice(alphabet) for i in range(10))
if (any(c.islower() for c in password)
and any(c.isupper() for c in password)
and sum(c.isdigit() for c in password) >= 3):
break
return password
def register(self, username, host, user_password):
data = {'user':username,
'host':self.local_vhost,
'password':user_password,
}
API_ENDPOINT = self.api_base_url + '/api/register?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
is_registered = response.ok
if is_registered:
response_text = response.json()
else:
response_text = f"{response.json()['status']}: {response.json()['message']}"
return (is_registered, response_text)
def unregister(self, username, host):
is_unregistered = False
if username+'@'+host == self.admin_account:
message = "ets l'admin, no puc esborrar el teu compte!"
return (is_unregistered, message)
data = {'user':username,
'host':self.local_vhost,
}
API_ENDPOINT = self.api_base_url + '/api/unregister?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
is_unregistered = response.ok
message = "eliminat amb èxit!"
return (is_unregistered, message)
def stats(self):
names_temp = ["registeredusers","onlineusers","onlineusersnode","uptimeseconds","processes"]
names = OrderedDict.fromkeys(names_temp).keys()
stats_dict = {}
for name in names:
data = {
"name": name
}
API_ENDPOINT = self.api_base_url + '/api/stats?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
result = response.json()['stat']
stats_dict[name] = result
stats = self.__json_allow_dict_attrs(stats_dict)
return stats
@staticmethod
def check_setup(self):
is_setup = False
if not os.path.isfile(self.ejabberd_config_path):
print(f"File {self.ejabberd_config_path} not found, running setup.")
else:
is_setup = True
return is_setup
@staticmethod
def setup(self):
if not os.path.exists('secrets'):
os.makedirs('secrets')
self.api_base_url = input("api_base_url, in ex. 'http://127.0.0.1:5280': ")
self.local_vhost = input("local_vhost, in ex. 'ejabberd.server': ")
self.admin_account = input("admin_account, in ex. 'admin@ejabberd.server': ")
self.admin_pass = getpass.getpass("admin_pass, in ex. 'my_very_hard_secret_pass': ")
if not os.path.exists(self.ejabberd_config_path):
with open(self.ejabberd_config_path, 'w'): pass
print(f"{self.ejabberd_config_path} created!")
with open(self.ejabberd_config_path, 'a') as the_file:
print("Writing ejabberd secrets parameters to " + self.ejabberd_config_path)
the_file.write(f'api_base_url: {self.api_base_url}\n'+f'local_vhost: {self.local_vhost}\n'+f'admin_account: {self.admin_account}\n'+f'admin_pass: {self.admin_pass}\n')
return (self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass)
@staticmethod
def get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object

5
requirements.txt Normal file
Veure arxiu

@ -0,0 +1,5 @@
pytz
requests
python-dateutil
decorator
unidecode

86
xmpp.py Normal file
Veure arxiu

@ -0,0 +1,86 @@
from akkomabot import Akkomabot
from ejabberdapi import Ejabberd
import pdb
# main
if __name__ == '__main__':
bot = Akkomabot()
ejabberd = Ejabberd()
notifications = bot.akkoma.notifications()
for notif in notifications:
if notif.type != 'mention':
print(f"Dismissing notification id {notif.id}")
bot.akkoma.notifications_dismiss(notif.id)
else:
mention = bot.get_data(notif)
if mention.reply and '@' not in mention.acct:
if mention.question == 'registre':
password = ejabberd.generate_pass()
is_registered, text = ejabberd.register(mention.acct, bot.akkoma_hostname, password)
if is_registered:
post = f"@{mention.acct} compte xmpp registrat amb èxit!\n\nusuari: {mention.acct}@{bot.akkoma_hostname}\n"
post += f"contrasenya: {password}\nservidor: {bot.akkoma_hostname}"
bot.akkoma.status_post(post, in_reply_to_id=mention.status_id, visibility='direct')
else:
bot.akkoma.status_post(f'@{mention.acct}, {text}', in_reply_to_id=mention.status_id, visibility='direct')
elif mention.question == 'baixa':
is_unregistered, message = ejabberd.unregister(mention.acct, bot.akkoma_hostname)
if is_unregistered:
bot.akkoma.status_post(f"@{mention.acct}, compte xmpp {mention.acct}@{bot.akkoma_hostname}: {message}", in_reply_to_id=mention.status_id, visibility='direct')
else:
bot.akkoma.status_post(f'@{mention.acct}, {message}', in_reply_to_id=mention.status_id, visibility='direct')
elif mention.question == 'info':
stats = ejabberd.stats()
post = f'@{mention.acct}, estadístiques del node #xmpp a {bot.akkoma_hostname}:\n\n'
post += f'usuaris registrats: {stats.registeredusers}\n'
post += f'usuaris en línia: {stats.onlineusers}\n'
post += f'usuaris del node: {stats.onlineusersnode}\n'
post += f'temps en línia (uptime): {stats.uptimeseconds}\n'
post += f'processos: {stats.processes}\n'
bot.akkoma.status_post(post, in_reply_to_id=mention.status_id, visibility=mention.visibility)
print(f"Dismissing notification id {mention.id}")
bot.akkoma.notifications_dismiss(mention.id)
else:
print(f"Dismissing notification id {mention.id}")
bot.akkoma.notifications_dismiss(mention.id)