mastochess/mastochess.py

1855 líneas
52 KiB
Python

import pdb
import sys
import os
import os.path
import re
import unidecode
from datetime import datetime, timedelta
from mastodon import Mastodon
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import smtplib
from smtplib import SMTPException, SMTPAuthenticationError, SMTPConnectError, SMTPRecipientsRefused
import socket
from socket import gaierror
import psycopg2
import chess
import chess.svg
from cairosvg import svg2png
import chess.pgn
def cleanhtml(raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
def unescape(s):
s = s.replace("&apos;", "'")
return s
def get_bot_id():
###################################################################################################################################
# get bot_id from bot's username
try:
conn = None
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select id from accounts where username = (%s) and domain is null", (bot_username,))
row = cur.fetchone()
if row != None:
bot_id = row[0]
cur.close()
return bot_id
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_user_domain(account_id):
try:
conn = None
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select username, domain from accounts where id=(%s)", (account_id,))
row = cur.fetchone()
if row != None:
username = row[0]
domain = row[1]
cur.close()
return (username, domain)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_piece_name(captured_piece):
if captured_piece == 1:
piece_name = pawn_piece
if captured_piece == 2:
piece_name = knight_piece
if captured_piece == 3:
piece_name = bishop_piece
if captured_piece == 4:
piece_name = rook_piece
if captured_piece == 5:
piece_name = queen_piece
if captured_piece == 6:
piece_name = king_piece
return piece_name
def get_notification_data():
try:
account_id_lst = []
status_id_lst = []
text_lst = []
visibility_lst = []
url_lst = []
search_text = [search_end, search_move, search_new, search_games, search_send, search_help]
conn = None
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
i=0
while i < len(search_text):
like_text = "%"+search_text[i]+"%"
select_query = "select account_id, id, text, visibility, url, created_at from statuses where text like (%s) and created_at + interval '60 minutes' > now() - interval '5 minutes'"
select_query += " and id=any (select status_id from mentions where account_id=(%s)) order by created_at asc"
cur.execute(select_query, (like_text, str(bot_id)))
rows = cur.fetchall()
for row in rows:
account_id_lst.append(row[0])
status_id_lst.append(row[1])
text_lst.append(row[2])
if row[3] == 0:
visibility_lst.append('public')
elif row[3] == 1:
visibility_lst.append('unlisted')
elif row[3] == 2:
visibility_lst.append('private')
elif row[3] == 3:
visibility_lst.append('direct')
url_lst.append(row[4])
i += 1
cur.close()
return (account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def update_replies(status_id, username, now):
post_id = status_id
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
insert_sql = "insert into botreplies(status_id, query_user, status_created_at) values(%s, %s, %s) ON CONFLICT DO NOTHING"
cur.execute(insert_sql, (post_id, username, now))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def check_replies(status_id):
post_id = status_id
replied = False
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select status_id from botreplies where status_id=(%s)", (post_id,))
row = cur.fetchone()
if row != None:
replied = True
else:
replied = False
cur.close()
return replied
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def current_games():
player1_name_lst = []
player2_name_lst = []
game_status_lst = []
game_link_lst = []
next_move_lst = []
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select white_user, black_user, chess_status, chess_link, next_move from games where not finished")
rows = cur.fetchall()
for row in rows:
player1_name_lst.append(row[0])
if row[1] != None:
player2_name_lst.append(row[1])
else:
player2_name_lst.append(" ")
game_status_lst.append(row[2])
game_link_lst.append(row[3])
next_move_lst.append(row[4])
cur.close()
return (player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst)
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def check_games():
game_id = ''
white_user = ''
black_user = ''
on_going_game = ''
waiting = False
playing_user = ''
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
### check if there is an ongoing game
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and white_user=(%s) ", (username,))
row = cur.fetchone()
if row == None:
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and black_user=(%s)", (username,))
row = cur.fetchone()
if row == None:
is_playing = False
else:
is_playing = True
game_id = row[0]
white_user = row[1]
if row[1] != None:
black_user = row[2]
else:
black_user = ''
on_going_game = row[3]
waiting = row[4]
playing_user = row[5]
else:
is_playing = True
game_id = row[0]
white_user = row[1]
if row[2] != None:
black_user = row[2]
else:
black_user = ''
on_going_game = row[3]
waiting = row[4]
playing_user = row[5]
cur.close()
return (is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user)
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def new_game(toot_url):
try:
game_status = 'waiting'
waiting = True
board_game = board.fen()
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
insert_query = 'INSERT INTO games(created_at, white_user, chess_game, chess_status, waiting, updated_at, chess_link) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING'
cur.execute(insert_query, (now, username, board_game, game_status, waiting, now, toot_url))
insert_query = 'INSERT INTO stats(created_at, white_user) VALUES (%s,%s) ON CONFLICT DO NOTHING'
cur.execute(insert_query, (now, username,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def update_game(board_game, toot_url):
try:
now = datetime.now()
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("update games set chess_game=(%s), chess_link=(%s), updated_at=(%s) where game_id=(%s)", (board_game, toot_url, now, game_id,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def write_result(filename, old_string, new_string):
with open(filename) as f:
s = f.read()
if old_string not in s:
print('"{old_string}" not found in {filename}.'.format(**locals()))
return
with open(filename, 'w') as f:
print('Changing "{old_string}" to "{new_string}" in {filename}'.format(**locals()))
s = s.replace(old_string, new_string)
f.write(s)
def save_anotation(moving, san_move):
pgn_file = "app/anotations/" + str(game_id) + ".pgn"
if bool(board.turn == chess.BLACK) == True:
line_data = str(board.fullmove_number) + ". " + san_move
else:
line_data = " " + san_move + " "
if checkmate or stalemate:
line_data = line_data + " " + board.result()
write_result(pgn_file, '[Result ]', '[Result ' + board.result() + ' ]')
if not os.path.isfile(pgn_file):
file_header = '[Event ' + game_name + ': ' + str(game_id) + ']\n'
file_header += '[Site ' + mastodon_hostname + ']' + '\n'
file_header += '[Date ' + str(datetime.today().strftime('%d-%m-%Y')) + ']' + '\n'
file_header += '[Round ' + str(game_id) + ']' + '\n'
file_header += '[White ' + white_user + ' ]' + '\n'
file_header += '[Black ' + black_user + ' ]' + '\n'
file_header += '[Result ]' + '\n'
file_header += '[Time ' + str(datetime.now().strftime('%H:%M:%S')) + ']' + '\n\n'
with open(pgn_file, 'w+') as f:
f.write(file_header)
with open(pgn_file, 'a') as f:
f.write(line_data)
else:
with open(pgn_file, 'a') as f:
f.write(line_data)
def get_email_address(username):
try:
conn = None
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select email from users where account_id = (select id from accounts where username = (%s) and domain is null)", (username,))
row = cur.fetchone()
if row != None:
username_email = row[0]
else:
username_email = None
cur.close()
return username_email
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def send_anotation(game_id):
emailed = False
game_found = False
username_email = get_email_address(username)
if username_email == None:
return (emailed, game_id, game_found)
# Create message object instance
msg = MIMEMultipart()
# Declare message elements
msg['From'] = smtp_user_login
msg['To'] = username_email
msg['Subject'] = email_subject + game_id
# Attach the game anotation
file_to_attach = "app/anotations/" + game_id + ".pgn"
try:
attachment = open(file_to_attach, 'rb')
game_found = True
except FileNotFoundError as not_found_error:
print(not_found_error)
return (emailed, game_id, game_found)
obj = MIMEBase('application', 'octet-stream')
obj.set_payload(attachment.read())
encoders.encode_base64(obj)
obj.add_header('Content-Disposition', "attachment; filename= "+file_to_attach)
# Add the message body to the object instance
msg.attach(obj)
try:
# Create the server connection
server = smtplib.SMTP(smtp_host)
# Switch the connection over to TLS encryption
server.starttls()
# Authenticate with the server
server.login(smtp_user_login, smtp_user_password)
# Send the message
server.sendmail(msg['From'], msg['To'], msg.as_string())
# Disconnect
server.quit()
print("Successfully sent game anotations to %s" % msg['To'])
emailed = True
return (emailed, game_id, game_found)
except SMTPAuthenticationError as auth_error:
print(auth_error)
pass
return (emailed, game_id, game_found)
except socket.gaierror as socket_error:
print(socket_error)
pass
return (emailed, game_id, game_found)
except SMTPRecipientsRefused as recip_error:
print(recip_error)
pass
return (emailed, game_id, game_found)
def close_game(username, checkmate):
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select white_user, black_user from games where game_id=(%s)", (game_id,))
row = cur.fetchone()
if row != None:
white_player = row[0]
black_player = row[1]
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
now = datetime.now()
winner = ''
waiting = False
finished = True
if stalemate == True:
winner = "stalemate"
if black_user == '':
winner = 'none'
if checkmate:
winner = username
else:
if query_word == search_end and username == white_user and stalemate == False:
winner = black_user
elif query_word == search_end and username == black_user and stalemate == False:
winner = white_user
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("update games set waiting=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (waiting, finished, now, game_id))
cur.execute("update stats set winner=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (winner, finished, now, game_id))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def get_stats(player):
played_games = 0
wins = 0
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select count(*) from stats where white_user = (%s) or black_user = (%s) and finished", (player, player))
row = cur.fetchone()
if row != None:
played_games = row[0]
cur.execute("select count(*) from stats where winner = (%s) and finished", (player,))
row = cur.fetchone()
if row != None:
wins = row[0]
cur.close()
return (played_games, wins)
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def waiting_games():
try:
game_id = ''
game_waiting = False
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select game_id from games where waiting order by game_id desc limit 1")
row = cur.fetchone()
if row != None:
game_id = row[0]
game_waiting = True
cur.close()
return (game_id, game_waiting)
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def join_player():
try:
now = datetime.now()
game_status = 'waiting'
waiting = True
moves = 0
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("update games set black_user=(%s), chess_status='playing', waiting='f', updated_at=(%s), moves=(%s) where game_id=(%s)", (username, now, moves, game_id,))
cur.execute("update stats set black_user=(%s), updated_at=(%s) where game_id=(%s)", (username, now, game_id,))
conn.commit()
cur.execute("select white_user, chess_game from games where game_id=(%s)", (game_id,))
row = cur.fetchone()
if row != None:
white_user = row[0]
chess_game = row[1]
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (white_user, now, game_id,))
conn.commit()
cur.close()
game_status = 'playing'
return (game_status, white_user, chess_game)
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def update_moves(username, game_moves):
try:
now = datetime.now()
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("update games set next_move=(%s), last_move=(%s), moves=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, username, game_moves, now, game_id,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def next_move(playing_user):
try:
now = datetime.now()
waiting = True
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select white_user, black_user, last_move, moves from games where game_id=(%s)", (game_id,))
row = cur.fetchone()
if row != None:
white_user = row[0]
black_user = row[1]
last_move = row[2]
moves = row[3]
if last_move != None:
if playing_user == white_user:
playing_user = black_user
elif playing_user == black_user:
playing_user = white_user
else:
last_move = white_user
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, now, game_id,))
conn.commit()
cur.close()
return playing_user
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def toot_help():
help_text = '@'+username + ' ' + search_help + ':\n'
help_text += '\n'
help_text += '@'+bot_username + ' ' + start_or_join_a_new_game + '\n'
help_text += '\n'
help_text += '@'+bot_username + ' ' + move_a_piece + '\n'
help_text += '\n'
help_text += '@'+bot_username + ' ' + leave_a_game + '\n'
help_text += '\n'
help_text += '@'+bot_username + ' ' + list_games + '\n'
help_text += '\n'
help_text += '@'+bot_username + ' ' + get_a_game_anotation + '\n'
help_text += '\n'
help_text += '@'+bot_username + ' ' + show_help + '\n'
return help_text
def replying():
reply = False
moving = ''
content = cleanhtml(text)
content = unescape(content)
try:
start = content.index("@")
end = content.index(" ")
if len(content) > end:
content = content[0: start:] + content[end +1::]
neteja = content.count('@')
i = 0
while i < neteja :
start = content.rfind("@")
end = len(content)
content = content[0: start:] + content[end +1::]
i += 1
question = content.lower()
query_word = question
query_word_length = len(query_word)
if unidecode.unidecode(question)[0:query_word_length] == query_word:
if query_word == search_new:
reply = True
elif query_word[:search_move_slicing] == search_move:
moving = query_word[moving_slicing:query_word_length].replace(" ","")
reply = True
elif query_word == search_end:
reply = True
elif query_word == search_games:
reply = True
elif query_word[:search_send_slicing] == search_send:
reply = True
elif query_word == search_help:
reply = True
else:
reply = False
return (reply, query_word, moving)
except ValueError as v_error:
print(v_error)
def load_strings(bot_lang):
search_end = get_parameter("search_end", language_filepath)
search_move = get_parameter("search_move", language_filepath)
search_new = get_parameter("search_new", language_filepath)
search_games = get_parameter("search_games", language_filepath)
search_send = get_parameter("search_send", language_filepath)
search_help = get_parameter("search_help", language_filepath)
new_game_started = get_parameter("new_game_started", language_filepath)
playing_with = get_parameter("playing_with", language_filepath)
your_turn = get_parameter("your_turn", language_filepath)
game_name = get_parameter("game_name", language_filepath)
chess_hashtag = get_parameter("chess_hashtag", language_filepath)
send_error = get_parameter("send_error", language_filepath)
return (search_end, search_move, search_new, search_games, search_send, search_help, new_game_started, playing_with, your_turn, game_name, chess_hashtag, send_error)
def load_strings1(bot_lang):
game_number_anotations = get_parameter("game_number_anotations", language_filepath)
anotations_sent = get_parameter("anotations_sent", language_filepath)
game_no_exists = get_parameter("game_no_exists", language_filepath)
it_not_exists = get_parameter("it_not_exists", language_filepath)
game_already_started = get_parameter("game_already_started", language_filepath)
cant_send_to_fediverse_account = get_parameter("cant_send_to_fediverse_account", language_filepath)
wait_other_player = get_parameter("wait_other_player", language_filepath)
is_not_legal_move = get_parameter("is_not_legal_move", language_filepath)
check_done = get_parameter("check_done", language_filepath)
check_mate = get_parameter("check_mate", language_filepath)
return (game_number_anotations, anotations_sent, game_no_exists, cant_send_to_fediverse_account, it_not_exists, game_already_started, wait_other_player, is_not_legal_move, check_done, check_mate)
def load_strings2(bot_lang):
check_mate_movements = get_parameter("check_mate_movements", language_filepath)
the_winner_is = get_parameter("the_winner_is", language_filepath)
well_done = get_parameter("well_done", language_filepath)
winned_games = get_parameter("winned_games", language_filepath)
wins_of_many = get_parameter("wins_of_many", language_filepath)
lost_piece = get_parameter("lost_piece", language_filepath)
not_legal_move_str = get_parameter("not_legal_move_str", language_filepath)
player_leave_game = get_parameter("player_leave_game", language_filepath)
return (check_mate_movements, the_winner_is, well_done, winned_games, wins_of_many, lost_piece, not_legal_move_str, player_leave_game)
def load_strings3(bot_lang):
leave_waiting_game = get_parameter("leave_waiting_game", language_filepath)
started_games = get_parameter("started_games", language_filepath)
game_is_waiting = get_parameter("game_is_waiting", language_filepath)
game_is_on_going = get_parameter("game_is_on_going", language_filepath)
no_on_going_games = get_parameter("no_on_going_games", language_filepath)
is_not_your_turn = get_parameter("is_not_your_turn", language_filepath)
is_the_turn_of = get_parameter("is_the_turn_of", language_filepath)
return (leave_waiting_game, started_games, game_is_waiting, game_is_on_going, no_on_going_games, is_not_your_turn, is_the_turn_of)
def load_strings4(bot_lang):
pawn_piece = get_parameter("pawn_piece", language_filepath)
knight_piece = get_parameter("knight_piece", language_filepath)
bishop_piece = get_parameter("bishop_piece", language_filepath)
rook_piece = get_parameter("rook_piece", language_filepath)
queen_piece = get_parameter("queen_piece", language_filepath)
king_piece = get_parameter("king_piece", language_filepath)
return (pawn_piece, knight_piece, bishop_piece, rook_piece, queen_piece, king_piece)
def load_strings5(bot_lang):
pawn_piece_letter = get_parameter("pawn_piece_letter", language_filepath)
knight_piece_letter = get_parameter("knight_piece_letter", language_filepath)
bishop_piece_letter = get_parameter("bishop_piece_letter", language_filepath)
rook_piece_letter = get_parameter("rook_piece_letter", language_filepath)
queen_piece_letter = get_parameter("queen_piece_letter", language_filepath)
king_piece_letter = get_parameter("king_piece_letter", language_filepath)
email_subject = get_parameter("email_subject", language_filepath)
return (pawn_piece_letter, knight_piece_letter, bishop_piece_letter, rook_piece_letter, queen_piece_letter, king_piece_letter, email_subject)
def load_strings6(bot_lang):
start_or_join_a_new_game = get_parameter("start_or_join_a_new_game", language_filepath)
move_a_piece = get_parameter("move_a_piece", language_filepath)
leave_a_game = get_parameter("leave_a_game", language_filepath)
list_games = get_parameter("list_games", language_filepath)
get_a_game_anotation = get_parameter("get_a_game_anotation", language_filepath)
show_help = get_parameter("show_help", language_filepath)
return (start_or_join_a_new_game, move_a_piece, leave_a_game, list_games, get_a_game_anotation, show_help)
def mastodon():
# Load secrets from secrets file
secrets_filepath = "secrets/secrets.txt"
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
# Load configuration from config file
config_filepath = "config/config.txt"
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath)
bot_username = get_parameter("bot_username", config_filepath)
# Initialise Mastodon API
mastodon = Mastodon(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + mastodon_hostname,
)
# Initialise access headers
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (mastodon, mastodon_hostname, bot_username)
def db_config():
# Load db configuration from config file
config_filepath = "config/db_config.txt"
mastodon_db = get_parameter("mastodon_db", config_filepath)
mastodon_db_user = get_parameter("mastodon_db_user", config_filepath)
chess_db = get_parameter("chess_db", config_filepath)
chess_db_user = get_parameter("chess_db_user", config_filepath)
return (mastodon_db, mastodon_db_user, chess_db, chess_db_user)
def smtp_config():
smtp_filepath = "config/smtp_config.txt"
smtp_host = get_parameter("smtp_host", smtp_filepath)
smtp_user_login = get_parameter("smtp_user_login", smtp_filepath)
smtp_user_password = get_parameter("smtp_user_password", smtp_filepath)
return (smtp_host, smtp_user_login, smtp_user_password)
def get_parameter( parameter, file_path ):
if not os.path.isfile(file_path):
print("File %s not found, exiting."%file_path)
sys.exit(0)
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
def usage():
print('usage: python ' + sys.argv[0] + ' --play' + ' --en')
###############################################################################
# main
if __name__ == '__main__':
# usage modes
if len(sys.argv) == 1:
usage()
elif len(sys.argv) >= 2:
if sys.argv[1] == '--play':
if len(sys.argv) == 3:
if sys.argv[2] == '--en':
bot_lang = 'en'
elif sys.argv[2] == '--es':
bot_lang = 'es'
else:
bot_lang = 'ca'
if bot_lang == 'ca':
language_filepath = 'app/locales/ca.txt'
elif bot_lang == 'en':
language_filepath = 'app/locales/en.txt'
elif bot_lang == 'es':
language_filepath = 'app/locales/es.txt'
else:
print("\nOnly 'ca', 'es' and 'en' languages are supported.\n")
sys.exit(0)
if bot_lang == 'ca':
search_move_slicing = 3
moving_slicing = 3
search_send_slicing = 5
send_game_slicing = 6
elif bot_lang == 'en':
search_move_slicing = 4
moving_slicing = 4
search_send_slicing = 4
send_game_slicing = 5
elif bot_lang == 'es':
search_move_slicing = 5
moving_slicing = 5
search_send_slicing = 5
send_game_slicing = 6
search_end, search_move, search_new, search_games, search_send, search_help, new_game_started, playing_with, your_turn, game_name, chess_hashtag, send_error = load_strings(bot_lang)
game_number_anotations, anotations_sent, game_no_exists, cant_send_to_fediverse_account, it_not_exists, game_already_started, wait_other_player, is_not_legal_move, check_done, check_mate = load_strings1(bot_lang)
check_mate_movements, the_winner_is, well_done, winned_games, wins_of_many, lost_piece, not_legal_move_str, player_leave_game = load_strings2(bot_lang)
leave_waiting_game, started_games, game_is_waiting, game_is_on_going, no_on_going_games, is_not_your_turn, is_the_turn_of = load_strings3(bot_lang)
pawn_piece, knight_piece, bishop_piece, rook_piece, queen_piece, king_piece = load_strings4(bot_lang)
pawn_piece_letter, knight_piece_letter, bishop_piece_letter, rook_piece_letter, queen_piece_letter, king_piece_letter, email_subject = load_strings5(bot_lang)
start_or_join_a_new_game, move_a_piece, leave_a_game, list_games, get_a_game_anotation, show_help = load_strings6(bot_lang)
mastodon, mastodon_hostname, bot_username = mastodon()
mastodon_db, mastodon_db_user, chess_db, chess_db_user = db_config()
smtp_host, smtp_user_login, smtp_user_password = smtp_config()
now = datetime.now()
bot_id = get_bot_id()
account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst = get_notification_data()
i = 0
while i < len(account_id_lst):
account_id = account_id_lst[i]
username, domain = get_user_domain(account_id)
if domain != None:
username = username + '@' + domain
status_id = status_id_lst[i]
replied = check_replies(status_id)
if replied == True:
i += 1
continue
# listen them or not
text = text_lst[i]
reply, query_word, moving = replying()
visibility = visibility_lst[i]
status_url = url_lst[i]
if query_word != search_games:
is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user = check_games()
if game_id == '':
game_id, game_waiting = waiting_games()
else:
is_playing = True
if reply == True and is_playing == False:
if query_word == search_new and not game_waiting:
board = chess.Board()
svgfile = chess.svg.board(board=board)
board_file = 'app/games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
toot_text = '@'+username + ' ' + new_game_started + '\n'
toot_text += '\n'
image_id = mastodon.media_post(board_file, "image/png").id
toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
toot_url = toot_id.uri
new_game(toot_url)
update_replies(status_id, username, now)
elif query_word == search_new and game_waiting:
game_status, white_user, chess_game = join_player()
playing_user = white_user
next_move(username)
board = chess.Board(chess_game)
svgfile = chess.svg.board(board=board)
board_file = 'app/games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
toot_text = '@'+username + ' ' + playing_with + ' ' + white_user + "\n"
toot_text += '\n'
toot_text += '@'+white_user + ': ' + your_turn + "\n"
toot_text += '\n'
toot_text += game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n'
image_id = mastodon.media_post(board_file, "image/png").id
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
game_moves = board.ply()
update_moves(username, game_moves)
update_replies(status_id, username, now)
elif query_word[:search_send_slicing] == search_send:
query_word_length = len(query_word)
send_game = query_word[send_game_slicing:query_word_length].replace(' ', '')
emailed, game_id, game_found = send_anotation(send_game)
if emailed == False and game_found == True:
toot_text = '@'+username + ' ' + send_error
elif emailed == True and game_found == True:
toot_text = '@'+username + ' ' + game_number_anotations + str(game_id) + ' ' + anotations_sent
elif emailed == False and game_found == False:
if domain != None:
toot_text = '@'+username + ' ' + cant_send_to_fediverse_account
else:
toot_text = '@'+username + ' ' + game_no_exists + str(game_id) + ' ' + it_not_exists
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
elif query_word == search_help:
help_text = toot_help()
mastodon.status_post(help_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
else:
update_replies(status_id, username, now)
elif reply and is_playing:
if query_word == search_new:
toot_text = '@'+username + ' ' + game_already_started + '\n'
if black_user != '':
toot_text += '@'+white_user + ' / ' + '@'+black_user + '\n'
else:
toot_text += wait_other_player + '\n'
toot_text += '\n'
toot_text += game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n'
board = chess.Board(on_going_game)
svgfile = chess.svg.board(board=board)
board_file = 'app/games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
image_id = mastodon.media_post(board_file, "image/png").id
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
update_replies(status_id, username, now)
elif query_word[:search_move_slicing] == search_move and playing_user == username:
board = chess.Board(on_going_game)
promoted = False
stalemate = False
checkmate = False
try:
piece_square_index = chess.SQUARE_NAMES.index(moving[:2])
moving_piece = board.piece_type_at(piece_square_index)
if moving_piece == 1:
square_index = chess.SQUARE_NAMES.index(moving[2:4])
if bool(board.turn == chess.WHITE) == True:
square_rank_trigger = 7
elif bool(board.turn == chess.BLACK) == True:
square_rank_trigger = 0
if chess.square_rank(square_index) == square_rank_trigger:
promoted = True
if len(moving) == 4:
moving = moving + 'q'
not_legal_move = chess.Move.from_uci(moving) not in board.legal_moves
else:
not_legal_move = chess.Move.from_uci(moving) not in board.legal_moves
if not_legal_move:
toot_text = '@'+username + ': ' + moving + ' ' + is_not_legal_move + '\n'
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
else:
san_move = board.san(chess.Move.from_uci(moving))
check = False
playing_user = next_move(username)
if bool(board.is_capture(chess.Move.from_uci(moving))):
capture = True
square_capture_index = chess.SQUARE_NAMES.index(moving[2:4])
captured_piece = board.piece_type_at(square_capture_index)
piece_name = get_piece_name(captured_piece)
else:
capture = False
board.push(chess.Move.from_uci(moving))
if bool(board.is_check()):
if username == white_user:
king_square = board.king(chess.BLACK)
check = True
else:
king_square = board.king(chess.WHITE)
check = True
if board.is_stalemate() == True:
stalemate = True
if board.is_game_over() == True:
game_moves = board.ply()
if stalemate == False:
checkmate = True
close_game(username, checkmate)
else:
checkmate = False
if check == True and checkmate == False:
toot_text = "@"+playing_user + " " + username + ' ' + check_done + '\n'
elif check == True and checkmate == True:
toot_text = '\n' + check_mate + ' ' + str(game_moves) + ' ' + check_mate_movements + '\n\n' + the_winner_is + ' ' + "@"+username + '\n'
toot_text += "\n@"+playing_user + ': ' + well_done + "\n"
toot_text += '\n' + winned_games + "\n"
played_games, wins = get_stats(username)
toot_text += username + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
played_games, wins = get_stats(playing_user)
toot_text += playing_user + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
elif check == False and stalemate == True:
toot_text = stalemate_str + ' (' + str(game_moves) + ')' + '\n'
toot_text += '\n@'+playing_user + ', ' + '@'+username + "\n"
toot_text += '\n' + winned_games + "\n"
played_games, wins = get_stats(username)
toot_text += username + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
played_games, wins = get_stats(playing_user)
toot_text += playing_user + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n"
else:
toot_text = '@'+playing_user + ' ' + your_turn + '\n'
if capture == True and checkmate == False:
toot_text += '\n' + lost_piece + ' ' + piece_name + '!\n'
toot_text += '\n' + game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n'
if username == white_user:
if check == True:
svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving), check=board.king(chess.BLACK))
else:
svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving))
else:
if check == True:
svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving), check=board.king(chess.WHITE))
else:
svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving))
board_file = 'app/games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
image_id = mastodon.media_post(board_file, "image/png").id
toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
toot_url = toot_id.uri
board_game = board.fen()
update_game(board_game, toot_url)
game_moves = board.ply()
save_anotation(moving, san_move)
update_moves(username, game_moves)
update_replies(status_id, username, now)
except ValueError as v_error:
print(v_error)
toot_text = '@'+username + ' ' + not_legal_move_str + ' ' + moving + '!?)\n'
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
pass
except AssertionError as a_error:
print(a_error)
toot_text = '@'+username + ' ' + not_legal_move_str + ' ' + moving + '!?)\n'
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
pass
elif query_word == search_end:
stalemate = False
checkmate = False
if black_user != '':
if username == white_user:
toot_text = '@'+username + ' ' + player_leave_game + ' ' + '@'+black_user
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
close_game(username, checkmate)
update_replies(status_id, username, now)
i += 1
continue
else:
toot_text = '@'+username + ' ' + player_leave_game + ' ' + white_user
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
close_game(username, checkmate)
update_replies(status_id, username, now)
i += 1
continue
else:
toot_text = '@'+username + ' ' + leave_waiting_game
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
close_game(username, checkmate)
update_replies(status_id, username, now)
i += 1
continue
elif query_word == search_games:
player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst = current_games()
if len(player1_name_lst) > 0:
toot_text = "@"+username + ' ' + started_games + "\n"
i = 0
while i < len(player1_name_lst):
if game_status_lst[i] == 'waiting':
toot_text += '\n' + player1_name_lst[i] + ' / ' + player2_name_lst[i] + ' ' + game_is_waiting + "\n"
else:
if next_move_lst[i] == player1_name_lst[i]:
toot_text += '\n*' + player1_name_lst[i] + ' / ' + player2_name_lst[i] + ' ' + game_is_on_going + '\n'
else:
toot_text += '\n' + player1_name_lst[i] + ' / *' + player2_name_lst[i] + ' ' + game_is_on_going + '\n'
if game_link_lst[i] != None:
toot_text += str(game_link_lst[i]) + "\n"
i += 1
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
else:
toot_text = '@'+username + ' ' + no_on_going_games + '\n'
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
elif query_word[:search_send_slicing] == search_send:
query_word_length = len(query_word)
send_game = query_word[search_send_slicing:query_word_length].replace(' ', '')
emailed, game_id, game_found = send_anotation(send_game)
if emailed == False and game_found == True:
toot_text = '@'+username + ' ' + send_error
elif emailed == True and game_found == True:
toot_text = '@'+username + ' ' + game_number_anotations + str(game_id) + ' ' + anotations_sent
elif emailed == False and game_found == False:
toot_text = '@'+username + ' ' + game_no_exists + str(game_id) + ' ' + it_not_exists
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
elif query_word == search_help:
help_text = toot_help()
mastodon.status_post(help_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
else:
if playing_user == None:
toot_text = '@'+username + ' ' + is_not_your_turn + '\n'
else:
toot_text = '@'+username + ' ' + is_the_turn_of + ' ' + playing_user + "\n"
toot_text += '\n'
toot_text += game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n'
board = chess.Board(on_going_game)
if username == white_user:
svgfile = chess.svg.board(board=board, orientation=chess.BLACK)
else:
svgfile = chess.svg.board(board=board, orientation=chess.WHITE)
board_file = 'app/games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
image_id = mastodon.media_post(board_file, "image/png").id
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
update_replies(status_id, username, now)
i += 1
else:
usage()