1546 líneas
39 KiB
Python
1546 líneas
39 KiB
Python
import pdb
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import re
|
|
import unidecode
|
|
from datetime import datetime, timedelta
|
|
from mastodon import Mastodon
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
import smtplib
|
|
from smtplib import SMTPException, SMTPAuthenticationError, SMTPConnectError, SMTPRecipientsRefused
|
|
import socket
|
|
from socket import gaierror
|
|
import psycopg2
|
|
import chess
|
|
import chess.svg
|
|
from cairosvg import svg2png
|
|
|
|
def cleanhtml(raw_html):
|
|
cleanr = re.compile('<.*?>')
|
|
cleantext = re.sub(cleanr, '', raw_html)
|
|
return cleantext
|
|
|
|
def unescape(s):
|
|
s = s.replace("'", "'")
|
|
return s
|
|
|
|
def get_bot_id():
|
|
|
|
###################################################################################################################################
|
|
# get bot_id from bot's username
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select id from accounts where username = (%s) and domain is null", (bot_username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
bot_id = row[0]
|
|
|
|
cur.close()
|
|
|
|
return bot_id
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
print(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def get_user_domain(account_id):
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select username, domain from accounts where id=(%s)", (account_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
username = row[0]
|
|
|
|
domain = row[1]
|
|
|
|
cur.close()
|
|
|
|
return (username, domain)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
print(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def get_piece_name(captured_piece):
|
|
|
|
if captured_piece == 1:
|
|
|
|
piece_name = "un Peó"
|
|
|
|
if captured_piece == 2:
|
|
|
|
piece_name = "un cavall"
|
|
|
|
if captured_piece == 3:
|
|
|
|
piece_name = "l'àlfil"
|
|
|
|
if captured_piece == 4:
|
|
|
|
piece_name = "una torre"
|
|
|
|
if captured_piece == 5:
|
|
|
|
piece_name = "la Reina"
|
|
|
|
if captured_piece == 6:
|
|
|
|
piece_name = "el Rei"
|
|
|
|
return piece_name
|
|
|
|
def get_moved_piece_name(moved_piece):
|
|
|
|
if moved_piece == 1:
|
|
|
|
moved_piece_name = "P"
|
|
|
|
if moved_piece == 2:
|
|
|
|
moved_piece_name = "C"
|
|
|
|
if moved_piece == 3:
|
|
|
|
moved_piece_name = "A"
|
|
|
|
if moved_piece == 4:
|
|
|
|
moved_piece_name = "T"
|
|
|
|
if moved_piece == 5:
|
|
|
|
moved_piece_name = "D"
|
|
|
|
if moved_piece == 6:
|
|
|
|
moved_piece_name = "R"
|
|
|
|
return moved_piece_name
|
|
|
|
def get_notification_data():
|
|
|
|
try:
|
|
|
|
account_id_lst = []
|
|
|
|
status_id_lst = []
|
|
|
|
text_lst = []
|
|
|
|
visibility_lst = []
|
|
|
|
url_lst = []
|
|
|
|
search_text = ['fi','mou','nova','jocs', 'envia']
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
i=0
|
|
|
|
while i < len(search_text):
|
|
|
|
like_text = "%"+search_text[i]+"%"
|
|
|
|
select_query = "select account_id, id, text, visibility, url, created_at from statuses where text like (%s) and created_at + interval '60 minutes' > now() - interval '5 minutes'"
|
|
select_query += " and id=any (select status_id from mentions where account_id=(%s)) order by created_at asc"
|
|
|
|
cur.execute(select_query, (like_text, str(bot_id)))
|
|
|
|
rows = cur.fetchall()
|
|
|
|
for row in rows:
|
|
|
|
account_id_lst.append(row[0])
|
|
|
|
status_id_lst.append(row[1])
|
|
|
|
text_lst.append(row[2])
|
|
|
|
if row[3] == 0:
|
|
visibility_lst.append('public')
|
|
elif row[3] == 1:
|
|
visibility_lst.append('unlisted')
|
|
elif row[3] == 2:
|
|
visibility_lst.append('private')
|
|
elif row[3] == 3:
|
|
visibility_lst.append('direct')
|
|
|
|
url_lst.append(row[4])
|
|
|
|
i += 1
|
|
|
|
cur.close()
|
|
|
|
return (account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
print(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def update_replies(status_id, username, now):
|
|
|
|
post_id = status_id
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
insert_sql = "insert into botreplies(status_id, query_user, status_created_at) values(%s, %s, %s) ON CONFLICT DO NOTHING"
|
|
|
|
cur.execute(insert_sql, (post_id, username, now))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def check_replies(status_id):
|
|
|
|
post_id = status_id
|
|
|
|
replied = False
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select status_id from botreplies where status_id=(%s)", (post_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
replied = True
|
|
|
|
else:
|
|
|
|
replied = False
|
|
|
|
cur.close()
|
|
|
|
return replied
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def current_games():
|
|
|
|
player1_name_lst = []
|
|
|
|
player2_name_lst = []
|
|
|
|
game_status_lst = []
|
|
|
|
game_link_lst = []
|
|
|
|
next_move_lst = []
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user, chess_status, chess_link, next_move from games where not finished")
|
|
|
|
rows = cur.fetchall()
|
|
|
|
for row in rows:
|
|
|
|
player1_name_lst.append(row[0])
|
|
|
|
if row[1] != None:
|
|
|
|
player2_name_lst.append(row[1])
|
|
|
|
else:
|
|
|
|
player2_name_lst.append(" ")
|
|
|
|
game_status_lst.append(row[2])
|
|
|
|
game_link_lst.append(row[3])
|
|
|
|
next_move_lst.append(row[4])
|
|
|
|
cur.close()
|
|
|
|
return (player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def check_games():
|
|
|
|
game_id = ''
|
|
|
|
white_user = ''
|
|
|
|
black_user = ''
|
|
|
|
on_going_game = ''
|
|
|
|
waiting = False
|
|
|
|
playing_user = ''
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
### check if there is an ongoing game
|
|
|
|
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and white_user=(%s) ", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row == None:
|
|
|
|
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and black_user=(%s)", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row == None:
|
|
|
|
is_playing = False
|
|
|
|
else:
|
|
|
|
is_playing = True
|
|
|
|
game_id = row[0]
|
|
|
|
white_user = row[1]
|
|
|
|
if row[1] != None:
|
|
|
|
black_user = row[2]
|
|
|
|
else:
|
|
|
|
black_user = ''
|
|
|
|
on_going_game = row[3]
|
|
|
|
waiting = row[4]
|
|
|
|
playing_user = row[5]
|
|
|
|
else:
|
|
|
|
is_playing = True
|
|
|
|
game_id = row[0]
|
|
|
|
white_user = row[1]
|
|
|
|
if row[2] != None:
|
|
|
|
black_user = row[2]
|
|
|
|
else:
|
|
|
|
black_user = ''
|
|
|
|
on_going_game = row[3]
|
|
|
|
waiting = row[4]
|
|
|
|
playing_user = row[5]
|
|
|
|
cur.close()
|
|
|
|
return (is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def new_game(toot_url):
|
|
|
|
try:
|
|
|
|
game_status = 'waiting'
|
|
|
|
waiting = True
|
|
|
|
board_game = board.fen()
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
insert_query = 'INSERT INTO games(created_at, white_user, chess_game, chess_status, waiting, updated_at, chess_link) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING'
|
|
|
|
cur.execute(insert_query, (now, username, board_game, game_status, waiting, now, toot_url))
|
|
|
|
insert_query = 'INSERT INTO stats(created_at, white_user) VALUES (%s,%s) ON CONFLICT DO NOTHING'
|
|
|
|
cur.execute(insert_query, (now, username,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def update_game(board_game, toot_url):
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set chess_game=(%s), chess_link=(%s), updated_at=(%s) where game_id=(%s)", (board_game, toot_url, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def save_annotation():
|
|
|
|
square_index = chess.SQUARE_NAMES.index(moving[2:])
|
|
|
|
moved_piece = board.piece_type_at(square_index)
|
|
|
|
moved_piece_name = get_moved_piece_name(moved_piece)
|
|
|
|
game_file = "anotations/" + str(game_id) + ".txt"
|
|
|
|
if moved_piece_name == 'P':
|
|
|
|
moved_piece_name = moved_piece_name.replace('P','')
|
|
|
|
if capture == True:
|
|
|
|
moved_piece_name = moved_piece_name + "X"
|
|
|
|
if check == True:
|
|
|
|
moved_piece_name = moved_piece_name + moving[2:] + "+"
|
|
|
|
if checkmate == True:
|
|
|
|
moved_piece_name = moved_piece_name + "+"
|
|
|
|
if bool(board.turn == chess.BLACK) == True:
|
|
|
|
if check != True and checkmate != True:
|
|
|
|
line_data = str(board.fullmove_number) + ". " + moved_piece_name + moving[2:]
|
|
|
|
else:
|
|
|
|
line_data = str(board.fullmove_number) + ". " + moved_piece_name
|
|
|
|
else:
|
|
|
|
moved_piece_name = moved_piece_name.lower()
|
|
|
|
if check != True and checkmate != True:
|
|
|
|
line_data = " - " + moved_piece_name + moving[2:] + "\n"
|
|
|
|
else:
|
|
|
|
line_data = " - " + moved_piece_name + "\n"
|
|
|
|
if not os.path.isfile(game_file):
|
|
|
|
file_header = "Partida: " + str(game_id) + "\n" + white_user + " / " + black_user + "\n\n"
|
|
|
|
with open(game_file, 'w+') as f:
|
|
|
|
f.write(file_header)
|
|
|
|
with open(game_file, 'a') as f:
|
|
|
|
f.write(line_data)
|
|
|
|
else:
|
|
|
|
with open(game_file, 'a') as f:
|
|
|
|
f.write(line_data)
|
|
|
|
def get_email_address(username):
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select email from users where account_id = (select id from accounts where username = (%s) and domain is null)", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
username_email = row[0]
|
|
|
|
cur.close()
|
|
|
|
return username_email
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def send_anotation(game_id):
|
|
|
|
emailed = False
|
|
|
|
game_found = False
|
|
|
|
username_email = get_email_address(username)
|
|
|
|
# Create message object instance
|
|
msg = MIMEMultipart()
|
|
|
|
# Declare message elements
|
|
msg['From'] = smtp_user_login
|
|
msg['To'] = username_email
|
|
msg['Subject'] = "Anotaciones partida n." + game_id
|
|
|
|
# Attach the game anotation
|
|
file_to_attach = "anotations/" + game_id + ".txt"
|
|
try:
|
|
|
|
attachment = open(file_to_attach, 'rb')
|
|
|
|
game_found = True
|
|
|
|
except FileNotFoundError as not_found_error:
|
|
|
|
print(not_found_error)
|
|
return (emailed, game_id, game_found)
|
|
|
|
obj = MIMEBase('application', 'octet-stream')
|
|
obj.set_payload(attachment.read())
|
|
encoders.encode_base64(obj)
|
|
obj.add_header('Content-Disposition', "attachment; filename= "+file_to_attach)
|
|
|
|
# Add the message body to the object instance
|
|
msg.attach(obj)
|
|
|
|
try:
|
|
|
|
# Create the server connection
|
|
server = smtplib.SMTP(smtp_host)
|
|
# Switch the connection over to TLS encryption
|
|
server.starttls()
|
|
# Authenticate with the server
|
|
server.login(smtp_user_login, smtp_user_password)
|
|
# Send the message
|
|
server.sendmail(msg['From'], msg['To'], msg.as_string())
|
|
# Disconnect
|
|
server.quit()
|
|
|
|
print("Successfully sent game anotations to %s" % msg['To'])
|
|
emailed = True
|
|
return (emailed, game_id, game_found)
|
|
|
|
except SMTPAuthenticationError as auth_error:
|
|
|
|
print(auth_error)
|
|
pass
|
|
return emailed
|
|
|
|
except socket.gaierror as socket_error:
|
|
|
|
print(socket_error)
|
|
pass
|
|
return emailed
|
|
|
|
except SMTPRecipientsRefused as recip_error:
|
|
|
|
print(recip_error)
|
|
pass
|
|
return emailed
|
|
|
|
def close_game():
|
|
|
|
now = datetime.now()
|
|
|
|
waiting = False
|
|
|
|
finished = True
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set waiting=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (waiting, finished, now, game_id))
|
|
|
|
cur.execute("update stats set winner=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (username, finished, now, game_id))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def get_stats(player):
|
|
|
|
played_games = 0
|
|
|
|
wins = 0
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select count(*) from stats where white_user = (%s) or black_user = (%s) and finished", (player, player))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
played_games = row[0]
|
|
|
|
cur.execute("select count(*) from stats where winner = (%s) and finished", (player,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
wins = row[0]
|
|
|
|
cur.close()
|
|
|
|
return (played_games, wins)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def waiting_games():
|
|
|
|
try:
|
|
|
|
game_id = ''
|
|
|
|
game_waiting = False
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select game_id from games where waiting order by game_id desc limit 1")
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
game_id = row[0]
|
|
game_waiting = True
|
|
|
|
cur.close()
|
|
|
|
return (game_id, game_waiting)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def join_player():
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
game_status = 'waiting'
|
|
|
|
waiting = True
|
|
|
|
moves = 0
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set black_user=(%s), chess_status='playing', waiting='f', updated_at=(%s), moves=(%s) where game_id=(%s)", (username, now, moves, game_id,))
|
|
|
|
cur.execute("update stats set black_user=(%s), updated_at=(%s) where game_id=(%s)", (username, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.execute("select white_user, chess_game from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_user = row[0]
|
|
|
|
chess_game = row[1]
|
|
|
|
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (white_user, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
game_status = 'playing'
|
|
|
|
return (game_status, white_user, chess_game)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def update_moves(username, game_moves):
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set next_move=(%s), last_move=(%s), moves=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, username, game_moves, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def next_move(playing_user):
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
waiting = True
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user, last_move, moves from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_user = row[0]
|
|
black_user = row[1]
|
|
last_move = row[2]
|
|
moves = row[3]
|
|
|
|
if last_move != None:
|
|
|
|
if playing_user == white_user:
|
|
|
|
playing_user = black_user
|
|
|
|
elif playing_user == black_user:
|
|
|
|
playing_user = white_user
|
|
|
|
else:
|
|
|
|
last_move = white_user
|
|
|
|
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
return playing_user
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def replying():
|
|
|
|
reply = False
|
|
|
|
moving = ''
|
|
|
|
content = cleanhtml(text)
|
|
content = unescape(content)
|
|
|
|
try:
|
|
|
|
start = content.index("@")
|
|
end = content.index(" ")
|
|
if len(content) > end:
|
|
|
|
content = content[0: start:] + content[end +1::]
|
|
|
|
neteja = content.count('@')
|
|
|
|
i = 0
|
|
while i < neteja :
|
|
|
|
start = content.rfind("@")
|
|
end = len(content)
|
|
content = content[0: start:] + content[end +1::]
|
|
i += 1
|
|
|
|
question = content.lower()
|
|
|
|
query_word = question
|
|
query_word_length = len(query_word)
|
|
|
|
if unidecode.unidecode(question)[0:query_word_length] == query_word:
|
|
|
|
if query_word[0:4] == 'nova':
|
|
|
|
reply = True
|
|
|
|
elif query_word[0:3] == 'mou':
|
|
|
|
moving = query_word[4:query_word_length].replace(" ","")
|
|
reply = True
|
|
|
|
elif query_word[0:2] == 'fi':
|
|
|
|
reply = True
|
|
|
|
elif query_word[0:4] == 'jocs':
|
|
|
|
reply = True
|
|
|
|
elif query_word[0:5] == 'envia':
|
|
|
|
reply = True
|
|
|
|
else:
|
|
|
|
reply = False
|
|
|
|
return (reply, query_word, moving)
|
|
|
|
except ValueError as v_error:
|
|
|
|
print(v_error)
|
|
|
|
def mastodon():
|
|
|
|
# Load secrets from secrets file
|
|
secrets_filepath = "secrets/secrets.txt"
|
|
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
|
|
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
|
|
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
|
|
|
|
# Load configuration from config file
|
|
config_filepath = "config/config.txt"
|
|
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath)
|
|
bot_username = get_parameter("bot_username", config_filepath)
|
|
|
|
# Initialise Mastodon API
|
|
mastodon = Mastodon(
|
|
client_id = uc_client_id,
|
|
client_secret = uc_client_secret,
|
|
access_token = uc_access_token,
|
|
api_base_url = 'https://' + mastodon_hostname,
|
|
)
|
|
|
|
# Initialise access headers
|
|
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
|
|
|
|
return (mastodon, mastodon_hostname, bot_username)
|
|
|
|
def db_config():
|
|
|
|
# Load db configuration from config file
|
|
config_filepath = "config/db_config.txt"
|
|
mastodon_db = get_parameter("mastodon_db", config_filepath)
|
|
mastodon_db_user = get_parameter("mastodon_db_user", config_filepath)
|
|
chess_db = get_parameter("chess_db", config_filepath)
|
|
chess_db_user = get_parameter("chess_db_user", config_filepath)
|
|
|
|
return (mastodon_db, mastodon_db_user, chess_db, chess_db_user)
|
|
|
|
def smtp_config():
|
|
|
|
smtp_filepath = "config/smtp_config.txt"
|
|
smtp_host = get_parameter("smtp_host", smtp_filepath)
|
|
smtp_user_login = get_parameter("smtp_user_login", smtp_filepath)
|
|
smtp_user_password = get_parameter("smtp_user_password", smtp_filepath)
|
|
|
|
return (smtp_host, smtp_user_login, smtp_user_password)
|
|
|
|
def get_parameter( parameter, file_path ):
|
|
|
|
if not os.path.isfile(file_path):
|
|
print("File %s not found, exiting."%file_path)
|
|
sys.exit(0)
|
|
|
|
with open( file_path ) as f:
|
|
for line in f:
|
|
if line.startswith( parameter ):
|
|
return line.replace(parameter + ":", "").strip()
|
|
|
|
print(file_path + " Missing parameter %s "%parameter)
|
|
sys.exit(0)
|
|
|
|
def create_dir():
|
|
if not os.path.exists('games'):
|
|
os.makedirs('games')
|
|
if not os.path.exists('anotations'):
|
|
os.makedirs('anotations')
|
|
|
|
def usage():
|
|
|
|
print('usage: python ' + sys.argv[0] + ' --play')
|
|
|
|
###############################################################################
|
|
# main
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# usage modes
|
|
|
|
if len(sys.argv) == 1:
|
|
|
|
usage()
|
|
|
|
elif len(sys.argv) == 2:
|
|
|
|
if sys.argv[1] == '--play':
|
|
|
|
mastodon, mastodon_hostname, bot_username = mastodon()
|
|
|
|
mastodon_db, mastodon_db_user, chess_db, chess_db_user = db_config()
|
|
|
|
smtp_host, smtp_user_login, smtp_user_password = smtp_config()
|
|
|
|
now = datetime.now()
|
|
|
|
create_dir()
|
|
|
|
bot_id = get_bot_id()
|
|
|
|
account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst = get_notification_data()
|
|
|
|
i = 0
|
|
while i < len(account_id_lst):
|
|
|
|
account_id = account_id_lst[i]
|
|
|
|
username, domain = get_user_domain(account_id)
|
|
|
|
status_id = status_id_lst[i]
|
|
|
|
replied = check_replies(status_id)
|
|
|
|
if replied == True or domain != None:
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
if domain != None:
|
|
|
|
update_replies(username, status_id)
|
|
|
|
i += 1
|
|
|
|
# listen them or not
|
|
|
|
text = text_lst[i]
|
|
|
|
reply, query_word, moving = replying()
|
|
|
|
visibility = visibility_lst[i]
|
|
|
|
status_url = url_lst[i]
|
|
|
|
if query_word != "jocs":
|
|
|
|
is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user = check_games()
|
|
|
|
if game_id == '':
|
|
|
|
game_id, game_waiting = waiting_games()
|
|
|
|
else:
|
|
|
|
is_playing = True
|
|
|
|
if reply == True and is_playing == False:
|
|
|
|
if query_word == 'nova' and not game_waiting:
|
|
|
|
board = chess.Board()
|
|
|
|
svgfile = chess.svg.board(board=board)
|
|
|
|
board_file = 'games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
toot_text = "@"+username+ " partida iniciada! Esperant jugador... " +"\n"
|
|
|
|
toot_text += '\n'
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
toot_url = toot_id.uri
|
|
|
|
new_game(toot_url)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word == 'nova' and game_waiting:
|
|
|
|
game_status, white_user, chess_game = join_player()
|
|
|
|
playing_user = white_user
|
|
|
|
next_move(username)
|
|
|
|
board = chess.Board(chess_game)
|
|
|
|
svgfile = chess.svg.board(board=board)
|
|
|
|
board_file = 'games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
toot_text = "@"+username + " jugues amb " + white_user + "\n"
|
|
|
|
toot_text += '\n'
|
|
|
|
toot_text += "@"+white_user + ": el teu torn" + "\n"
|
|
|
|
toot_text += '\n'
|
|
|
|
toot_text += 'partida: ' + str(game_id) + ' ' + '#escacs' + '\n'
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
game_moves = board.ply()
|
|
|
|
update_moves(username, game_moves)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word[0:5] == 'envia':
|
|
|
|
query_word_length = len(query_word)
|
|
|
|
send_game = query_word[6:query_word_length].replace(' ', '')
|
|
|
|
emailed, game_id, game_found = send_anotation(send_game)
|
|
|
|
if emailed == False and game_found == True:
|
|
|
|
toot_text = "@"+username + " error al enviar les anotacions :-("
|
|
|
|
elif emailed == True and game_found == True:
|
|
|
|
toot_text = "@"+username + " les anotaciones de la partida n." + str(game_id) + " enviades amb èxit!"
|
|
|
|
elif emailed == False and game_found == False:
|
|
|
|
toot_text = "@"+username + " la partida n." + str(game_id) + " no existeix..."
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
else:
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif reply and is_playing:
|
|
|
|
if query_word == 'nova':
|
|
|
|
toot_text = "@"+username + ' ja tenies iniciada una partida!' + '\n'
|
|
|
|
if black_user != '':
|
|
|
|
toot_text += '@'+white_user + ' / ' + '@'+black_user + '\n'
|
|
|
|
else:
|
|
|
|
toot_text += "espera l'altre jugador" + '\n'
|
|
|
|
toot_text += '\n'
|
|
|
|
toot_text += 'partida: ' + str(game_id) + ' ' + '#escacs' + '\n'
|
|
|
|
board = chess.Board(on_going_game)
|
|
|
|
svgfile = chess.svg.board(board=board)
|
|
|
|
board_file = 'games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word[0:3] == 'mou' and playing_user == username:
|
|
|
|
board = chess.Board(on_going_game)
|
|
|
|
try:
|
|
|
|
if chess.Move.from_uci(moving) not in board.legal_moves:
|
|
|
|
toot_text = "@"+username + ": " + moving + " és un moviment il·legal. Torna a tirar." + "\n"
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
else:
|
|
|
|
check = False
|
|
|
|
playing_user = next_move(username)
|
|
|
|
if bool(board.is_capture(chess.Move.from_uci(moving))):
|
|
|
|
capture = True
|
|
|
|
square_capture_index = chess.SQUARE_NAMES.index(moving[2:])
|
|
|
|
captured_piece = board.piece_type_at(square_capture_index)
|
|
|
|
piece_name = get_piece_name(captured_piece)
|
|
|
|
else:
|
|
|
|
capture = False
|
|
|
|
board.push(chess.Move.from_uci(moving))
|
|
|
|
if bool(board.is_check()):
|
|
|
|
if username == white_user:
|
|
|
|
king_square = board.king(chess.BLACK)
|
|
check = True
|
|
|
|
else:
|
|
|
|
king_square = board.king(chess.WHITE)
|
|
check = True
|
|
|
|
if board.is_game_over() == True:
|
|
|
|
game_moves = board.ply()
|
|
|
|
close_game()
|
|
|
|
checkmate = True
|
|
|
|
else:
|
|
|
|
checkmate = False
|
|
|
|
if check == True and checkmate == False:
|
|
|
|
toot_text = "@"+playing_user + " " + username + " t'ha fet escac!\n"
|
|
|
|
elif check == True and checkmate == True:
|
|
|
|
toot_text = "\nEscac i mat! (en " + str(game_moves) + " moviments)" + "\n\nEl guanyador és: " + "@"+username + '\n'
|
|
|
|
toot_text += "\n@"+playing_user + ": ben jugat!" + "\n"
|
|
|
|
toot_text += "\nPartides guanyades:" + "\n"
|
|
|
|
played_games, wins = get_stats(username)
|
|
|
|
toot_text += username + ": " + str(wins) + " de " + str(played_games) + "\n"
|
|
|
|
played_games, wins = get_stats(playing_user)
|
|
|
|
toot_text += playing_user + ": " + str(wins) + " de " + str(played_games) + "\n"
|
|
|
|
else:
|
|
|
|
toot_text = "@"+playing_user + ' el teu torn.'+ '\n'
|
|
|
|
if capture == True and checkmate == False:
|
|
|
|
toot_text += "\n* has perdut " + piece_name + "!\n"
|
|
|
|
toot_text += '\npartida: ' + str(game_id) + ' ' + '#escacs' + '\n'
|
|
|
|
if username == white_user:
|
|
|
|
if check == True:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving), check=board.king(chess.BLACK))
|
|
|
|
else:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving))
|
|
|
|
else:
|
|
|
|
if check == True:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving), check=board.king(chess.WHITE))
|
|
|
|
else:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving))
|
|
|
|
board_file = 'games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
toot_url = toot_id.uri
|
|
|
|
board_game = board.fen()
|
|
|
|
update_game(board_game, toot_url)
|
|
|
|
game_moves = board.ply()
|
|
|
|
save_annotation()
|
|
|
|
update_moves(username, game_moves)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
except ValueError as v_error:
|
|
|
|
print(v_error)
|
|
|
|
toot_text = "@"+username + ' moviment il·legal! (' + moving + '!?)\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
pass
|
|
|
|
except AssertionError as a_error:
|
|
|
|
print(a_error)
|
|
|
|
toot_text = "@"+username + ' moviment il·legal! (' + moving + '!?)\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
pass
|
|
|
|
elif query_word[0:2] == 'fi':
|
|
|
|
if black_user != '':
|
|
|
|
if username == white_user:
|
|
|
|
toot_text = "@"+username + " ha deixat la partida amb " + "@"+black_user
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
close_game()
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
toot_text = "@"+username + " ha deixat la partida amb " + white_user
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
close_game()
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
toot_text = "@"+username + " has abandonat la partida en espera."
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|
|
|
close_game()
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
elif query_word == 'jocs':
|
|
|
|
player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst = current_games()
|
|
|
|
if len(player1_name_lst) > 0:
|
|
|
|
toot_text = "@"+username + " partides iniciades:" + "\n"
|
|
|
|
i = 0
|
|
while i < len(player1_name_lst):
|
|
|
|
if game_status_lst[i] == 'waiting':
|
|
|
|
toot_text += "\n" + player1_name_lst[i] + " / " + player2_name_lst[i] + " (en espera...)" + "\n"
|
|
|
|
else:
|
|
|
|
if next_move_lst[i] == player1_name_lst[i]:
|
|
|
|
toot_text += "\n*" + player1_name_lst[i] + " / " + player2_name_lst[i] + " (en joc)" + "\n"
|
|
|
|
else:
|
|
|
|
toot_text += "\n" + player1_name_lst[i] + " / *" + player2_name_lst[i] + " (en joc)" + "\n"
|
|
|
|
if game_link_lst[i] != None:
|
|
|
|
toot_text += str(game_link_lst[i]) + "\n"
|
|
|
|
i += 1
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
else:
|
|
|
|
toot_text = "@"+username + " cap partida en joc" + "\n"
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
else:
|
|
|
|
if playing_user == None:
|
|
|
|
toot_text = "@"+username + " no és el teu torn." + "\n"
|
|
|
|
else:
|
|
|
|
toot_text = "@"+username + " és el torn de " + playing_user + "\n"
|
|
|
|
toot_text += '\n'
|
|
|
|
toot_text += 'partida: ' + str(game_id) + ' ' + '#escacs' + '\n'
|
|
|
|
board = chess.Board(on_going_game)
|
|
|
|
if username == white_user:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.BLACK)
|
|
|
|
else:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.WHITE)
|
|
|
|
board_file = 'games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
|
|
i += 1
|
|
|