info/mastobot.py

401 líneas
11 KiB
Python
Original Vista normal Històric

from mastodon import Mastodon
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
import getpass
import unidecode
import fileinput,re
import os
import sys
import os.path
2022-08-06 10:49:37 +02:00
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
class Mastobot:
name = 'Mastobot'
def __init__(self, mastodon=None, mastodon_hostname=None):
file_path = "secrets/secrets.txt"
is_setup = self.check_setup(file_path)
if is_setup:
self.uc_client_id = self.get_parameter("uc_client_id", file_path)
self.uc_client_secret = self.get_parameter("uc_client_secret", file_path)
self.uc_access_token = self.get_parameter("uc_access_token", file_path)
self.mastodon, self.mastodon_hostname = self.log_in(self)
else:
while(True):
logged_in, self.mastodon, self.mastodon_hostname = self.setup()
if not logged_in:
print("\nLog in failed! Try again.\n")
else:
break
def get_data(self, notif):
2022-08-06 10:49:37 +02:00
id = notif.id
account_id = notif.account.id
username = notif.account.acct
status_id = notif.status.id
text = notif.status.content
visibility = notif.status.visibility
reply, question = self.get_question(self, text)
2022-08-06 10:49:37 +02:00
mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'username': username, 'status_id': status_id, 'text': text, 'visibility': visibility}
2022-08-06 10:49:37 +02:00
mention = self.__json_allow_dict_attrs(mention_dict)
2022-08-06 10:49:37 +02:00
return mention
2022-08-06 10:49:37 +02:00
def post(self, mention):
2022-08-06 10:49:37 +02:00
mau = self.mastodon.instance_nodeinfo().usage.users.activeMonth
2022-08-06 10:49:37 +02:00
mau = '{:,}'.format(mau).replace(',','.')
2022-08-06 10:49:37 +02:00
registers = self.mastodon.instance().stats.user_count
2022-08-06 10:49:37 +02:00
registers = '{:,}'.format(registers).replace(',','.')
2022-08-06 10:49:37 +02:00
posts = self.mastodon.instance().stats.status_count
2022-08-06 10:49:37 +02:00
posts = '{:,}'.format(posts).replace(',','.')
2022-08-06 10:49:37 +02:00
peers = self.mastodon.instance().stats.domain_count
2022-08-06 10:49:37 +02:00
peers = '{:,}'.format(peers).replace(',','.')
2022-08-06 10:49:37 +02:00
version = self.mastodon.instance().version
2022-08-06 10:49:37 +02:00
reg_open = self.mastodon.instance_nodeinfo().openRegistrations
2022-08-06 10:49:37 +02:00
if reg_open:
2022-08-06 10:49:37 +02:00
opened = 'obert'
2022-08-06 10:49:37 +02:00
else:
2022-08-06 10:49:37 +02:00
opened = 'tancat'
2022-08-06 10:49:37 +02:00
post_text = f"@{mention.username}, dades de {self.mastodon_hostname}:\n\n"
2022-08-06 10:49:37 +02:00
post_text += f"Usuaris registrats: {registers}\n"
2022-08-06 10:49:37 +02:00
post_text += f"Usuaris actius (mes): {mau}\n"
2022-08-06 10:49:37 +02:00
post_text += f"Apunts: {posts}\n"
2022-08-06 10:49:37 +02:00
post_text += f"Servidors federats: {peers}\n\n"
2022-08-06 10:49:37 +02:00
post_text += f"Versió Mastodon: v{version}\n"
2022-08-06 10:49:37 +02:00
post_text += f"Registre: {opened}"
2022-08-06 10:49:37 +02:00
post_text = (post_text[:400] + '... ') if len(post_text) > 400 else post_text
2022-08-06 10:49:37 +02:00
self.mastodon.status_post(post_text, in_reply_to_id=mention.status_id,visibility=mention.visibility)
print(f'Replied notification {mention.id}')
self.mastodon.notifications_dismiss(mention.id)
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object
@staticmethod
def get_question(self, text):
reply = False
keyword = ''
content = self.cleanhtml(self, text)
content = self.unescape(self, content)
try:
start = content.index("@")
end = content.index(" ")
if len(content) > end:
content = content[0: start:] + content[end +1::]
cleanit = content.count('@')
i = 0
while i < cleanit :
start = content.rfind("@")
end = len(content)
content = content[0: start:] + content[end +1::]
i += 1
content = content.lower()
question = content
keyword_length = 4
keyword = 'info'
if unidecode.unidecode(question)[0:keyword_length] == keyword:
reply = True
except:
pass
return (reply, question)
@staticmethod
def log_in(self):
file_path = "secrets/secrets.txt"
uc_client_id = self.get_parameter("uc_client_id", file_path)
uc_client_secret = self.get_parameter("uc_client_secret", file_path)
uc_access_token = self.get_parameter("uc_access_token", file_path)
file_path = "config/config.txt"
self.mastodon_hostname = self.get_parameter("mastodon_hostname", file_path)
self.mastodon = Mastodon(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + self.mastodon_hostname,
)
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (self.mastodon, self.mastodon_hostname)
@staticmethod
def check_setup(file_path):
is_setup = False
if not os.path.isfile(file_path):
print(f"File {file_path} not found, running setup.")
return
else:
is_setup = True
return is_setup
@staticmethod
def get_parameter(parameter, file_path ):
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(f'{file_path} Missing parameter {parameter}')
sys.exit(0)
@staticmethod
def cleanhtml(self, raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
@staticmethod
def unescape(self, s):
s = s.replace("&apos;", "'")
return s
@staticmethod
def modify_file(self, file_name, pattern,value=""):
fh=fileinput.input(file_name,inplace=True)
for line in fh:
replacement=pattern + value
line=re.sub(pattern,replacement,line)
sys.stdout.write(line)
fh.close()
@staticmethod
def create_config(self):
if not os.path.exists('config'):
os.makedirs('config')
if not os.path.exists(self.config_filepath):
print(self.config_filepath + " created!")
with open('config/config.txt', 'w'): pass
@staticmethod
def write_config(self):
with open(self.config_filepath, 'a') as the_file:
the_file.write('mastodon_hostname: \n')
print(f"adding parameter 'mastodon_hostname' to {self.config_filepath}")
@staticmethod
def read_config_line(self):
with open(self.config_filepath) as fp:
line = fp.readline()
self.modify_file(self, self.config_filepath, "mastodon_hostname: ", value=self.mastodon_hostname)
def setup(self):
logged_in = False
try:
self.mastodon_hostname = input("Enter Mastodon hostname (or 'q' to exit): ")
if self.mastodon_hostname == 'q':
sys.exit("Bye")
user_name = input("User email, ex. user@" + self.mastodon_hostname +"? ")
user_password = getpass.getpass("User password? ")
app_name = input("App name? ")
Mastodon.create_app(app_name, scopes=["read","write"],
to_file="app_clientcred.txt", api_base_url=self.mastodon_hostname)
mastodon = Mastodon(client_id = "app_clientcred.txt", api_base_url = self.mastodon_hostname)
mastodon.log_in(
user_name,
user_password,
scopes = ["read", "write"],
to_file = "app_usercred.txt"
)
if os.path.isfile("app_usercred.txt"):
print(f"Log in succesful!")
logged_in = True
if not os.path.exists('secrets'):
os.makedirs('secrets')
secrets_filepath = 'secrets/secrets.txt'
if not os.path.exists(secrets_filepath):
with open(secrets_filepath, 'w'): pass
print(f"{secrets_filepath} created!")
with open(secrets_filepath, 'a') as the_file:
print("Writing secrets parameter names to " + secrets_filepath)
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
client_path = 'app_clientcred.txt'
with open(client_path) as fp:
line = fp.readline()
cnt = 1
while line:
if cnt == 1:
print("Writing client id to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip())
elif cnt == 2:
print("Writing client secret to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip())
line = fp.readline()
cnt += 1
token_path = 'app_usercred.txt'
with open(token_path) as fp:
line = fp.readline()
print("Writing access token to " + secrets_filepath)
self.modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip())
if os.path.exists("app_clientcred.txt"):
print("Removing app_clientcred.txt temp file..")
os.remove("app_clientcred.txt")
if os.path.exists("app_usercred.txt"):
print("Removing app_usercred.txt temp file..")
os.remove("app_usercred.txt")
self.config_filepath = 'config/config.txt'
self.create_config(self)
self.write_config(self)
self.read_config_line(self)
print("Secrets setup done!\n")
except MastodonIllegalArgumentError as i_error:
sys.stdout.write(f'\n{str(i_error)}\n')
except MastodonNetworkError as n_error:
sys.stdout.write(f'\n{str(n_error)}\n')
except MastodonReadTimeout as r_error:
sys.stdout.write(f'\n{str(r_error)}\n')
except MastodonAPIError as a_error:
sys.stdout.write(f'\n{str(a_error)}\n')
return (logged_in, mastodon, self.mastodon_hostname)