xmpp/akkomabot.py

408 líneas
13 KiB
Python
Original Vista normal Històric

from akkoma import Akkoma
from akkoma import AkkomaMalformedEventError, AkkomaNetworkError, AkkomaReadTimeout, AkkomaAPIError, AkkomaIllegalArgumentError
import getpass
import unidecode
import fileinput,re
import os
import sys
import os.path
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Akkomabot:
name = 'Akkomabot'
def __init__(self, akkoma=None, akkoma_hostname=None):
file_path = "secrets/secrets.txt"
is_setup = self.__check_setup(file_path)
if is_setup:
self.__uc_client_id = self.__get_parameter("uc_client_id", file_path)
self.__uc_client_secret = self.__get_parameter("uc_client_secret", file_path)
self.__uc_access_token = self.__get_parameter("uc_access_token", file_path)
self.akkoma, self.akkoma_hostname = self.__log_in(self)
bot_file_path = "config/config.txt"
self.__bot_lang = self.__get_parameter("bot_lang" , bot_file_path)
self.__load_strings(self)
else:
while(True):
logged_in, self.akkoma, self.akkoma_hostname = self.setup()
if not logged_in:
print("\nLog in failed! Try again.\n")
else:
bot_file_path = "config/config.txt"
self.__bot_lang = self.__get_parameter("bot_lang" , bot_file_path)
self.__load_strings(self)
break
@staticmethod
def __log_in(self):
file_path = "secrets/secrets.txt"
uc_client_id = self.__get_parameter("uc_client_id", file_path)
uc_client_secret = self.__get_parameter("uc_client_secret", file_path)
uc_access_token = self.__get_parameter("uc_access_token", file_path)
file_path = "config/config.txt"
self.akkoma_hostname = self.__get_parameter("akkoma_hostname", file_path)
self.akkoma = Akkoma(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + self.akkoma_hostname,
)
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (self.akkoma, self.akkoma_hostname)
@staticmethod
def __check_setup(file_path):
is_setup = False
if not os.path.isfile(file_path):
print(f"File {file_path} not found, running setup.")
return
else:
is_setup = True
return is_setup
def setup(self):
logged_in = False
try:
self.akkoma_hostname = input("Enter Akkoma hostname (or 'q' to exit): ")
if self.akkoma_hostname == 'q':
sys.exit("Bye")
2022-08-29 11:27:47 +02:00
user_name = input("Bot user name, ex. xmpp? ")
user_password = getpass.getpass("Bot password? ")
app_name = input("Bot App name? ")
self.__bot_lang = input("Bot replies lang (ca or en)? ")
client_id, client_secret = Akkoma.create_app(
app_name,
to_file="app_clientcred.txt",
api_base_url=self.akkoma_hostname
)
akkoma = Akkoma(client_id = "app_clientcred.txt", api_base_url = self.akkoma_hostname)
grant_type = 'password'
2022-08-29 11:27:47 +02:00
akkoma.log_in(
client_id,
client_secret,
grant_type,
user_name,
user_password,
scopes = ["read", "write"],
to_file = "app_usercred.txt"
)
if os.path.isfile("app_usercred.txt"):
print(f"Log in succesful!")
logged_in = True
if not os.path.exists('secrets'):
os.makedirs('secrets')
secrets_filepath = 'secrets/secrets.txt'
if not os.path.exists(secrets_filepath):
with open(secrets_filepath, 'w'): pass
print(f"{secrets_filepath} created!")
with open(secrets_filepath, 'a') as the_file:
print("Writing secrets parameter names to " + secrets_filepath)
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
client_path = 'app_clientcred.txt'
with open(client_path) as fp:
line = fp.readline()
cnt = 1
while line:
if cnt == 1:
print("Writing client id to " + secrets_filepath)
self.__modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip())
elif cnt == 2:
print("Writing client secret to " + secrets_filepath)
self.__modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip())
line = fp.readline()
cnt += 1
token_path = 'app_usercred.txt'
with open(token_path) as fp:
line = fp.readline()
print("Writing access token to " + secrets_filepath)
self.__modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip())
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt temp file..")
os.remove("app_clientcred.txt")
if os.path.exists("app_usercred.txt"):
print("Removing app_usercred.txt temp file..")
os.remove("app_usercred.txt")
self.config_filepath = 'config/config.txt'
self.__create_config(self)
self.__write_config(self)
self.__read_config_line(self)
print("Secrets setup done!\n")
except AkkomaIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except AkkomaNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except AkkomaReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except AkkomaAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
return (logged_in, akkoma, self.akkoma_hostname)
@staticmethod
def __get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def __load_strings(self):
lang_file_path = f"app/locales/{self.__bot_lang}.txt"
self.register_str = self.__get_parameter("register_str", lang_file_path)
self.unregister_str = self.__get_parameter("unregister_str", lang_file_path)
self.stats_str = self.__get_parameter("stats_str", lang_file_path)
self.status_str = self.__get_parameter("status_str", lang_file_path)
self.registerok_str = self.__get_parameter("registerok_str", lang_file_path)
self.user_str = self.__get_parameter("user_str", lang_file_path)
self.password_str = self.__get_parameter("password_str", lang_file_path)
self.server_str = self.__get_parameter("server_str", lang_file_path)
self.xmpp_account_str = self.__get_parameter("xmpp_account_str", lang_file_path)
self.notdeleteadmin_str = self.__get_parameter("notdeleteadmin_str", lang_file_path)
self.deleted_str = self.__get_parameter("deleted_str", lang_file_path)
self.stats_title_str = self.__get_parameter("stats_title_str", lang_file_path)
self.registered_users_str = self.__get_parameter("registered_users_str", lang_file_path)
self.users_online_str = self.__get_parameter("users_online_str", lang_file_path)
self.node_users_str = self.__get_parameter("node_users_str", lang_file_path)
self.uptime_str = self.__get_parameter("uptime_str", lang_file_path)
self.processes_str = self.__get_parameter("processes_str", lang_file_path)
self.account_exists_str = self.__get_parameter("account_exists_str", lang_file_path)
self.user_sessions_info_str = self.__get_parameter("user_sessions_info_str", lang_file_path)
self.current_sessions_str = self.__get_parameter("current_sessions_str", lang_file_path)
self.sessions_connection_str = self.__get_parameter("sessions_connection_str", lang_file_path)
self.sessions_ip_str = self.__get_parameter("sessions_ip_str", lang_file_path)
self.sessions_port_str = self.__get_parameter("sessions_port_str", lang_file_path)
self.sessions_priority_str = self.__get_parameter("sessions_priority_str", lang_file_path)
self.sessions_node_str = self.__get_parameter("sessions_node_str", lang_file_path)
self.sessions_uptime_str = self.__get_parameter("sessions_uptime_str", lang_file_path)
self.sessions_status_str = self.__get_parameter("sessions_status_str", lang_file_path)
self.sessions_resource_str = self.__get_parameter("sessions_resource_str", lang_file_path)
self.sessions_statustext_str = self.__get_parameter("sessions_statustext_str", lang_file_path)
@staticmethod
def __modify_file(self, file_name, pattern,value=""):
fh=fileinput.input(file_name,inplace=True)
for line in fh:
replacement=pattern + value
line=re.sub(pattern,replacement,line)
sys.stdout.write(line)
fh.close()
@staticmethod
def __create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_filepath):
print(self.config_filepath + " created!")
with open('config/config.txt', 'w'): pass
@staticmethod
def __write_config(self):
with open(self.config_filepath, 'a') as the_file:
the_file.write('akkoma_hostname: \n')
2022-08-29 11:51:53 +02:00
the_file.write('bot_lang: \n')
print(f"adding parameter 'akkoma_hostname' to {self.config_filepath}")
print(f"adding parameter 'bot_lang' to {self.config_filepath}")
@staticmethod
def __read_config_line(self):
with open(self.config_filepath) as fp:
line = fp.readline()
self.__modify_file(self, self.config_filepath, "akkoma_hostname: ", value=self.akkoma_hostname)
self.__modify_file(self, self.config_filepath, "bot_lang: ", value=self.__bot_lang)
def get_data(self, notif):
id = notif.id
account_id = notif.account.id
acct = notif.account.acct
status_id = notif.status.id
text = notif.status.content
visibility = notif.status.visibility
reply, question = self.__get_question(self, text)
mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility}
mention = self.__json_allow_dict_attrs(mention_dict)
return mention
@staticmethod
def __get_question(self, text):
reply = False
keyword = ''
content = self.__cleanhtml(self, text)
content = self.__unescape(self, content)
try:
start = content.index("@")
end = content.index(" ")
if len(content) > end:
content = content[0: start:] + content[end +1::]
cleanit = content.count('@')
i = 0
while i < cleanit :
start = content.rfind("@")
end = len(content)
content = content[0: start:] + content[end +1::]
i += 1
content = content.lower()
question = content
#keyword_length = 8
keyword = question
if keyword == self.register_str or keyword == self.unregister_str or keyword == self.stats_str or self.status_str or user_sessions_info_str:
keyword_length = len(keyword)
if unidecode.unidecode(question)[0:keyword_length] == keyword:
reply = True
except:
pass
return (reply, question)
@staticmethod
def __cleanhtml(self, raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
@staticmethod
def __unescape(self, s):
s = s.replace("&apos;", "'")
return s
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object