import pdb import sys import os import os.path import re import unidecode from datetime import datetime, timedelta from mastodon import Mastodon import psycopg2 import chess import chess.svg from cairosvg import svg2png def cleanhtml(raw_html): cleanr = re.compile('<.*?>') cleantext = re.sub(cleanr, '', raw_html) return cleantext def unescape(s): s = s.replace("'", "'") return s def get_bot_id(): ################################################################################################################################### # get bot_id from bot's username try: conn = None conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select id from accounts where username = (%s) and domain is null", (bot_username,)) row = cur.fetchone() if row != None: bot_id = row[0] cur.close() return bot_id except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def get_user_domain(account_id): try: conn = None conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select username, domain from accounts where id=(%s)", (account_id,)) row = cur.fetchone() if row != None: username = row[0] domain = row[1] cur.close() return (username, domain) except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def get_piece_name(captured_piece): if captured_piece == 1: piece_name = "un Peó" if captured_piece == 2: piece_name = "un cavall" if captured_piece == 3: piece_name = "l'àlfil" if captured_piece == 4: piece_name = "una torre" if captured_piece == 5: piece_name = "la Reina" if captured_piece == 6: piece_name = "el Rei" return piece_name def get_moved_piece_name(moved_piece): if moved_piece == 1: moved_piece_name = "P" if moved_piece == 2: moved_piece_name = "C" if moved_piece == 3: moved_piece_name = "A" if moved_piece == 4: moved_piece_name = "T" if moved_piece == 5: moved_piece_name = "D" if moved_piece == 6: moved_piece_name = "R" return moved_piece_name def get_notification_data(): try: account_id_lst = [] status_id_lst = [] text_lst = [] visibility_lst = [] url_lst = [] search_text = ['fi','mou','nova','jocs'] conn = None conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() i=0 while i < len(search_text): like_text = "%"+search_text[i]+"%" select_query = "select account_id, id, text, visibility, url, created_at from statuses where text like (%s) and created_at + interval '60 minutes' > now() - interval '5 minutes'" select_query += " and id=any (select status_id from mentions where account_id=(%s)) order by created_at asc" cur.execute(select_query, (like_text, str(bot_id))) rows = cur.fetchall() for row in rows: account_id_lst.append(row[0]) status_id_lst.append(row[1]) text_lst.append(row[2]) if row[3] == 0: visibility_lst.append('public') elif row[3] == 1: visibility_lst.append('unlisted') elif row[3] == 2: visibility_lst.append('private') elif row[3] == 3: visibility_lst.append('direct') url_lst.append(row[4]) i += 1 cur.close() return (account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst) except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() def update_replies(status_id, username, now): post_id = status_id try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() insert_sql = "insert into botreplies(status_id, query_user, status_created_at) values(%s, %s, %s) ON CONFLICT DO NOTHING" cur.execute(insert_sql, (post_id, username, now)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def check_replies(status_id): post_id = status_id replied = False try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select status_id from botreplies where status_id=(%s)", (post_id,)) row = cur.fetchone() if row != None: replied = True else: replied = False cur.close() return replied except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def current_games(): player1_name_lst = [] player2_name_lst = [] game_status_lst = [] game_link_lst = [] next_move_lst = [] try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select white_user, black_user, chess_status, chess_link, next_move from games where not finished") rows = cur.fetchall() for row in rows: player1_name_lst.append(row[0]) if row[1] != None: player2_name_lst.append(row[1]) else: player2_name_lst.append(" ") game_status_lst.append(row[2]) game_link_lst.append(row[3]) next_move_lst.append(row[4]) cur.close() return (player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def check_games(): game_id = '' white_user = '' black_user = '' on_going_game = '' waiting = False playing_user = '' try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() ### check if there is an ongoing game cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and white_user=(%s) ", (username,)) row = cur.fetchone() if row == None: cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and black_user=(%s)", (username,)) row = cur.fetchone() if row == None: is_playing = False else: is_playing = True game_id = row[0] white_user = row[1] if row[1] != None: black_user = row[2] else: black_user = '' on_going_game = row[3] waiting = row[4] playing_user = row[5] else: is_playing = True game_id = row[0] white_user = row[1] if row[2] != None: black_user = row[2] else: black_user = '' on_going_game = row[3] waiting = row[4] playing_user = row[5] cur.close() return (is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def new_game(toot_url): try: game_status = 'waiting' waiting = True board_game = board.fen() conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() insert_query = 'INSERT INTO games(created_at, white_user, chess_game, chess_status, waiting, updated_at, chess_link) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING' cur.execute(insert_query, (now, username, board_game, game_status, waiting, now, toot_url)) insert_query = 'INSERT INTO stats(created_at, white_user) VALUES (%s,%s) ON CONFLICT DO NOTHING' cur.execute(insert_query, (now, username,)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def update_game(board_game, toot_url): try: now = datetime.now() conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("update games set chess_game=(%s), chess_link=(%s), updated_at=(%s) where game_id=(%s)", (board_game, toot_url, now, game_id,)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def save_annotation(): square_index = chess.SQUARE_NAMES.index(moving[2:]) moved_piece = board.piece_type_at(square_index) moved_piece_name = get_moved_piece_name(moved_piece) game_file = "anotations/" + str(game_id) if moved_piece_name == 'P': moved_piece_name = moved_piece_name.replace('P','') if capture == True: moved_piece_name = moved_piece_name + "X" if check == True: moved_piece_name = moved_piece_name + moving[2:] + "+" if checkmate == True: moved_piece_name = moved_piece_name + "+" if bool(board.turn == chess.BLACK) == True: if check != True and checkmate != True: line_data = str(board.fullmove_number) + ". " + moved_piece_name + moving[2:] else: line_data = str(board.fullmove_number) + ". " + moved_piece_name else: moved_piece_name = moved_piece_name.lower() if check != True and checkmate != True: line_data = " - " + moved_piece_name + moving[2:] + "\n" else: line_data = " - " + moved_piece_name + "\n" if not os.path.isfile(game_file): file_header = "Partida: " + str(game_id) + "\n" + white_user + " / " + black_user + "\n\n" with open(game_file, 'w+') as f: f.write(file_header) with open(game_file, 'a') as f: f.write(line_data) else: with open(game_file, 'a') as f: f.write(line_data) def close_game(): now = datetime.now() waiting = False finished = True try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("update games set waiting=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (waiting, finished, now, game_id)) cur.execute("update stats set winner=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (username, finished, now, game_id)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def get_stats(player): played_games = 0 wins = 0 try: conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select count(*) from stats where white_user = (%s) or black_user = (%s) and finished", (player, player)) row = cur.fetchone() if row != None: played_games = row[0] cur.execute("select count(*) from stats where winner = (%s) and finished", (player,)) row = cur.fetchone() if row != None: wins = row[0] cur.close() return (played_games, wins) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def waiting_games(): try: game_id = '' game_waiting = False conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select game_id from games where waiting order by game_id desc limit 1") row = cur.fetchone() if row != None: game_id = row[0] game_waiting = True cur.close() return (game_id, game_waiting) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def join_player(): try: now = datetime.now() game_status = 'waiting' waiting = True moves = 0 conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("update games set black_user=(%s), chess_status='playing', waiting='f', updated_at=(%s), moves=(%s) where game_id=(%s)", (username, now, moves, game_id,)) cur.execute("update stats set black_user=(%s), updated_at=(%s) where game_id=(%s)", (username, now, game_id,)) conn.commit() cur.execute("select white_user, chess_game from games where game_id=(%s)", (game_id,)) row = cur.fetchone() if row != None: white_user = row[0] chess_game = row[1] cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (white_user, now, game_id,)) conn.commit() cur.close() game_status = 'playing' return (game_status, white_user, chess_game) except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def update_moves(username, game_moves): try: now = datetime.now() conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("update games set next_move=(%s), last_move=(%s), moves=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, username, game_moves, now, game_id,)) conn.commit() cur.close() except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def next_move(playing_user): try: now = datetime.now() waiting = True conn = None conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432") cur = conn.cursor() cur.execute("select white_user, black_user, last_move, moves from games where game_id=(%s)", (game_id,)) row = cur.fetchone() if row != None: white_user = row[0] black_user = row[1] last_move = row[2] moves = row[3] if last_move != None: if playing_user == white_user: playing_user = black_user elif playing_user == black_user: playing_user = white_user else: last_move = white_user cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, now, game_id,)) conn.commit() cur.close() return playing_user except (Exception, psycopg2.DatabaseError) as error: sys.exit(error) finally: if conn is not None: conn.close() def replying(): reply = False moving = '' content = cleanhtml(text) content = unescape(content) try: start = content.index("@") end = content.index(" ") if len(content) > end: content = content[0: start:] + content[end +1::] neteja = content.count('@') i = 0 while i < neteja : start = content.rfind("@") end = len(content) content = content[0: start:] + content[end +1::] i += 1 question = content.lower() query_word = question query_word_length = len(query_word) if unidecode.unidecode(question)[0:query_word_length] == query_word: if query_word[0:4] == 'nova': reply = True elif query_word[0:3] == 'mou': moving = query_word[4:query_word_length].replace(" ","") reply = True elif query_word[0:2] == 'fi': reply = True elif query_word[0:4] == 'jocs': reply = True else: reply = False return (reply, query_word, moving) except ValueError as v_error: print(v_error) def mastodon(): # Load secrets from secrets file secrets_filepath = "secrets/secrets.txt" uc_client_id = get_parameter("uc_client_id", secrets_filepath) uc_client_secret = get_parameter("uc_client_secret", secrets_filepath) uc_access_token = get_parameter("uc_access_token", secrets_filepath) # Load configuration from config file config_filepath = "config/config.txt" mastodon_hostname = get_parameter("mastodon_hostname", config_filepath) bot_username = get_parameter("bot_username", config_filepath) # Initialise Mastodon API mastodon = Mastodon( client_id = uc_client_id, client_secret = uc_client_secret, access_token = uc_access_token, api_base_url = 'https://' + mastodon_hostname, ) # Initialise access headers headers={ 'Authorization': 'Bearer %s'%uc_access_token } return (mastodon, mastodon_hostname, bot_username) def db_config(): # Load db configuration from config file config_filepath = "config/db_config.txt" mastodon_db = get_parameter("mastodon_db", config_filepath) mastodon_db_user = get_parameter("mastodon_db_user", config_filepath) chess_db = get_parameter("chess_db", config_filepath) chess_db_user = get_parameter("chess_db_user", config_filepath) return (mastodon_db, mastodon_db_user, chess_db, chess_db_user) def get_parameter( parameter, file_path ): if not os.path.isfile(file_path): print("File %s not found, exiting."%file_path) sys.exit(0) with open( file_path ) as f: for line in f: if line.startswith( parameter ): return line.replace(parameter + ":", "").strip() print(file_path + " Missing parameter %s "%parameter) sys.exit(0) def create_dir(): if not os.path.exists('games'): os.makedirs('games') if not os.path.exists('anotations'): os.makedirs('anotations') def usage(): print('usage: python ' + sys.argv[0] + ' --play') ############################################################################### # main if __name__ == '__main__': # usage modes if len(sys.argv) == 1: usage() elif len(sys.argv) == 2: if sys.argv[1] == '--play': mastodon, mastodon_hostname, bot_username = mastodon() mastodon_db, mastodon_db_user, chess_db, chess_db_user = db_config() now = datetime.now() create_dir() bot_id = get_bot_id() account_id_lst, status_id_lst, text_lst, visibility_lst, url_lst = get_notification_data() i = 0 while i < len(account_id_lst): account_id = account_id_lst[i] username, domain = get_user_domain(account_id) status_id = status_id_lst[i] replied = check_replies(status_id) if replied == True or domain != None: i += 1 continue if domain != None: update_replies(username, status_id) i += 1 # listen them or not text = text_lst[i] reply, query_word, moving = replying() visibility = visibility_lst[i] status_url = url_lst[i] if query_word != "jocs": is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user = check_games() if game_id == '': game_id, game_waiting = waiting_games() else: is_playing = True if reply == True and is_playing == False: if query_word == 'nova' and not game_waiting: board = chess.Board() svgfile = chess.svg.board(board=board) board_file = 'games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) toot_text = "@"+username+ " partida iniciada! Esperant jugador... " +"\n" toot_text += '\n' image_id = mastodon.media_post(board_file, "image/png").id toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) toot_url = toot_id.uri new_game(toot_url) update_replies(status_id, username, now) elif query_word == 'nova' and game_waiting: game_status, white_user, chess_game = join_player() playing_user = white_user next_move(username) board = chess.Board(chess_game) svgfile = chess.svg.board(board=board) board_file = 'games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) toot_text = "@"+username + " jugues amb " + white_user + "\n" toot_text += '\n' toot_text += "@"+white_user + ": el teu torn" + "\n" toot_text += '\n' toot_text += '#escacs' + '\n' image_id = mastodon.media_post(board_file, "image/png").id mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) game_moves = board.ply() update_moves(username, game_moves) update_replies(status_id, username, now) else: update_replies(status_id, username, now) elif reply and is_playing: if query_word == 'nova': toot_text = "@"+username + ' ja tenies iniciada una partida!' + '\n' if black_user != '': toot_text += '@'+white_user + ' / ' + '@'+black_user + '\n' else: toot_text += "espera l'altre jugador" + '\n' toot_text += '\n' toot_text += '#escacs' + '\n' board = chess.Board(on_going_game) svgfile = chess.svg.board(board=board) board_file = 'games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) image_id = mastodon.media_post(board_file, "image/png").id mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) update_replies(status_id, username, now) elif query_word[0:3] == 'mou' and playing_user == username: board = chess.Board(on_going_game) try: if chess.Move.from_uci(moving) not in board.legal_moves: toot_text = "@"+username + ": " + moving + " és un moviment il·legal. Torna a tirar." + "\n" mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) else: check = False playing_user = next_move(username) if bool(board.is_capture(chess.Move.from_uci(moving))): capture = True square_capture_index = chess.SQUARE_NAMES.index(moving[2:]) captured_piece = board.piece_type_at(square_capture_index) piece_name = get_piece_name(captured_piece) else: capture = False board.push(chess.Move.from_uci(moving)) if bool(board.is_check()): if username == white_user: king_square = board.king(chess.BLACK) check = True else: king_square = board.king(chess.WHITE) check = True if board.is_game_over() == True: game_moves = board.ply() close_game() checkmate = True else: checkmate = False if check == True and checkmate == False: toot_text = "@"+playing_user + " " + username + " t'ha fet escac!\n" elif check == True and checkmate == True: toot_text = "\nEscac i mat! (en " + str(game_moves) + " moviments)" + "\n\nEl guanyador és: " + "@"+username + '\n' toot_text += "\n@"+playing_user + ": ben jugat!" + "\n" toot_text += "\nPartides guanyades:" + "\n" played_games, wins = get_stats(username) toot_text += username + ": " + str(wins) + " de " + str(played_games) + "\n" played_games, wins = get_stats(playing_user) toot_text += playing_user + ": " + str(wins) + " de " + str(played_games) + "\n" else: toot_text = "@"+playing_user + ' el teu torn.'+ '\n' if capture == True and checkmate == False: toot_text += "\n* has perdut " + piece_name + "!\n" toot_text += '\n#escacs' + '\n' if username == white_user: if check == True: svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving), check=board.king(chess.BLACK)) else: svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving)) else: if check == True: svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving), check=board.king(chess.WHITE)) else: svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving)) board_file = 'games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) image_id = mastodon.media_post(board_file, "image/png").id toot_id = mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) toot_url = toot_id.uri board_game = board.fen() update_game(board_game, toot_url) game_moves = board.ply() save_annotation() update_moves(username, game_moves) update_replies(status_id, username, now) except ValueError as v_error: print(v_error) toot_text = "@"+username + ' moviment il·legal! (' + moving + '!?)\n' mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) pass except AssertionError as a_error: print(a_error) toot_text = "@"+username + ' moviment il·legal! (' + moving + '!?)\n' mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) pass elif query_word[0:2] == 'fi': if black_user != '': if username == white_user: toot_text = "@"+username + " ha deixat la partida amb " + "@"+black_user mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) close_game() update_replies(status_id, username, now) i += 1 continue else: toot_text = "@"+username + " ha deixat la partida amb " + white_user mastodon.statud_post(toot_text, in_reply_to_id=status_id,visibility=visibility) close_game() update_replies(status_id, username, now) i += 1 continue else: toot_text = "@"+username + " has abandonat la partida en espera." mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility) close_game() update_replies(status_id, username, now) i += 1 continue elif query_word == 'jocs': player1_name_lst, player2_name_lst, game_status_lst, game_link_lst, next_move_lst = current_games() if len(player1_name_lst) > 0: toot_text = "@"+username + " partides iniciades:" + "\n" i = 0 while i < len(player1_name_lst): if game_status_lst[i] == 'waiting': toot_text += "\n" + player1_name_lst[i] + " / " + player2_name_lst[i] + " (en espera...)" + "\n" else: if next_move_lst[i] == player1_name_lst[i]: toot_text += "\n*" + player1_name_lst[i] + " / " + player2_name_lst[i] + " (en joc)" + "\n" else: toot_text += "\n" + player1_name_lst[i] + " / *" + player2_name_lst[i] + " (en joc)" + "\n" if game_link_lst[i] != None: toot_text += str(game_link_lst[i]) + "\n" i += 1 mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) else: toot_text = "@"+username + " cap partida en joc" + "\n" mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility) update_replies(status_id, username, now) else: if playing_user == None: toot_text = "@"+username + " no és el teu torn." + "\n" else: toot_text = "@"+username + " és el torn de " + playing_user + "\n" toot_text += '\n' toot_text += '#escacs' + '\n' board = chess.Board(on_going_game) if username == white_user: svgfile = chess.svg.board(board=board, orientation=chess.BLACK) else: svgfile = chess.svg.board(board=board, orientation=chess.WHITE) board_file = 'games/' + str(game_id) + '_board.png' svg2png(bytestring=svgfile,write_to=board_file) image_id = mastodon.media_post(board_file, "image/png").id mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id}) update_replies(status_id, username, now) i += 1