Comparar commits

...

5 commits

Autor SHA1 Mensaje Fecha
spla a0b7b6a367 Bot for register xmpp ejabberd accounts from Akkoma instance 2022-08-08 22:36:12 +02:00
spla 1354d25b4b Added notificactions, me and account methods 2022-08-07 16:50:15 +02:00
spla 4d19b21940 Update README.md 2022-07-24 21:54:08 +02:00
spla 4fced75d90 Updated 2022-07-24 21:18:19 +02:00
spla 05ebc69894 Add LICENSE and README.md 2022-07-24 21:14:42 +02:00
S'han modificat 7 arxius amb 822 adicions i 24 eliminacions

21
LICENSE Normal file
Veure arxiu

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 Lorenz Diener / Mastodon.py contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

32
README.md Normal file
Veure arxiu

@ -0,0 +1,32 @@
# xmpp Akkoma bot
bot to manage an xmpp ejabberd node by posting keywords to it from your Akkoma account.
The bot only listen keywords from your Akkoma instance local users. They can register themselves to your xmpp server, unregister and also get your xmpp node stats.
The keywords that Akkoma instance local users can use are:
@your_bot register
@your_bot unregister
@your_bot stats
The bot will process the keyword thanks to the wrapper for ejabberd (included) and the wrapper for Akkoma (also included) but first time you run `python xmpp.py` it will ask for the needed parameters like:
- api_base_url: http://127.0.0.1:5280
- local_vhost: your local ejabberd vhost
- admin_account: the ejabberd admin account, in exemple admin@ejabberd.server
- admin_pass: ejabberd admin account password
- Akkoma hostname: in ex. akkoma.host
Before running `python xmpp.py`:
1. git clone https://git.mastodont.cat/spla/xmpp.git target_dir.
2. `cd target_dir`
3. create the Python Virtual Environment with `python3.x -m venv .`
4. activate it with `source bin/activate`
5. run `pip install -r requirements.txt` to install required libraries.
6. set up your contrab to run `python xmpp.py` every minute.
Enjoy!

158
akkoma.py
Veure arxiu

