from mastodon import Mastodon from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError import getpass import unidecode import fileinput,re import os import sys import os.path ### # Dict helper class. # Defined at top level so it can be pickled. ### class AttribAccessDict(dict): def __getattr__(self, attr): if attr in self: return self[attr] else: raise AttributeError("Attribute not found: " + str(attr)) def __setattr__(self, attr, val): if attr in self: raise AttributeError("Attribute-style access is read only") super(AttribAccessDict, self).__setattr__(attr, val) class Mastodonbot: name = 'Mastodonbot' def __init__(self, mastodon=None, mastodon_hostname=None): file_path = "secrets/secrets.txt" is_setup = self.__check_setup(file_path) if is_setup: self.__uc_client_id = self.__get_parameter("uc_client_id", file_path) self.__uc_client_secret = self.__get_parameter("uc_client_secret", file_path) self.__uc_access_token = self.__get_parameter("uc_access_token", file_path) self.mastodon, self.mastodon_hostname = self.__log_in(self) bot_file_path = "config/config.txt" self.__bot_lang = self.__get_parameter("bot_lang" , bot_file_path) self.__load_strings(self) else: while(True): logged_in, self.mastodon, self.mastodon_hostname = self.setup() if not logged_in: print("\nLog in failed! Try again.\n") else: bot_file_path = "config/config.txt" self.__bot_lang = self.__get_parameter("bot_lang" , bot_file_path) self.__load_strings(self) break @staticmethod def __log_in(self): file_path = "secrets/secrets.txt" uc_client_id = self.__get_parameter("uc_client_id", file_path) uc_client_secret = self.__get_parameter("uc_client_secret", file_path) uc_access_token = self.__get_parameter("uc_access_token", file_path) file_path = "config/config.txt" self.mastodon_hostname = self.__get_parameter("mastodon_hostname", file_path) self.mastodon = Mastodon( client_id = uc_client_id, client_secret = uc_client_secret, access_token = uc_access_token, api_base_url = 'https://' + self.mastodon_hostname, ) headers={ 'Authorization': 'Bearer %s'%uc_access_token } return (self.mastodon, self.mastodon_hostname) @staticmethod def __check_setup(file_path): is_setup = False if not os.path.isfile(file_path): print(f"File {file_path} not found, running setup.") return else: is_setup = True return is_setup def setup(self): logged_in = False try: self.mastodon_hostname = input("Enter Mastodon hostname (or 'q' to exit): ") if self.mastodon_hostname == 'q': sys.exit("Bye") user_name = input("Bot user name, ex. john? ") user_password = getpass.getpass("Bot password? ") app_name = input("Bot App name? ") self.__bot_lang = input("Bot replies lang (ca or en)? ") client_id, client_secret = Mastodon.create_app( app_name, to_file="app_clientcred.txt", api_base_url=self.mastodon_hostname ) mastodon = Mastodon(client_id = "app_clientcred.txt", api_base_url = self.mastodon_hostname) mastodon.log_in( user_name, user_password, scopes = ["read", "write"], to_file = "app_usercred.txt" ) if os.path.isfile("app_usercred.txt"): print(f"Log in succesful!") logged_in = True if not os.path.exists('secrets'): os.makedirs('secrets') secrets_filepath = 'secrets/secrets.txt' if not os.path.exists(secrets_filepath): with open(secrets_filepath, 'w'): pass print(f"{secrets_filepath} created!") with open(secrets_filepath, 'a') as the_file: print("Writing secrets parameter names to " + secrets_filepath) the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n') client_path = 'app_clientcred.txt' with open(client_path) as fp: line = fp.readline() cnt = 1 while line: if cnt == 1: print("Writing client id to " + secrets_filepath) self.__modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip()) elif cnt == 2: print("Writing client secret to " + secrets_filepath) self.__modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip()) line = fp.readline() cnt += 1 token_path = 'app_usercred.txt' with open(token_path) as fp: line = fp.readline() print("Writing access token to " + secrets_filepath) self.__modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip()) if os.path.exists("app_clientcred.txt"): print("Removing app_clientcred.txt temp file..") os.remove("app_clientcred.txt") if os.path.exists("app_usercred.txt"): print("Removing app_usercred.txt temp file..") os.remove("app_usercred.txt") self.config_filepath = 'config/config.txt' self.__create_config(self) self.__write_config(self) print("Secrets setup done!\n") except MastodonIllegalArgumentError as i_error: sys.stdout.write(f'\n{str(i_error)}\n') except MastodonNetworkError as n_error: sys.stdout.write(f'\n{str(n_error)}\n') except MastodonReadTimeout as r_error: sys.stdout.write(f'\n{str(r_error)}\n') except MastodonAPIError as a_error: sys.stdout.write(f'\n{str(a_error)}\n') return (logged_in, mastodon, self.mastodon_hostname) @staticmethod def __get_parameter(parameter, file_path ): with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() print(f'{file_path} Missing parameter {parameter}') sys.exit(0) @staticmethod def __load_strings(self): lang_file_path = f"app/locales/{self.__bot_lang}.txt" self.register_str = self.__get_parameter("register_str", lang_file_path) self.unregister_str = self.__get_parameter("unregister_str", lang_file_path) self.stats_str = self.__get_parameter("stats_str", lang_file_path) self.status_str = self.__get_parameter("status_str", lang_file_path) self.registerok_str = self.__get_parameter("registerok_str", lang_file_path) self.user_str = self.__get_parameter("user_str", lang_file_path) self.password_str = self.__get_parameter("password_str", lang_file_path) self.server_str = self.__get_parameter("server_str", lang_file_path) self.xmpp_account_str = self.__get_parameter("xmpp_account_str", lang_file_path) self.notdeleteadmin_str = self.__get_parameter("notdeleteadmin_str", lang_file_path) self.deleted_str = self.__get_parameter("deleted_str", lang_file_path) self.stats_title_str = self.__get_parameter("stats_title_str", lang_file_path) self.registered_users_str = self.__get_parameter("registered_users_str", lang_file_path) self.users_online_str = self.__get_parameter("users_online_str", lang_file_path) self.node_users_str = self.__get_parameter("node_users_str", lang_file_path) self.uptime_str = self.__get_parameter("uptime_str", lang_file_path) self.processes_str = self.__get_parameter("processes_str", lang_file_path) self.account_exists_str = self.__get_parameter("account_exists_str", lang_file_path) self.user_sessions_info_str = self.__get_parameter("user_sessions_info_str", lang_file_path) self.current_sessions_str = self.__get_parameter("current_sessions_str", lang_file_path) self.sessions_connection_str = self.__get_parameter("sessions_connection_str", lang_file_path) self.sessions_ip_str = self.__get_parameter("sessions_ip_str", lang_file_path) self.sessions_port_str = self.__get_parameter("sessions_port_str", lang_file_path) self.sessions_priority_str = self.__get_parameter("sessions_priority_str", lang_file_path) self.sessions_node_str = self.__get_parameter("sessions_node_str", lang_file_path) self.sessions_uptime_str = self.__get_parameter("sessions_uptime_str", lang_file_path) self.sessions_status_str = self.__get_parameter("sessions_status_str", lang_file_path) self.sessions_resource_str = self.__get_parameter("sessions_resource_str", lang_file_path) self.sessions_statustext_str = self.__get_parameter("sessions_statustext_str", lang_file_path) @staticmethod def __modify_file(self, file_name, pattern,value=""): fh=fileinput.input(file_name,inplace=True) for line in fh: replacement=pattern + value line=re.sub(pattern,replacement,line) sys.stdout.write(line) fh.close() @staticmethod def __create_config(self): if not os.path.exists('config'): os.makedirs('config') if not os.path.exists(self.config_filepath): print(self.config_filepath + " created!") with open('config/config.txt', 'w'): pass @staticmethod def __write_config(self): with open(self.config_filepath, 'a') as the_file: the_file.write(f'mastodon_hostname: {self.mastodon_hostname}\n') the_file.write(f'bot_lang: {self.__bot_lang}\n') print(f"adding parameter 'mastodon_hostname' to {self.config_filepath}\n") print(f"adding parameter 'bot_lang' to {self.config_filepath}\n") def get_data(self, notif): id = notif.id account_id = notif.account.id acct = notif.account.acct status_id = notif.status.id text = notif.status.content visibility = notif.status.visibility reply, question = self.__get_question(self, text) mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility} mention = self.__json_allow_dict_attrs(mention_dict) return mention @staticmethod def __get_question(self, text): reply = False keyword = '' content = self.__cleanhtml(self, text) content = self.__unescape(self, content) try: start = content.index("@") end = content.index(" ") if len(content) > end: content = content[0: start:] + content[end +1::] cleanit = content.count('@') i = 0 while i < cleanit : start = content.rfind("@") end = len(content) content = content[0: start:] + content[end +1::] i += 1 content = content.lower() question = content #keyword_length = 8 keyword = question if keyword == self.register_str or keyword == self.unregister_str or keyword == self.stats_str or self.status_str or user_sessions_info_str: keyword_length = len(keyword) if unidecode.unidecode(question)[0:keyword_length] == keyword: reply = True except: pass return (reply, question) @staticmethod def __cleanhtml(self, raw_html): cleanr = re.compile('<.*?>') cleantext = re.sub(cleanr, '', raw_html) return cleantext @staticmethod def __unescape(self, s): s = s.replace("'", "'") return s @staticmethod def __json_allow_dict_attrs(json_object): """ Makes it possible to use attribute notation to access a dicts elements, while still allowing the dict to act as a dict. """ if isinstance(json_object, dict): return AttribAccessDict(json_object) return json_object