corpus/mastodonbot.py

347 líneas
9,6 KiB
Python
Original Vista normal Històric

2022-12-01 17:34:13 +01:00
from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
import getpass
import unidecode
import fileinput,re
import os
import sys
import os.path
2022-12-02 17:46:07 +01:00
import pdb
2022-12-01 17:34:13 +01:00
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Mastodonbot:
name = 'Mastodonbot'
def __init__(self, mastodon=None, mastodon_hostname=None):
file_path = "secrets/secrets.txt"
is_setup = self.__check_setup(file_path)
if is_setup:
self.__uc_client_id = self.__get_parameter("uc_client_id", file_path)
self.__uc_client_secret = self.__get_parameter("uc_client_secret", file_path)
self.__uc_access_token = self.__get_parameter("uc_access_token", file_path)
self.mastodon, self.mastodon_hostname = self.__log_in(self)
bot_file_path = "config/config.txt"
else:
while(True):
logged_in, self.mastodon, self.mastodon_hostname = self.setup()
if not logged_in:
print("\nLog in failed! Try again.\n")
else:
bot_file_path = "config/config.txt"
break
@staticmethod
def __log_in(self):
file_path = "secrets/secrets.txt"
uc_client_id = self.__get_parameter("uc_client_id", file_path)
uc_client_secret = self.__get_parameter("uc_client_secret", file_path)
uc_access_token = self.__get_parameter("uc_access_token", file_path)
file_path = "config/config.txt"
self.mastodon_hostname = self.__get_parameter("mastodon_hostname", file_path)
self.mastodon = Mastodon(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + self.mastodon_hostname,
)
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (self.mastodon, self.mastodon_hostname)
@staticmethod
def __check_setup(file_path):
is_setup = False
if not os.path.isfile(file_path):
print(f"File {file_path} not found, running setup.")
return
else:
is_setup = True
return is_setup
def setup(self):
logged_in = False
try:
self.mastodon_hostname = input("Enter Mastodon hostname (or 'q' to exit): ")
if self.mastodon_hostname == 'q':
sys.exit("Bye")
user_name = input("Bot user name, ex. john? ")
user_password = getpass.getpass("Bot password? ")
app_name = input("Bot App name? ")
client_id, client_secret = Mastodon.create_app(
app_name,
to_file="app_clientcred.txt",
api_base_url=self.mastodon_hostname
)
mastodon = Mastodon(client_id = "app_clientcred.txt", api_base_url = self.mastodon_hostname)
mastodon.log_in(
user_name,
user_password,
scopes = ["read", "write"],
to_file = "app_usercred.txt"
)
if os.path.isfile("app_usercred.txt"):
print(f"Log in succesful!")
logged_in = True
if not os.path.exists('secrets'):
os.makedirs('secrets')
secrets_filepath = 'secrets/secrets.txt'
if not os.path.exists(secrets_filepath):
with open(secrets_filepath, 'w'): pass
print(f"{secrets_filepath} created!")
with open(secrets_filepath, 'a') as the_file:
print("Writing secrets parameter names to " + secrets_filepath)
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
client_path = 'app_clientcred.txt'
with open(client_path) as fp:
line = fp.readline()
cnt = 1
while line:
if cnt == 1:
print("Writing client id to " + secrets_filepath)
self.__modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip())
elif cnt == 2:
print("Writing client secret to " + secrets_filepath)
self.__modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip())
line = fp.readline()
cnt += 1
token_path = 'app_usercred.txt'
with open(token_path) as fp:
line = fp.readline()
print("Writing access token to " + secrets_filepath)
self.__modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip())
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt temp file..")
os.remove("app_clientcred.txt")
if os.path.exists("app_usercred.txt"):
print("Removing app_usercred.txt temp file..")
os.remove("app_usercred.txt")
self.config_filepath = 'config/config.txt'
self.__create_config(self)
self.__write_config(self)
print("Secrets setup done!\n")
except MastodonIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except MastodonNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except MastodonReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except MastodonAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
return (logged_in, mastodon, self.mastodon_hostname)
@staticmethod
def __get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def __modify_file(self, file_name, pattern,value=""):
fh=fileinput.input(file_name,inplace=True)
for line in fh:
replacement=pattern + value
line=re.sub(pattern,replacement,line)
sys.stdout.write(line)
fh.close()
@staticmethod
def __create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_filepath):
print(self.config_filepath + " created!")
with open('config/config.txt', 'w'): pass
@staticmethod
def __write_config(self):
with open(self.config_filepath, 'a') as the_file:
the_file.write(f'mastodon_hostname: {self.mastodon_hostname}\n')
print(f"adding parameter 'mastodon_hostname' to {self.config_filepath}\n")
def get_data(self, notif):
id = notif.id
account_id = notif.account.id
acct = notif.account.acct
status_id = notif.status.id
text = notif.status.content
visibility = notif.status.visibility
reply, question = self.__get_question(self, text)
mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility}
mention = self.__json_allow_dict_attrs(mention_dict)
return mention
@staticmethod
def __get_question(self, text):
reply = False
keyword = ''
content = self.__cleanhtml(self, text)
content = self.__unescape(self, content)
try:
start = content.index("@")
end = content.index(" ")
if len(content) > end:
content = content[0: start:] + content[end +1::]
cleanit = content.count('@')
i = 0
while i < cleanit :
start = content.rfind("@")
end = len(content)
content = content[0: start:] + content[end +1::]
i += 1
content = content.lower()
question = content
keyword = question
2022-12-02 17:46:07 +01:00
if keyword == 'alta' or keyword == 'baixa' or keyword == 'esborra' or keyword == 'confirmo':
2022-12-01 17:34:13 +01:00
keyword_length = len(keyword)
if unidecode.unidecode(question)[0:keyword_length] == keyword:
reply = True
except:
pass
return (reply, question)
@staticmethod
def __cleanhtml(self, raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
@staticmethod
def __unescape(self, s):
s = s.replace("&apos;", "'")
return s
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object