@ -77,7 +77,7 @@ class AttribAccessDict(dict):
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
@ -145,7 +145,8 @@ class Akkoma:
__DICT_VERSION_POLL = "2.8.0"
__DICT_VERSION_STATUS = bigger_version(bigger_version(bigger_version(bigger_version(bigger_version("3.1.0",
__DICT_VERSION_MEDIA), __DICT_VERSION_ACCOUNT), __DICT_VERSION_APPLICATION), __DICT_VERSION_MENTION), __DICT_VERSION_POLL)
__DICT_VERSION_NOTIFICATION = bigger_version(bigger_version("1.0.0", __DICT_VERSION_ACCOUNT), __DICT_VERSION_STATUS)
@staticmethod
def create_app(app_name, scopes=__DEFAULT_SCOPES, redirect_uris=None, website=None, to_file=None, api_base_url=__DEFAULT_BASE_URL,
request_timeout=__DEFAULT_TIMEOUT, session=None):
@ -233,13 +234,13 @@ class Akkoma:
expect to be installed on the server. The function will throw an error if an unparseable
Version is specified. If no version is specified, Akkoma.py will set `akkoma_version` to the
detected version.
The version check mode can be set to "created" (the default behaviour), "changed" or "none". If set to
"created", Akkoma.py will throw an error if the version of Akkoma it is connected to is too old
to have an endpoint. If it is set to "changed", it will throw an error if the endpoints behaviour has
changed after the version of Akkoma that is connected has been released. If it is set to "none",
version checking is disabled.
`feature_set` can be used to enable behaviour specific to non-mainline Akkoma API implementations.
Details are documented in the functions that provide such functionality. Currently supported feature
sets are `mainline`, `fedibird` and `pleroma`.
@ -247,7 +248,7 @@ class Akkoma:
self.api_base_url = None
if not api_base_url is None:
self.api_base_url = Akkoma.__protocolize(api_base_url)
self.client_id = client_id
self.client_secret = client_secret
self.access_token = access_token
@ -255,9 +256,9 @@ class Akkoma:
self.ratelimit_method = ratelimit_method
self._token_expired = datetime.datetime.now()
self._refresh_token = None
self.__logged_in_id = None
self.ratelimit_limit = 300
self.ratelimit_reset = time.time()
self.ratelimit_remaining = 300
@ -274,14 +275,14 @@ class Akkoma:
self.feature_set = feature_set
if not self.feature_set in ["mainline", "fedibird", "pleroma"]:
raise AkkomaIllegalArgumentError('Requested invalid feature set')
# Token loading
if self.client_id is not None:
if os.path.isfile(self.client_id):
with open(self.client_id, 'r') as secret_file:
self.client_id = secret_file.readline().rstrip()
self.client_secret = secret_file.readline().rstrip()
try_base_url = secret_file.readline().rstrip()
if (not try_base_url is None) and len(try_base_url) != 0:
try_base_url = Akkoma.__protocolize(try_base_url)
@ -295,14 +296,14 @@ class Akkoma:
if self.access_token is not None and os.path.isfile(self.access_token):
with open(self.access_token, 'r') as token_file:
self.access_token = token_file.readline().rstrip()
try_base_url = token_file.readline().rstrip()
if (not try_base_url is None) and len(try_base_url) != 0:
try_base_url = Akkoma.__protocolize(try_base_url)
if not (self.api_base_url is None or try_base_url == self.api_base_url):
raise AkkomaIllegalArgumentError('Mismatch in base URLs between files and/or specified')
self.api_base_url = try_base_url
# Versioning
if akkoma_version == None:
self.retrieve_akkoma_version()
@ -311,11 +312,11 @@ class Akkoma:
self.akkoma_major, self.akkoma_minor, self.akkoma_patch = parse_version_string(akkoma_version)
except:
raise AkkomaVersionError("Bad version specified")
if not version_check_mode in ["created", "changed", "none"]:
raise AkkomaIllegalArgumentError("Invalid version check method.")
self.version_check_mode = version_check_mode
# Ratelimiting parameter check
if ratelimit_method not in ["throw", "wait", "pace"]:
raise AkkomaIllegalArgumentError("Invalid ratelimit method.")
@ -335,16 +336,35 @@ class Akkoma:
self.akkoma_major, self.akkoma_minor, self.akkoma_patch = parse_version_string(version_str)
return version_str
def verify_minimum_version(self, version_str, cached=False):
"""
Update version info from server and verify that at least the specified version is present.
If you specify "cached", the version info update part is skipped.
Returns True if version requirement is satisfied, False if not.
"""
if not cached:
self.retrieve_akkoma_version()
major, minor, patch = parse_version_string(version_str)
if major > self.akkoma_major:
return False
elif major == self.akkoma_major and minor > self.akkoma_minor:
return False
elif major == self.akkoma_major and minor == self.akkoma_minor and patch > self.akkoma_patch:
return False
return True
def log_in(self, client_id=None, client_secret=None, grant_type=None, username=None, password=None, code=None, redirect_uri="urn:ietf:wg:oauth:2.0:oob", refresh_token=None, scopes=__DEFAULT_SCOPES, to_file=None):
"""
Get the access token for a user.
The username is the e-mail used to log in into akkoma.
Can persist access token to file `to_file`, to be used in the constructor.
Handles password and OAuth-based authorization.
Will throw a `AkkomaIllegalArgumentError` if the OAuth or the
username / password credentials given are incorrect, and
`AkkomaAPIError` if all of the requested scopes were not granted.
@ -391,7 +411,7 @@ class Akkoma:
for scope_set in self.__SCOPE_SETS.keys():
if scope_set in received_scopes:
received_scopes += self.__SCOPE_SETS[scope_set]
if not set(scopes) <= set(received_scopes):
raise AkkomaAPIError(
'Granted scopes "' + " ".join(received_scopes) + '" do not contain all of the requested scopes "' + " ".join(scopes) + '".')
@ -400,14 +420,80 @@ class Akkoma:
with open(to_file, 'w') as token_file:
token_file.write(response['access_token'] + "\n")
token_file.write(self.api_base_url + "\n")
self.__logged_in_id = None
return response['access_token']
###
# Reading data: Notifications
###
#@api_version("1.0.0", "2.9.0", __DICT_VERSION_NOTIFICATION)
def notifications(self, id=None, account_id=None, max_id=None, min_id=None, since_id=None, limit=None, mentions_only=None):
"""
Fetch notifications (mentions, favourites, reblogs, follows) for the logged-in
user. Pass `account_id` to get only notifications originating from the given account.
Can be passed an `id` to fetch a single notification.
Returns a list of `notification dicts`_.
"""
if max_id != None:
max_id = self.__unpack_id(max_id)
if min_id != None:
min_id = self.__unpack_id(min_id)
if since_id != None:
since_id = self.__unpack_id(since_id)
if account_id != None:
account_id = self.__unpack_id(account_id)
if id is None:
params = self.__generate_params(locals(), ['id'])
return self.__api_request('GET', '/api/v1/notifications', params)
else:
id = self.__unpack_id(id)
url = '/api/v1/notifications/{0}'.format(str(id))
return self.__api_request('GET', url)
###
# Reading data: Accounts
###
@api_version("1.0.0", "1.0.0", __DICT_VERSION_ACCOUNT)
def account(self, id):
"""
Fetch account information by user `id`.
Does not require authentication for publicly visible accounts.
Returns a `user dict`_.
"""
id = self.__unpack_id(id)
url = '/api/v1/accounts/{0}'.format(str(id))
return self.__api_request('GET', url)
@api_version("1.0.0", "2.1.0", __DICT_VERSION_ACCOUNT)
def account_verify_credentials(self):
"""
Fetch logged-in user's account information.
Returns a `user dict`_ (Starting from 2.1.0, with an additional "source" field).
"""
return self.__api_request('GET', '/api/v1/accounts/verify_credentials')
@api_version("1.0.0", "2.1.0", __DICT_VERSION_ACCOUNT)
def me(self):
"""
Get this users account. Symonym for `account_verify_credentials()`, does exactly
the same thing, just exists becase `account_verify_credentials()` has a confusing
name.
"""
return self.account_verify_credentials()
###
# Internal helpers, dragons probably
###
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
@ -435,7 +521,7 @@ class Akkoma:
except:
raise AkkomaAPIError('Encountered invalid date.')
return json_object
@staticmethod
def __json_truefalse_parse(json_object):
"""
@ -887,7 +973,27 @@ class Akkoma:
params = self.__generate_params(params_initial, ['idempotency_key'])
return self.__api_request('POST', '/api/v1/statuses', params, headers = headers, use_json = use_json)
###
# Writing data: Notifications
###
#@api_version("1.0.0", "1.0.0", "1.0.0")
def notifications_clear(self):
"""
Clear out a users notifications
"""
self.__api_request('POST', '/api/v1/notifications/clear')
#@api_version("1.3.0", "2.9.2", "2.9.2")
def notifications_dismiss(self, id):
"""
Deletes a single notification
"""
id = self.__unpack_id(id)
url = '/api/v1/notifications/{0}/dismiss'.format(str(id))
self.__api_request('POST', url)
###
# Writing data: Media
###
@ -924,7 +1030,7 @@ class Akkoma:
if focus != None:
focus = str(focus[0]) + "," + str(focus[1])
media_file_description = (file_name, media_file, mime_type)
return self.__api_request('POST', '/api/v1/media',
files={'file': media_file_description},
@ -933,7 +1039,7 @@ class Akkoma:
def __unpack_id(self, id):
"""
Internal object-to-id converter
Checks if id is a dict that contains id and
returns the id inside, otherwise just returns
the id straight.
@ -952,7 +1058,7 @@ class Akkoma:
"""Internal helper for oauth code"""
self._refresh_token = value
return
@staticmethod
def __protocolize(base_url):
"""Internal add-protocol-to-url helper"""
@ -992,6 +1098,10 @@ class AkkomaAPIError(AkkomaError):
"""Raised when the akkoma API generates a response that cannot be handled"""
pass
class AkkomaNotFoundError(AkkomaAPIError):
"""Raised when the akkoma API returns a 404 Not Found error"""
pass
class AkkomaMalformedEventError(AkkomaError):
"""Raised when the server-sent event stream is malformed"""
pass

357
akkomabot.py Normal file
Veure arxiu

@ -0,0 +1,357 @@
from akkoma import Akkoma
from akkoma import AkkomaMalformedEventError, AkkomaNetworkError, AkkomaReadTimeout, AkkomaAPIError, AkkomaIllegalArgumentError
import getpass
import unidecode
import fileinput,re
import os
import sys
import os.path
import pdb
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Akkomabot:
name = 'Akkomabot'
def __init__(self, akkoma=None, akkoma_hostname=None):
file_path = "secrets/secrets.txt"
is_setup = self.check_setup(file_path)
if is_setup:
self.uc_client_id = self.get_parameter("uc_client_id", file_path)
self.uc_client_secret = self.get_parameter("uc_client_secret", file_path)
self.uc_access_token = self.get_parameter("uc_access_token", file_path)
self.akkoma, self.akkoma_hostname = self.log_in(self)
else:
while(True):
logged_in, self.akkoma, self.akkoma_hostname = self.setup()
if not logged_in:
print("\nLog in failed! Try again.\n")
else:
break
@staticmethod
def log_in(self):
file_path = "secrets/secrets.txt"
uc_client_id = self.get_parameter("uc_client_id", file_path)
uc_client_secret = self.get_parameter("uc_client_secret", file_path)
uc_access_token = self.get_parameter("uc_access_token", file_path)
file_path = "config/config.txt"
self.akkoma_hostname = self.get_parameter("akkoma_hostname", file_path)
self.akkoma = Akkoma(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + self.akkoma_hostname,
)
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (self.akkoma, self.akkoma_hostname)
@staticmethod
def check_setup(file_path):
is_setup = False
if not os.path.isfile(file_path):
print(f"File {file_path} not found, running setup.")
return
else:
is_setup = True
return is_setup
def setup(self):
logged_in = False
try:
self.akkoma_hostname = input("Enter Akkoma hostname (or 'q' to exit): ")
if self.akkoma_hostname == 'q':
sys.exit("Bye")
user_name = input("User name, ex. john? ")
user_password = getpass.getpass("User password? ")
app_name = input("App name? ")
client_id, client_secret = Akkoma.create_app(
app_name,
to_file="app_clientcred.txt",
api_base_url=self.akkoma_hostname
)
akkoma = Akkoma(client_id = "app_clientcred.txt", api_base_url = self.akkoma_hostname)
grant_type = 'password'
akkoma.log_in(
client_id,
client_secret,
grant_type,
user_name,
user_password,
scopes = ["read", "write"],
to_file = "app_usercred.txt"
)
if os.path.isfile("app_usercred.txt"):
print(f"Log in succesful!")
logged_in = True
if not os.path.exists('secrets'):
os.makedirs('secrets')
secrets_filepath = 'secrets/secrets.txt'
if not os.path.exists(secrets_filepath):
with open(secrets_filepath, 'w'): pass
print(f"{secrets_filepath} created!")
with open(secrets_filepath, 'a') as the_file:
print("Writing secrets parameter names to " + secrets_filepath)
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
client_path = 'app_clientcred.txt'
with open(client_path) as fp:
line = fp.readline()
cnt = 1
while line:
if cnt == 1:
print("Writing client id to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip())
elif cnt == 2:
print("Writing client secret to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip())
line = fp.readline()
cnt += 1
token_path = 'app_usercred.txt'
with open(token_path) as fp:
line = fp.readline()
print("Writing access token to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip())
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt temp file..")
os.remove("app_clientcred.txt")
if os.path.exists("app_usercred.txt"):
print("Removing app_usercred.txt temp file..")
os.remove("app_usercred.txt")
self.config_filepath = 'config/config.txt'
self.create_config(self)
self.write_config(self)
self.read_config_line(self)
print("Secrets setup done!\n")
except AkkomaIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except AkkomaNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except AkkomaReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except AkkomaAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
return (logged_in, akkoma, self.akkoma_hostname)
@staticmethod
def get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def modify_file(self, file_name, pattern,value=""):
fh=fileinput.input(file_name,inplace=True)
for line in fh:
replacement=pattern + value
line=re.sub(pattern,replacement,line)
sys.stdout.write(line)
fh.close()
@staticmethod
def create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_filepath):
print(self.config_filepath + " created!")
with open('config/config.txt', 'w'): pass
@staticmethod
def write_config(self):
with open(self.config_filepath, 'a') as the_file:
the_file.write('akkoma_hostname: \n')
print(f"adding parameter 'akkoma_hostname' to {self.config_filepath}")
@staticmethod
def read_config_line(self):
with open(self.config_filepath) as fp:
line = fp.readline()
self.modify_file(self, self.config_filepath, "akkoma_hostname: ", value=self.akkoma_hostname)
def get_data(self, notif):
id = notif.id
account_id = notif.account.id
acct = notif.account.acct
status_id = notif.status.id
text = notif.status.content
visibility = notif.status.visibility
reply, question = self.get_question(self, text)
mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility}
mention = self.__json_allow_dict_attrs(mention_dict)
return mention
@staticmethod
def get_question(self, text):
reply = False
keyword = ''
content = self.cleanhtml(self, text)
content = self.unescape(self, content)
try:
start = content.index("@")
end = content.index(" ")
if len(content) > end:
content = content[0: start:] + content[end +1::]
cleanit = content.count('@')
i = 0
while i < cleanit :
start = content.rfind("@")
end = len(content)
content = content[0: start:] + content[end +1::]
i += 1
content = content.lower()
question = content
#keyword_length = 8
keyword = question
if keyword == 'registre' or keyword == 'baixa' or keyword == 'info':
keyword_length = len(keyword)
if unidecode.unidecode(question)[0:keyword_length] == keyword:
reply = True
except:
pass
return (reply, question)
@staticmethod
def cleanhtml(self, raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
@staticmethod
def unescape(self, s):
s = s.replace("&apos;", "'")
return s
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object

187
ejabberdapi.py Normal file
Veure arxiu

@ -0,0 +1,187 @@
import os
import os.path
import requests
import string
import getpass
import secrets
from collections import OrderedDict
import pdb
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Ejabberd:
name = 'Ejabberd API wrapper'
def __init__(self, api_base_url=None, local_vhost=None, admin_account=None, admin_pass=None):
self.ejabberd_config_path = "secrets/ejabberd_secrets.txt"
is_setup = self.check_setup(self)
if is_setup:
self.api_base_url = self.get_parameter("api_base_url", self.ejabberd_config_path)
self.local_vhost = self.get_parameter("local_vhost", self.ejabberd_config_path)
self.admin_account = self.get_parameter("admin_account", self.ejabberd_config_path)
self.admin_pass = self.get_parameter("admin_pass", self.ejabberd_config_path)
else:
self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass = self.setup(self)
def generate_pass(self):
alphabet = string.ascii_letters + string.digits
while True:
password = ''.join(secrets.choice(alphabet) for i in range(10))
if (any(c.islower() for c in password)
and any(c.isupper() for c in password)
and sum(c.isdigit() for c in password) >= 3):
break
return password
def register(self, username, host, user_password):
data = {'user':username,
'host':self.local_vhost,
'password':user_password,
}
API_ENDPOINT = self.api_base_url + '/api/register?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
is_registered = response.ok
if is_registered:
response_text = response.json()
else:
response_text = f"{response.json()['status']}: {response.json()['message']}"
return (is_registered, response_text)
def unregister(self, username, host):
is_unregistered = False
if username+'@'+host == self.admin_account:
message = "ets l'admin, no puc esborrar el teu compte!"
return (is_unregistered, message)
data = {'user':username,
'host':self.local_vhost,
}
API_ENDPOINT = self.api_base_url + '/api/unregister?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
is_unregistered = response.ok
message = "eliminat amb èxit!"
return (is_unregistered, message)
def stats(self):
names_temp = ["registeredusers","onlineusers","onlineusersnode","uptimeseconds","processes"]
names = OrderedDict.fromkeys(names_temp).keys()
stats_dict = {}
for name in names:
data = {
"name": name
}
API_ENDPOINT = self.api_base_url + '/api/stats?'
response = requests.post(url = API_ENDPOINT, json = data, auth=(self.admin_account, self.admin_pass))
result = response.json()['stat']
stats_dict[name] = result
stats = self.__json_allow_dict_attrs(stats_dict)
return stats
@staticmethod
def check_setup(self):
is_setup = False
if not os.path.isfile(self.ejabberd_config_path):
print(f"File {self.ejabberd_config_path} not found, running setup.")
else:
is_setup = True
return is_setup
@staticmethod
def setup(self):
if not os.path.exists('secrets'):
os.makedirs('secrets')
self.api_base_url = input("api_base_url, in ex. 'http://127.0.0.1:5280': ")
self.local_vhost = input("local_vhost, in ex. 'ejabberd.server': ")
self.admin_account = input("admin_account, in ex. 'admin@ejabberd.server': ")
self.admin_pass = getpass.getpass("admin_pass, in ex. 'my_very_hard_secret_pass': ")
if not os.path.exists(self.ejabberd_config_path):
with open(self.ejabberd_config_path, 'w'): pass
print(f"{self.ejabberd_config_path} created!")
with open(self.ejabberd_config_path, 'a') as the_file:
print("Writing ejabberd secrets parameters to " + self.ejabberd_config_path)
the_file.write(f'api_base_url: {self.api_base_url}\n'+f'local_vhost: {self.local_vhost}\n'+f'admin_account: {self.admin_account}\n'+f'admin_pass: {self.admin_pass}\n')
return (self.api_base_url, self.local_vhost, self.admin_account, self.admin_pass)
@staticmethod
def get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object

5
requirements.txt Normal file
Veure arxiu

@ -0,0 +1,5 @@
pytz
requests
python-dateutil
decorator
unidecode

86
xmpp.py Normal file
Veure arxiu

@ -0,0 +1,86 @@
from akkomabot import Akkomabot
from ejabberdapi import Ejabberd
import pdb
# main
if __name__ == '__main__':
bot = Akkomabot()
ejabberd = Ejabberd()
notifications = bot.akkoma.notifications()
for notif in notifications:
if notif.type != 'mention':
print(f"Dismissing notification id {notif.id}")
bot.akkoma.notifications_dismiss(notif.id)
else:
mention = bot.get_data(notif)
if mention.reply and '@' not in mention.acct:
if mention.question == 'registre':
password = ejabberd.generate_pass()
is_registered, text = ejabberd.register(mention.acct, bot.akkoma_hostname, password)
if is_registered:
post = f"@{mention.acct} compte xmpp registrat amb èxit!\n\nusuari: {mention.acct}@{bot.akkoma_hostname}\n"
post += f"contrasenya: {password}\nservidor: {bot.akkoma_hostname}"
bot.akkoma.status_post(post, in_reply_to_id=mention.status_id, visibility='direct')
else:
bot.akkoma.status_post(f'@{mention.acct}, {text}', in_reply_to_id=mention.status_id, visibility='direct')
elif mention.question == 'baixa':
is_unregistered, message = ejabberd.unregister(mention.acct, bot.akkoma_hostname)
if is_unregistered:
bot.akkoma.status_post(f"@{mention.acct}, compte xmpp {mention.acct}@{bot.akkoma_hostname}: {message}", in_reply_to_id=mention.status_id, visibility='direct')
else:
bot.akkoma.status_post(f'@{mention.acct}, {message}', in_reply_to_id=mention.status_id, visibility='direct')
elif mention.question == 'info':
stats = ejabberd.stats()
post = f'@{mention.acct}, estadístiques del node #xmpp a {bot.akkoma_hostname}:\n\n'
post += f'usuaris registrats: {stats.registeredusers}\n'
post += f'usuaris en línia: {stats.onlineusers}\n'
post += f'usuaris del node: {stats.onlineusersnode}\n'
post += f'temps en línia (uptime): {stats.uptimeseconds}\n'
post += f'processos: {stats.processes}\n'
bot.akkoma.status_post(post, in_reply_to_id=mention.status_id, visibility=mention.visibility)
print(f"Dismissing notification id {mention.id}")
bot.akkoma.notifications_dismiss(mention.id)
else:
print(f"Dismissing notification id {mention.id}")
bot.akkoma.notifications_dismiss(mention.id)