xmpp/akkomabot.py

394 líneas
12 KiB
Python
Original Vista normal Històric

from akkoma import Akkoma
from akkoma import AkkomaMalformedEventError, AkkomaNetworkError, AkkomaReadTimeout, AkkomaAPIError, AkkomaIllegalArgumentError
import getpass
import unidecode
import fileinput,re
import os
import sys
import os.path
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Akkomabot:
name = 'Akkomabot'
def __init__(self, akkoma=None, akkoma_hostname=None):
file_path = "secrets/secrets.txt"
is_setup = self.check_setup(file_path)
if is_setup:
self.uc_client_id = self.get_parameter("uc_client_id", file_path)
self.uc_client_secret = self.get_parameter("uc_client_secret", file_path)
self.uc_access_token = self.get_parameter("uc_access_token", file_path)
self.akkoma, self.akkoma_hostname = self.log_in(self)
bot_file_path = "config/config.txt"
self.bot_lang = self.get_parameter("bot_lang" , bot_file_path)
self.load_strings(self)
else:
while(True):
logged_in, self.akkoma, self.akkoma_hostname = self.setup()
if not logged_in:
print("\nLog in failed! Try again.\n")
else:
bot_file_path = "config/config.txt"
self.bot_lang = self.get_parameter("bot_lang" , bot_file_path)
self.load_strings(self)
break
@staticmethod
def log_in(self):
file_path = "secrets/secrets.txt"
uc_client_id = self.get_parameter("uc_client_id", file_path)
uc_client_secret = self.get_parameter("uc_client_secret", file_path)
uc_access_token = self.get_parameter("uc_access_token", file_path)
file_path = "config/config.txt"
self.akkoma_hostname = self.get_parameter("akkoma_hostname", file_path)
self.akkoma = Akkoma(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + self.akkoma_hostname,
)
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (self.akkoma, self.akkoma_hostname)
@staticmethod
def check_setup(file_path):
is_setup = False
if not os.path.isfile(file_path):
print(f"File {file_path} not found, running setup.")
return
else:
is_setup = True
return is_setup
def setup(self):
logged_in = False
try:
self.akkoma_hostname = input("Enter Akkoma hostname (or 'q' to exit): ")
if self.akkoma_hostname == 'q':
sys.exit("Bye")
user_name = input("Bot user name, ex. john? ")
user_password = getpass.getpass("Bot password? ")
app_name = input("Bot App name? ")
self.bot_lang = input("Bot replies lang (ca or en)? ")
client_id, client_secret = Akkoma.create_app(
app_name,
to_file="app_clientcred.txt",
api_base_url=self.akkoma_hostname
)
akkoma = Akkoma(client_id = "app_clientcred.txt", api_base_url = self.akkoma_hostname)
grant_type = 'password'
akkoma.log_in(
client_id,
client_secret,
grant_type,
user_name,
user_password,
scopes = ["read", "write"],
to_file = "app_usercred.txt"
)
if os.path.isfile("app_usercred.txt"):
print(f"Log in succesful!")
logged_in = True
if not os.path.exists('secrets'):
os.makedirs('secrets')
secrets_filepath = 'secrets/secrets.txt'
if not os.path.exists(secrets_filepath):
with open(secrets_filepath, 'w'): pass
print(f"{secrets_filepath} created!")
with open(secrets_filepath, 'a') as the_file:
print("Writing secrets parameter names to " + secrets_filepath)
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
client_path = 'app_clientcred.txt'
with open(client_path) as fp:
line = fp.readline()
cnt = 1
while line:
if cnt == 1:
print("Writing client id to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip())
elif cnt == 2:
print("Writing client secret to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip())
line = fp.readline()
cnt += 1
token_path = 'app_usercred.txt'
with open(token_path) as fp:
line = fp.readline()
print("Writing access token to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip())
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt temp file..")
os.remove("app_clientcred.txt")
if os.path.exists("app_usercred.txt"):
print("Removing app_usercred.txt temp file..")
os.remove("app_usercred.txt")
self.config_filepath = 'config/config.txt'
self.create_config(self)
self.write_config(self)
self.read_config_line(self)
print("Secrets setup done!\n")
except AkkomaIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except AkkomaNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except AkkomaReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except AkkomaAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
return (logged_in, akkoma, self.akkoma_hostname)
@staticmethod
def get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def load_strings(self):
lang_file_path = f"app/locales/{self.bot_lang}.txt"
self.register_str = self.get_parameter("register_str", lang_file_path)
self.unregister_str = self.get_parameter("unregister_str", lang_file_path)
self.stats_str = self.get_parameter("stats_str", lang_file_path)
self.registerok_str = self.get_parameter("registerok_str", lang_file_path)
self.user_str = self.get_parameter("user_str", lang_file_path)
self.password_str = self.get_parameter("password_str", lang_file_path)
self.server_str = self.get_parameter("server_str", lang_file_path)
self.xmpp_account_str = self.get_parameter("xmpp_account_str", lang_file_path)
self.notdeleteadmin_str = self.get_parameter("notdeleteadmin_str", lang_file_path)
self.deleted_str = self.get_parameter("deleted_str", lang_file_path)
self.stats_title_str = self.get_parameter("stats_title_str", lang_file_path)
self.registered_users_str = self.get_parameter("registered_users_str", lang_file_path)
self.users_online_str = self.get_parameter("users_online_str", lang_file_path)
self.node_users_str = self.get_parameter("node_users_str", lang_file_path)
self.uptime_str = self.get_parameter("uptime_str", lang_file_path)
self.processes_str = self.get_parameter("processes_str", lang_file_path)
@staticmethod
def modify_file(self, file_name, pattern,value=""):
fh=fileinput.input(file_name,inplace=True)
for line in fh:
replacement=pattern + value
line=re.sub(pattern,replacement,line)
sys.stdout.write(line)
fh.close()
@staticmethod
def create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_filepath):
print(self.config_filepath + " created!")
with open('config/config.txt', 'w'): pass
@staticmethod
def write_config(self):
with open(self.config_filepath, 'a') as the_file:
the_file.write('akkoma_hostname: \n')
print(f"adding parameter 'akkoma_hostname' to {self.config_filepath}")
print(f"adding parameter 'bot_lang' to {self.config_filepath}")
@staticmethod
def read_config_line(self):
with open(self.config_filepath) as fp:
line = fp.readline()
self.modify_file(self, self.config_filepath, "akkoma_hostname: ", value=self.akkoma_hostname)
self.modify_file(self, self.config_filepath, "bot_lang: ", value=self.bot_lang)
def get_data(self, notif):
id = notif.id
account_id = notif.account.id
acct = notif.account.acct
status_id = notif.status.id
text = notif.status.content
visibility = notif.status.visibility
reply, question = self.get_question(self, text)
mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility}
mention = self.__json_allow_dict_attrs(mention_dict)
return mention
@staticmethod
def get_question(self, text):
reply = False
keyword = ''
content = self.cleanhtml(self, text)
content = self.unescape(self, content)
try:
start = content.index("@")
end = content.index(" ")
if len(content) > end:
content = content[0: start:] + content[end +1::]
cleanit = content.count('@')
i = 0
while i < cleanit :
start = content.rfind("@")
end = len(content)
content = content[0: start:] + content[end +1::]
i += 1
content = content.lower()
question = content
#keyword_length = 8
keyword = question
if keyword == self.register_str or keyword == self.unregister_str or keyword == self.stats_str:
keyword_length = len(keyword)
if unidecode.unidecode(question)[0:keyword_length] == keyword:
reply = True
except:
pass
return (reply, question)
@staticmethod
def cleanhtml(self, raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
@staticmethod
def unescape(self, s):
s = s.replace("&apos;", "'")
return s
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object