407 lines
13 KiB
Python
407 lines
13 KiB
Python
from akkoma import Akkoma
|
|
from akkoma import AkkomaMalformedEventError, AkkomaNetworkError, AkkomaReadTimeout, AkkomaAPIError, AkkomaIllegalArgumentError
|
|
import getpass
|
|
import unidecode
|
|
import fileinput,re
|
|
import os
|
|
import sys
|
|
import os.path
|
|
|
|
###
|
|
# Dict helper class.
|
|
# Defined at top level so it can be pickled.
|
|
###
|
|
class AttribAccessDict(dict):
|
|
def __getattr__(self, attr):
|
|
if attr in self:
|
|
return self[attr]
|
|
else:
|
|
raise AttributeError("Attribute not found: " + str(attr))
|
|
|
|
def __setattr__(self, attr, val):
|
|
if attr in self:
|
|
raise AttributeError("Attribute-style access is read only")
|
|
super(AttribAccessDict, self).__setattr__(attr, val)
|
|
|
|
class Akkomabot:
|
|
|
|
name = 'Akkomabot'
|
|
|
|
def __init__(self, akkoma=None, akkoma_hostname=None):
|
|
|
|
file_path = "secrets/secrets.txt"
|
|
|
|
is_setup = self.__check_setup(file_path)
|
|
|
|
if is_setup:
|
|
|
|
self.__uc_client_id = self.__get_parameter("uc_client_id", file_path)
|
|
self.__uc_client_secret = self.__get_parameter("uc_client_secret", file_path)
|
|
self.__uc_access_token = self.__get_parameter("uc_access_token", file_path)
|
|
|
|
self.akkoma, self.akkoma_hostname = self.__log_in(self)
|
|
|
|
bot_file_path = "config/config.txt"
|
|
|
|
self.__bot_lang = self.__get_parameter("bot_lang" , bot_file_path)
|
|
|
|
self.__load_strings(self)
|
|
|
|
else:
|
|
|
|
while(True):
|
|
|
|
logged_in, self.akkoma, self.akkoma_hostname = self.setup()
|
|
|
|
if not logged_in:
|
|
|
|
print("\nLog in failed! Try again.\n")
|
|
|
|
else:
|
|
|
|
bot_file_path = "config/config.txt"
|
|
|
|
self.__bot_lang = self.__get_parameter("bot_lang" , bot_file_path)
|
|
|
|
self.__load_strings(self)
|
|
|
|
break
|
|
|
|
@staticmethod
|
|
def __log_in(self):
|
|
|
|
file_path = "secrets/secrets.txt"
|
|
uc_client_id = self.__get_parameter("uc_client_id", file_path)
|
|
uc_client_secret = self.__get_parameter("uc_client_secret", file_path)
|
|
uc_access_token = self.__get_parameter("uc_access_token", file_path)
|
|
|
|
file_path = "config/config.txt"
|
|
self.akkoma_hostname = self.__get_parameter("akkoma_hostname", file_path)
|
|
|
|
self.akkoma = Akkoma(
|
|
client_id = uc_client_id,
|
|
client_secret = uc_client_secret,
|
|
access_token = uc_access_token,
|
|
api_base_url = 'https://' + self.akkoma_hostname,
|
|
)
|
|
|
|
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
|
|
|
|
return (self.akkoma, self.akkoma_hostname)
|
|
|
|
@staticmethod
|
|
def __check_setup(file_path):
|
|
|
|
is_setup = False
|
|
|
|
if not os.path.isfile(file_path):
|
|
print(f"File {file_path} not found, running setup.")
|
|
return
|
|
else:
|
|
is_setup = True
|
|
return is_setup
|
|
|
|
def setup(self):
|
|
|
|
logged_in = False
|
|
|
|
try:
|
|
|
|
self.akkoma_hostname = input("Enter Akkoma hostname (or 'q' to exit): ")
|
|
|
|
if self.akkoma_hostname == 'q':
|
|
|
|
sys.exit("Bye")
|
|
|
|
user_name = input("Bot user name, ex. xmpp? ")
|
|
user_password = getpass.getpass("Bot password? ")
|
|
app_name = input("Bot App name? ")
|
|
self.__bot_lang = input("Bot replies lang (ca or en)? ")
|
|
|
|
client_id, client_secret = Akkoma.create_app(
|
|
app_name,
|
|
to_file="app_clientcred.txt",
|
|
api_base_url=self.akkoma_hostname
|
|
)
|
|
|
|
akkoma = Akkoma(client_id = "app_clientcred.txt", api_base_url = self.akkoma_hostname)
|
|
|
|
grant_type = 'password'
|
|
|
|
akkoma.log_in(
|
|
client_id,
|
|
client_secret,
|
|
grant_type,
|
|
user_name,
|
|
user_password,
|
|
scopes = ["read", "write"],
|
|
to_file = "app_usercred.txt"
|
|
)
|
|
|
|
if os.path.isfile("app_usercred.txt"):
|
|
|
|
print(f"Log in succesful!")
|
|
logged_in = True
|
|
|
|
if not os.path.exists('secrets'):
|
|
os.makedirs('secrets')
|
|
|
|
secrets_filepath = 'secrets/secrets.txt'
|
|
|
|
if not os.path.exists(secrets_filepath):
|
|
with open(secrets_filepath, 'w'): pass
|
|
print(f"{secrets_filepath} created!")
|
|
|
|
with open(secrets_filepath, 'a') as the_file:
|
|
print("Writing secrets parameter names to " + secrets_filepath)
|
|
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
|
|
|
|
client_path = 'app_clientcred.txt'
|
|
|
|
with open(client_path) as fp:
|
|
|
|
line = fp.readline()
|
|
cnt = 1
|
|
|
|
while line:
|
|
|
|
if cnt == 1:
|
|
|
|
print("Writing client id to " + secrets_filepath)
|
|
self.__modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip())
|
|
|
|
elif cnt == 2:
|
|
|
|
print("Writing client secret to " + secrets_filepath)
|
|
self.__modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip())
|
|
|
|
line = fp.readline()
|
|
cnt += 1
|
|
|
|
token_path = 'app_usercred.txt'
|
|
|
|
with open(token_path) as fp:
|
|
|
|
line = fp.readline()
|
|
print("Writing access token to " + secrets_filepath)
|
|
self.__modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip())
|
|
|
|
if os.path.exists("app_clientcred.txt"):
|
|
|
|
print("Removing app_clientcred.txt temp file..")
|
|
os.remove("app_clientcred.txt")
|
|
|
|
if os.path.exists("app_usercred.txt"):
|
|
|
|
print("Removing app_usercred.txt temp file..")
|
|
os.remove("app_usercred.txt")
|
|
|
|
self.config_filepath = 'config/config.txt'
|
|
|
|
self.__create_config(self)
|
|
self.__write_config(self)
|
|
self.__read_config_line(self)
|
|
|
|
print("Secrets setup done!\n")
|
|
|
|
except AkkomaIllegalArgumentError as i_error:
|
|
|
|
sys.stdout.write(f'\n{str(i_error)}\n')
|
|
|
|
except AkkomaNetworkError as n_error:
|
|
|
|
sys.stdout.write(f'\n{str(n_error)}\n')
|
|
|
|
except AkkomaReadTimeout as r_error:
|
|
|
|
sys.stdout.write(f'\n{str(r_error)}\n')
|
|
|
|
except AkkomaAPIError as a_error:
|
|
|
|
sys.stdout.write(f'\n{str(a_error)}\n')
|
|
|
|
return (logged_in, akkoma, self.akkoma_hostname)
|
|
|
|
@staticmethod
|
|
def __get_parameter(parameter, file_path ):
|
|
|
|
with open( file_path ) as f:
|
|
for line in f:
|
|
if line.startswith( parameter ):
|
|
return line.replace(parameter + ":", "").strip()
|
|
|
|
print(f'{file_path} Missing parameter {parameter}')
|
|
sys.exit(0)
|
|
|
|
@staticmethod
|
|
def __load_strings(self):
|
|
|
|
lang_file_path = f"app/locales/{self.__bot_lang}.txt"
|
|
|
|
self.register_str = self.__get_parameter("register_str", lang_file_path)
|
|
self.unregister_str = self.__get_parameter("unregister_str", lang_file_path)
|
|
self.stats_str = self.__get_parameter("stats_str", lang_file_path)
|
|
self.status_str = self.__get_parameter("status_str", lang_file_path)
|
|
self.registerok_str = self.__get_parameter("registerok_str", lang_file_path)
|
|
self.user_str = self.__get_parameter("user_str", lang_file_path)
|
|
self.password_str = self.__get_parameter("password_str", lang_file_path)
|
|
self.server_str = self.__get_parameter("server_str", lang_file_path)
|
|
self.xmpp_account_str = self.__get_parameter("xmpp_account_str", lang_file_path)
|
|
self.notdeleteadmin_str = self.__get_parameter("notdeleteadmin_str", lang_file_path)
|
|
self.deleted_str = self.__get_parameter("deleted_str", lang_file_path)
|
|
self.stats_title_str = self.__get_parameter("stats_title_str", lang_file_path)
|
|
self.registered_users_str = self.__get_parameter("registered_users_str", lang_file_path)
|
|
self.users_online_str = self.__get_parameter("users_online_str", lang_file_path)
|
|
self.node_users_str = self.__get_parameter("node_users_str", lang_file_path)
|
|
self.uptime_str = self.__get_parameter("uptime_str", lang_file_path)
|
|
self.processes_str = self.__get_parameter("processes_str", lang_file_path)
|
|
self.account_exists_str = self.__get_parameter("account_exists_str", lang_file_path)
|
|
self.user_sessions_info_str = self.__get_parameter("user_sessions_info_str", lang_file_path)
|
|
self.current_sessions_str = self.__get_parameter("current_sessions_str", lang_file_path)
|
|
self.sessions_connection_str = self.__get_parameter("sessions_connection_str", lang_file_path)
|
|
self.sessions_ip_str = self.__get_parameter("sessions_ip_str", lang_file_path)
|
|
self.sessions_port_str = self.__get_parameter("sessions_port_str", lang_file_path)
|
|
self.sessions_priority_str = self.__get_parameter("sessions_priority_str", lang_file_path)
|
|
self.sessions_node_str = self.__get_parameter("sessions_node_str", lang_file_path)
|
|
self.sessions_uptime_str = self.__get_parameter("sessions_uptime_str", lang_file_path)
|
|
self.sessions_status_str = self.__get_parameter("sessions_status_str", lang_file_path)
|
|
self.sessions_resource_str = self.__get_parameter("sessions_resource_str", lang_file_path)
|
|
self.sessions_statustext_str = self.__get_parameter("sessions_statustext_str", lang_file_path)
|
|
|
|
@staticmethod
|
|
def __modify_file(self, file_name, pattern,value=""):
|
|
|
|
fh=fileinput.input(file_name,inplace=True)
|
|
|
|
for line in fh:
|
|
|
|
replacement=pattern + value
|
|
line=re.sub(pattern,replacement,line)
|
|
sys.stdout.write(line)
|
|
|
|
fh.close()
|
|
|
|
@staticmethod
|
|
def __create_config(self):
|
|
|
|
if not os.path.exists('config'):
|
|
|
|
os.makedirs('config')
|
|
|
|
if not os.path.exists(self.config_filepath):
|
|
|
|
print(self.config_filepath + " created!")
|
|
with open('config/config.txt', 'w'): pass
|
|
|
|
@staticmethod
|
|
def __write_config(self):
|
|
|
|
with open(self.config_filepath, 'a') as the_file:
|
|
|
|
the_file.write('akkoma_hostname: \n')
|
|
the_file.write('bot_lang: \n')
|
|
print(f"adding parameter 'akkoma_hostname' to {self.config_filepath}")
|
|
print(f"adding parameter 'bot_lang' to {self.config_filepath}")
|
|
|
|
@staticmethod
|
|
def __read_config_line(self):
|
|
|
|
with open(self.config_filepath) as fp:
|
|
|
|
line = fp.readline()
|
|
self.__modify_file(self, self.config_filepath, "akkoma_hostname: ", value=self.akkoma_hostname)
|
|
self.__modify_file(self, self.config_filepath, "bot_lang: ", value=self.__bot_lang)
|
|
|
|
def get_data(self, notif):
|
|
|
|
id = notif.id
|
|
|
|
account_id = notif.account.id
|
|
|
|
acct = notif.account.acct
|
|
|
|
status_id = notif.status.id
|
|
|
|
text = notif.status.content
|
|
|
|
visibility = notif.status.visibility
|
|
|
|
reply, question = self.__get_question(self, text)
|
|
|
|
mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility}
|
|
|
|
mention = self.__json_allow_dict_attrs(mention_dict)
|
|
|
|
return mention
|
|
|
|
@staticmethod
|
|
def __get_question(self, text):
|
|
|
|
reply = False
|
|
|
|
keyword = ''
|
|
|
|
content = self.__cleanhtml(self, text)
|
|
|
|
content = self.__unescape(self, content)
|
|
|
|
try:
|
|
|
|
start = content.index("@")
|
|
end = content.index(" ")
|
|
if len(content) > end:
|
|
|
|
content = content[0: start:] + content[end +1::]
|
|
|
|
cleanit = content.count('@')
|
|
|
|
i = 0
|
|
while i < cleanit :
|
|
|
|
start = content.rfind("@")
|
|
end = len(content)
|
|
content = content[0: start:] + content[end +1::]
|
|
i += 1
|
|
|
|
content = content.lower()
|
|
question = content
|
|
|
|
#keyword_length = 8
|
|
keyword = question
|
|
|
|
if keyword == self.register_str or keyword == self.unregister_str or keyword == self.stats_str or self.status_str or user_sessions_info_str:
|
|
|
|
keyword_length = len(keyword)
|
|
|
|
if unidecode.unidecode(question)[0:keyword_length] == keyword:
|
|
|
|
reply = True
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
return (reply, question)
|
|
|
|
@staticmethod
|
|
def __cleanhtml(self, raw_html):
|
|
|
|
cleanr = re.compile('<.*?>')
|
|
cleantext = re.sub(cleanr, '', raw_html)
|
|
return cleantext
|
|
|
|
@staticmethod
|
|
def __unescape(self, s):
|
|
|
|
s = s.replace("'", "'")
|
|
return s
|
|
|
|
@staticmethod
|
|
def __json_allow_dict_attrs(json_object):
|
|
"""
|
|
Makes it possible to use attribute notation to access a dicts
|
|
elements, while still allowing the dict to act as a dict.
|
|
"""
|
|
if isinstance(json_object, dict):
|
|
return AttribAccessDict(json_object)
|
|
return json_object
|