import pdb import sys import os import os.path import re import unidecode from datetime import datetime, timedelta from mastodon import Mastodon from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders import smtplib from smtplib import SMTPException, SMTPAuthenticationError, SMTPConnectError, SMTPRecipientsRefused import socket from socket import gaierror import psycopg2 import chess import chess.svg from cairosvg import svg2png def cleanhtml(raw_html): cleanr = re.compile('<.*?>') cleantext = re.sub(cleanr, '', raw_html) return cleantext def unescape(s): s = s.replace("'", "'") return s def get_bot_id(): ################################################################################################################################### # get bot_id from bot's username try: conn = None conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select id from accounts where username = (%s) and domain is null", (bot_username,)) row = cur.fetchone() if row != None: bot_id = row[0] cur.close() return bot_id except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def get_user_domain(account_id): try: conn = None conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select username, domain from accounts where id=(%s)", (account_id,)) row = cur.fetchone() if row != None: username = row[0] domain = row[1] cur.close() return (username, domain) except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def get_piece_name(captured_piece): if captured_piece == 1: piece_name = pawn_piece if captured_piece == 2: piece_name = knight_piece if captured_piece == 3: piece_name = bishop_piece if captured_piece == 4: piece_name = rook_piece if captured_piece == 5: piece_name = queen_piece if captured_piece == 6: piece_name = king_piece return piece_name def get_moved_piece_name(moved_piece): if moved_piece == 1: moved_piece_name = pawn_piece_letter if moved_piece == 2: moved_piece_name = knight_piece_letter if moved_piece == 3: moved_piece_name = bishop_piece_letter if moved_piece == 4: moved_piece_name = rook_piece_letter if moved_piece == 5: moved_piece_name = queen_piece_letter if moved_piece == 6: moved_piece_name = king_piece_letter return moved_piece_name def get_notification_data(): try: account_id_lst = [] status_id_lst = [] text_lst = [] visibility_lst = [] url_lst = [] search_text = [search_end, search_move, search_new, search_games, search_send, search_help] conn = None conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() i=0 while i < len(search_text): like_text = "%"+search_text[i]+"%" select_query = "select account_id, id, text, visibility, url, created_at from statuses where text like (%s) and created_at + interval '60 minutes' > now() - interval '5 minutes'" select_query += " and id=any (select status_id from mentions where account_id=(%s)) order by created_at asc" cur.execute(select_query, (like_text, str(bot_id))) rows = cur.fetchall() for row in rows: account_id_lst.append(row[0]) status_id_lst.append(row[1]) text_lst.append(row[2]) if row[3] == 0: visibility_lst.append('public') elif row[3] == 1: visibility_lst.append('unlisted') elif row[3] == 2: visibility_lst.append('private') elif row[3] == 3: visibility_lst.append('direct') url_lst.append(row[4]) i += 1 cur.close() return (account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst) except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def update_replies(status_id, username, now): post_id = status_id try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() insert_sql = "insert into botreplies(status_id, query_user, status_created_at) values(%s, %s, %s) ON CONFLICT DO NOTHING" cur.execute(insert_sql, (post_id, username, now)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def check_replies(status_id): post_id = status_id replied = False try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select status_id from botreplies where status_id=(%s)", (post_id,)) row = cur.fetchone() if row != None: replied = True else: replied = False cur.close() return replied except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def current_games(): player1_name_lst = [] player2_name_lst = [] game_status_lst = [] game_link_lst = [] next_move_lst = [] try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select white_user, black_user, chess_status, chess_link, next_move from games where not finished") rows = cur.fetchall() for row in rows: player1_name_lst.append(row[0]) if row[1] != None: player2_name_lst.append(row[1]) else: player2_name_lst.append(" ") game_status_lst.append(row[2]) game_link_lst.append(row[3]) next_move_lst.append(row[4]) cur.close() return (player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def check_games(): game_id = '' white_user = '' black_user = '' on_going_game = '' waiting = False playing_user = '' try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() ### check if there is an ongoing game cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and white_user=(%s) ", (username,)) row = cur.fetchone() if row == None: cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and black_user=(%s)", (username,)) row = cur.fetchone() if row == None: is_playing = False else: is_playing = True game_id = row[0] white_user = row[1] if row[1] != None: black_user = row[2] else: black_user = '' on_going_game = row[3] waiting = row[4] playing_user = row[5] else: is_playing = True game_id = row[0] white_user = row[1] if row[2] != None: black_user = row[2] else: black_user = '' on_going_game = row[3] waiting = row[4] playing_user = row[5] cur.close() return (is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def new_game(toot_url): try: game_status = 'waiting' waiting = True board_game = board.fen() conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() insert_query = 'INSERT INTO games(created_at, white_user, chess_game, chess_status, waiting, updated_at, chess_link) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING' cur.execute(insert_query, (now, username, board_game, game_status, waiting, now, toot_url)) insert_query = 'INSERT INTO stats(created_at, white_user) VALUES (%s,%s) ON CONFLICT DO NOTHING' cur.execute(insert_query, (now, username,)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def update_game(board_game, toot_url): try: now = datetime.now() conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("update games set chess_game=(%s), chess_link=(%s), updated_at=(%s) where game_id=(%s)", (board_game, toot_url, now, game_id,)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def save_anotation(moving): if moving_piece != 1: square_index = chess.SQUARE_NAMES.index(moving[2:]) moved_piece = board.piece_type_at(square_index) moved_piece_name = get_moved_piece_name(moved_piece) else: moved_piece_name = 'P' if promoted == True: moving = moving + 'q' game_file = "app/anotations/" + str(game_id) + ".txt" if moved_piece_name == 'P': moved_piece_name = moved_piece_name.replace('P','') if capture == True: moved_piece_name = moved_piece_name + "X" if check == True: moved_piece_name = moved_piece_name + moving[2:] + "+" if checkmate == True: moved_piece_name = moved_piece_name + "+" if bool(board.turn == chess.BLACK) == True: if check != True and checkmate != True: if promoted != True: line_data = str(board.fullmove_number) + ". " + moved_piece_name + moving[2:] else: line_data = str(board.fullmove_number) + ". " + moved_piece_name + "=D" else: line_data = str(board.fullmove_number) + ". " + moved_piece_name else: moved_piece_name = moved_piece_name.lower() if check != True and checkmate != True: if promoted != True: line_data = " - " + moved_piece_name + moving[2:] + "\n" else: line_data = " - " + moved_piece_name + "=D" else: line_data = " - " + moved_piece_name + "\n" if not os.path.isfile(game_file): file_header = game_name + ': ' + str(game_id) + "\n" + white_user + " / " + black_user + "\n\n" with open(game_file, 'w+') as f: f.write(file_header) with open(game_file, 'a') as f: f.write(line_data) else: with open(game_file, 'a') as f: f.write(line_data) def get_email_address(username): try: conn = None conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select email from users where account_id = (select id from accounts where username = (%s) and domain is null)", (username,)) row = cur.fetchone() if row != None: username_email = row[0] else: username_email = None cur.close() return username_email except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def send_anotation(game_id): emailed = False game_found = False username_email = get_email_address(username) if username_email == None: return (emailed, game_id, game_found) # Create message object instance msg = MIMEMultipart() # Declare message elements msg['From'] = smtp_user_login msg['To'] = username_email msg['Subject'] = email_subject + game_id # Attach the game anotation file_to_attach = "app/anotations/" + game_id + ".txt" try: attachment = open(file_to_attach, 'rb') game_found = True except FileNotFoundError as not_found_error: print(not_found_error) return (emailed, game_id, game_found) obj = MIMEBase('application', 'octet-stream') obj.set_payload(attachment.read()) encoders.encode_base64(obj) obj.add_header('Content-Disposition', "attachment; filename= "+file_to_attach) # Add the message body to the object instance msg.attach(obj) try: # Create the server connection server = smtplib.SMTP(smtp_host) # Switch the connection over to TLS encryption server.starttls() # Authenticate with the server server.login(smtp_user_login, smtp_user_password) # Send the message server.sendmail(msg['From'], msg['To'], msg.as_string()) # Disconnect server.quit() print("Successfully sent game anotations to %s" % msg['To']) emailed = True return (emailed, game_id, game_found) except SMTPAuthenticationError as auth_error: print(auth_error) pass return emailed except socket.gaierror as socket_error: print(socket_error) pass return emailed except SMTPRecipientsRefused as recip_error: print(recip_error) pass return emailed def close_game(): now = datetime.now() waiting = False finished = True try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("update games set waiting=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (waiting, finished, now, game_id)) cur.execute("update stats set winner=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (username, finished, now, game_id)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def get_stats(player): played_games = 0 wins = 0 try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select count(*) from stats where white_user = (%s) or black_user = (%s) and finished", (player, player)) row = cur.fetchone() if row != None: played_games = row[0] cur.execute("select count(*) from stats where winner = (%s) and finished", (player,)) row = cur.fetchone() if row != None: wins = row[0] cur.close() return (played_games, wins) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def waiting_games(): try: game_id = '' game_waiting = False conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select game_id from games where waiting order by game_id desc limit 1") row = cur.fetchone() if row != None: game_id = row[0] game_waiting = True cur.close() return (game_id, game_waiting) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def join_player(): try: now = datetime.now() game_status = 'waiting' waiting = True moves = 0 conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("update games set black_user=(%s), chess_status='playing', waiting='f', updated_at=(%s), moves=(%s) where game_id=(%s)", (username, now, moves, game_id,)) cur.execute("update stats set black_user=(%s), updated_at=(%s) where game_id=(%s)", (username, now, game_id,)) conn.commit() cur.execute("select white_user, chess_game from games where game_id=(%s)", (game_id,)) row = cur.fetchone() if row != None: white_user = row[0] chess_game = row[1] cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (white_user, now, game_id,)) conn.commit() cur.close() game_status = 'playing' return (game_status, white_user, chess_game) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def update_moves(username, game_moves): try: now = datetime.now() conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("update games set next_move=(%s), last_move=(%s), moves=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, username, game_moves, now, game_id,)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def next_move(playing_user): try: now = datetime.now() waiting = True conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select white_user, black_user, last_move, moves from games where game_id=(%s)", (game_id,)) row = cur.fetchone() if row != None: white_user = row[0] black_user = row[1] last_move = row[2] moves = row[3] if last_move != None: if playing_user == white_user: playing_user = black_user elif playing_user == black_user: playing_user = white_user else: last_move = white_user cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, now, game_id,)) conn.commit() cur.close() return playing_user except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def toot_help(): help_text = '@'+username + ' ' + search_help + ':\n' help_text += '\n' help_text += '@'+bot_username + ' ' + start_or_join_a_new_game + '\n' help_text += '\n' help_text += '@'+bot_username + ' ' + move_a_piece + '\n' help_text += '\n' help_text += '@'+bot_username + ' ' + leave_a_game + '\n' help_text += '\n' help_text += '@'+bot_username + ' ' + list_games + '\n' help_text += '\n' help_text += '@'+bot_username + ' ' + get_a_game_anotation + '\n' help_text += '\n' help_text += '@'+bot_username + ' ' + show_help + '\n' return help_text def replying(): reply = False moving = '' content = cleanhtml(text) content = unescape(content) try: start = content.index("@") end = content.index(" ") if len(content) > end: content = content[0: start:] + content[end +1::] neteja = content.count('@') i = 0 while i < neteja : start = content.rfind("@") end = len(content) content = content[0: start:] + content[end +1::] i += 1 question = content.lower() query_word = question query_word_length = len(query_word) if unidecode.unidecode(question)[0:query_word_length] == query_word: if query_word == search_new: reply = True elif query_word[:search_move_slicing] == search_move: moving = query_word[moving_slicing:query_word_length].replace(" ","") reply = True elif query_word == search_end: reply = True elif query_word == search_games: reply = True elif query_word[:search_send_slicing] == search_send: reply = True elif query_word == search_help: reply = True else: reply = False return (reply, query_word, moving) except ValueError as v_error: print(v_error) def load_strings(bot_lang): search_end = get_parameter("search_end", language_filepath) search_move = get_parameter("search_move", language_filepath) search_new = get_parameter("search_new", language_filepath) search_games = get_parameter("search_games", language_filepath) search_send = get_parameter("search_send", language_filepath) search_help = get_parameter("search_help", language_filepath) new_game_started = get_parameter("new_game_started", language_filepath) playing_with = get_parameter("playing_with", language_filepath) your_turn = get_parameter("your_turn", language_filepath) game_name = get_parameter("game_name", language_filepath) chess_hashtag = get_parameter("chess_hashtag", language_filepath) send_error = get_parameter("send_error", language_filepath) return (search_end, search_move, search_new, search_games, search_send, search_help, new_game_started, playing_with, your_turn, game_name, chess_hashtag, send_error) def load_strings1(bot_lang): game_number_anotations = get_parameter("game_number_anotations", language_filepath) anotations_sent = get_parameter("anotations_sent", language_filepath) game_no_exists = get_parameter("game_no_exists", language_filepath) it_not_exists = get_parameter("it_not_exists", language_filepath) game_already_started = get_parameter("game_already_started", language_filepath) cant_send_to_fediverse_account = get_parameter("cant_send_to_fediverse_account", language_filepath) wait_other_player = get_parameter("wait_other_player", language_filepath) is_not_legal_move = get_parameter("is_not_legal_move", language_filepath) check_done = get_parameter("check_done", language_filepath) check_mate = get_parameter("check_mate", language_filepath) return (game_number_anotations, anotations_sent, game_no_exists, cant_send_to_fediverse_account, it_not_exists, game_already_started, wait_other_player, is_not_legal_move, check_done, check_mate) def load_strings2(bot_lang): check_mate_movements = get_parameter("check_mate_movements", language_filepath) the_winner_is = get_parameter("the_winner_is", language_filepath) well_done = get_parameter("well_done", language_filepath) winned_games = get_parameter("winned_games", language_filepath) wins_of_many = get_parameter("wins_of_many", language_filepath) lost_piece = get_parameter("lost_piece", language_filepath) not_legal_move_str = get_parameter("not_legal_move_str", language_filepath) player_leave_game = get_parameter("player_leave_game", language_filepath) return (check_mate_movements, the_winner_is, well_done, winned_games, wins_of_many, lost_piece, not_legal_move_str, player_leave_game) def load_strings3(bot_lang): leave_waiting_game = get_parameter("leave_waiting_game", language_filepath) started_games = get_parameter("started_games", language_filepath) game_is_waiting = get_parameter("game_is_waiting", language_filepath) game_is_on_going = get_parameter("game_is_on_going", language_filepath) no_on_going_games = get_parameter("no_on_going_games", language_filepath) is_not_your_turn = get_parameter("is_not_your_turn", language_filepath) is_the_turn_of = get_parameter("is_the_turn_of", language_filepath) return (leave_waiting_game, started_games, game_is_waiting, game_is_on_going, no_on_going_games, is_not_your_turn, is_the_turn_of) def load_strings4(bot_lang): pawn_piece = get_parameter("pawn_piece", language_filepath) knight_piece = get_parameter("knight_piece", language_filepath) bishop_piece = get_parameter("bishop_piece", language_filepath) rook_piece = get_parameter("rook_piece", language_filepath) queen_piece = get_parameter("queen_piece", language_filepath) king_piece = get_parameter("king_piece", language_filepath) return (pawn_piece, knight_piece, bishop_piece, rook_piece, queen_piece, king_piece) def load_strings5(bot_lang): pawn_piece_letter = get_parameter("pawn_piece_letter", language_filepath) knight_piece_letter = get_parameter("knight_piece_letter", language_filepath) bishop_piece_letter = get_parameter("bishop_piece_letter", language_filepath) rook_piece_letter = get_parameter("rook_piece_letter", language_filepath) queen_piece_letter = get_parameter("queen_piece_letter", language_filepath) king_piece_letter = get_parameter("king_piece_letter", language_filepath) email_subject = get_parameter("email_subject", language_filepath) return (pawn_piece_letter, knight_piece_letter, bishop_piece_letter, rook_piece_letter, queen_piece_letter, king_piece_letter, email_subject) def load_strings6(bot_lang): start_or_join_a_new_game = get_parameter("start_or_join_a_new_game", language_filepath) move_a_piece = get_parameter("move_a_piece", language_filepath) leave_a_game = get_parameter("leave_a_game", language_filepath) list_games = get_parameter("list_games", language_filepath) get_a_game_anotation = get_parameter("get_a_game_anotation", language_filepath) show_help = get_parameter("show_help", language_filepath) return (start_or_join_a_new_game, move_a_piece, leave_a_game, list_games, get_a_game_anotation, show_help) def mastodon(): # Load secrets from secrets file secrets_filepath = "secrets/secrets.txt" uc_client_id = get_parameter("uc_client_id", secrets_filepath) uc_client_secret = get_parameter("uc_client_secret", secrets_filepath) uc_access_token = get_parameter("uc_access_token", secrets_filepath) # Load configuration from config file config_filepath = "config/config.txt" mastodon_hostname = get_parameter("mastodon_hostname", config_filepath) bot_username = get_parameter("bot_username", config_filepath) # Initialise Mastodon API mastodon = Mastodon( client_id = uc_client_id, client_secret = uc_client_secret, access_token = uc_access_token, api_base_url = 'https://' + mastodon_hostname, ) # Initialise access headers headers={ 'Authorization': 'Bearer %s'%uc_access_token } return (mastodon, mastodon_hostname, bot_username) def db_config(): # Load db configuration from config file config_filepath = "config/db_config.txt" mastodon_db = get_parameter("mastodon_db", config_filepath) mastodon_db_user = get_parameter("mastodon_db_user", config_filepath) chess_db = get_parameter("chess_db", config_filepath) chess_db_user = get_parameter("chess_db_user", config_filepath) return (mastodon_db, mastodon_db_user, chess_db, chess_db_user) def smtp_config(): smtp_filepath = "config/smtp_config.txt" smtp_host = get_parameter("smtp_host", smtp_filepath) smtp_user_login = get_parameter("smtp_user_login", smtp_filepath) smtp_user_password = get_parameter("smtp_user_password", smtp_filepath) return (smtp_host, smtp_user_login, smtp_user_password) def get_parameter( parameter, file_path ): if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) sys.exit(0) with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() print(file_path + " Missing parameter %s "%parameter) sys.exit(0) def usage(): print('usage: python ' + sys.argv[0] + ' --play' + ' --en') ############################################################################### # main if __name__ == '__main__': # usage modes if len(sys.argv) == 1: usage() elif len(sys.argv) >= 2: if sys.argv[1] == '--play': if len(sys.argv) == 3: if sys.argv[2] == '--en': bot_lang = 'en' elif sys.argv[2] == '--es': bot_lang = 'es' else: bot_lang = 'ca' if bot_lang == 'ca': language_filepath = 'app/locales/ca.txt' elif bot_lang == 'en': language_filepath = 'app/locales/en.txt' elif bot_lang == 'es': language_filepath = 'app/locales/es.txt' else: print("\nOnly 'ca', 'es' and 'en' languages are supported.\n") sys.exit(0) if bot_lang == 'ca': search_move_slicing = 3 moving_slicing = 3 search_send_slicing = 5 send_game_slicing = 6 elif bot_lang == 'en': search_move_slicing = 4 moving_slicing = 4 search_send_slicing = 4 send_game_slicing = 5 elif bot_lang == 'es': search_move_slicing = 5 moving_slicing = 5 search_send_slicing = 5 send_game_slicing = 6 search_end, search_move, search_new, search_games, search_send, search_help, new_game_started, playing_with, your_turn, game_name, chess_hashtag, send_error = load_strings(bot_lang) game_number_anotations, anotations_sent, game_no_exists, cant_send_to_fediverse_account, it_not_exists, game_already_started, wait_other_player, is_not_legal_move, check_done, check_mate = load_strings1(bot_lang) check_mate_movements, the_winner_is, well_done, winned_games, wins_of_many, lost_piece, not_legal_move_str, player_leave_game = load_strings2(bot_lang) leave_waiting_game, started_games, game_is_waiting, game_is_on_going, no_on_going_games, is_not_your_turn, is_the_turn_of = load_strings3(bot_lang) pawn_piece, knight_piece, bishop_piece, rook_piece, queen_piece, king_piece = load_strings4(bot_lang) pawn_piece_letter, knight_piece_letter, bishop_piece_letter, rook_piece_letter, queen_piece_letter, king_piece_letter, email_subject = load_strings5(bot_lang) start_or_join_a_new_game, move_a_piece, leave_a_game, list_games, get_a_game_anotation, show_help = load_strings6(bot_lang) mastodon, mastodon_hostname, bot_username = mastodon() mastodon_db, mastodon_db_user, chess_db, chess_db_user = db_config() smtp_host, smtp_user_login, smtp_user_password = smtp_config() now = datetime.now() bot_id = get_bot_id() account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst = get_notification_data() i = 0 while i < len(account_id_lst): account_id = account_id_lst[i] username, domain = get_user_domain(account_id) if domain != None: username = username + '@' + domain status_id = status_id_lst[i] replied = check_replies(status_id) if replied == True: i += 1 continue # listen them or not text = text_lst[i] reply, query_word, moving = replying() visibility = visibility_lst[i] status_url = url_lst[i] if query_word != search_games: is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user = check_games() if game_id == '': game_id, game_waiting = waiting_games() else: is_playing = True if reply == True and is_playing == False: if query_word == search_new and not game_waiting: board = chess.Board() svgfile = chess.svg.board(board=board) board_file = 'app/games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) toot_text = '@'+username + ' ' + new_game_started + '\n' toot_text += '\n' image_id = mastodon.media_post(board_file, "image/png").id toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) toot_url = toot_id.uri new_game(toot_url) update_replies(status_id, username, now) elif query_word == search_new and game_waiting: game_status, white_user, chess_game = join_player() playing_user = white_user next_move(username) board = chess.Board(chess_game) svgfile = chess.svg.board(board=board) board_file = 'app/games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) toot_text = '@'+username + ' ' + playing_with + ' ' + white_user + "\n" toot_text += '\n' toot_text += '@'+white_user + ': ' + your_turn + "\n" toot_text += '\n' toot_text += game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n' image_id = mastodon.media_post(board_file, "image/png").id mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) game_moves = board.ply() update_moves(username, game_moves) update_replies(status_id, username, now) elif query_word[:search_send_slicing] == search_send: query_word_length = len(query_word) send_game = query_word[send_game_slicing:query_word_length].replace(' ', '') emailed, game_id, game_found = send_anotation(send_game) if emailed == False and game_found == True: toot_text = '@'+username + send_error elif emailed == True and game_found == True: toot_text = '@'+username + ' ' + game_number_anotations + str(game_id) + ' ' + anotations_sent elif emailed == False and game_found == False: if domain != None: toot_text = '@'+username + ' ' + cant_send_to_fediverse_account else: toot_text = '@'+username + ' ' + game_no_exists + str(game_id) + ' ' + it_not_exists mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) elif query_word == search_help: help_text = toot_help() mastodon.status_post(help_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) else: update_replies(status_id, username, now) elif reply and is_playing: if query_word == search_new: toot_text = '@'+username + ' ' + game_already_started + '\n' if black_user != '': toot_text += '@'+white_user + ' / ' + '@'+black_user + '\n' else: toot_text += wait_other_player + '\n' toot_text += '\n' toot_text += game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n' board = chess.Board(on_going_game) svgfile = chess.svg.board(board=board) board_file = 'app/games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) image_id = mastodon.media_post(board_file, "image/png").id mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) update_replies(status_id, username, now) elif query_word[:search_move_slicing] == search_move and playing_user == username: board = chess.Board(on_going_game) promoted = False try: piece_square_index = chess.SQUARE_NAMES.index(moving[:2]) moving_piece = board.piece_type_at(piece_square_index) if moving_piece == 1: square_index = chess.SQUARE_NAMES.index(moving[2:4]) if bool(board.turn == chess.WHITE) == True: square_rank_trigger = 7 elif bool(board.turn == chess.BLACK) == True: square_rank_trigger = 0 if chess.square_rank(square_index) == square_rank_trigger: promoted = True if len(moving) == 4: moving = moving + 'q' not_legal_move = chess.Move.from_uci(moving) not in board.legal_moves else: not_legal_move = chess.Move.from_uci(moving) not in board.legal_moves if not_legal_move: toot_text = '@'+username + ': ' + moving + ' ' + is_not_legal_move + '\n' mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) else: check = False playing_user = next_move(username) if bool(board.is_capture(chess.Move.from_uci(moving))): capture = True square_capture_index = chess.SQUARE_NAMES.index(moving[2:4]) captured_piece = board.piece_type_at(square_capture_index) piece_name = get_piece_name(captured_piece) else: capture = False board.push(chess.Move.from_uci(moving)) if bool(board.is_check()): if username == white_user: king_square = board.king(chess.BLACK) check = True else: king_square = board.king(chess.WHITE) check = True if board.is_game_over() == True: game_moves = board.ply() close_game() checkmate = True else: checkmate = False if check == True and checkmate == False: toot_text = "@"+playing_user + " " + username + ' ' + check_done + '\n' elif check == True and checkmate == True: toot_text = '\n' + check_mate + ' ' + str(game_moves) + ' ' + check_mate_movements + '\n\n' + the_winner_is + ' ' + "@"+username + '\n' toot_text += "\n@"+playing_user + ': ' + well_done + "\n" toot_text += '\n' + winned_games + "\n" played_games, wins = get_stats(username) toot_text += username + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n" played_games, wins = get_stats(playing_user) toot_text += playing_user + ': ' + str(wins) + ' ' + wins_of_many + ' ' + str(played_games) + "\n" else: toot_text = '@'+playing_user + ' ' + your_turn + '\n' if capture == True and checkmate == False: toot_text += '\n' + lost_piece + ' ' + piece_name + '!\n' toot_text += '\n' + game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n' if username == white_user: if check == True: svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving), check=board.king(chess.BLACK)) else: svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving)) else: if check == True: svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving), check=board.king(chess.WHITE)) else: svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving)) board_file = 'app/games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) image_id = mastodon.media_post(board_file, "image/png").id toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) toot_url = toot_id.uri board_game = board.fen() update_game(board_game, toot_url) game_moves = board.ply() save_anotation(moving) update_moves(username, game_moves) update_replies(status_id, username, now) except ValueError as v_error: print(v_error) toot_text = '@'+username + ' ' + not_legal_move_str + moving + '!?)\n' mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) pass except AssertionError as a_error: print(a_error) toot_text = '@'+username + ' ' + not_legal_move_str + moving + '!?)\n' mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) pass elif query_word == search_end: if black_user != '': if username == white_user: toot_text = '@'+username + ' ' + player_leave_game + ' ' + '@'+black_user mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) close_game() update_replies(status_id, username, now) i += 1 continue else: toot_text = '@'+username + ' ' + player_leave_game + ' ' + white_user mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) close_game() update_replies(status_id, username, now) i += 1 continue else: toot_text = '@'+username + ' ' + leave_waiting_game mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility) close_game() update_replies(status_id, username, now) i += 1 continue elif query_word == search_games: player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst = current_games() if len(player1_name_lst) > 0: toot_text = "@"+username + ' ' + started_games + "\n" i = 0 while i < len(player1_name_lst): if game_status_lst[i] == 'waiting': toot_text += '\n' + player1_name_lst[i] + ' / ' + player2_name_lst[i] + ' ' + game_is_waiting + "\n" else: if next_move_lst[i] == player1_name_lst[i]: toot_text += '\n*' + player1_name_lst[i] + ' / ' + player2_name_lst[i] + ' ' + game_is_on_going + '\n' else: toot_text += '\n' + player1_name_lst[i] + ' / *' + player2_name_lst[i] + ' ' + game_is_on_going + '\n' if game_link_lst[i] != None: toot_text += str(game_link_lst[i]) + "\n" i += 1 mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) else: toot_text = '@'+username + ' ' + no_on_going_games + '\n' mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) elif query_word[:search_send_slicing] == search_send: query_word_length = len(query_word) send_game = query_word[search_send_slicing:query_word_length].replace(' ', '') emailed, game_id, game_found = send_anotation(send_game) if emailed == False and game_found == True: toot_text = '@'+username + ' ' + send_error elif emailed == True and game_found == True: toot_text = '@'+username + ' ' + game_number_anotations + str(game_id) + ' ' + anotations_sent elif emailed == False and game_found == False: toot_text = '@'+username + ' ' + game_no_exists + str(game_id) + ' ' + it_not_exists mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) elif query_word == search_help: help_text = toot_help() mastodon.status_post(help_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) else: if playing_user == None: toot_text = '@'+username + ' ' + is_not_your_turn + '\n' else: toot_text = '@'+username + ' ' + is_the_turn_of + ' ' + playing_user + "\n" toot_text += '\n' toot_text += game_name + ': ' + str(game_id) + ' ' + chess_hashtag + '\n' board = chess.Board(on_going_game) if username == white_user: svgfile = chess.svg.board(board=board, orientation=chess.BLACK) else: svgfile = chess.svg.board(board=board, orientation=chess.WHITE) board_file = 'app/games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) image_id = mastodon.media_post(board_file, "image/png").id mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) update_replies(status_id, username, now) i += 1 else: usage()