2276 lines
66 KiB
Python
2276 lines
66 KiB
Python
import sys
|
|
import os
|
|
import os.path
|
|
import re
|
|
import unidecode
|
|
from datetime import datetime, timedelta
|
|
import time
|
|
from mastodon import Mastodon
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
import smtplib
|
|
from smtplib import SMTPException, SMTPAuthenticationError, SMTPConnectError, SMTPRecipientsRefused
|
|
import socket
|
|
from socket import gaierror
|
|
import psycopg2
|
|
import chess
|
|
import chess.svg
|
|
from cairosvg import svg2png
|
|
import chess.pgn
|
|
from PIL import Image, ImageFont, ImageDraw
|
|
import math
|
|
import pdb
|
|
|
|
def cleanhtml(raw_html):
|
|
cleanr = re.compile('<.*?>')
|
|
cleantext = re.sub(cleanr, '', raw_html)
|
|
return cleantext
|
|
|
|
def unescape(s):
|
|
s = s.replace("'", "'")
|
|
return s
|
|
|
|
# Function to calculate the Probability
|
|
def Probability(rating1, rating2):
|
|
|
|
return 1.0 * 1.0 / (1 + 1.0 * math.pow(10, 1.0 * (rating1 - rating2) / 400))
|
|
|
|
# Function to calculate Elo rating
|
|
# K is a constant.
|
|
# d determines whether
|
|
# Player A wins or Player B.
|
|
def EloRating(Ra, Rb, K, d):
|
|
|
|
# To calculate the Winning
|
|
# Probability of Player B
|
|
Pb = Probability(Ra, Rb)
|
|
|
|
# To calculate the Winning
|
|
# Probability of Player A
|
|
Pa = Probability(Rb, Ra)
|
|
|
|
# Case -1 When Player A wins
|
|
# Updating the Elo Ratings
|
|
if (d == 1) :
|
|
Ra = Ra + K * (1 - Pa)
|
|
Rb = Rb + K * (0 - Pb)
|
|
|
|
# Case -2 When Player B wins
|
|
# Updating the Elo Ratings
|
|
else :
|
|
Ra = Ra + K * (0 - Pa)
|
|
Rb = Rb + K * (1 - Pb)
|
|
|
|
Ra = round(Ra, 6)
|
|
Rb = round(Rb, 6)
|
|
print("Updated Ratings:-")
|
|
print("Ra =", Ra," Rb =", Rb)
|
|
|
|
return(Ra, Rb)
|
|
|
|
# This code is contributed by
|
|
# Smitha Dinesh Semwal
|
|
|
|
def create_panel(username, rating, played_games, wins):
|
|
|
|
if played_games > 0 and wins > 0:
|
|
|
|
ratio = round((wins * 100) / played_games, 2)
|
|
|
|
else:
|
|
|
|
ratio = 0
|
|
|
|
x = 10
|
|
y = 10
|
|
|
|
fons = Image.open('app/panel/fons.jpg')
|
|
print(fons.size)
|
|
|
|
large, high = fons.size
|
|
|
|
panel_title_str = get_locale("panel_title_str", player_lang)
|
|
|
|
title_length = len(panel_title_str + ' ' + username)
|
|
|
|
# add chess icon
|
|
icon_path = 'app/panel/chess.png'
|
|
icon_img = Image.open(icon_path)
|
|
|
|
fons.paste(icon_img, (y+300, x+50), icon_img)
|
|
|
|
logo_img = Image.open('app/panel/logo.png')
|
|
fons.paste(logo_img, (15, 320), logo_img)
|
|
|
|
fons.save('app/panel/panel.png',"PNG")
|
|
|
|
base = Image.open('app/panel/panel.png').convert('RGBA')
|
|
txt = Image.new('RGBA', base.size, (255,255,255,0))
|
|
fnt = ImageFont.truetype('app/fonts/DroidSans.ttf', 40, layout_engine=ImageFont.LAYOUT_BASIC)
|
|
# get a drawing context
|
|
draw = ImageDraw.Draw(txt)
|
|
|
|
draw.text((((large / 2) - (title_length * 2)),x+20), panel_title_str + ' ' + username, font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|
|
|
fnt = ImageFont.truetype('app/fonts/DroidSans.ttf', 35, layout_engine=ImageFont.LAYOUT_BASIC)
|
|
|
|
panel_games_str = get_locale("panel_games_str", player_lang)
|
|
panel_wins_str = get_locale("panel_wins_str", player_lang)
|
|
panel_ratio_str = get_locale("panel_ratio_str", player_lang)
|
|
panel_elo_rating_str = get_locale("panel_elo_rating_str", player_lang)
|
|
|
|
draw.text((y+70,x+80), panel_games_str + ': ' + str(played_games), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|
draw.text((y+70,x+130), panel_wins_str + ': ' + str(wins), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|
draw.text((y+70,x+180), panel_ratio_str + ': ' + str(ratio) + '%', font=fnt, fill=(255,255,255,220))
|
|
draw.text((y+70,x+230), panel_elo_rating_str + ': ' + str(round(rating)), font=fnt, fill=(255,255,255,220))
|
|
|
|
fnt = ImageFont.truetype('app/fonts/DroidSans.ttf', 15, layout_engine=ImageFont.LAYOUT_BASIC)
|
|
|
|
draw.text((60,330), bot_username + '@' + mastodon_hostname + ' - 2020', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
|
|
|
out = Image.alpha_composite(base, txt)
|
|
out.save('app/panel/' + username + '_panel.png')
|
|
|
|
def get_piece_name(captured_piece):
|
|
|
|
if captured_piece == 1:
|
|
|
|
pawn_piece = get_locale("pawn_piece", player_lang)
|
|
|
|
piece_name = pawn_piece
|
|
|
|
if captured_piece == 2:
|
|
|
|
knight_piece = get_locale("knight_piece", player_lang)
|
|
|
|
piece_name = knight_piece
|
|
|
|
if captured_piece == 3:
|
|
|
|
bishop_piece = get_locale("bishop_piece", player_lang)
|
|
|
|
piece_name = bishop_piece
|
|
|
|
if captured_piece == 4:
|
|
|
|
rook_piece = get_locale("rook_piece", player_lang)
|
|
|
|
piece_name = rook_piece
|
|
|
|
if captured_piece == 5:
|
|
|
|
queen_piece = get_locale("queen_piece", player_lang)
|
|
|
|
piece_name = queen_piece
|
|
|
|
if captured_piece == 6:
|
|
|
|
king_piece = get_locale("king_piece", player_lang)
|
|
|
|
piece_name = king_piece
|
|
|
|
return piece_name
|
|
|
|
def get_player_langs(account_id):
|
|
|
|
lang_changed = False
|
|
|
|
conn = None
|
|
|
|
try:
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
select_query = "select lang from players where player_id = (%s)"
|
|
|
|
cur.execute(select_query, (str(account_id),))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
player_lang = row[0]
|
|
|
|
else:
|
|
|
|
lang_changed, player_lang = set_lang (account_id, username, bot_lang)
|
|
|
|
cur.close()
|
|
|
|
return (lang_changed, player_lang)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
print(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def get_lang(player):
|
|
|
|
conn = None
|
|
|
|
try:
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
select_query = "select lang from players where player_name = (%s)"
|
|
|
|
cur.execute(select_query, (player,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
player_lang = row[0]
|
|
|
|
cur.close()
|
|
|
|
return (player_lang)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
print(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def set_lang(account_id, username, new_lang):
|
|
|
|
lang_changed = False
|
|
|
|
conn = None
|
|
|
|
try:
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
select_query = "select lang from players where player_id = (%s)"
|
|
|
|
cur.execute(select_query, (str(account_id),))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
player_lang = row[0]
|
|
|
|
if new_lang not in ['ca', 'es', 'fr', 'en']:
|
|
|
|
return lang_changed, player_lang
|
|
|
|
if row == None:
|
|
|
|
insert_sql = "insert into players(player_id, player_name, lang) values(%s, %s, %s) ON CONFLICT DO NOTHING"
|
|
|
|
cur.execute(insert_sql, (account_id, username, new_lang))
|
|
|
|
lang_changed = True
|
|
|
|
else:
|
|
|
|
update_sql = "update players set lang=(%s) where player_id=(%s)"
|
|
|
|
cur.execute(update_sql, (new_lang, account_id))
|
|
|
|
lang_changed = True
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
player_lang = new_lang
|
|
|
|
return (lang_changed, player_lang)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
print(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def get_locale( parameter, player_lang):
|
|
|
|
if player_lang not in ['ca','es','fr','en']:
|
|
print("lang must be 'ca', 'es', 'fr' or 'en'")
|
|
sys.exit(0)
|
|
|
|
language_filepath = f"app/locales/{player_lang}.txt"
|
|
|
|
if not os.path.isfile(language_filepath):
|
|
print("File %s not found, exiting."%language_filepath)
|
|
sys.exit(0)
|
|
|
|
with open( language_filepath ) as f:
|
|
for line in f:
|
|
if line.startswith( parameter ):
|
|
return line.replace(parameter + ":", "").strip()
|
|
|
|
print(language_filepath + " Missing parameter %s "%parameter)
|
|
sys.exit(0)
|
|
|
|
def current_games():
|
|
|
|
player1_name_lst = []
|
|
|
|
player2_name_lst = []
|
|
|
|
game_status_lst = []
|
|
|
|
game_link_lst = []
|
|
|
|
next_move_lst = []
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user, chess_status, chess_link, next_move from games where not finished")
|
|
|
|
rows = cur.fetchall()
|
|
|
|
for row in rows:
|
|
|
|
player1_name_lst.append(row[0])
|
|
|
|
if row[1] != None:
|
|
|
|
player2_name_lst.append(row[1])
|
|
|
|
else:
|
|
|
|
player2_name_lst.append(" ")
|
|
|
|
game_status_lst.append(row[2])
|
|
|
|
game_link_lst.append(row[3])
|
|
|
|
next_move_lst.append(row[4])
|
|
|
|
cur.close()
|
|
|
|
return (player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def check_games():
|
|
|
|
game_id = ''
|
|
|
|
white_user = ''
|
|
|
|
black_user = ''
|
|
|
|
on_going_game = ''
|
|
|
|
waiting = False
|
|
|
|
playing_user = ''
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
### check if there is an ongoing game
|
|
|
|
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and white_user=(%s) ", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row == None:
|
|
|
|
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and black_user=(%s)", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row == None:
|
|
|
|
is_playing = False
|
|
|
|
else:
|
|
|
|
is_playing = True
|
|
|
|
game_id = row[0]
|
|
|
|
white_user = row[1]
|
|
|
|
if row[1] != None:
|
|
|
|
black_user = row[2]
|
|
|
|
else:
|
|
|
|
black_user = ''
|
|
|
|
on_going_game = row[3]
|
|
|
|
waiting = row[4]
|
|
|
|
playing_user = row[5]
|
|
|
|
else:
|
|
|
|
is_playing = True
|
|
|
|
game_id = row[0]
|
|
|
|
white_user = row[1]
|
|
|
|
if row[2] != None:
|
|
|
|
black_user = row[2]
|
|
|
|
else:
|
|
|
|
black_user = ''
|
|
|
|
on_going_game = row[3]
|
|
|
|
waiting = row[4]
|
|
|
|
playing_user = row[5]
|
|
|
|
cur.close()
|
|
|
|
return (is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def new_game(toot_url):
|
|
|
|
try:
|
|
|
|
game_status = 'waiting'
|
|
|
|
waiting = True
|
|
|
|
board_game = board.fen()
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
insert_query = 'INSERT INTO games(created_at, white_user, chess_game, chess_status, waiting, updated_at, chess_link) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING'
|
|
|
|
cur.execute(insert_query, (now, username, board_game, game_status, waiting, now, toot_url))
|
|
|
|
insert_query = 'INSERT INTO stats(created_at, white_user) VALUES (%s,%s) ON CONFLICT DO NOTHING'
|
|
|
|
cur.execute(insert_query, (now, username,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def update_game(board_game, toot_url):
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set chess_game=(%s), chess_link=(%s), updated_at=(%s) where game_id=(%s)", (board_game, toot_url, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def write_result(filename, old_string, new_string):
|
|
|
|
with open(filename) as f:
|
|
s = f.read()
|
|
if old_string not in s:
|
|
print('"{old_string}" not found in {filename}.'.format(**locals()))
|
|
return
|
|
|
|
with open(filename, 'w') as f:
|
|
print('Changing "{old_string}" to "{new_string}" in {filename}'.format(**locals()))
|
|
s = s.replace(old_string, new_string)
|
|
f.write(s)
|
|
|
|
def save_anotation(moving, san_move):
|
|
|
|
pgn_file = "app/anotations/" + str(game_id) + ".pgn"
|
|
|
|
if bool(board.turn == chess.BLACK) == True:
|
|
|
|
line_data = str(board.fullmove_number) + ". " + san_move
|
|
|
|
else:
|
|
|
|
line_data = " " + san_move + " "
|
|
|
|
if checkmate or stalemate:
|
|
|
|
line_data = line_data + " " + board.result()
|
|
|
|
write_result(pgn_file, '[Result ]', '[Result ' + board.result() + ' ]')
|
|
|
|
if not os.path.isfile(pgn_file):
|
|
|
|
file_header = '[Event ' + game_name + ': ' + str(game_id) + ']\n'
|
|
file_header += '[Site ' + mastodon_hostname + ']' + '\n'
|
|
file_header += '[Date ' + str(datetime.today().strftime('%d-%m-%Y')) + ']' + '\n'
|
|
file_header += '[Round ' + str(game_id) + ']' + '\n'
|
|
file_header += '[White ' + white_user + ' ]' + '\n'
|
|
file_header += '[Black ' + black_user + ' ]' + '\n'
|
|
file_header += '[Result ]' + '\n'
|
|
file_header += '[Time ' + str(datetime.now().strftime('%H:%M:%S')) + ']' + '\n\n'
|
|
|
|
with open(pgn_file, 'w+') as f:
|
|
|
|
f.write(file_header)
|
|
|
|
with open(pgn_file, 'a') as f:
|
|
|
|
f.write(line_data)
|
|
|
|
else:
|
|
|
|
with open(pgn_file, 'a') as f:
|
|
|
|
f.write(line_data)
|
|
|
|
def get_email_address(username):
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select email from users where account_id = (select id from accounts where username = (%s) and domain is null)", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
username_email = row[0]
|
|
|
|
else:
|
|
|
|
username_email = None
|
|
|
|
cur.close()
|
|
|
|
return username_email
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def send_anotation(game_id):
|
|
|
|
emailed = False
|
|
|
|
game_found = False
|
|
|
|
username_email = get_email_address(username)
|
|
|
|
if username_email == None:
|
|
|
|
return (emailed, game_id, game_found)
|
|
|
|
# Create message object instance
|
|
msg = MIMEMultipart()
|
|
|
|
# Declare message elements
|
|
msg['From'] = smtp_user_login
|
|
msg['To'] = username_email
|
|
|
|
player_lang = get_lang(username)
|
|
email_subject = get_locale("email_subject", player_lang)
|
|
|
|
msg['Subject'] = email_subject + game_id
|
|
|
|
# Attach the game anotation
|
|
file_to_attach = "app/anotations/" + game_id + ".pgn"
|
|
try:
|
|
|
|
attachment = open(file_to_attach, 'rb')
|
|
|
|
game_found = True
|
|
|
|
except FileNotFoundError as not_found_error:
|
|
|
|
print(not_found_error)
|
|
return (emailed, game_id, game_found)
|
|
|
|
obj = MIMEBase('application', 'octet-stream')
|
|
obj.set_payload(attachment.read())
|
|
encoders.encode_base64(obj)
|
|
obj.add_header('Content-Disposition', "attachment; filename= "+file_to_attach)
|
|
|
|
# Add the message body to the object instance
|
|
msg.attach(obj)
|
|
|
|
try:
|
|
|
|
# Create the server connection
|
|
server = smtplib.SMTP(smtp_host)
|
|
# Switch the connection over to TLS encryption
|
|
server.starttls()
|
|
# Authenticate with the server
|
|
server.login(smtp_user_login, smtp_user_password)
|
|
# Send the message
|
|
server.sendmail(msg['From'], msg['To'], msg.as_string())
|
|
# Disconnect
|
|
server.quit()
|
|
|
|
print("Successfully sent game anotations to %s" % msg['To'])
|
|
emailed = True
|
|
return (emailed, game_id, game_found)
|
|
|
|
except SMTPAuthenticationError as auth_error:
|
|
|
|
print(auth_error)
|
|
pass
|
|
return (emailed, game_id, game_found)
|
|
|
|
except socket.gaierror as socket_error:
|
|
|
|
print(socket_error)
|
|
pass
|
|
return (emailed, game_id, game_found)
|
|
|
|
except SMTPRecipientsRefused as recip_error:
|
|
|
|
print(recip_error)
|
|
pass
|
|
return (emailed, game_id, game_found)
|
|
|
|
|
|
def claim_draw(username):
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_player = row[0]
|
|
|
|
black_player = row[1]
|
|
|
|
if white_player == username:
|
|
|
|
username_lang = get_lang(username)
|
|
claim_draw_str = get_locale("claim_draw_str", username_lang)
|
|
|
|
toot_text = '@'+username + ' ' + claim_draw_str + ' @'+black_player + '\n'
|
|
|
|
cur.execute("update games set white_stalemate = 't' where game_id=(%s)", (game_id,))
|
|
|
|
else:
|
|
|
|
username_lang = get_lang(username)
|
|
claim_draw_str = get_locale("claim_draw_str", username_lang)
|
|
|
|
toot_text = '@'+username + ' ' + claim_draw_str + ' @'+white_player + '\n'
|
|
|
|
cur.execute("update games set black_stalemate = 't' where game_id=(%s)", (game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.execute("select white_stalemate, black_stalemate from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_stalemate = row[0]
|
|
|
|
black_stalemate = row[1]
|
|
|
|
cur.close()
|
|
|
|
if white_stalemate == True and black_stalemate == True:
|
|
|
|
stalemate = True
|
|
|
|
else:
|
|
|
|
stalemate = False
|
|
|
|
return (white_player, black_player, toot_text, stalemate)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
|
|
def close_game(username, checkmate):
|
|
|
|
d = 0
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_player = row[0]
|
|
|
|
black_player = row[1]
|
|
|
|
cur.execute("select elo_rating from players where player_name = (%s)", (white_player,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
if row[0] != None:
|
|
|
|
white_rating = row[0]
|
|
|
|
else:
|
|
|
|
white_rating = 1500
|
|
|
|
else:
|
|
|
|
white_rating = 1500
|
|
|
|
cur.execute("select elo_rating from players where player_name = (%s)", (black_player,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
if row[0] != None:
|
|
|
|
black_rating = row[0]
|
|
|
|
else:
|
|
|
|
black_rating = 1500
|
|
|
|
else:
|
|
|
|
black_rating = 1500
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
now = datetime.now()
|
|
|
|
winner = ''
|
|
|
|
waiting = False
|
|
|
|
finished = True
|
|
|
|
if stalemate == True:
|
|
|
|
winner = "stalemate"
|
|
|
|
if black_user == '':
|
|
|
|
winner = 'none'
|
|
|
|
if checkmate:
|
|
|
|
winner = username
|
|
|
|
if winner == white_player:
|
|
|
|
d = 1
|
|
|
|
else:
|
|
|
|
if query_word == search_end and username == white_user and stalemate == False:
|
|
|
|
winner = black_user
|
|
|
|
d = 2
|
|
|
|
elif query_word == search_end and username == black_user and stalemate == False:
|
|
|
|
winner = white_user
|
|
|
|
d = 1
|
|
|
|
K = 30
|
|
|
|
new_white_rating, new_black_rating = EloRating(white_rating, black_rating, K, d)
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set waiting=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (waiting, finished, now, game_id))
|
|
|
|
cur.execute("update stats set winner=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (winner, finished, now, game_id))
|
|
|
|
cur.execute("update players set elo_rating=(%s) where player_name=(%s)", (new_white_rating, white_user))
|
|
|
|
cur.execute("update players set elo_rating=(%s) where player_name=(%s)", (new_black_rating, black_user))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def get_stats(player):
|
|
|
|
played_games = 0
|
|
|
|
wins = 0
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select count(*) from games where (white_user = (%s) or black_user = (%s)) and finished and moves > 0", (player, player))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
played_games = row[0]
|
|
|
|
cur.execute("select count(*) from stats where winner = (%s) and finished", (player,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
wins = row[0]
|
|
|
|
cur.execute("select elo_rating from players where player_name = (%s)", (player,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row[0] != None:
|
|
|
|
rating = row[0]
|
|
|
|
else:
|
|
|
|
rating = 1500
|
|
|
|
cur.close()
|
|
|
|
return (rating, played_games, wins)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def waiting_games():
|
|
|
|
try:
|
|
|
|
game_id = ''
|
|
|
|
game_waiting = False
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select game_id from games where waiting order by game_id desc limit 1")
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
game_id = row[0]
|
|
game_waiting = True
|
|
|
|
cur.close()
|
|
|
|
return (game_id, game_waiting)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def join_player():
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
game_status = 'waiting'
|
|
|
|
waiting = True
|
|
|
|
moves = 0
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set black_user=(%s), chess_status='playing', waiting='f', updated_at=(%s), moves=(%s) where game_id=(%s)", (username, now, moves, game_id,))
|
|
|
|
cur.execute("update stats set black_user=(%s), updated_at=(%s) where game_id=(%s)", (username, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.execute("select white_user, chess_game from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_user = row[0]
|
|
|
|
chess_game = row[1]
|
|
|
|
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (white_user, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
game_status = 'playing'
|
|
|
|
return (game_status, white_user, chess_game)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def update_moves(username, game_moves):
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set next_move=(%s), last_move=(%s), moves=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, username, game_moves, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def next_move(playing_user):
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
waiting = True
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user, last_move, moves from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_user = row[0]
|
|
black_user = row[1]
|
|
last_move = row[2]
|
|
moves = row[3]
|
|
|
|
if last_move != None:
|
|
|
|
if playing_user == white_user:
|
|
|
|
playing_user = black_user
|
|
|
|
elif playing_user == black_user:
|
|
|
|
playing_user = white_user
|
|
|
|
else:
|
|
|
|
last_move = white_user
|
|
|
|
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
return playing_user
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def toot_help():
|
|
|
|
x = 10
|
|
y = 10
|
|
|
|
fons = Image.open('app/panel/fons.jpg')
|
|
print(fons.size)
|
|
|
|
logo_img = Image.open('app/panel/logo.png')
|
|
fons.paste(logo_img, (15, 320), logo_img)
|
|
|
|
fons.save('app/panel/panel.png',"PNG")
|
|
|
|
base = Image.open('app/panel/panel.png').convert('RGBA')
|
|
txt = Image.new('RGBA', base.size, (255,255,255,0))
|
|
fnt = ImageFont.truetype('app/fonts/DroidSans.ttf', 40, layout_engine=ImageFont.LAYOUT_BASIC)
|
|
# get a drawing context
|
|
draw = ImageDraw.Draw(txt)
|
|
|
|
draw.text((y+270,x+10), search_help, font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|
|
|
fnt = ImageFont.truetype('app/fonts/DroidSans.ttf', 18, layout_engine=ImageFont.LAYOUT_BASIC)
|
|
|
|
start_or_join_a_new_game = get_locale("start_or_join_a_new_game", player_lang)
|
|
move_a_piece = get_locale("move_a_piece", player_lang)
|
|
leave_a_game = get_locale("leave_a_game", player_lang)
|
|
list_games = get_locale("list_games", player_lang)
|
|
get_a_game_anotation = get_locale("get_a_game_anotation", player_lang)
|
|
claim_a_draw = get_locale("claim_a_draw", player_lang)
|
|
post_my_panel_str = get_locale("post_my_panel_str", player_lang)
|
|
change_lang_str = get_locale("change_lang_str", player_lang)
|
|
|
|
draw.text((y+80,x+70), '@'+bot_username + ' ' + start_or_join_a_new_game , font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|
draw.text((y+80,x+100), '@'+bot_username + ' ' + move_a_piece, font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|
draw.text((y+80,x+130), '@'+bot_username + ' ' + leave_a_game, font=fnt, fill=(255,255,255,220))
|
|
draw.text((y+80,x+160), '@'+bot_username + ' ' + list_games, font=fnt, fill=(255,255,255,220))
|
|
draw.text((y+80,x+190), '@'+bot_username + ' ' + get_a_game_anotation, font=fnt, fill=(255,255,255,220))
|
|
draw.text((y+80,x+220), '@'+bot_username + ' ' + claim_a_draw, font=fnt, fill=(255,255,255,220))
|
|
draw.text((y+80,x+250), '@'+bot_username + ' ' + post_my_panel_str, font=fnt, fill=(255,255,255,220))
|
|
draw.text((y+80,x+280), '@'+bot_username + ' ' + change_lang_str, font=fnt, fill=(255,255,255,220))
|
|
|
|
fnt = ImageFont.truetype('app/fonts/DroidSans.ttf', 15, layout_engine=ImageFont.LAYOUT_BASIC)
|
|
|
|
draw.text((60,330), bot_username + '@' + mastodon_hostname + ' - 2020', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
|
|
|
out = Image.alpha_composite(base, txt)
|
|
out.save('app/panel/help_panel.png')
|
|
|
|
def replying():
|
|
|
|
reply = False
|
|
|
|
moving = ''
|
|
|
|
content = cleanhtml(text)
|
|
content = unescape(content)
|
|
|
|
try:
|
|
|
|
start = content.index("@")
|
|
end = content.index(" ")
|
|
if len(content) > end:
|
|
|
|
content = content[0: start:] + content[end +1::]
|
|
|
|
neteja = content.count('@')
|
|
|
|
i = 0
|
|
while i < neteja :
|
|
|
|
start = content.rfind("@")
|
|
end = len(content)
|
|
content = content[0: start:] + content[end +1::]
|
|
i += 1
|
|
|
|
question = content.lower()
|
|
|
|
query_word = question
|
|
query_word_length = len(query_word)
|
|
|
|
if query_word == search_new:
|
|
|
|
reply = True
|
|
|
|
elif query_word[:search_move_slicing] == search_move:
|
|
|
|
moving = query_word[moving_slicing:query_word_length].replace(" ","")
|
|
reply = True
|
|
|
|
elif query_word == search_end:
|
|
|
|
reply = True
|
|
|
|
elif query_word == search_games:
|
|
|
|
reply = True
|
|
|
|
elif query_word[:search_send_slicing] == search_send:
|
|
|
|
reply = True
|
|
|
|
elif query_word == search_help:
|
|
|
|
reply = True
|
|
|
|
elif query_word == search_draw:
|
|
|
|
reply = True
|
|
|
|
elif query_word == search_panel:
|
|
|
|
reply = True
|
|
|
|
elif query_word[:4] == search_config:
|
|
|
|
reply = True
|
|
|
|
else:
|
|
|
|
reply = False
|
|
|
|
return (reply, query_word, moving)
|
|
|
|
except ValueError as v_error:
|
|
|
|
print(v_error)
|
|
|
|
def mastodon():
|
|
|
|
# Load secrets from secrets file
|
|
secrets_filepath = "secrets/secrets.txt"
|
|
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
|
|
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
|
|
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
|
|
|
|
# Load configuration from config file
|
|
config_filepath = "config/config.txt"
|
|
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath)
|
|
bot_username = get_parameter("bot_username", config_filepath)
|
|
|
|
# Initialise Mastodon API
|
|
mastodon = Mastodon(
|
|
client_id = uc_client_id,
|
|
client_secret = uc_client_secret,
|
|
access_token = uc_access_token,
|
|
api_base_url = 'https://' + mastodon_hostname,
|
|
)
|
|
|
|
# Initialise access headers
|
|
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
|
|
|
|
return (mastodon, mastodon_hostname, bot_username)
|
|
|
|
def db_config():
|
|
|
|
# Load db configuration from config file
|
|
config_filepath = "config/db_config.txt"
|
|
mastodon_db = get_parameter("mastodon_db", config_filepath)
|
|
mastodon_db_user = get_parameter("mastodon_db_user", config_filepath)
|
|
chess_db = get_parameter("chess_db", config_filepath)
|
|
chess_db_user = get_parameter("chess_db_user", config_filepath)
|
|
|
|
return (mastodon_db, mastodon_db_user, chess_db, chess_db_user)
|
|
|
|
def smtp_config():
|
|
|
|
smtp_filepath = "config/smtp_config.txt"
|
|
smtp_host = get_parameter("smtp_host", smtp_filepath)
|
|
smtp_user_login = get_parameter("smtp_user_login", smtp_filepath)
|
|
smtp_user_password = get_parameter("smtp_user_password", smtp_filepath)
|
|
|
|
return (smtp_host, smtp_user_login, smtp_user_password)
|
|
|
|
def get_parameter( parameter, file_path ):
|
|
|
|
if not os.path.isfile(file_path):
|
|
print("File %s not found, exiting."%file_path)
|
|
sys.exit(0)
|
|
|
|
with open( file_path ) as f:
|
|
for line in f:
|
|
if line.startswith( parameter ):
|
|
return line.replace(parameter + ":", "").strip()
|
|
|
|
print(file_path + " Missing parameter %s "%parameter)
|
|
sys.exit(0)
|
|
|
|
def usage():
|
|
|
|
print('usage: python ' + sys.argv[0] + ' --play' + ' --en')
|
|
|
|
###############################################################################
|
|
# main
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# usage modes
|
|
|
|
if len(sys.argv) == 1:
|
|
|
|
usage()
|
|
|
|
elif len(sys.argv) >= 2:
|
|
|
|
if sys.argv[1] == '--play':
|
|
|
|
bot_lang = ''
|
|
|
|
if len(sys.argv) == 3:
|
|
|
|
if sys.argv[2] == '--ca':
|
|
|
|
bot_lang = 'ca'
|
|
|
|
if sys.argv[2] == '--es':
|
|
|
|
bot_lang = 'es'
|
|
|
|
if sys.argv[2] == '--fr':
|
|
|
|
bot_lang = 'fr'
|
|
|
|
elif sys.argv[2] == '--en':
|
|
|
|
bot_lang = 'en'
|
|
|
|
elif len(sys.argv) == 2:
|
|
|
|
bot_lang = 'ca'
|
|
|
|
if not bot_lang in ['ca', 'es', 'fr', 'en']:
|
|
|
|
print("\nOnly 'ca', 'es', 'fr' and 'en' languages are supported.\n")
|
|
sys.exit(0)
|
|
|
|
mastodon, mastodon_hostname, bot_username = mastodon()
|
|
|
|
mastodon_db, mastodon_db_user, chess_db, chess_db_user = db_config()
|
|
|
|
smtp_host, smtp_user_login, smtp_user_password = smtp_config()
|
|
|
|
now = datetime.now()
|
|
|
|
####################################################################
|
|
# get notifications
|
|
|
|
notifications = mastodon.notifications()
|
|
|
|
if len(notifications) == 0:
|
|
|
|
print('No mentions')
|
|
|
|
sys.exit(0)
|
|
|
|
i = 0
|
|
|
|
while i < len(notifications):
|
|
|
|
notification_id = notifications[i].id
|
|
|
|
if notifications[i].type != 'mention':
|
|
|
|
i += 1
|
|
|
|
print(f'dismissing notification {notification_id}')
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
continue
|
|
|
|
account_id = notifications[i].account.id
|
|
|
|
username = notifications[i].account.acct
|
|
|
|
status_id = notifications[i].status.id
|
|
|
|
text = notifications[i].status.content
|
|
|
|
visibility = notifications[i].status.visibility
|
|
|
|
url = notifications[i].status.uri
|
|
|
|
lang_changed, player_lang = get_player_langs(account_id)
|
|
|
|
if player_lang == 'ca':
|
|
|
|
search_move_slicing = 3
|
|
moving_slicing = 3
|
|
search_send_slicing = 5
|
|
send_game_slicing = 6
|
|
|
|
elif player_lang == 'en':
|
|
|
|
search_move_slicing = 4
|
|
moving_slicing = 4
|
|
search_send_slicing = 4
|
|
send_game_slicing = 5
|
|
|
|
elif player_lang == 'es':
|
|
|
|
search_move_slicing = 5
|
|
moving_slicing = 5
|
|
search_send_slicing = 5
|
|
send_game_slicing = 6
|
|
|
|
elif player_lang == 'fr':
|
|
|
|
search_move_slicing = 7
|
|
moving_slicing = 7
|
|
search_send_slicing = 7
|
|
send_game_slicing = 7
|
|
|
|
else:
|
|
|
|
sys.exit(0)
|
|
|
|
# listen them or not
|
|
|
|
search_new = get_locale("search_new", player_lang)
|
|
search_move = get_locale("search_move", player_lang)
|
|
search_end = get_locale("search_end", player_lang)
|
|
search_games = get_locale("search_games", player_lang)
|
|
search_send = get_locale("search_send", player_lang)
|
|
search_help = get_locale("search_help", player_lang)
|
|
search_draw = get_locale("search_draw", player_lang)
|
|
search_panel = get_locale("search_panel", player_lang)
|
|
search_config = get_locale("search_config", player_lang)
|
|
|
|
reply, query_word, moving = replying()
|
|
|
|
status_url = url
|
|
|
|
if query_word != search_games:
|
|
|
|
is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user = check_games()
|
|
|
|
if game_id == '':
|
|
|
|
game_id, game_waiting = waiting_games()
|
|
|
|
else:
|
|
|
|
is_playing = True
|
|
|
|
if reply == True and is_playing == False:
|
|
|
|
if query_word == search_new and not game_waiting:
|
|
|
|
board = chess.Board()
|
|
|
|
svgfile = chess.svg.board(board=board)
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
new_game_started = get_locale("new_game_started", player_lang)
|
|
|
|
toot_text = f'@{username} {new_game_started} \n'
|
|
|
|
toot_text += '\n'
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
toot_url = toot_id.uri
|
|
|
|
new_game(toot_url)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif query_word == search_new and game_waiting:
|
|
|
|
game_status, white_user, chess_game = join_player()
|
|
|
|
playing_user = white_user
|
|
|
|
next_move(username)
|
|
|
|
board = chess.Board(chess_game)
|
|
|
|
svgfile = chess.svg.board(board=board)
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
player_lang1 = get_lang(username)
|
|
|
|
playing_with = get_locale("playing_with", player_lang1)
|
|
|
|
toot_text = f'@{username} {playing_with} {white_user} \n\n'
|
|
|
|
player_lang2 = get_lang(white_user)
|
|
|
|
your_turn = get_locale("your_turn", player_lang2)
|
|
|
|
toot_text += f'@{white_user}: {your_turn}\n\n'
|
|
|
|
game_name = get_locale("game_name", player_lang2)
|
|
chess_hashtag = get_locale("chess_hashtag", player_lang)
|
|
|
|
toot_text += f"{game_name}: {str(game_id)} {chess_hashtag} \n"
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
game_moves = board.ply()
|
|
|
|
update_moves(username, game_moves)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif query_word[:search_send_slicing] == search_send:
|
|
|
|
query_word_length = len(query_word)
|
|
|
|
send_game = query_word[send_game_slicing:query_word_length].replace(' ', '')
|
|
|
|
emailed, game_id, game_found = send_anotation(send_game)
|
|
|
|
if emailed == False and game_found == True:
|
|
|
|
username_lang = get_lang(username)
|
|
send_error = get_locale("send_error", username_lang)
|
|
|
|
toot_text = f'@{username} {send_error}'
|
|
|
|
elif emailed == True and game_found == True:
|
|
|
|
username_lang = get_lang(username)
|
|
game_number_anotations = get_locale("game_number_anotations", username_lang)
|
|
anotations_sent = get_locale("anotations_sent", username_lang)
|
|
|
|
toot_text = f'@{username} {game_number_anotations} {str(game_id)} {anotations_sent}'
|
|
|
|
elif emailed == False and game_found == False:
|
|
|
|
if domain != None:
|
|
|
|
username_lang = get_lang(username)
|
|
cant_send_to_fediverse_account = get_locale("cant_send_to_fediverse_account", username_lang)
|
|
|
|
toot_text = f'@{username} {cant_send_to_fediverse_account}'
|
|
|
|
else:
|
|
|
|
username_lang = get_lang(username)
|
|
game_no_exists = get_locale("game_no_exists", username_lang)
|
|
it_not_exists = get_locale("it_not_exists", username_lang)
|
|
|
|
toot_text = f'@{username} {game_no_exists} {str(game_id)} {it_not_exists}'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif query_word == search_panel:
|
|
|
|
rating, played_games, wins = get_stats(username)
|
|
|
|
create_panel(username, rating, played_games, wins)
|
|
|
|
toot_text = f'@{username}'
|
|
|
|
saved_panel = 'app/panel/' + username + '_panel.png'
|
|
|
|
image_id = mastodon.media_post(saved_panel, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif query_word[:4] == search_config:
|
|
|
|
new_lang = query_word[5:7]
|
|
|
|
lang_changed, player_lang = set_lang(account_id, username, new_lang)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
if lang_changed:
|
|
|
|
locale_change_successfully = get_locale("locale_change_successfully", new_lang)
|
|
|
|
toot_text = f'@{username} {locale_change_successfully} {new_lang}'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|
|
|
else:
|
|
|
|
locale_not_changed = get_locale("locale_not_changed", player_lang)
|
|
|
|
toot_text = f'@{username} {new_lang} {locale_not_changed}'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|
|
|
elif query_word == search_help:
|
|
|
|
toot_help()
|
|
|
|
help_text = f'@{username}'
|
|
|
|
help_panel = 'app/panel/help_panel.png'
|
|
|
|
image_id = mastodon.media_post(help_panel, "image/png").id
|
|
|
|
mastodon.status_post(help_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
else:
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif reply and is_playing:
|
|
|
|
if query_word == search_new:
|
|
|
|
player_lang1 = get_lang(username)
|
|
|
|
game_already_started = get_locale("game_already_started", player_lang1)
|
|
|
|
toot_text = f'@{username} {game_already_started} \n'
|
|
|
|
if black_user != '':
|
|
|
|
toot_text += f'@{white_user} / @{black_user}\n'
|
|
|
|
else:
|
|
|
|
wait_other_player = get_locale("wait_other_player", player_lang1)
|
|
|
|
toot_text += wait_other_player + '\n'
|
|
|
|
toot_text += '\n'
|
|
|
|
game_name = get_locale("game_name", player_lang1)
|
|
chess_hashtag = get_locale("chess_hashtag", player_lang1)
|
|
|
|
toot_text += f'{game_name}: {str(game_id)} {chess_hashtag} \n'
|
|
|
|
board = chess.Board(on_going_game)
|
|
|
|
svgfile = chess.svg.board(board=board)
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif query_word[:search_move_slicing] == search_move and playing_user == username:
|
|
|
|
board = chess.Board(on_going_game)
|
|
|
|
promoted = False
|
|
|
|
stalemate = False
|
|
|
|
checkmate = False
|
|
|
|
try:
|
|
|
|
piece_square_index = chess.SQUARE_NAMES.index(moving[:2])
|
|
|
|
moving_piece = board.piece_type_at(piece_square_index)
|
|
|
|
if moving_piece == 1:
|
|
|
|
square_index = chess.SQUARE_NAMES.index(moving[2:4])
|
|
|
|
if bool(board.turn == chess.WHITE) == True:
|
|
|
|
square_rank_trigger = 7
|
|
|
|
elif bool(board.turn == chess.BLACK) == True:
|
|
|
|
square_rank_trigger = 0
|
|
|
|
if chess.square_rank(square_index) == square_rank_trigger:
|
|
|
|
promoted = True
|
|
|
|
if len(moving) == 4:
|
|
|
|
moving = moving + 'q'
|
|
|
|
not_legal_move = chess.Move.from_uci(moving) not in board.legal_moves
|
|
|
|
else:
|
|
|
|
not_legal_move = chess.Move.from_uci(moving) not in board.legal_moves
|
|
|
|
if not_legal_move:
|
|
|
|
is_not_legal_move = get_locale("is_not_legal_move", player_lang)
|
|
|
|
toot_text = f'@{username}: {moving} {is_not_legal_move} \n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
else:
|
|
|
|
san_move = board.san(chess.Move.from_uci(moving))
|
|
|
|
check = False
|
|
|
|
playing_user = next_move(username)
|
|
|
|
if bool(board.is_capture(chess.Move.from_uci(moving))):
|
|
|
|
capture = True
|
|
|
|
square_capture_index = chess.SQUARE_NAMES.index(moving[2:4])
|
|
|
|
captured_piece = board.piece_type_at(square_capture_index)
|
|
|
|
piece_name = get_piece_name(captured_piece)
|
|
|
|
else:
|
|
|
|
capture = False
|
|
|
|
board.push(chess.Move.from_uci(moving))
|
|
|
|
if bool(board.is_check()):
|
|
|
|
if username == white_user:
|
|
|
|
king_square = board.king(chess.BLACK)
|
|
check = True
|
|
|
|
else:
|
|
|
|
king_square = board.king(chess.WHITE)
|
|
check = True
|
|
|
|
if board.is_stalemate() == True:
|
|
|
|
stalemate = True
|
|
|
|
if board.is_game_over() == True:
|
|
|
|
game_moves = board.ply()
|
|
|
|
if stalemate == False:
|
|
|
|
checkmate = True
|
|
|
|
close_game(username, checkmate)
|
|
|
|
else:
|
|
|
|
checkmate = False
|
|
|
|
if check == True and checkmate == False:
|
|
|
|
player_lang = get_lang(playing_user)
|
|
check_done = get_locale("check_done", player_lang)
|
|
|
|
toot_text = f"@{playing_user} {username} {check_done}\n"
|
|
|
|
elif check == True and checkmate == True:
|
|
|
|
player_lang1 = get_lang(username)
|
|
|
|
check_mate = get_locale("check_mate", player_lang1)
|
|
check_mate_movements = get_locale("check_mate_movements", player_lang1)
|
|
the_winner_is = get_locale("the_winner_is", player_lang1)
|
|
|
|
toot_text = f'\n{check_mate} {str(game_moves)} {check_mate_movements}\n\n{the_winner_is} @{username}\n'
|
|
|
|
winned_games = get_locale("winned_games", player_lang1)
|
|
|
|
toot_text += f'\n{winned_games}\n'
|
|
|
|
rating, played_games, wins = get_stats(username)
|
|
|
|
wins_of_many = get_locale("wins_of_many", player_lang1)
|
|
|
|
toot_text += f'{username}: {str(wins)} {wins_of_many} {str(played_games)} (Elo: {str(round(rating))})\n'
|
|
|
|
player_lang2 = get_lang(playing_user)
|
|
|
|
well_done = get_locale("well_done", player_lang2)
|
|
|
|
toot_text += f"\n@{playing_user}: {well_done}\n"
|
|
|
|
rating, played_games, wins = get_stats(playing_user)
|
|
|
|
winned_games = get_locale("winned_games", player_lang2)
|
|
|
|
wins_of_many = get_locale("wins_of_many", player_lang2)
|
|
|
|
toot_text += f'\n\n{winned_games}\n'
|
|
|
|
toot_text += f'{playing_user}: {str(wins)} {wins_of_many} {str(played_games)} (Elo: {str(round(rating))})\n'
|
|
|
|
elif check == False and stalemate == True:
|
|
|
|
toot_text = stalemate_str + ' (' + str(game_moves) + ')' + '\n'
|
|
|
|
toot_text += f'\n@{playing_user}, @{username}\n'
|
|
|
|
toot_text += f'\n{winned_games}\n'
|
|
|
|
rating, played_games, wins = get_stats(username)
|
|
|
|
toot_text += f'{username}: {str(wins)} {wins_of_many} {str(played_games)}\n'
|
|
|
|
rating, played_games, wins = get_stats(playing_user)
|
|
|
|
toot_text += f'{playing_user}: {str(wins)} {wins_of_many} {str(played_games)}\n'
|
|
|
|
else:
|
|
|
|
player_lang = get_lang(playing_user)
|
|
|
|
your_turn = get_locale("your_turn", player_lang)
|
|
|
|
toot_text = f'@{playing_user} {your_turn}\n'
|
|
|
|
if capture == True and checkmate == False:
|
|
|
|
player_lang = get_lang(playing_user)
|
|
lost_piece = get_locale("lost_piece", player_lang)
|
|
|
|
toot_text += f'\n{lost_piece} {piece_name}!\n'
|
|
|
|
game_name = get_locale("game_name", player_lang)
|
|
|
|
chess_hashtag = get_locale("chess_hashtag", player_lang)
|
|
|
|
toot_text += f'\n{game_name}: {str(game_id)} {chess_hashtag}\n'
|
|
|
|
if username == white_user:
|
|
|
|
if check == True:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving), check=board.king(chess.BLACK))
|
|
|
|
else:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving))
|
|
|
|
else:
|
|
|
|
if check == True:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving), check=board.king(chess.WHITE))
|
|
|
|
else:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving))
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
toot_url = toot_id.uri
|
|
|
|
board_game = board.fen()
|
|
|
|
update_game(board_game, toot_url)
|
|
|
|
game_moves = board.ply()
|
|
|
|
save_anotation(moving, san_move)
|
|
|
|
update_moves(username, game_moves)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
except ValueError as v_error:
|
|
|
|
print(v_error)
|
|
|
|
username_lang = get_lang(username)
|
|
not_legal_move_str = get_locale("not_legal_move_str", username_lang)
|
|
|
|
toot_text = f'@{username} {not_legal_move_str} {moving}!?)\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
pass
|
|
|
|
except AssertionError as a_error:
|
|
|
|
print(a_error)
|
|
|
|
username_lang = get_lang(username)
|
|
not_legal_move_str = get_locale("not_legal_move_str", username_lang)
|
|
|
|
toot_text = f'@{username} {not_legal_move_str} {moving}!?)\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
pass
|
|
|
|
elif query_word == search_end:
|
|
|
|
stalemate = False
|
|
|
|
checkmate = False
|
|
|
|
if black_user != '':
|
|
|
|
if username == white_user:
|
|
|
|
player_leave_game = get_locale("player_leave_game", player_lang)
|
|
|
|
toot_text = f'@{username} {player_leave_game} @{black_user}'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
close_game(username, checkmate)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
player_leave_game = get_locale("player_leave_game", player_lang)
|
|
|
|
toot_text = f'@{username} {player_leave_game} @{white_user}'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
close_game(username, checkmate)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
leave_waiting_game = get_locale("leave_waiting_game", player_lang)
|
|
|
|
toot_text = f'@{username} {leave_waiting_game}'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|
|
|
close_game(username, checkmate)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
elif query_word == search_games:
|
|
|
|
player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst = current_games()
|
|
|
|
if len(player1_name_lst) > 0:
|
|
|
|
started_games = get_locale("started_games", player_lang)
|
|
|
|
toot_text = f"@{username} {started_games}\n"
|
|
|
|
i = 0
|
|
while i < len(player1_name_lst):
|
|
|
|
if game_status_lst[i] == 'waiting':
|
|
|
|
game_is_waiting = get_locale("game_is_waiting", player_lang)
|
|
|
|
toot_text += f'\n{player1_name_lst[i]} / {player2_name_lst[i]} {game_is_waiting}\n'
|
|
|
|
else:
|
|
|
|
game_is_on_going = get_locale("game_is_on_going", player_lang)
|
|
|
|
if next_move_lst[i] == player1_name_lst[i]:
|
|
|
|
toot_text += f'\n*{player1_name_lst[i]} / {player2_name_lst[i]} {game_is_on_going}\n'
|
|
|
|
else:
|
|
|
|
toot_text += f'\n{player1_name_lst[i]} / *{player2_name_lst[i]} {game_is_on_going}\n'
|
|
|
|
if game_link_lst[i] != None:
|
|
|
|
toot_text += f'{str(game_link_lst[i])}\n'
|
|
|
|
i += 1
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
else:
|
|
|
|
no_on_going_games = get_locale("no_on_going_games", player_lang)
|
|
|
|
toot_text = f'@{username} {no_on_going_games}\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif query_word[:search_send_slicing] == search_send:
|
|
|
|
query_word_length = len(query_word)
|
|
|
|
send_game = query_word[search_send_slicing:query_word_length].replace(' ', '')
|
|
|
|
emailed, game_id, game_found = send_anotation(send_game)
|
|
|
|
username_lang = get_lang(username)
|
|
|
|
if emailed == False and game_found == True:
|
|
|
|
send_error = get_locale("send_error", username_lang)
|
|
|
|
toot_text = f'@{username} {send_error}'
|
|
|
|
elif emailed == True and game_found == True:
|
|
|
|
game_number_anotations = get_locale("game_number_anotations", username_lang)
|
|
anotations_sent = get_locale("anotations_sent", username_lang)
|
|
|
|
toot_text = f'@{username} {game_number_anotations} {str(game_id)} {anotations_sent}'
|
|
|
|
elif emailed == False and game_found == False:
|
|
|
|
game_no_exists = get_locale("game_no_exists", username_lang)
|
|
it_not_exists = get_locale("it_not_exists", username_lang)
|
|
|
|
toot_text = f'@{username} {game_no_exists} {str(game_id)} {it_not_exists}'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif query_word == search_draw:
|
|
|
|
white_player, black_player, toot_text, stalemate = claim_draw(username)
|
|
|
|
if stalemate == True:
|
|
|
|
checkmate = False
|
|
|
|
close_game(username, checkmate)
|
|
|
|
player_lang1 = get_lang(white_player)
|
|
draw_and_str = get_locale("draw_and_str", player_lang1)
|
|
agreed_draw_str = get_locale("agreed_draw_str", player_lang1)
|
|
winned_games = get_locale("winned_games", player_lang1)
|
|
wins_of_many = get_locale("wins_of_many", player_lang1)
|
|
|
|
toot_text = f'@{white_player} {draw_and_str} @{black_player} {agreed_draw_str}\n\n'
|
|
|
|
toot_text += f'\n{winned_games}\n'
|
|
|
|
rating, played_games, wins = get_stats(white_player)
|
|
|
|
toot_text += f'{white_player}: {str(wins)} {wins_of_many} {str(played_games)}\n'
|
|
|
|
player_lang2 = get_lang(black_player)
|
|
draw_and_str = get_locale("draw_and_str", player_lang2)
|
|
agreed_draw_str = get_locale("agreed_draw_str", player_lang2)
|
|
winned_games = get_locale("winned_games", player_lang2)
|
|
wins_of_many = get_locale("wins_of_many", player_lang2)
|
|
|
|
if player_lang1 != player_lang2:
|
|
|
|
toot_text += f'\n@{white_player} {draw_and_str} @{black_player} {agreed_draw_str}\n\n'
|
|
|
|
toot_text += f'\n{winned_games}\n'
|
|
|
|
rating, played_games, wins = get_stats(black_player)
|
|
|
|
toot_text += f'{black_player}: {str(wins)} {wins_of_many} {str(played_games)}\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
else:
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif query_word == search_panel:
|
|
|
|
rating, played_games, wins = get_stats(username)
|
|
|
|
create_panel(username, rating, played_games, wins)
|
|
|
|
toot_text = f'@{username}'
|
|
|
|
saved_panel = 'app/panel/' + username + '_panel.png'
|
|
|
|
image_id = mastodon.media_post(saved_panel, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
elif query_word[:4] == search_config:
|
|
|
|
new_lang = query_word[5:7]
|
|
|
|
lang_changed, player_lang = set_lang(account_id, username, new_lang)
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
if lang_changed:
|
|
|
|
locale_change_successfully = get_locale("locale_change_successfully", new_lang)
|
|
|
|
toot_text = f'@{username} {locale_change_successfully} {new_lang}'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|
|
|
else:
|
|
|
|
locale_not_changed = get_locale("locale_not_changed", player_lang)
|
|
|
|
toot_text = f'@{username} {new_lang} {locale_not_changed}'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|
|
|
elif query_word == search_help:
|
|
|
|
toot_help()
|
|
|
|
help_text = f'@{username}'
|
|
|
|
help_panel = 'app/panel/help_panel.png'
|
|
|
|
image_id = mastodon.media_post(help_panel, "image/png").id
|
|
|
|
mastodon.status_post(help_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
else:
|
|
|
|
if playing_user == None:
|
|
|
|
username_lang = get_lang(username)
|
|
is_not_your_turn = get_locale("is_not_your_turn", username_lang)
|
|
|
|
toot_text = f'@{username} {is_not_your_turn}\n'
|
|
|
|
else:
|
|
|
|
is_the_turn_of = get_locale("is_the_turn_of", player_lang)
|
|
|
|
toot_text = f'@{username} {is_the_turn_of} {playing_user}\n'
|
|
|
|
toot_text += '\n'
|
|
|
|
game_name = get_locale("game_name", player_lang)
|
|
|
|
chess_hashtag = get_locale("chess_hashtag", player_lang)
|
|
|
|
toot_text += f'{game_name}: {str(game_id)} {chess_hashtag}\n'
|
|
|
|
board = chess.Board(on_going_game)
|
|
|
|
if username == white_user:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.BLACK)
|
|
|
|
else:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.WHITE)
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
else:
|
|
|
|
mastodon.notifications_dismiss(notification_id)
|
|
|
|
i += 1
|
|
|
|
else:
|
|
|
|
usage()
|