2054 líneas
59 KiB
Python
2054 líneas
59 KiB
Python
import pdb
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import re
|
|
import unidecode
|
|
from datetime import datetime, timedelta
|
|
from mastodon import Mastodon
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
import smtplib
|
|
from smtplib import SMTPException, SMTPAuthenticationError, SMTPConnectError, SMTPRecipientsRefused
|
|
import socket
|
|
from socket import gaierror
|
|
import psycopg2
|
|
import chess
|
|
import chess.svg
|
|
from cairosvg import svg2png
|
|
import chess.pgn
|
|
from PIL import Image, ImageFont, ImageDraw
|
|
|
|
def cleanhtml(raw_html):
|
|
cleanr = re.compile('<.*?>')
|
|
cleantext = re.sub(cleanr, '', raw_html)
|
|
return cleantext
|
|
|
|
def unescape(s):
|
|
s = s.replace("'", "'")
|
|
return s
|
|
|
|
def create_panel(username, played_games, wins):
|
|
|
|
ratio = round((wins * 100) / played_games, 2)
|
|
x = 10
|
|
y = 10
|
|
|
|
fons = Image.open('app/panel/fons.jpg')
|
|
print(fons.size)
|
|
|
|
# add chess icon
|
|
icon_path = 'app/panel/chess.png'
|
|
icon_img = Image.open(icon_path)
|
|
|
|
fons.paste(icon_img, (y+350, x+50), icon_img)
|
|
|
|
logo_img = Image.open('app/panel/logo.png')
|
|
fons.paste(logo_img, (15, 320), logo_img)
|
|
|
|
fons.save('app/panel/panel.png',"PNG")
|
|
|
|
base = Image.open('app/panel/panel.png').convert('RGBA')
|
|
txt = Image.new('RGBA', base.size, (255,255,255,0))
|
|
fnt = ImageFont.truetype('app/fonts/DroidSans.ttf', 40, layout_engine=ImageFont.LAYOUT_BASIC)
|
|
# get a drawing context
|
|
draw = ImageDraw.Draw(txt)
|
|
|
|
draw.text((y+200,x+20), panel_title_str + ' ' + username, font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|
|
|
fnt = ImageFont.truetype('app/fonts/DroidSans.ttf', 25, layout_engine=ImageFont.LAYOUT_BASIC)
|
|
|
|
draw.text((y+70,x+120), panel_games_str + ': ' + str(played_games), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|
draw.text((y+70,x+170), panel_wins_str + ': ' + str(wins), font=fnt, fill=(255,255,255,220)) #fill=(255,255,255,255)) ## full opacity
|
|
draw.text((y+70,x+220), panel_ratio_str + ': ' + str(ratio) + '%', font=fnt, fill=(255,255,255,220))
|
|
|
|
fnt = ImageFont.truetype('app/fonts/DroidSans.ttf', 15, layout_engine=ImageFont.LAYOUT_BASIC)
|
|
|
|
draw.text((60,330), bot_username + '@' + mastodon_hostname + ' - 2020', font=fnt, fill=(255,255,255,200)) #fill=(255,255,255,255)) ## full opacity
|
|
|
|
out = Image.alpha_composite(base, txt)
|
|
out.save('app/panel/' + username + '_panel.png')
|
|
|
|
def get_bot_id():
|
|
|
|
###################################################################################################################################
|
|
# get bot_id from bot's username
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select id from accounts where username = (%s) and domain is null", (bot_username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
bot_id = row[0]
|
|
|
|
cur.close()
|
|
|
|
return bot_id
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
print(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def get_user_domain(account_id):
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select username, domain from accounts where id=(%s)", (account_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
username = row[0]
|
|
|
|
domain = row[1]
|
|
|
|
cur.close()
|
|
|
|
return (username, domain)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
print(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def get_piece_name(captured_piece):
|
|
|
|
if captured_piece == 1:
|
|
|
|
piece_name = pawn_piece
|
|
|
|
if captured_piece == 2:
|
|
|
|
piece_name = knight_piece
|
|
|
|
if captured_piece == 3:
|
|
|
|
piece_name = bishop_piece
|
|
|
|
if captured_piece == 4:
|
|
|
|
piece_name = rook_piece
|
|
|
|
if captured_piece == 5:
|
|
|
|
piece_name = queen_piece
|
|
|
|
if captured_piece == 6:
|
|
|
|
piece_name = king_piece
|
|
|
|
return piece_name
|
|
|
|
def get_notification_data():
|
|
|
|
conn = None
|
|
|
|
try:
|
|
|
|
account_id_lst = []
|
|
|
|
status_id_lst = []
|
|
|
|
text_lst = []
|
|
|
|
visibility_lst = []
|
|
|
|
url_lst = []
|
|
|
|
search_text = [search_end, search_move, search_new, search_games, search_send, search_help, search_draw, search_panel]
|
|
|
|
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
i=0
|
|
|
|
while i < len(search_text):
|
|
|
|
like_text = "%"+search_text[i]+"%"
|
|
|
|
select_query = "select account_id, id, text, visibility, url, created_at from statuses where text like (%s) and created_at + interval '60 minutes' > now() - interval '5 minutes'"
|
|
select_query += " and id=any (select status_id from mentions where account_id=(%s)) order by created_at asc"
|
|
|
|
cur.execute(select_query, (like_text, str(bot_id)))
|
|
|
|
rows = cur.fetchall()
|
|
|
|
for row in rows:
|
|
|
|
account_id_lst.append(row[0])
|
|
|
|
status_id_lst.append(row[1])
|
|
|
|
text_lst.append(row[2])
|
|
|
|
if row[3] == 0:
|
|
visibility_lst.append('public')
|
|
elif row[3] == 1:
|
|
visibility_lst.append('unlisted')
|
|
elif row[3] == 2:
|
|
visibility_lst.append('private')
|
|
elif row[3] == 3:
|
|
visibility_lst.append('direct')
|
|
|
|
url_lst.append(row[4])
|
|
|
|
i += 1
|
|
|
|
cur.close()
|
|
|
|
return (account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
print(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def update_replies(status_id, username, now):
|
|
|
|
post_id = status_id
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
insert_sql = "insert into botreplies(status_id, query_user, status_created_at) values(%s, %s, %s) ON CONFLICT DO NOTHING"
|
|
|
|
cur.execute(insert_sql, (post_id, username, now))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def check_replies(status_id):
|
|
|
|
post_id = status_id
|
|
|
|
replied = False
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select status_id from botreplies where status_id=(%s)", (post_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
replied = True
|
|
|
|
else:
|
|
|
|
replied = False
|
|
|
|
cur.close()
|
|
|
|
return replied
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def current_games():
|
|
|
|
player1_name_lst = []
|
|
|
|
player2_name_lst = []
|
|
|
|
game_status_lst = []
|
|
|
|
game_link_lst = []
|
|
|
|
next_move_lst = []
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user, chess_status, chess_link, next_move from games where not finished")
|
|
|
|
rows = cur.fetchall()
|
|
|
|
for row in rows:
|
|
|
|
player1_name_lst.append(row[0])
|
|
|
|
if row[1] != None:
|
|
|
|
player2_name_lst.append(row[1])
|
|
|
|
else:
|
|
|
|
player2_name_lst.append(" ")
|
|
|
|
game_status_lst.append(row[2])
|
|
|
|
game_link_lst.append(row[3])
|
|
|
|
next_move_lst.append(row[4])
|
|
|
|
cur.close()
|
|
|
|
return (player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def check_games():
|
|
|
|
game_id = ''
|
|
|
|
white_user = ''
|
|
|
|
black_user = ''
|
|
|
|
on_going_game = ''
|
|
|
|
waiting = False
|
|
|
|
playing_user = ''
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
### check if there is an ongoing game
|
|
|
|
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and white_user=(%s) ", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row == None:
|
|
|
|
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and black_user=(%s)", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row == None:
|
|
|
|
is_playing = False
|
|
|
|
else:
|
|
|
|
is_playing = True
|
|
|
|
game_id = row[0]
|
|
|
|
white_user = row[1]
|
|
|
|
if row[1] != None:
|
|
|
|
black_user = row[2]
|
|
|
|
else:
|
|
|
|
black_user = ''
|
|
|
|
on_going_game = row[3]
|
|
|
|
waiting = row[4]
|
|
|
|
playing_user = row[5]
|
|
|
|
else:
|
|
|
|
is_playing = True
|
|
|
|
game_id = row[0]
|
|
|
|
white_user = row[1]
|
|
|
|
if row[2] != None:
|
|
|
|
black_user = row[2]
|
|
|
|
else:
|
|
|
|
black_user = ''
|
|
|
|
on_going_game = row[3]
|
|
|
|
waiting = row[4]
|
|
|
|
playing_user = row[5]
|
|
|
|
cur.close()
|
|
|
|
return (is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def new_game(toot_url):
|
|
|
|
try:
|
|
|
|
game_status = 'waiting'
|
|
|
|
waiting = True
|
|
|
|
board_game = board.fen()
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
insert_query = 'INSERT INTO games(created_at, white_user, chess_game, chess_status, waiting, updated_at, chess_link) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING'
|
|
|
|
cur.execute(insert_query, (now, username, board_game, game_status, waiting, now, toot_url))
|
|
|
|
insert_query = 'INSERT INTO stats(created_at, white_user) VALUES (%s,%s) ON CONFLICT DO NOTHING'
|
|
|
|
cur.execute(insert_query, (now, username,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def update_game(board_game, toot_url):
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set chess_game=(%s), chess_link=(%s), updated_at=(%s) where game_id=(%s)", (board_game, toot_url, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def write_result(filename, old_string, new_string):
|
|
|
|
with open(filename) as f:
|
|
s = f.read()
|
|
if old_string not in s:
|
|
print('"{old_string}" not found in {filename}.'.format(**locals()))
|
|
return
|
|
|
|
with open(filename, 'w') as f:
|
|
print('Changing "{old_string}" to "{new_string}" in {filename}'.format(**locals()))
|
|
s = s.replace(old_string, new_string)
|
|
f.write(s)
|
|
|
|
def save_anotation(moving, san_move):
|
|
|
|
pgn_file = "app/anotations/" + str(game_id) + ".pgn"
|
|
|
|
if bool(board.turn == chess.BLACK) == True:
|
|
|
|
line_data = str(board.fullmove_number) + ". " + san_move
|
|
|
|
else:
|
|
|
|
line_data = " " + san_move + " "
|
|
|
|
if checkmate or stalemate:
|
|
|
|
line_data = line_data + " " + board.result()
|
|
|
|
write_result(pgn_file, '[Result ]', '[Result ' + board.result() + ' ]')
|
|
|
|
if not os.path.isfile(pgn_file):
|
|
|
|
file_header = '[Event ' + game_name + ': ' + str(game_id) + ']\n'
|
|
file_header += '[Site ' + mastodon_hostname + ']' + '\n'
|
|
file_header += '[Date ' + str(datetime.today().strftime('%d-%m-%Y')) + ']' + '\n'
|
|
file_header += '[Round ' + str(game_id) + ']' + '\n'
|
|
file_header += '[White ' + white_user + ' ]' + '\n'
|
|
file_header += '[Black ' + black_user + ' ]' + '\n'
|
|
file_header += '[Result ]' + '\n'
|
|
file_header += '[Time ' + str(datetime.now().strftime('%H:%M:%S')) + ']' + '\n\n'
|
|
|
|
with open(pgn_file, 'w+') as f:
|
|
|
|
f.write(file_header)
|
|
|
|
with open(pgn_file, 'a') as f:
|
|
|
|
f.write(line_data)
|
|
|
|
else:
|
|
|
|
with open(pgn_file, 'a') as f:
|
|
|
|
f.write(line_data)
|
|
|
|
def get_email_address(username):
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select email from users where account_id = (select id from accounts where username = (%s) and domain is null)", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
username_email = row[0]
|
|
|
|
else:
|
|
|
|
username_email = None
|
|
|
|
cur.close()
|
|
|
|
return username_email
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def send_anotation(game_id):
|
|
|
|
emailed = False
|
|
|
|
game_found = False
|
|
|
|
username_email = get_email_address(username)
|
|
|
|
if username_email == None:
|
|
|
|
return (emailed, game_id, game_found)
|
|
|
|
# Create message object instance
|
|
msg = MIMEMultipart()
|
|
|
|
# Declare message elements
|
|
msg['From'] = smtp_user_login
|
|
msg['To'] = username_email
|
|
msg['Subject'] = email_subject + game_id
|
|
|
|
# Attach the game anotation
|
|
file_to_attach = "app/anotations/" + game_id + ".pgn"
|
|
try:
|
|
|
|
attachment = open(file_to_attach, 'rb')
|
|
|
|
game_found = True
|
|
|
|
except FileNotFoundError as not_found_error:
|
|
|
|
print(not_found_error)
|
|
return (emailed, game_id, game_found)
|
|
|
|
obj = MIMEBase('application', 'octet-stream')
|
|
obj.set_payload(attachment.read())
|
|
encoders.encode_base64(obj)
|
|
obj.add_header('Content-Disposition', "attachment; filename= "+file_to_attach)
|
|
|
|
# Add the message body to the object instance
|
|
msg.attach(obj)
|
|
|
|
try:
|
|
|
|
# Create the server connection
|
|
server = smtplib.SMTP(smtp_host)
|
|
# Switch the connection over to TLS encryption
|
|
server.starttls()
|
|
# Authenticate with the server
|
|
server.login(smtp_user_login, smtp_user_password)
|
|
# Send the message
|
|
server.sendmail(msg['From'], msg['To'], msg.as_string())
|
|
# Disconnect
|
|
server.quit()
|
|
|
|
print("Successfully sent game anotations to %s" % msg['To'])
|
|
emailed = True
|
|
return (emailed, game_id, game_found)
|
|
|
|
except SMTPAuthenticationError as auth_error:
|
|
|
|
print(auth_error)
|
|
pass
|
|
return (emailed, game_id, game_found)
|
|
|
|
except socket.gaierror as socket_error:
|
|
|
|
print(socket_error)
|
|
pass
|
|
return (emailed, game_id, game_found)
|
|
|
|
except SMTPRecipientsRefused as recip_error:
|
|
|
|
print(recip_error)
|
|
pass
|
|
return (emailed, game_id, game_found)
|
|
|
|
|
|
def claim_draw(username):
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_player = row[0]
|
|
|
|
black_player = row[1]
|
|
|
|
if white_player == username:
|
|
|
|
toot_text = '@'+username + ' ' + claim_draw_str + ' @'+black_player + '\n'
|
|
|
|
cur.execute("update games set white_stalemate = 't' where game_id=(%s)", (game_id,))
|
|
|
|
else:
|
|
|
|
toot_text = '@'+username + ' ha proposat taules a ' + '@'+white_player + '\n'
|
|
|
|
cur.execute("update games set black_stalemate = 't' where game_id=(%s)", (game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.execute("select white_stalemate, black_stalemate from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_stalemate = row[0]
|
|
|
|
black_stalemate = row[1]
|
|
|
|
cur.close()
|
|
|
|
if white_stalemate == True and black_stalemate == True:
|
|
|
|
stalemate = True
|
|
|
|
else:
|
|
|
|
stalemate = False
|
|
|
|
return (white_player, black_player, toot_text, stalemate)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
|
|
def close_game(username, checkmate):
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_player = row[0]
|
|
|
|
black_player = row[1]
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
now = datetime.now()
|
|
|
|
winner = ''
|
|
|
|
waiting = False
|
|
|
|
finished = True
|
|
|
|
if stalemate == True:
|
|
|
|
winner = "stalemate"
|
|
|
|
if black_user == '':
|
|
|
|
winner = 'none'
|
|
|
|
if checkmate:
|
|
|
|
winner = username
|
|
|
|
else:
|
|
|
|
if query_word == search_end and username == white_user and stalemate == False:
|
|
|
|
winner = black_user
|
|
|
|
elif query_word == search_end and username == black_user and stalemate == False:
|
|
|
|
winner = white_user
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set waiting=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (waiting, finished, now, game_id))
|
|
|
|
cur.execute("update stats set winner=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (winner, finished, now, game_id))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def get_stats(player):
|
|
|
|
played_games = 0
|
|
|
|
wins = 0
|
|
|
|
try:
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select count(*) from stats where white_user = (%s) or black_user = (%s) and finished", (player, player))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
played_games = row[0]
|
|
|
|
cur.execute("select count(*) from stats where winner = (%s) and finished", (player,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
wins = row[0]
|
|
|
|
cur.close()
|
|
|
|
return (played_games, wins)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def waiting_games():
|
|
|
|
try:
|
|
|
|
game_id = ''
|
|
|
|
game_waiting = False
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select game_id from games where waiting order by game_id desc limit 1")
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
game_id = row[0]
|
|
game_waiting = True
|
|
|
|
cur.close()
|
|
|
|
return (game_id, game_waiting)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def join_player():
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
game_status = 'waiting'
|
|
|
|
waiting = True
|
|
|
|
moves = 0
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set black_user=(%s), chess_status='playing', waiting='f', updated_at=(%s), moves=(%s) where game_id=(%s)", (username, now, moves, game_id,))
|
|
|
|
cur.execute("update stats set black_user=(%s), updated_at=(%s) where game_id=(%s)", (username, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.execute("select white_user, chess_game from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_user = row[0]
|
|
|
|
chess_game = row[1]
|
|
|
|
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (white_user, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
game_status = 'playing'
|
|
|
|
return (game_status, white_user, chess_game)
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def update_moves(username, game_moves):
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("update games set next_move=(%s), last_move=(%s), moves=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, username, game_moves, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def next_move(playing_user):
|
|
|
|
try:
|
|
|
|
now = datetime.now()
|
|
|
|
waiting = True
|
|
|
|
conn = None
|
|
|
|
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("select white_user, black_user, last_move, moves from games where game_id=(%s)", (game_id,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if row != None:
|
|
|
|
white_user = row[0]
|
|
black_user = row[1]
|
|
last_move = row[2]
|
|
moves = row[3]
|
|
|
|
if last_move != None:
|
|
|
|
if playing_user == white_user:
|
|
|
|
playing_user = black_user
|
|
|
|
elif playing_user == black_user:
|
|
|
|
playing_user = white_user
|
|
|
|
else:
|
|
|
|
last_move = white_user
|
|
|
|
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, now, game_id,))
|
|
|
|
conn.commit()
|
|
|
|
cur.close()
|
|
|
|
return playing_user
|
|
|
|
except (Exception, psycopg2.DatabaseError) as error:
|
|
|
|
sys.exit(error)
|
|
|
|
finally:
|
|
|
|
if conn is not None:
|
|
|
|
conn.close()
|
|
|
|
def toot_help():
|
|
|
|
help_text = '@'+username + '\n'
|
|
help_text += '\n'
|
|
help_text += '@'+bot_username + ' ' + start_or_join_a_new_game + '\n'
|
|
help_text += '\n'
|
|
help_text += '@'+bot_username + ' ' + move_a_piece + '\n'
|
|
help_text += '\n'
|
|
help_text += '@'+bot_username + ' ' + leave_a_game + '\n'
|
|
help_text += '\n'
|
|
help_text += '@'+bot_username + ' ' + list_games + '\n'
|
|
help_text += '\n'
|
|
help_text += '@'+bot_username + ' ' + get_a_game_anotation + '\n'
|
|
help_text += '\n'
|
|
help_text += '@'+bot_username + ' ' + claim_a_draw + '\n'
|
|
|
|
return help_text
|
|
|
|
def replying():
|
|
|
|
reply = False
|
|
|
|
moving = ''
|
|
|
|
content = cleanhtml(text)
|
|
content = unescape(content)
|
|
|
|
try:
|
|
|
|
start = content.index("@")
|
|
end = content.index(" ")
|
|
if len(content) > end:
|
|
|
|
content = content[0: start:] + content[end +1::]
|
|
|
|
neteja = content.count('@')
|
|
|
|
i = 0
|
|
while i < neteja :
|
|
|
|
start = content.rfind("@")
|
|
end = len(content)
|
|
content = content[0: start:] + content[end +1::]
|
|
i += 1
|
|
|
|
question = content.lower()
|
|
|
|
query_word = question
|
|
query_word_length = len(query_word)
|
|
|
|
if unidecode.unidecode(question)[0:query_word_length] == query_word:
|
|
|
|
if query_word == search_new:
|
|
|
|
reply = True
|
|
|
|
elif query_word[:search_move_slicing] == search_move:
|
|
|
|
moving = query_word[moving_slicing:query_word_length].replace(" ","")
|
|
reply = True
|
|
|
|
elif query_word == search_end:
|
|
|
|
reply = True
|
|
|
|
elif query_word == search_games:
|
|
|
|
reply = True
|
|
|
|
elif query_word[:search_send_slicing] == search_send:
|
|
|
|
reply = True
|
|
|
|
elif query_word == search_help:
|
|
|
|
reply = True
|
|
|
|
elif query_word == search_draw:
|
|
|
|
reply = True
|
|
|
|
elif query_word == search_panel:
|
|
|
|
reply = True
|
|
|
|
else:
|
|
|
|
reply = False
|
|
|
|
return (reply, query_word, moving)
|
|
|
|
except ValueError as v_error:
|
|
|
|
print(v_error)
|
|
|
|
def load_strings(bot_lang):
|
|
|
|
search_end = get_parameter("search_end", language_filepath)
|
|
search_move = get_parameter("search_move", language_filepath)
|
|
search_new = get_parameter("search_new", language_filepath)
|
|
search_games = get_parameter("search_games", language_filepath)
|
|
search_send = get_parameter("search_send", language_filepath)
|
|
search_help = get_parameter("search_help", language_filepath)
|
|
new_game_started = get_parameter("new_game_started", language_filepath)
|
|
playing_with = get_parameter("playing_with", language_filepath)
|
|
your_turn = get_parameter("your_turn", language_filepath)
|
|
game_name = get_parameter("game_name", language_filepath)
|
|
chess_hashtag = get_parameter("chess_hashtag", language_filepath)
|
|
send_error = get_parameter("send_error", language_filepath)
|
|
|
|
return (search_end, search_move, search_new, search_games, search_send, search_help, new_game_started, playing_with, your_turn, game_name, chess_hashtag, send_error)
|
|
|
|
def load_strings1(bot_lang):
|
|
|
|
game_number_anotations = get_parameter("game_number_anotations", language_filepath)
|
|
anotations_sent = get_parameter("anotations_sent", language_filepath)
|
|
game_no_exists = get_parameter("game_no_exists", language_filepath)
|
|
it_not_exists = get_parameter("it_not_exists", language_filepath)
|
|
game_already_started = get_parameter("game_already_started", language_filepath)
|
|
cant_send_to_fediverse_account = get_parameter("cant_send_to_fediverse_account", language_filepath)
|
|
wait_other_player = get_parameter("wait_other_player", language_filepath)
|
|
is_not_legal_move = get_parameter("is_not_legal_move", language_filepath)
|
|
check_done = get_parameter("check_done", language_filepath)
|
|
check_mate = get_parameter("check_mate", language_filepath)
|
|
|
|
return (game_number_anotations, anotations_sent, game_no_exists, cant_send_to_fediverse_account, it_not_exists, game_already_started, wait_other_player, is_not_legal_move, check_done, check_mate)
|
|
|
|
def load_strings2(bot_lang):
|
|
|
|
check_mate_movements = get_parameter("check_mate_movements", language_filepath)
|
|
the_winner_is = get_parameter("the_winner_is", language_filepath)
|
|
well_done = get_parameter("well_done", language_filepath)
|
|
winned_games = get_parameter("winned_games", language_filepath)
|
|
wins_of_many = get_parameter("wins_of_many", language_filepath)
|
|
lost_piece = get_parameter("lost_piece", language_filepath)
|
|
not_legal_move_str = get_parameter("not_legal_move_str", language_filepath)
|
|
player_leave_game = get_parameter("player_leave_game", language_filepath)
|
|
|
|
return (check_mate_movements, the_winner_is, well_done, winned_games, wins_of_many, lost_piece, not_legal_move_str, player_leave_game)
|
|
|
|
def load_strings3(bot_lang):
|
|
|
|
leave_waiting_game = get_parameter("leave_waiting_game", language_filepath)
|
|
started_games = get_parameter("started_games", language_filepath)
|
|
game_is_waiting = get_parameter("game_is_waiting", language_filepath)
|
|
game_is_on_going = get_parameter("game_is_on_going", language_filepath)
|
|
no_on_going_games = get_parameter("no_on_going_games", language_filepath)
|
|
is_not_your_turn = get_parameter("is_not_your_turn", language_filepath)
|
|
is_the_turn_of = get_parameter("is_the_turn_of", language_filepath)
|
|
|
|
return (leave_waiting_game, started_games, game_is_waiting, game_is_on_going, no_on_going_games, is_not_your_turn, is_the_turn_of)
|
|
|
|
def load_strings4(bot_lang):
|
|
|
|
pawn_piece = get_parameter("pawn_piece", language_filepath)
|
|
knight_piece = get_parameter("knight_piece", language_filepath)
|
|
bishop_piece = get_parameter("bishop_piece", language_filepath)
|
|
rook_piece = get_parameter("rook_piece", language_filepath)
|
|
queen_piece = get_parameter("queen_piece", language_filepath)
|
|
king_piece = get_parameter("king_piece", language_filepath)
|
|
|
|
return (pawn_piece, knight_piece, bishop_piece, rook_piece, queen_piece, king_piece)
|
|
|
|
def load_strings5(bot_lang):
|
|
|
|
pawn_piece_letter = get_parameter("pawn_piece_letter", language_filepath)
|
|
knight_piece_letter = get_parameter("knight_piece_letter", language_filepath)
|
|
bishop_piece_letter = get_parameter("bishop_piece_letter", language_filepath)
|
|
rook_piece_letter = get_parameter("rook_piece_letter", language_filepath)
|
|
queen_piece_letter = get_parameter("queen_piece_letter", language_filepath)
|
|
king_piece_letter = get_parameter("king_piece_letter", language_filepath)
|
|
email_subject = get_parameter("email_subject", language_filepath)
|
|
|
|
return (pawn_piece_letter, knight_piece_letter, bishop_piece_letter, rook_piece_letter, queen_piece_letter, king_piece_letter, email_subject)
|
|
|
|
def load_strings6(bot_lang):
|
|
|
|
start_or_join_a_new_game = get_parameter("start_or_join_a_new_game", language_filepath)
|
|
move_a_piece = get_parameter("move_a_piece", language_filepath)
|
|
leave_a_game = get_parameter("leave_a_game", language_filepath)
|
|
list_games = get_parameter("list_games", language_filepath)
|
|
get_a_game_anotation = get_parameter("get_a_game_anotation", language_filepath)
|
|
show_help = get_parameter("show_help", language_filepath)
|
|
search_draw = get_parameter("search_draw", language_filepath)
|
|
ask_for_draw = get_parameter("ask_for_draw", language_filepath)
|
|
|
|
return (start_or_join_a_new_game, move_a_piece, leave_a_game, list_games, get_a_game_anotation, show_help, search_draw, ask_for_draw)
|
|
|
|
def load_strings7(bot_lang):
|
|
|
|
claim_draw_str = get_parameter("claim_draw_str", language_filepath)
|
|
draw_and_str = get_parameter("draw_and_str", language_filepath)
|
|
agreed_draw_str = get_parameter("agreed_draw_str", language_filepath)
|
|
claim_a_draw = get_parameter("claim_a_draw", language_filepath)
|
|
search_panel = get_parameter("search_panel", language_filepath)
|
|
panel_title_str = get_parameter("panel_title_str", language_filepath)
|
|
panel_games_str = get_parameter("panel_games_str", language_filepath)
|
|
panel_wins_str = get_parameter("panel_wins_str", language_filepath)
|
|
panel_ratio_str = get_parameter("panel_ratio_str", language_filepath)
|
|
|
|
return (claim_draw_str, draw_and_str, agreed_draw_str, claim_a_draw, search_panel, panel_title_str, panel_games_str, panel_wins_str, panel_ratio_str)
|
|
|
|
def mastodon():
|
|
|
|
# Load secrets from secrets file
|
|
secrets_filepath = "secrets/secrets.txt"
|
|
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
|
|
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
|
|
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
|
|
|
|
# Load configuration from config file
|
|
config_filepath = "config/config.txt"
|
|
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath)
|
|
bot_username = get_parameter("bot_username", config_filepath)
|
|
|
|
# Initialise Mastodon API
|
|
mastodon = Mastodon(
|
|
client_id = uc_client_id,
|
|
client_secret = uc_client_secret,
|
|
access_token = uc_access_token,
|
|
api_base_url = 'https://' + mastodon_hostname,
|
|
)
|
|
|
|
# Initialise access headers
|
|
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
|
|
|
|
return (mastodon, mastodon_hostname, bot_username)
|
|
|
|
def db_config():
|
|
|
|
# Load db configuration from config file
|
|
config_filepath = "config/db_config.txt"
|
|
mastodon_db = get_parameter("mastodon_db", config_filepath)
|
|
mastodon_db_user = get_parameter("mastodon_db_user", config_filepath)
|
|
chess_db = get_parameter("chess_db", config_filepath)
|
|
chess_db_user = get_parameter("chess_db_user", config_filepath)
|
|
|
|
return (mastodon_db, mastodon_db_user, chess_db, chess_db_user)
|
|
|
|
def smtp_config():
|
|
|
|
smtp_filepath = "config/smtp_config.txt"
|
|
smtp_host = get_parameter("smtp_host", smtp_filepath)
|
|
smtp_user_login = get_parameter("smtp_user_login", smtp_filepath)
|
|
smtp_user_password = get_parameter("smtp_user_password", smtp_filepath)
|
|
|
|
return (smtp_host, smtp_user_login, smtp_user_password)
|
|
|
|
def get_parameter( parameter, file_path ):
|
|
|
|
if not os.path.isfile(file_path):
|
|
print("File %s not found, exiting."%file_path)
|
|
sys.exit(0)
|
|
|
|
with open( file_path ) as f:
|
|
for line in f:
|
|
if line.startswith( parameter ):
|
|
return line.replace(parameter + ":", "").strip()
|
|
|
|
print(file_path + " Missing parameter %s "%parameter)
|
|
sys.exit(0)
|
|
|
|
def usage():
|
|
|
|
print('usage: python ' + sys.argv[0] + ' --play' + ' --en')
|
|
|
|
###############################################################################
|
|
# main
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# usage modes
|
|
|
|
if len(sys.argv) == 1:
|
|
|
|
usage()
|
|
|
|
elif len(sys.argv) >= 2:
|
|
|
|
if sys.argv[1] == '--play':
|
|
|
|
if len(sys.argv) == 3:
|
|
|
|
if sys.argv[2] == '--en':
|
|
|
|
bot_lang = 'en'
|
|
|
|
elif sys.argv[2] == '--es':
|
|
|
|
bot_lang = 'es'
|
|
else:
|
|
|
|
bot_lang = 'ca'
|
|
|
|
if bot_lang == 'ca':
|
|
|
|
language_filepath = 'app/locales/ca.txt'
|
|
|
|
elif bot_lang == 'en':
|
|
|
|
language_filepath = 'app/locales/en.txt'
|
|
|
|
elif bot_lang == 'es':
|
|
|
|
language_filepath = 'app/locales/es.txt'
|
|
|
|
else:
|
|
|
|
print("\nOnly 'ca', 'es' and 'en' languages are supported.\n")
|
|
sys.exit(0)
|
|
|
|
if bot_lang == 'ca':
|
|
|
|
search_move_slicing = 3
|
|
moving_slicing = 3
|
|
search_send_slicing = 5
|
|
send_game_slicing = 6
|
|
|
|
elif bot_lang == 'en':
|
|
|
|
search_move_slicing = 4
|
|
moving_slicing = 4
|
|
search_send_slicing = 4
|
|
send_game_slicing = 5
|
|
|
|
elif bot_lang == 'es':
|
|
|
|
search_move_slicing = 5
|
|
moving_slicing = 5
|
|
search_send_slicing = 5
|
|
send_game_slicing = 6
|
|
|
|
search_end, search_move, search_new, search_games, search_send, search_help, new_game_started, playing_with, your_turn, game_name, chess_hashtag, send_error = load_strings(bot_lang)
|
|
|
|
game_number_anotations, anotations_sent, game_no_exists, cant_send_to_fediverse_account, it_not_exists, game_already_started, wait_other_player, is_not_legal_move, check_done, check_mate = load_strings1(bot_lang)
|
|
|
|
check_mate_movements, the_winner_is, well_done, winned_games, wins_of_many, lost_piece, not_legal_move_str, player_leave_game = load_strings2(bot_lang)
|
|
|
|
leave_waiting_game, started_games, game_is_waiting, game_is_on_going, no_on_going_games, is_not_your_turn, is_the_turn_of = load_strings3(bot_lang)
|
|
|
|
pawn_piece, knight_piece, bishop_piece, rook_piece, queen_piece, king_piece = load_strings4(bot_lang)
|
|
|
|
pawn_piece_letter, knight_piece_letter, bishop_piece_letter, rook_piece_letter, queen_piece_letter, king_piece_letter, email_subject = load_strings5(bot_lang)
|
|
|
|
start_or_join_a_new_game, move_a_piece, leave_a_game, list_games, get_a_game_anotation, show_help, search_draw, ask_for_draw = load_strings6(bot_lang)
|
|
|
|
claim_draw_str, draw_and_str, agreed_draw_str, claim_a_draw, search_panel, panel_title_str, panel_games_str, panel_wins_str, panel_ratio_str = load_strings7(bot_lang)
|
|
|
|
mastodon, mastodon_hostname, bot_username = mastodon()
|
|
|
|
mastodon_db, mastodon_db_user, chess_db, chess_db_user = db_config()
|
|
|
|
smtp_host, smtp_user_login, smtp_user_password = smtp_config()
|
|
|
|
now = datetime.now()
|
|
|
|
bot_id = get_bot_id()
|
|
|
|
account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst = get_notification_data()
|
|
|
|
i = 0
|
|
while i < len(account_id_lst):
|
|
|
|
account_id = account_id_lst[i]
|
|
|
|
username, domain = get_user_domain(account_id)
|
|
|
|
if domain != None:
|
|
|
|
username = username + '@' + domain
|
|
|
|
status_id = status_id_lst[i]
|
|
|
|
replied = check_replies(status_id)
|
|
|
|
if replied == True:
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
# listen them or not
|
|
|
|
text = text_lst[i]
|
|
|
|
reply, query_word, moving = replying()
|
|
|
|
visibility = visibility_lst[i]
|
|
|
|
status_url = url_lst[i]
|
|
|
|
if query_word != search_games:
|
|
|
|
is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user = check_games()
|
|
|
|
if game_id == '':
|
|
|
|
game_id, game_waiting = waiting_games()
|
|
|
|
else:
|
|
|
|
is_playing = True
|
|
|
|
if reply == True and is_playing == False:
|
|
|
|
if query_word == search_new and not game_waiting:
|
|
|
|
board = chess.Board()
|
|
|
|
svgfile = chess.svg.board(board=board)
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
toot_text = '@'+username + ' ' + new_game_started + '\n'
|
|
|
|
toot_text += '\n'
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
toot_url = toot_id.uri
|
|
|
|
new_game(toot_url)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word == search_new and game_waiting:
|
|
|
|
game_status, white_user, chess_game = join_player()
|
|
|
|
playing_user = white_user
|
|
|
|
next_move(username)
|
|
|
|
board = chess.Board(chess_game)
|
|
|
|
svgfile = chess.svg.board(board=board)
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
toot_text = '@'+username + ' ' + playing_with + ' ' + white_user + "\n"
|
|
|
|
toot_text += '\n'
|
|
|
|
toot_text += '@'+white_user + ': ' + your_turn + "\n"
|
|
|
|
toot_text += '\n'
|
|
|
|
toot_text += game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n'
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
game_moves = board.ply()
|
|
|
|
update_moves(username, game_moves)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word[:search_send_slicing] == search_send:
|
|
|
|
query_word_length = len(query_word)
|
|
|
|
send_game = query_word[send_game_slicing:query_word_length].replace(' ', '')
|
|
|
|
emailed, game_id, game_found = send_anotation(send_game)
|
|
|
|
if emailed == False and game_found == True:
|
|
|
|
toot_text = '@'+username + ' ' + send_error
|
|
|
|
elif emailed == True and game_found == True:
|
|
|
|
toot_text = '@'+username + ' ' + game_number_anotations + str(game_id) + ' ' + anotations_sent
|
|
|
|
elif emailed == False and game_found == False:
|
|
|
|
if domain != None:
|
|
|
|
toot_text = '@'+username + ' ' + cant_send_to_fediverse_account
|
|
|
|
else:
|
|
|
|
toot_text = '@'+username + ' ' + game_no_exists + str(game_id) + ' ' + it_not_exists
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word == search_panel:
|
|
|
|
played_games, wins = get_stats(username)
|
|
|
|
create_panel(username, played_games, wins)
|
|
|
|
toot_text = '@'+username
|
|
|
|
saved_panel = 'app/panel/' + username + '_panel.png'
|
|
|
|
image_id = mastodon.media_post(saved_panel, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word == search_help:
|
|
|
|
help_text = toot_help()
|
|
|
|
help_text = (help_text[:490] + '... ') if len(help_text) > 490 else help_text
|
|
|
|
mastodon.status_post(help_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
else:
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif reply and is_playing:
|
|
|
|
if query_word == search_new:
|
|
|
|
toot_text = '@'+username + ' ' + game_already_started + '\n'
|
|
|
|
if black_user != '':
|
|
|
|
toot_text += '@'+white_user + ' / ' + '@'+black_user + '\n'
|
|
|
|
else:
|
|
|
|
toot_text += wait_other_player + '\n'
|
|
|
|
toot_text += '\n'
|
|
|
|
toot_text += game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n'
|
|
|
|
board = chess.Board(on_going_game)
|
|
|
|
svgfile = chess.svg.board(board=board)
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word[:search_move_slicing] == search_move and playing_user == username:
|
|
|
|
board = chess.Board(on_going_game)
|
|
|
|
promoted = False
|
|
|
|
stalemate = False
|
|
|
|
checkmate = False
|
|
|
|
try:
|
|
|
|
piece_square_index = chess.SQUARE_NAMES.index(moving[:2])
|
|
|
|
moving_piece = board.piece_type_at(piece_square_index)
|
|
|
|
if moving_piece == 1:
|
|
|
|
square_index = chess.SQUARE_NAMES.index(moving[2:4])
|
|
|
|
if bool(board.turn == chess.WHITE) == True:
|
|
|
|
square_rank_trigger = 7
|
|
|
|
elif bool(board.turn == chess.BLACK) == True:
|
|
|
|
square_rank_trigger = 0
|
|
|
|
if chess.square_rank(square_index) == square_rank_trigger:
|
|
|
|
promoted = True
|
|
|
|
if len(moving) == 4:
|
|
|
|
moving = moving + 'q'
|
|
|
|
not_legal_move = chess.Move.from_uci(moving) not in board.legal_moves
|
|
|
|
else:
|
|
|
|
not_legal_move = chess.Move.from_uci(moving) not in board.legal_moves
|
|
|
|
if not_legal_move:
|
|
|
|
toot_text = '@'+username + ': ' + moving + ' ' + is_not_legal_move + '\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
else:
|
|
|
|
san_move = board.san(chess.Move.from_uci(moving))
|
|
|
|
check = False
|
|
|
|
playing_user = next_move(username)
|
|
|
|
if bool(board.is_capture(chess.Move.from_uci(moving))):
|
|
|
|
capture = True
|
|
|
|
square_capture_index = chess.SQUARE_NAMES.index(moving[2:4])
|
|
|
|
captured_piece = board.piece_type_at(square_capture_index)
|
|
|
|
piece_name = get_piece_name(captured_piece)
|
|
|
|
else:
|
|
|
|
capture = False
|
|
|
|
board.push(chess.Move.from_uci(moving))
|
|
|
|
if bool(board.is_check()):
|
|
|
|
if username == white_user:
|
|
|
|
king_square = board.king(chess.BLACK)
|
|
check = True
|
|
|
|
else:
|
|
|
|
king_square = board.king(chess.WHITE)
|
|
check = True
|
|
|
|
if board.is_stalemate() == True:
|
|
|
|
stalemate = True
|
|
|
|
if board.is_game_over() == True:
|
|
|
|
game_moves = board.ply()
|
|
|
|
if stalemate == False:
|
|
|
|
checkmate = True
|
|
|
|
close_game(username, checkmate)
|
|
|
|
else:
|
|
|
|
checkmate = False
|
|
|
|
if check == True and checkmate == False:
|
|
|
|
toot_text = "@"+playing_user + " " + username + ' ' + check_done + '\n'
|
|
|
|
elif check == True and checkmate == True:
|
|
|
|
toot_text = '\n' + check_mate + ' ' + str(game_moves) + ' ' + check_mate_movements + '\n\n' + the_winner_is + ' ' + "@"+username + '\n'
|
|
|
|
toot_text += "\n@"+playing_user + ': ' + well_done + "\n"
|
|
|
|
toot_text += '\n' + winned_games + "\n"
|
|
|
|
played_games, wins = get_stats(username)
|
|
|
|
toot_text += username + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
|
|
|
|
played_games, wins = get_stats(playing_user)
|
|
|
|
toot_text += playing_user + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
|
|
|
|
elif check == False and stalemate == True:
|
|
|
|
toot_text = stalemate_str + ' (' + str(game_moves) + ')' + '\n'
|
|
|
|
toot_text += '\n@'+playing_user + ', ' + '@'+username + "\n"
|
|
|
|
toot_text += '\n' + winned_games + "\n"
|
|
|
|
played_games, wins = get_stats(username)
|
|
|
|
toot_text += username + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
|
|
|
|
played_games, wins = get_stats(playing_user)
|
|
|
|
toot_text += playing_user + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
|
|
|
|
else:
|
|
|
|
toot_text = '@'+playing_user + ' ' + your_turn + '\n'
|
|
|
|
if capture == True and checkmate == False:
|
|
|
|
toot_text += '\n' + lost_piece + ' ' + piece_name + '!\n'
|
|
|
|
toot_text += '\n' + game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n'
|
|
|
|
if username == white_user:
|
|
|
|
if check == True:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving), check=board.king(chess.BLACK))
|
|
|
|
else:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving))
|
|
|
|
else:
|
|
|
|
if check == True:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving), check=board.king(chess.WHITE))
|
|
|
|
else:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving))
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
toot_url = toot_id.uri
|
|
|
|
board_game = board.fen()
|
|
|
|
update_game(board_game, toot_url)
|
|
|
|
game_moves = board.ply()
|
|
|
|
save_anotation(moving, san_move)
|
|
|
|
update_moves(username, game_moves)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
except ValueError as v_error:
|
|
|
|
print(v_error)
|
|
|
|
toot_text = '@'+username + ' ' + not_legal_move_str + ' ' + moving + '!?)\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
pass
|
|
|
|
except AssertionError as a_error:
|
|
|
|
print(a_error)
|
|
|
|
toot_text = '@'+username + ' ' + not_legal_move_str + ' ' + moving + '!?)\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
pass
|
|
|
|
elif query_word == search_end:
|
|
|
|
stalemate = False
|
|
|
|
checkmate = False
|
|
|
|
if black_user != '':
|
|
|
|
if username == white_user:
|
|
|
|
toot_text = '@'+username + ' ' + player_leave_game + ' ' + '@'+black_user
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
close_game(username, checkmate)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
toot_text = '@'+username + ' ' + player_leave_game + ' ' + white_user
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
close_game(username, checkmate)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
toot_text = '@'+username + ' ' + leave_waiting_game
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
|
|
|
|
close_game(username, checkmate)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
i += 1
|
|
|
|
continue
|
|
|
|
elif query_word == search_games:
|
|
|
|
player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst = current_games()
|
|
|
|
if len(player1_name_lst) > 0:
|
|
|
|
toot_text = "@"+username + ' ' + started_games + "\n"
|
|
|
|
i = 0
|
|
while i < len(player1_name_lst):
|
|
|
|
if game_status_lst[i] == 'waiting':
|
|
|
|
toot_text += '\n' + player1_name_lst[i] + ' / ' + player2_name_lst[i] + ' ' + game_is_waiting + "\n"
|
|
|
|
else:
|
|
|
|
if next_move_lst[i] == player1_name_lst[i]:
|
|
|
|
toot_text += '\n*' + player1_name_lst[i] + ' / ' + player2_name_lst[i] + ' ' + game_is_on_going + '\n'
|
|
|
|
else:
|
|
|
|
toot_text += '\n' + player1_name_lst[i] + ' / *' + player2_name_lst[i] + ' ' + game_is_on_going + '\n'
|
|
|
|
if game_link_lst[i] != None:
|
|
|
|
toot_text += str(game_link_lst[i]) + "\n"
|
|
|
|
i += 1
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
else:
|
|
|
|
toot_text = '@'+username + ' ' + no_on_going_games + '\n'
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word[:search_send_slicing] == search_send:
|
|
|
|
query_word_length = len(query_word)
|
|
|
|
send_game = query_word[search_send_slicing:query_word_length].replace(' ', '')
|
|
|
|
emailed, game_id, game_found = send_anotation(send_game)
|
|
|
|
if emailed == False and game_found == True:
|
|
|
|
toot_text = '@'+username + ' ' + send_error
|
|
|
|
elif emailed == True and game_found == True:
|
|
|
|
toot_text = '@'+username + ' ' + game_number_anotations + str(game_id) + ' ' + anotations_sent
|
|
|
|
elif emailed == False and game_found == False:
|
|
|
|
toot_text = '@'+username + ' ' + game_no_exists + str(game_id) + ' ' + it_not_exists
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word == search_draw:
|
|
|
|
white_player, black_player, toot_text, stalemate = claim_draw(username)
|
|
|
|
if stalemate == True:
|
|
|
|
checkmate = False
|
|
|
|
close_game(username, checkmate)
|
|
|
|
toot_text = '@'+white_player + ' ' + draw_and_str + ' ' + '@'+black_player + ' ' + agreed_draw_str + '\n\n'
|
|
|
|
toot_text += '\n' + winned_games + "\n"
|
|
|
|
played_games, wins = get_stats(white_player)
|
|
|
|
toot_text += white_player + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
|
|
|
|
played_games, wins = get_stats(black_player)
|
|
|
|
toot_text += black_player + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
else:
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word == search_panel:
|
|
|
|
played_games, wins = get_stats(username)
|
|
|
|
create_panel(username, played_games, wins)
|
|
|
|
toot_text = '@'+username
|
|
|
|
saved_panel = 'app/panel/' + username + '_panel.png'
|
|
|
|
image_id = mastodon.media_post(saved_panel, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility, media_ids={image_id})
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
elif query_word == search_help:
|
|
|
|
help_text = toot_help()
|
|
|
|
mastodon.status_post(help_text, in_reply_to_id=status_id,visibility=visibility)
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
else:
|
|
|
|
if playing_user == None:
|
|
|
|
toot_text = '@'+username + ' ' + is_not_your_turn + '\n'
|
|
|
|
else:
|
|
|
|
toot_text = '@'+username + ' ' + is_the_turn_of + ' ' + playing_user + "\n"
|
|
|
|
toot_text += '\n'
|
|
|
|
toot_text += game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n'
|
|
|
|
board = chess.Board(on_going_game)
|
|
|
|
if username == white_user:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.BLACK)
|
|
|
|
else:
|
|
|
|
svgfile = chess.svg.board(board=board, orientation=chess.WHITE)
|
|
|
|
board_file = 'app/games/' + str(game_id) + '_board.png'
|
|
|
|
svg2png(bytestring=svgfile,write_to=board_file)
|
|
|
|
image_id = mastodon.media_post(board_file, "image/png").id
|
|
|
|
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
|
|
|
|
update_replies(status_id, username, now)
|
|
|
|
|
|
i += 1
|
|
|
|
else:
|
|
|
|
usage()
|