from mastodon import Mastodon import unidecode import re import os import time import sys import os.path import wikipedia import ray import pdb ray.init(num_cpus = 8) # Specify this system CPUs. def cleanhtml(raw_html): cleanr = re.compile('<.*?>') cleantext = re.sub(cleanr, '', raw_html) return cleantext def unescape(s): s = s.replace("'", "'") return s class Notifications: name = 'Notifications' def __init__(self, type=None, id=None, account_id=None, username=None, status_id=None, text='', visibility=None): self.type = type self.id = id self.account_id = account_id self.username = username self.status_id = status_id self.text = text self.visibility = visibility @ray.remote def get_data(self): notification_id = self.id if self.type != 'mention': print(f'dismissing notification {notification_id}') mastodon.notifications_dismiss(notification_id) return account_id = self.account.id username = self.account.acct status_id = self.status.id text = self.status.content visibility = self.status.visibility reply, question = get_question(text) if reply: try: wiki_result = wikipedia.page(question) except wikipedia.exceptions.DisambiguationError as error: wiki_result = wikipedia.search(question, results=10) ambiguous_reply = "\n- ".join(str(x) for x in wiki_result) toot_text = f"@{username} '{question}' és una consulta ambigua. Potser et referies a:\n\n{ambiguous_reply}" toot_text = (toot_text[:400] + '... ') if len(toot_text) > 400 else toot_text except wikipedia.exceptions.PageError as page_error: toot_text = f"@{username} l'article '{question}' no existeix. Varia una mica la consulta o si comprobes que l'article no existeix t'animem a crear-lo. Som l'enciclopèdia lliure que tu també pots editar.\n" toot_text += ":viqui:" else: toot_text = f'@{username}\n' toot_text += wikipedia.summary(question, sentences=2) + "\n" toot_text = (toot_text[:400] + '... ') if len(toot_text) > 400 else toot_text toot_text += f'{str(wiki_result.url)}\n' toot_text += ":viqui:" mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) print(f'Replied notification {notification_id}') mastodon.notifications_dismiss(notification_id) else: print(f'dismissing notification {notification_id}') mastodon.notifications_dismiss(notification_id) def get_question(text): reply = False keyword = '' content = cleanhtml(text) content = unescape(content) try: start = content.index("@") end = content.index(" ") if len(content) > end: content = content[0: start:] + content[end +1::] cleanit = content.count('@') i = 0 while i < cleanit : start = content.rfind("@") end = len(content) content = content[0: start:] + content[end +1::] i += 1 content = content.lower() question = content wiki_lang = 'ca' if wiki_lang == 'ca': keyword_length = 8 keyword = 'consulta' if unidecode.unidecode(question)[0:keyword_length] == keyword: reply = True wikipedia.set_lang(wiki_lang) start = 0 end = unidecode.unidecode(question).index(keyword, 0) if len(question) > end: question = question[0: start:] + question[end + keyword_length+1::] except: pass return (reply, question) def log_in(): # Load secrets from secrets file secrets_filepath = "secrets/secrets.txt" uc_client_id = get_parameter("uc_client_id", secrets_filepath) uc_client_secret = get_parameter("uc_client_secret", secrets_filepath) uc_access_token = get_parameter("uc_access_token", secrets_filepath) # Load configuration from config file config_filepath = "config/config.txt" mastodon_hostname = get_parameter("mastodon_hostname", config_filepath) # Initialise Mastodon API mastodon = Mastodon( client_id = uc_client_id, client_secret = uc_client_secret, access_token = uc_access_token, api_base_url = 'https://' + mastodon_hostname, ) # Initialise access headers headers={ 'Authorization': 'Bearer %s'%uc_access_token } return mastodon def get_parameter( parameter, file_path ): # Check if secrets file exists if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) sys.exit(0) # Find parameter in file with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() # Cannot find parameter, exit print(file_path + " Missing parameter %s "%parameter) sys.exit(0) ############################################################################### # main if __name__ == '__main__': mastodon = log_in() bot_notifications = mastodon.notifications() notifications = Notifications() ray_start = time.time() results = ray.get([notifications.get_data.remote(mention) for mention in bot_notifications]) print(f"duration = {time.time() - ray_start}.\nprocessed queries: {len(results)}")