total refactor, adding realtime replies thanks to Mastodon's Streaming API
This commit is contained in:
pare
1a0186b0de
commit
e203b92c5a
S'han modificat 7 arxius amb 434 adicions i 458 eliminacions
|
@ -35,8 +35,7 @@ Before running `python xmpp.py`:
|
||||||
5. run `pip install -r requirements.txt` to install required libraries.
|
5. run `pip install -r requirements.txt` to install required libraries.
|
||||||
6. set up your contrab to run `python xmpp.py` every minute.
|
6. set up your contrab to run `python xmpp.py` every minute.
|
||||||
|
|
||||||
Enjoy!
|
Enjoy!
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
3.4.2024 - total refactor, adding realtime replies thanks to Mastodon's Streaming API
|
||||||
|
|
||||||
|
|
|
@ -147,7 +147,7 @@ class Ejabberd:
|
||||||
|
|
||||||
response = self.__api_request(endpoint, data)
|
response = self.__api_request(endpoint, data)
|
||||||
|
|
||||||
result = response.json()['stat']
|
result = response.json()
|
||||||
|
|
||||||
stats_dict[name] = result
|
stats_dict[name] = result
|
||||||
|
|
88
app/libraries/mentions.py
Normal file
88
app/libraries/mentions.py
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
import unidecode
|
||||||
|
import re
|
||||||
|
import pdb
|
||||||
|
|
||||||
|
class Mentions:
|
||||||
|
|
||||||
|
name = 'Mentions'
|
||||||
|
|
||||||
|
def __init__(self, mastodon=None):
|
||||||
|
|
||||||
|
self.mastodon = mastodon
|
||||||
|
|
||||||
|
def get_data(self, mention):
|
||||||
|
|
||||||
|
notification_id = mention.id
|
||||||
|
|
||||||
|
account_id = mention.account.id
|
||||||
|
|
||||||
|
username = mention.account.acct
|
||||||
|
|
||||||
|
status_id = mention.status.id
|
||||||
|
|
||||||
|
text = mention.status.content
|
||||||
|
|
||||||
|
visibility = mention.status.visibility
|
||||||
|
|
||||||
|
reply, query_word = self.get_question(username, text)
|
||||||
|
|
||||||
|
return reply, query_word, username, status_id, visibility
|
||||||
|
|
||||||
|
def get_question(self, username, text):
|
||||||
|
|
||||||
|
reply = False
|
||||||
|
|
||||||
|
keyword = ''
|
||||||
|
|
||||||
|
if '@' in username:
|
||||||
|
|
||||||
|
return reply, keyword
|
||||||
|
|
||||||
|
content = self.cleanhtml(text)
|
||||||
|
|
||||||
|
content = self.unescape(content)
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
start = content.index("@")
|
||||||
|
end = content.index(" ")
|
||||||
|
if len(content) > end:
|
||||||
|
|
||||||
|
content = content[0: start:] + content[end +1::]
|
||||||
|
|
||||||
|
cleanit = content.count('@')
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
while i < cleanit :
|
||||||
|
|
||||||
|
start = content.rfind("@")
|
||||||
|
end = len(content)
|
||||||
|
content = content[0: start:] + content[end +1::]
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
question = content.lower()
|
||||||
|
|
||||||
|
query_word = question
|
||||||
|
|
||||||
|
if query_word == 'registre' or query_word == 'baixa' or query_word == 'info' or query_word == 'estat' or query_word == 'sessions':
|
||||||
|
|
||||||
|
reply = True
|
||||||
|
|
||||||
|
return (reply, query_word)
|
||||||
|
|
||||||
|
except:
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def cleanhtml(raw_html):
|
||||||
|
cleanr = re.compile('<.*?>')
|
||||||
|
cleantext = re.sub(cleanr, '', raw_html)
|
||||||
|
return cleantext
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def unescape(s):
|
||||||
|
s = s.replace("'", "'")
|
||||||
|
s = s.replace("'", "'")
|
||||||
|
s = s.replace(""", '"')
|
||||||
|
return s
|
155
app/libraries/setup.py
Normal file
155
app/libraries/setup.py
Normal file
|
@ -0,0 +1,155 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
import pytz
|
||||||
|
from mastodon import Mastodon
|
||||||
|
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
|
||||||
|
import pdb
|
||||||
|
|
||||||
|
class Setup():
|
||||||
|
|
||||||
|
name = 'Mastodon account setup'
|
||||||
|
|
||||||
|
def __init__(self, config_file=None, mastodon_hostname=None, secrets_filepath=None, mastodon_app_token=None):
|
||||||
|
|
||||||
|
self.config_file = "config/config.txt"
|
||||||
|
self.mastodon_hostname = self.__get_parameter("mastodon_hostname", self.config_file)
|
||||||
|
|
||||||
|
self.secrets_filepath = 'secrets/secrets.txt'
|
||||||
|
|
||||||
|
is_setup = self.__check_mastodon_setup(self)
|
||||||
|
|
||||||
|
if is_setup:
|
||||||
|
|
||||||
|
self.mastodon_app_token = self.__get_mastodon_parameter("mastodon_app_token", self.secrets_filepath)
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
self.mastodon_app_token = self.mastodon_setup(self)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __check_mastodon_setup(self):
|
||||||
|
|
||||||
|
is_setup = False
|
||||||
|
|
||||||
|
if not os.path.isfile(self.secrets_filepath):
|
||||||
|
print(f"File {self.secrets_filepath} not found, running setup.")
|
||||||
|
else:
|
||||||
|
is_setup = True
|
||||||
|
|
||||||
|
return is_setup
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def mastodon_setup(self):
|
||||||
|
|
||||||
|
if not os.path.exists('secrets'):
|
||||||
|
os.makedirs('secrets')
|
||||||
|
|
||||||
|
self.mastodon_user = input("Mastodon user login? ")
|
||||||
|
self.mastodon_password = input("Mastodon user password? ")
|
||||||
|
self.app_name = 'fediverse'
|
||||||
|
|
||||||
|
self.mastodon_app_token = self.mastodon_log_in()
|
||||||
|
|
||||||
|
if not os.path.exists(self.secrets_filepath):
|
||||||
|
with open(self.secrets_filepath, 'w'): pass
|
||||||
|
print(f"{self.secrets_filepath} created!")
|
||||||
|
|
||||||
|
with open(self.secrets_filepath, 'a') as the_file:
|
||||||
|
print("Writing Mastodon parameters to " + self.secrets_filepath)
|
||||||
|
the_file.write(f'mastodon_app_token: {self.mastodon_app_token}')
|
||||||
|
|
||||||
|
return self.mastodon_app_token
|
||||||
|
|
||||||
|
def mastodon_log_in(self):
|
||||||
|
|
||||||
|
token = ''
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
response = Mastodon.create_app(
|
||||||
|
self.app_name,
|
||||||
|
scopes=["read","write"],
|
||||||
|
to_file=None,
|
||||||
|
api_base_url=self.mastodon_hostname
|
||||||
|
)
|
||||||
|
client_id = response[0]
|
||||||
|
client_secret = response[1]
|
||||||
|
|
||||||
|
mastodon = Mastodon(client_id = client_id, client_secret = client_secret, api_base_url = self.mastodon_hostname)
|
||||||
|
|
||||||
|
token = mastodon.log_in(
|
||||||
|
self.mastodon_user,
|
||||||
|
self.mastodon_password,
|
||||||
|
scopes = ["read", "write"],
|
||||||
|
to_file = None
|
||||||
|
)
|
||||||
|
|
||||||
|
print('Log in succesful!')
|
||||||
|
|
||||||
|
except MastodonIllegalArgumentError as i_error:
|
||||||
|
|
||||||
|
sys.stdout.write(f'\n{str(i_error)}\n')
|
||||||
|
|
||||||
|
except MastodonNetworkError as n_error:
|
||||||
|
|
||||||
|
sys.stdout.write(f'\n{str(n_error)}\n')
|
||||||
|
|
||||||
|
except MastodonReadTimeout as r_error:
|
||||||
|
|
||||||
|
sys.stdout.write(f'\n{str(r_error)}\n')
|
||||||
|
|
||||||
|
except MastodonAPIError as a_error:
|
||||||
|
|
||||||
|
sys.stdout.write(f'\n{str(a_error)}\n')
|
||||||
|
|
||||||
|
finally:
|
||||||
|
|
||||||
|
return token
|
||||||
|
|
||||||
|
def __get_parameter(self, parameter, config_file):
|
||||||
|
|
||||||
|
if not os.path.isfile(config_file):
|
||||||
|
print(f"File {config_file} not found..")
|
||||||
|
|
||||||
|
self.mastodon_hostname = input("\nMastodon hostname: ")
|
||||||
|
|
||||||
|
self.__create_config(self)
|
||||||
|
self.__write_config(self)
|
||||||
|
|
||||||
|
with open( self.config_file ) as f:
|
||||||
|
for line in f:
|
||||||
|
if line.startswith( parameter ):
|
||||||
|
return line.replace(parameter + ":", "").strip()
|
||||||
|
|
||||||
|
def __get_mastodon_parameter(self, parameter, secrets_filepath):
|
||||||
|
|
||||||
|
if not os.path.isfile(secrets_filepath):
|
||||||
|
print(f"File {secrets_filepath} not found..")
|
||||||
|
|
||||||
|
self.mastodon_log_in()
|
||||||
|
|
||||||
|
with open( self.secrets_filepath ) as f:
|
||||||
|
for line in f:
|
||||||
|
if line.startswith( parameter ):
|
||||||
|
return line.replace(parameter + ":", "").strip()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __create_config(self):
|
||||||
|
|
||||||
|
if not os.path.exists('config'):
|
||||||
|
|
||||||
|
os.makedirs('config')
|
||||||
|
|
||||||
|
if not os.path.exists(self.config_file):
|
||||||
|
|
||||||
|
print(self.config_file + " created!")
|
||||||
|
with open(self.config_file, 'w'): pass
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __write_config(self):
|
||||||
|
|
||||||
|
with open(self.config_file, 'a') as the_file:
|
||||||
|
|
||||||
|
the_file.write(f'mastodon_hostname: {self.mastodon_hostname}')
|
||||||
|
print(f"adding parameters to {self.config_file}\n")
|
97
app/libraries/strings.py
Normal file
97
app/libraries/strings.py
Normal file
|
@ -0,0 +1,97 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import os.path
|
||||||
|
import pdb
|
||||||
|
|
||||||
|
class Strings:
|
||||||
|
|
||||||
|
name = 'xmppbot strings'
|
||||||
|
|
||||||
|
def __init__(self, bot_lang_path=None, bot_lang=None):
|
||||||
|
|
||||||
|
self.bot_lang_path = "config/lang.txt"
|
||||||
|
|
||||||
|
is_setup = self.check_setup(self)
|
||||||
|
|
||||||
|
if is_setup:
|
||||||
|
|
||||||
|
self.bot_lang = self.get_parameter("bot_lang", self.bot_lang_path)
|
||||||
|
self.load_strings(self)
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
self.bot_lang = self.setup(self)
|
||||||
|
|
||||||
|
self.load_strings(self)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def check_setup(self):
|
||||||
|
|
||||||
|
is_setup = False
|
||||||
|
|
||||||
|
if not os.path.isfile(self.bot_lang_path):
|
||||||
|
print(f"File {self.bot_lang_path} not found, running setup.")
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
is_setup = True
|
||||||
|
return is_setup
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def setup(self):
|
||||||
|
|
||||||
|
self.bot_lang = input("Bot replies lang (ca or en)? ")
|
||||||
|
|
||||||
|
if not os.path.exists(self.bot_lang_path):
|
||||||
|
with open(self.bot_lang_path, 'w'): pass
|
||||||
|
print(f"{self.bot_lang_path} created!")
|
||||||
|
|
||||||
|
with open(self.bot_lang_path, 'a') as the_file:
|
||||||
|
print(f"Writing bot lang parameters to {self.bot_lang_path}")
|
||||||
|
the_file.write(f'bot_lang: {self.bot_lang}')
|
||||||
|
|
||||||
|
return self.bot_lang
|
||||||
|
|
||||||
|
def get_parameter(self, parameter, file):
|
||||||
|
|
||||||
|
with open( file ) as f:
|
||||||
|
for line in f:
|
||||||
|
if line.startswith( parameter ):
|
||||||
|
return line.replace(parameter + ":", "").strip()
|
||||||
|
|
||||||
|
print(f'{self.bot_path} Missing parameter {parameter}')
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def load_strings(self):
|
||||||
|
|
||||||
|
lang_file_path = f"app/locales/{self.bot_lang}.txt"
|
||||||
|
|
||||||
|
self.register_str = self.get_parameter("register_str", lang_file_path)
|
||||||
|
self.unregister_str = self.get_parameter("unregister_str", lang_file_path)
|
||||||
|
self.stats_str = self.get_parameter("stats_str", lang_file_path)
|
||||||
|
self.status_str = self.get_parameter("status_str", lang_file_path)
|
||||||
|
self.registerok_str = self.get_parameter("registerok_str", lang_file_path)
|
||||||
|
self.user_str = self.get_parameter("user_str", lang_file_path)
|
||||||
|
self.password_str = self.get_parameter("password_str", lang_file_path)
|
||||||
|
self.server_str = self.get_parameter("server_str", lang_file_path)
|
||||||
|
self.xmpp_account_str = self.get_parameter("xmpp_account_str", lang_file_path)
|
||||||
|
self.notdeleteadmin_str = self.get_parameter("notdeleteadmin_str", lang_file_path)
|
||||||
|
self.deleted_str = self.get_parameter("deleted_str", lang_file_path)
|
||||||
|
self.stats_title_str = self.get_parameter("stats_title_str", lang_file_path)
|
||||||
|
self.registered_users_str = self.get_parameter("registered_users_str", lang_file_path)
|
||||||
|
self.users_online_str = self.get_parameter("users_online_str", lang_file_path)
|
||||||
|
self.node_users_str = self.get_parameter("node_users_str", lang_file_path)
|
||||||
|
self.uptime_str = self.get_parameter("uptime_str", lang_file_path)
|
||||||
|
self.processes_str = self.get_parameter("processes_str", lang_file_path)
|
||||||
|
self.account_exists_str = self.get_parameter("account_exists_str", lang_file_path)
|
||||||
|
self.user_sessions_info_str = self.get_parameter("user_sessions_info_str", lang_file_path)
|
||||||
|
self.current_sessions_str = self.get_parameter("current_sessions_str", lang_file_path)
|
||||||
|
self.sessions_connection_str = self.get_parameter("sessions_connection_str", lang_file_path)
|
||||||
|
self.sessions_ip_str = self.get_parameter("sessions_ip_str", lang_file_path)
|
||||||
|
self.sessions_port_str = self.get_parameter("sessions_port_str", lang_file_path)
|
||||||
|
self.sessions_priority_str = self.get_parameter("sessions_priority_str", lang_file_path)
|
||||||
|
self.sessions_node_str = self.get_parameter("sessions_node_str", lang_file_path)
|
||||||
|
self.sessions_uptime_str = self.get_parameter("sessions_uptime_str", lang_file_path)
|
||||||
|
self.sessions_status_str = self.get_parameter("sessions_status_str", lang_file_path)
|
||||||
|
self.sessions_resource_str = self.get_parameter("sessions_resource_str", lang_file_path)
|
||||||
|
self.sessions_statustext_str = self.get_parameter("sessions_statustext_str", lang_file_path)
|
392
mastodonbot.py
392
mastodonbot.py
|
@ -1,392 +0,0 @@
|
||||||
from mastodon import Mastodon
|
|
||||||
from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError, MastodonReadTimeout, MastodonAPIError, MastodonIllegalArgumentError
|
|
||||||
import getpass
|
|
||||||
import unidecode
|
|
||||||
import fileinput,re
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import os.path
|
|
||||||
|
|
||||||
###
|
|
||||||
# Dict helper class.
|
|
||||||
# Defined at top level so it can be pickled.
|
|
||||||
###
|
|
||||||
class AttribAccessDict(dict):
|
|
||||||
def __getattr__(self, attr):
|
|
||||||
if attr in self:
|
|
||||||
return self[attr]
|
|
||||||
else:
|
|
||||||
raise AttributeError("Attribute not found: " + str(attr))
|
|
||||||
|
|
||||||
def __setattr__(self, attr, val):
|
|
||||||
if attr in self:
|
|
||||||
raise AttributeError("Attribute-style access is read only")
|
|
||||||
super(AttribAccessDict, self).__setattr__(attr, val)
|
|
||||||
|
|
||||||
class Mastodonbot:
|
|
||||||
|
|
||||||
name = 'Mastodonbot'
|
|
||||||
|
|
||||||
def __init__(self, mastodon=None, mastodon_hostname=None):
|
|
||||||
|
|
||||||
file_path = "secrets/secrets.txt"
|
|
||||||
|
|
||||||
is_setup = self.__check_setup(file_path)
|
|
||||||
|
|
||||||
if is_setup:
|
|
||||||
|
|
||||||
self.__uc_client_id = self.__get_parameter("uc_client_id", file_path)
|
|
||||||
self.__uc_client_secret = self.__get_parameter("uc_client_secret", file_path)
|
|
||||||
self.__uc_access_token = self.__get_parameter("uc_access_token", file_path)
|
|
||||||
|
|
||||||
self.mastodon, self.mastodon_hostname = self.__log_in(self)
|
|
||||||
|
|
||||||
bot_file_path = "config/config.txt"
|
|
||||||
|
|
||||||
self.__bot_lang = self.__get_parameter("bot_lang" , bot_file_path)
|
|
||||||
|
|
||||||
self.__load_strings(self)
|
|
||||||
|
|
||||||
else:
|
|
||||||
|
|
||||||
while(True):
|
|
||||||
|
|
||||||
logged_in, self.mastodon, self.mastodon_hostname = self.setup()
|
|
||||||
|
|
||||||
if not logged_in:
|
|
||||||
|
|
||||||
print("\nLog in failed! Try again.\n")
|
|
||||||
|
|
||||||
else:
|
|
||||||
|
|
||||||
bot_file_path = "config/config.txt"
|
|
||||||
|
|
||||||
self.__bot_lang = self.__get_parameter("bot_lang" , bot_file_path)
|
|
||||||
|
|
||||||
self.__load_strings(self)
|
|
||||||
|
|
||||||
break
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __log_in(self):
|
|
||||||
|
|
||||||
file_path = "secrets/secrets.txt"
|
|
||||||
uc_client_id = self.__get_parameter("uc_client_id", file_path)
|
|
||||||
uc_client_secret = self.__get_parameter("uc_client_secret", file_path)
|
|
||||||
uc_access_token = self.__get_parameter("uc_access_token", file_path)
|
|
||||||
|
|
||||||
file_path = "config/config.txt"
|
|
||||||
self.mastodon_hostname = self.__get_parameter("mastodon_hostname", file_path)
|
|
||||||
|
|
||||||
self.mastodon = Mastodon(
|
|
||||||
client_id = uc_client_id,
|
|
||||||
client_secret = uc_client_secret,
|
|
||||||
access_token = uc_access_token,
|
|
||||||
api_base_url = 'https://' + self.mastodon_hostname,
|
|
||||||
)
|
|
||||||
|
|
||||||
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
|
|
||||||
|
|
||||||
return (self.mastodon, self.mastodon_hostname)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __check_setup(file_path):
|
|
||||||
|
|
||||||
is_setup = False
|
|
||||||
|
|
||||||
if not os.path.isfile(file_path):
|
|
||||||
print(f"File {file_path} not found, running setup.")
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
is_setup = True
|
|
||||||
return is_setup
|
|
||||||
|
|
||||||
def setup(self):
|
|
||||||
|
|
||||||
logged_in = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
|
|
||||||
self.mastodon_hostname = input("Enter Mastodon hostname (or 'q' to exit): ")
|
|
||||||
|
|
||||||
if self.mastodon_hostname == 'q':
|
|
||||||
|
|
||||||
sys.exit("Bye")
|
|
||||||
|
|
||||||
user_name = input("Bot user name, ex. john? ")
|
|
||||||
user_password = getpass.getpass("Bot password? ")
|
|
||||||
app_name = input("Bot App name? ")
|
|
||||||
self.__bot_lang = input("Bot replies lang (ca or en)? ")
|
|
||||||
|
|
||||||
client_id, client_secret = Mastodon.create_app(
|
|
||||||
app_name,
|
|
||||||
to_file="app_clientcred.txt",
|
|
||||||
api_base_url=self.mastodon_hostname
|
|
||||||
)
|
|
||||||
|
|
||||||
mastodon = Mastodon(client_id = "app_clientcred.txt", api_base_url = self.mastodon_hostname)
|
|
||||||
|
|
||||||
mastodon.log_in(
|
|
||||||
user_name,
|
|
||||||
user_password,
|
|
||||||
scopes = ["read", "write"],
|
|
||||||
to_file = "app_usercred.txt"
|
|
||||||
)
|
|
||||||
|
|
||||||
if os.path.isfile("app_usercred.txt"):
|
|
||||||
|
|
||||||
print(f"Log in succesful!")
|
|
||||||
logged_in = True
|
|
||||||
|
|
||||||
if not os.path.exists('secrets'):
|
|
||||||
os.makedirs('secrets')
|
|
||||||
|
|
||||||
secrets_filepath = 'secrets/secrets.txt'
|
|
||||||
|
|
||||||
if not os.path.exists(secrets_filepath):
|
|
||||||
with open(secrets_filepath, 'w'): pass
|
|
||||||
print(f"{secrets_filepath} created!")
|
|
||||||
|
|
||||||
with open(secrets_filepath, 'a') as the_file:
|
|
||||||
print("Writing secrets parameter names to " + secrets_filepath)
|
|
||||||
the_file.write('uc_client_id: \n'+'uc_client_secret: \n'+'uc_access_token: \n')
|
|
||||||
|
|
||||||
client_path = 'app_clientcred.txt'
|
|
||||||
|
|
||||||
with open(client_path) as fp:
|
|
||||||
|
|
||||||
line = fp.readline()
|
|
||||||
cnt = 1
|
|
||||||
|
|
||||||
while line:
|
|
||||||
|
|
||||||
if cnt == 1:
|
|
||||||
|
|
||||||
print("Writing client id to " + secrets_filepath)
|
|
||||||
self.__modify_file(self, secrets_filepath, "uc_client_id: ", value=line.rstrip())
|
|
||||||
|
|
||||||
elif cnt == 2:
|
|
||||||
|
|
||||||
print("Writing client secret to " + secrets_filepath)
|
|
||||||
self.__modify_file(self, secrets_filepath, "uc_client_secret: ", value=line.rstrip())
|
|
||||||
|
|
||||||
line = fp.readline()
|
|
||||||
cnt += 1
|
|
||||||
|
|
||||||
token_path = 'app_usercred.txt'
|
|
||||||
|
|
||||||
with open(token_path) as fp:
|
|
||||||
|
|
||||||
line = fp.readline()
|
|
||||||
print("Writing access token to " + secrets_filepath)
|
|
||||||
self.__modify_file(self, secrets_filepath, "uc_access_token: ", value=line.rstrip())
|
|
||||||
|
|
||||||
if os.path.exists("app_clientcred.txt"):
|
|
||||||
|
|
||||||
print("Removing app_clientcred.txt temp file..")
|
|
||||||
os.remove("app_clientcred.txt")
|
|
||||||
|
|
||||||
if os.path.exists("app_usercred.txt"):
|
|
||||||
|
|
||||||
print("Removing app_usercred.txt temp file..")
|
|
||||||
os.remove("app_usercred.txt")
|
|
||||||
|
|
||||||
self.config_filepath = 'config/config.txt'
|
|
||||||
|
|
||||||
self.__create_config(self)
|
|
||||||
self.__write_config(self)
|
|
||||||
|
|
||||||
print("Secrets setup done!\n")
|
|
||||||
|
|
||||||
except MastodonIllegalArgumentError as i_error:
|
|
||||||
|
|
||||||
sys.stdout.write(f'\n{str(i_error)}\n')
|
|
||||||
|
|
||||||
except MastodonNetworkError as n_error:
|
|
||||||
|
|
||||||
sys.stdout.write(f'\n{str(n_error)}\n')
|
|
||||||
|
|
||||||
except MastodonReadTimeout as r_error:
|
|
||||||
|
|
||||||
sys.stdout.write(f'\n{str(r_error)}\n')
|
|
||||||
|
|
||||||
except MastodonAPIError as a_error:
|
|
||||||
|
|
||||||
sys.stdout.write(f'\n{str(a_error)}\n')
|
|
||||||
|
|
||||||
return (logged_in, mastodon, self.mastodon_hostname)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __get_parameter(parameter, file_path ):
|
|
||||||
|
|
||||||
with open( file_path ) as f:
|
|
||||||
for line in f:
|
|
||||||
if line.startswith( parameter ):
|
|
||||||
return line.replace(parameter + ":", "").strip()
|
|
||||||
|
|
||||||
print(f'{file_path} Missing parameter {parameter}')
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __load_strings(self):
|
|
||||||
|
|
||||||
lang_file_path = f"app/locales/{self.__bot_lang}.txt"
|
|
||||||
|
|
||||||
self.register_str = self.__get_parameter("register_str", lang_file_path)
|
|
||||||
self.unregister_str = self.__get_parameter("unregister_str", lang_file_path)
|
|
||||||
self.stats_str = self.__get_parameter("stats_str", lang_file_path)
|
|
||||||
self.status_str = self.__get_parameter("status_str", lang_file_path)
|
|
||||||
self.registerok_str = self.__get_parameter("registerok_str", lang_file_path)
|
|
||||||
self.user_str = self.__get_parameter("user_str", lang_file_path)
|
|
||||||
self.password_str = self.__get_parameter("password_str", lang_file_path)
|
|
||||||
self.server_str = self.__get_parameter("server_str", lang_file_path)
|
|
||||||
self.xmpp_account_str = self.__get_parameter("xmpp_account_str", lang_file_path)
|
|
||||||
self.notdeleteadmin_str = self.__get_parameter("notdeleteadmin_str", lang_file_path)
|
|
||||||
self.deleted_str = self.__get_parameter("deleted_str", lang_file_path)
|
|
||||||
self.stats_title_str = self.__get_parameter("stats_title_str", lang_file_path)
|
|
||||||
self.registered_users_str = self.__get_parameter("registered_users_str", lang_file_path)
|
|
||||||
self.users_online_str = self.__get_parameter("users_online_str", lang_file_path)
|
|
||||||
self.node_users_str = self.__get_parameter("node_users_str", lang_file_path)
|
|
||||||
self.uptime_str = self.__get_parameter("uptime_str", lang_file_path)
|
|
||||||
self.processes_str = self.__get_parameter("processes_str", lang_file_path)
|
|
||||||
self.account_exists_str = self.__get_parameter("account_exists_str", lang_file_path)
|
|
||||||
self.user_sessions_info_str = self.__get_parameter("user_sessions_info_str", lang_file_path)
|
|
||||||
self.current_sessions_str = self.__get_parameter("current_sessions_str", lang_file_path)
|
|
||||||
self.sessions_connection_str = self.__get_parameter("sessions_connection_str", lang_file_path)
|
|
||||||
self.sessions_ip_str = self.__get_parameter("sessions_ip_str", lang_file_path)
|
|
||||||
self.sessions_port_str = self.__get_parameter("sessions_port_str", lang_file_path)
|
|
||||||
self.sessions_priority_str = self.__get_parameter("sessions_priority_str", lang_file_path)
|
|
||||||
self.sessions_node_str = self.__get_parameter("sessions_node_str", lang_file_path)
|
|
||||||
self.sessions_uptime_str = self.__get_parameter("sessions_uptime_str", lang_file_path)
|
|
||||||
self.sessions_status_str = self.__get_parameter("sessions_status_str", lang_file_path)
|
|
||||||
self.sessions_resource_str = self.__get_parameter("sessions_resource_str", lang_file_path)
|
|
||||||
self.sessions_statustext_str = self.__get_parameter("sessions_statustext_str", lang_file_path)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __modify_file(self, file_name, pattern,value=""):
|
|
||||||
|
|
||||||
fh=fileinput.input(file_name,inplace=True)
|
|
||||||
|
|
||||||
for line in fh:
|
|
||||||
|
|
||||||
replacement=pattern + value
|
|
||||||
line=re.sub(pattern,replacement,line)
|
|
||||||
sys.stdout.write(line)
|
|
||||||
|
|
||||||
fh.close()
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __create_config(self):
|
|
||||||
|
|
||||||
if not os.path.exists('config'):
|
|
||||||
|
|
||||||
os.makedirs('config')
|
|
||||||
|
|
||||||
if not os.path.exists(self.config_filepath):
|
|
||||||
|
|
||||||
print(self.config_filepath + " created!")
|
|
||||||
with open('config/config.txt', 'w'): pass
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __write_config(self):
|
|
||||||
|
|
||||||
with open(self.config_filepath, 'a') as the_file:
|
|
||||||
|
|
||||||
the_file.write(f'mastodon_hostname: {self.mastodon_hostname}\n')
|
|
||||||
the_file.write(f'bot_lang: {self.__bot_lang}\n')
|
|
||||||
print(f"adding parameter 'mastodon_hostname' to {self.config_filepath}\n")
|
|
||||||
print(f"adding parameter 'bot_lang' to {self.config_filepath}\n")
|
|
||||||
|
|
||||||
def get_data(self, notif):
|
|
||||||
|
|
||||||
id = notif.id
|
|
||||||
|
|
||||||
account_id = notif.account.id
|
|
||||||
|
|
||||||
acct = notif.account.acct
|
|
||||||
|
|
||||||
status_id = notif.status.id
|
|
||||||
|
|
||||||
text = notif.status.content
|
|
||||||
|
|
||||||
visibility = notif.status.visibility
|
|
||||||
|
|
||||||
reply, question = self.__get_question(self, text)
|
|
||||||
|
|
||||||
mention_dict = {'reply': reply, 'question': question, 'id': id, 'account_id': account_id, 'acct': acct, 'status_id': status_id, 'text': text, 'visibility': visibility}
|
|
||||||
|
|
||||||
mention = self.__json_allow_dict_attrs(mention_dict)
|
|
||||||
|
|
||||||
return mention
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __get_question(self, text):
|
|
||||||
|
|
||||||
reply = False
|
|
||||||
|
|
||||||
keyword = ''
|
|
||||||
|
|
||||||
content = self.__cleanhtml(self, text)
|
|
||||||
|
|
||||||
content = self.__unescape(self, content)
|
|
||||||
|
|
||||||
try:
|
|
||||||
|
|
||||||
start = content.index("@")
|
|
||||||
end = content.index(" ")
|
|
||||||
if len(content) > end:
|
|
||||||
|
|
||||||
content = content[0: start:] + content[end +1::]
|
|
||||||
|
|
||||||
cleanit = content.count('@')
|
|
||||||
|
|
||||||
i = 0
|
|
||||||
while i < cleanit :
|
|
||||||
|
|
||||||
start = content.rfind("@")
|
|
||||||
end = len(content)
|
|
||||||
content = content[0: start:] + content[end +1::]
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
content = content.lower()
|
|
||||||
question = content
|
|
||||||
|
|
||||||
#keyword_length = 8
|
|
||||||
keyword = question
|
|
||||||
|
|
||||||
if keyword == self.register_str or keyword == self.unregister_str or keyword == self.stats_str or self.status_str or user_sessions_info_str:
|
|
||||||
|
|
||||||
keyword_length = len(keyword)
|
|
||||||
|
|
||||||
if unidecode.unidecode(question)[0:keyword_length] == keyword:
|
|
||||||
|
|
||||||
reply = True
|
|
||||||
|
|
||||||
except:
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
return (reply, question)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __cleanhtml(self, raw_html):
|
|
||||||
|
|
||||||
cleanr = re.compile('<.*?>')
|
|
||||||
cleantext = re.sub(cleanr, '', raw_html)
|
|
||||||
return cleantext
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __unescape(self, s):
|
|
||||||
|
|
||||||
s = s.replace("'", "'")
|
|
||||||
return s
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __json_allow_dict_attrs(json_object):
|
|
||||||
"""
|
|
||||||
Makes it possible to use attribute notation to access a dicts
|
|
||||||
elements, while still allowing the dict to act as a dict.
|
|
||||||
"""
|
|
||||||
if isinstance(json_object, dict):
|
|
||||||
return AttribAccessDict(json_object)
|
|
||||||
return json_object
|
|
153
xmpp.py
153
xmpp.py
|
@ -1,138 +1,167 @@
|
||||||
from mastodonbot import Mastodonbot
|
from app.libraries.setup import Setup
|
||||||
from ejabberdapi import Ejabberd
|
from app.libraries.mentions import Mentions
|
||||||
from mastodon import MastodonNetworkError
|
from app.libraries.ejabberdapi import Ejabberd
|
||||||
|
from app.libraries.strings import Strings
|
||||||
|
from mastodon import Mastodon, StreamListener
|
||||||
import sys
|
import sys
|
||||||
import humanize
|
import humanize
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
|
import pdb
|
||||||
|
|
||||||
# main
|
def check_activity(username):
|
||||||
|
|
||||||
if __name__ == '__main__':
|
active = False
|
||||||
|
|
||||||
bot = Mastodonbot()
|
account_id = mastodon.account_lookup(username).id
|
||||||
|
|
||||||
ejabberd = Ejabberd()
|
act_st = mastodon.account_statuses(account_id)
|
||||||
|
|
||||||
try:
|
active_post = 0
|
||||||
|
|
||||||
notifications = bot.mastodon.notifications()
|
for status in act_st:
|
||||||
|
|
||||||
except MastodonNetworkError as net_error:
|
if status.created_at.replace(tzinfo=None) > datetime.now() - timedelta(days=30):
|
||||||
|
|
||||||
sys.exit(net_error)
|
active_post += 1
|
||||||
|
|
||||||
for notif in notifications:
|
if active_post >= 3:
|
||||||
|
|
||||||
if notif.type != 'mention':
|
active = True
|
||||||
|
|
||||||
print(f"Dismissing notification id {notif.id}")
|
return active
|
||||||
|
|
||||||
bot.mastodon.notifications_dismiss(notif.id)
|
class Listener(StreamListener):
|
||||||
|
|
||||||
|
def on_notification(self, notification):
|
||||||
|
|
||||||
|
if notification.type != 'mention':
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
mention = bot.get_data(notif)
|
qry = Mentions(mastodon)
|
||||||
|
|
||||||
if mention.reply and '@' not in mention.acct:
|
reply, query_word, username, status_id, visibility = qry.get_data(notification)
|
||||||
|
|
||||||
if mention.question == bot.register_str:
|
if reply:
|
||||||
|
|
||||||
account_exists = ejabberd.check_account(mention.acct, ejabberd.local_vhost)
|
if query_word == strings.register_str:
|
||||||
|
|
||||||
if not account_exists:
|
active = check_activity(username)
|
||||||
|
|
||||||
password = ejabberd.generate_pass()
|
if active:
|
||||||
|
|
||||||
is_registered, text = ejabberd.register(mention.acct, ejabberd.local_vhost, password)
|
account_exists = ejabberd.check_account(username, ejabberd.local_vhost)
|
||||||
|
|
||||||
if is_registered:
|
if not account_exists:
|
||||||
|
|
||||||
post = f"@{mention.acct} {bot.registerok_str}\n\n{bot.user_str} {mention.acct}@{ejabberd.local_vhost}\n"
|
password = ejabberd.generate_pass()
|
||||||
|
|
||||||
post += f"{bot.password_str} {password}\n{bot.server_str} {ejabberd.local_vhost}"
|
is_registered, text = ejabberd.register(username, ejabberd.local_vhost, password)
|
||||||
|
|
||||||
bot.mastodon.status_post(post, in_reply_to_id=mention.status_id, visibility='direct')
|
if is_registered:
|
||||||
|
|
||||||
|
post = f"@{username} {strings.registerok_str}\n\n{strings.user_str} {username}@{ejabberd.local_vhost}\n"
|
||||||
|
|
||||||
|
post += f"{strings.password_str} {password}\n{strings.server_str} {ejabberd.local_vhost}"
|
||||||
|
|
||||||
|
mastodon.status_post(post, in_reply_to_id=status_id, visibility='direct')
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
mastodon.status_post(f'@{username}, {text}', in_reply_to_id=status_id, visibility='direct')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
bot.mastodon.status_post(f'@{mention.acct}, {text}', in_reply_to_id=mention.status_id, visibility='direct')
|
mastodon.status_post(f'@{username}, {strings.account_exists_str}', in_reply_to_id=status_id, visibility='direct')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
bot.mastodon.status_post(f'@{mention.acct}, {bot.account_exists_str}', in_reply_to_id=mention.status_id, visibility='direct')
|
toot_text = f'@{username},\n'
|
||||||
|
|
||||||
elif mention.question == bot.unregister_str:
|
toot_text += 'no hi ha activitat en el darrer mes, el registre no és pot dur a terme.'
|
||||||
|
|
||||||
is_unregistered, is_admin = ejabberd.unregister(mention.acct, ejabberd.local_vhost)
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
||||||
|
|
||||||
|
elif query_word == strings.unregister_str:
|
||||||
|
|
||||||
|
is_unregistered, is_admin = ejabberd.unregister(username, ejabberd.local_vhost)
|
||||||
|
|
||||||
if is_unregistered:
|
if is_unregistered:
|
||||||
|
|
||||||
bot.mastodon.status_post(f"@{mention.acct}, {bot.xmpp_account_str} {mention.acct}@{ejabberd.local_vhost}: {bot.deleted_str}", in_reply_to_id=mention.status_id, visibility='direct')
|
mastodon.status_post(f"@{username}, {strings.xmpp_account_str} {username}@{ejabberd.local_vhost}: {strings.deleted_str}", in_reply_to_id=status_id, visibility='direct')
|
||||||
|
|
||||||
elif not is_unregistered:
|
elif not is_unregistered:
|
||||||
|
|
||||||
if is_admin:
|
if is_admin:
|
||||||
|
|
||||||
bot.mastodon.status_post(f'@{mention.acct}, {bot.notdeleteadmin_str}', in_reply_to_id=mention.status_id, visibility='direct')
|
mastodon.status_post(f'@{username}, {strings.notdeleteadmin_str}', in_reply_to_id=status_id, visibility='direct')
|
||||||
|
|
||||||
elif mention.question == bot.stats_str:
|
elif query_word == strings.stats_str:
|
||||||
|
|
||||||
stats = ejabberd.stats()
|
stats = ejabberd.stats()
|
||||||
|
|
||||||
post = f'@{mention.acct}, {bot.stats_title_str} {ejabberd.local_vhost}:\n\n'
|
post = f'@{username}, {strings.stats_title_str} {ejabberd.local_vhost}:\n\n'
|
||||||
|
|
||||||
post += f'{bot.registered_users_str} {stats.registeredusers}\n'
|
post += f'{strings.registered_users_str} {stats.registeredusers}\n'
|
||||||
|
|
||||||
post += f'{bot.users_online_str} {stats.onlineusers}\n'
|
post += f'{strings.users_online_str} {stats.onlineusers}\n'
|
||||||
|
|
||||||
post += f'{bot.node_users_str} {stats.onlineusersnode}\n'
|
post += f'{strings.node_users_str} {stats.onlineusersnode}\n'
|
||||||
|
|
||||||
post += f'{bot.uptime_str} {humanize.naturaldelta(dt.timedelta(seconds=stats.uptimeseconds))}\n'
|
post += f'{strings.uptime_str} {humanize.naturaldelta(dt.timedelta(seconds=stats.uptimeseconds))}\n'
|
||||||
|
|
||||||
post += f'{bot.processes_str} {stats.processes}\n'
|
post += f'{strings.processes_str} {stats.processes}\n'
|
||||||
|
|
||||||
bot.mastodon.status_post(post, in_reply_to_id=mention.status_id, visibility=mention.visibility)
|
mastodon.status_post(post, in_reply_to_id=status_id, visibility=visibility)
|
||||||
|
|
||||||
elif mention.question == bot.status_str:
|
elif query_word == strings.status_str:
|
||||||
|
|
||||||
status = ejabberd.status()
|
status = ejabberd.status()
|
||||||
|
|
||||||
post = f'@{mention.acct}, {status}\n'
|
post = f'@{username}, {status}\n'
|
||||||
|
|
||||||
bot.mastodon.status_post(post, in_reply_to_id=mention.status_id, visibility=mention.visibility)
|
mastodon.status_post(post, in_reply_to_id=status_id, visibility=visibility)
|
||||||
|
|
||||||
elif mention.question == bot.user_sessions_info_str:
|
elif query_word == strings.user_sessions_info_str:
|
||||||
|
|
||||||
sessions = ejabberd.user_sessions_info(mention.acct, ejabberd.local_vhost)
|
sessions = ejabberd.user_sessions_info(username, ejabberd.local_vhost)
|
||||||
|
|
||||||
post = f'@{mention.acct}, {bot.current_sessions_str}\n\n'
|
post = f'@{username}, {strings.current_sessions_str}\n\n'
|
||||||
|
|
||||||
if len(sessions) != 0:
|
if len(sessions) != 0:
|
||||||
|
|
||||||
post += f'{bot.sessions_connection_str} {sessions.connection}\n'
|
post += f'{strings.sessions_connection_str} {sessions.connection}\n'
|
||||||
post += f'{bot.sessions_ip_str} {sessions.ip}\n'
|
post += f'{strings.sessions_ip_str} {sessions.ip}\n'
|
||||||
post += f'{bot.sessions_port_str} {sessions.port}\n'
|
post += f'{strings.sessions_port_str} {sessions.port}\n'
|
||||||
post += f'{bot.sessions_priority_str} {sessions.priority}\n'
|
post += f'{strings.sessions_priority_str} {sessions.priority}\n'
|
||||||
post += f'{bot.sessions_node_str} {sessions.node}\n'
|
post += f'{strings.sessions_node_str} {sessions.node}\n'
|
||||||
post += f'{bot.sessions_uptime_str} {sessions.uptime}\n'
|
post += f'{strings.sessions_uptime_str} {sessions.uptime}\n'
|
||||||
post += f'{bot.sessions_status_str} {sessions.status}\n'
|
post += f'{strings.sessions_status_str} {sessions.status}\n'
|
||||||
post += f'{bot.sessions_resource_str} {sessions.resource}\n'
|
post += f'{strings.sessions_resource_str} {sessions.resource}\n'
|
||||||
post += f'{bot.sessions_statustext_str} {sessions.statustext}\n'
|
post += f'{strings.sessions_statustext_str} {sessions.statustext}\n'
|
||||||
|
|
||||||
bot.mastodon.status_post(post, in_reply_to_id=mention.status_id, visibility='direct')
|
mastodon.status_post(post, in_reply_to_id=status_id, visibility='direct')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
post += '0'
|
post += '0'
|
||||||
|
|
||||||
bot.mastodon.status_post(post, in_reply_to_id=mention.status_id, visibility=mention.visibility)
|
mastodon.status_post(post, in_reply_to_id=status_id, visibility=visibility)
|
||||||
|
|
||||||
print(f"Dismissing notification id {mention.id}")
|
# main
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
bot.mastodon.notifications_dismiss(mention.id)
|
setup = Setup()
|
||||||
|
|
||||||
else:
|
mastodon = Mastodon(
|
||||||
|
access_token = setup.mastodon_app_token,
|
||||||
|
api_base_url= setup.mastodon_hostname
|
||||||
|
)
|
||||||
|
|
||||||
print(f"Dismissing notification id {mention.id}")
|
ejabberd = Ejabberd()
|
||||||
|
|
||||||
bot.mastodon.notifications_dismiss(mention.id)
|
strings = Strings()
|
||||||
|
|
||||||
|
mastodon.stream_user(Listener())
|
||||||
|
|
Loading…
Referencia en una nova incidència