mastochess/mastochess.py
2020-11-17 16:29:26 +01:00

1108 líneas
27 KiB
Python

import pdb
import sys
import os
import os.path
import re
import unidecode
from datetime import datetime, timedelta
from mastodon import Mastodon
import psycopg2
import chess
import chess.svg
from cairosvg import svg2png
def cleanhtml(raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
def unescape(s):
s = s.replace("&apos;", "'")
return s
def last_notification():
###################################################################
# query status_created_at of last notification
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select status_created_at from botreplies order by status_created_at desc limit 1")
row = cur.fetchone()
if row != None:
last_posted = row[0]
last_posted = last_posted.strftime("%d/%m/%Y, %H:%M:%S")
else:
last_posted = ""
cur.close()
return last_posted
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_bot_id():
###################################################################################################################################
# get bot_id from bot's username
try:
conn = None
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select id from accounts where username = (%s) and domain is null", (bot_username,))
row = cur.fetchone()
if row != None:
bot_id = row[0]
cur.close()
return bot_id
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_new_notifications():
#############################################################################################################################
# check if any new notifications by comparing newest notification datetime with the last query datetime
last_notifications = [] # to store last 20 'Mention' type notitifications for our bot
try:
conn = None
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select * from notifications where activity_type = 'Mention' and account_id = (%s) order by created_at desc limit 1", (bot_id,))
row = cur.fetchone()
if row != None:
last_notif_created_at = row[3]
last_notif_created_at = last_notif_created_at + timedelta(hours=2)
last_notif_created_at = last_notif_created_at.strftime("%d/%m/%Y, %H:%M:%S")
if last_posted != "":
if last_notif_created_at <= last_posted:
cur.close()
conn.close()
print("No new notifications")
sys.exit(0)
cur.execute("select * from notifications where activity_type = 'Mention' and account_id = (%s) order by created_at desc limit 20", (bot_id,))
rows = cur.fetchall()
if rows != None:
for row in rows:
last_notifications.append(row)
cur.close()
return last_notifications
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def get_notification_data():
try:
conn = None
conn = psycopg2.connect(database = mastodon_db, user = mastodon_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select username, domain from accounts where id=(%s)", (user_id,))
row = cur.fetchone()
if row != None:
username = row[0]
domain = row[1]
cur.execute("select status_id from mentions where id = (%s)", (activity_id,))
row = cur.fetchone()
if row != None:
status_id = row[0]
cur.execute("select text, visibility from statuses where id = (%s)", (status_id,))
row = cur.fetchone()
if row != None:
text = row[0]
visibility = row[1]
cur.close()
if visibility == 0:
visibility = 'public'
elif visibility == 1:
visibility = 'unlisted'
elif visibility == 2:
visibility = 'private'
elif visibility == 3:
visibility = 'direct'
cur.close()
return (username, domain, status_id, text, visibility)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
def update_replies(status_id, username, now):
post_id = status_id
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
insert_sql = "insert into botreplies(status_id, query_user, status_created_at) values(%s, %s, %s) ON CONFLICT DO NOTHING"
cur.execute(insert_sql, (post_id, username, now))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def check_replies(status_id):
post_id = status_id
replied = False
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select status_id from botreplies where status_id=(%s)", (post_id,))
row = cur.fetchone()
if row != None:
replied = True
else:
replied = False
cur.close()
return replied
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def check_games():
game_id = ''
white_user = ''
black_user = ''
on_going_game = ''
waiting = False
playing_user = ''
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
### check if there is an ongoing game
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and white_user=(%s) ", (username,))
row = cur.fetchone()
if row == None:
cur.execute("SELECT game_id, white_user, black_user, chess_game, waiting, next_move FROM games where not finished and black_user=(%s)", (username,))
row = cur.fetchone()
if row == None:
is_playing = False
else:
is_playing = True
game_id = row[0]
white_user = row[1]
if row[1] != None:
black_user = row[2]
else:
black_user = ''
on_going_game = row[3]
waiting = row[4]
playing_user = row[5]
else:
is_playing = True
game_id = row[0]
white_user = row[1]
if row[2] != None:
black_user = row[2]
else:
black_user = ''
on_going_game = row[3]
waiting = row[4]
playing_user = row[5]
cur.close()
return (is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user)
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def new_game():
try:
game_status = 'waiting'
waiting = True
board_game = board.fen()
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
insert_query = 'INSERT INTO games(created_at, white_user, chess_game, chess_status, waiting, updated_at) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING'
cur.execute(insert_query, (now, username, board_game, game_status, waiting, now))
insert_query = 'INSERT INTO stats(created_at, white_user) VALUES (%s,%s) ON CONFLICT DO NOTHING'
cur.execute(insert_query, (now, username,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def update_game(board_game):
try:
now = datetime.now()
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("update games set chess_game=(%s), updated_at=(%s) where game_id=(%s)", (board_game, now, game_id,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def close_game():
now = datetime.now()
finished = True
try:
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("update games set finished=(%s), updated_at=(%s) where game_id=(%s)", (finished, now, game_id))
cur.execute("update stats set winner=(%s), finished=(%s), updated_at=(%s) where game_id=(%s)", (username, finished, now, game_id))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def waiting_games():
try:
game_id = ''
game_waiting = False
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select game_id from games where waiting order by game_id desc limit 1")
row = cur.fetchone()
if row != None:
game_id = row[0]
game_waiting = True
cur.close()
return (game_id, game_waiting)
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def join_player():
try:
now = datetime.now()
game_status = 'waiting'
waiting = True
moves = 0
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("update games set black_user=(%s), chess_status='playing', waiting='f', updated_at=(%s), moves=(%s) where game_id=(%s)", (username, now, moves, game_id,))
cur.execute("update stats set black_user=(%s), updated_at=(%s) where game_id=(%s)", (username, now, game_id,))
conn.commit()
cur.execute("select white_user, chess_game from games where game_id=(%s)", (game_id,))
row = cur.fetchone()
if row != None:
white_user = row[0]
chess_game = row[1]
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (white_user, now, game_id,))
conn.commit()
cur.close()
game_status = 'playing'
return (game_status, white_user, chess_game)
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def update_moves(username, game_moves):
try:
now = datetime.now()
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("update games set next_move=(%s), last_move=(%s), moves=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, username, game_moves, now, game_id,))
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def next_move(playing_user):
try:
now = datetime.now()
waiting = True
conn = None
conn = psycopg2.connect(database = chess_db, user = chess_db_user, password = "", host = "/var/run/postgresql", port = "5432")
cur = conn.cursor()
cur.execute("select white_user, black_user, last_move, moves from games where game_id=(%s)", (game_id,))
row = cur.fetchone()
if row != None:
white_user = row[0]
black_user = row[1]
last_move = row[2]
moves = row[3]
if last_move != None:
if playing_user == white_user:
playing_user = black_user
elif playing_user == black_user:
playing_user = white_user
else:
last_move = white_user
cur.execute("update games set next_move=(%s), updated_at=(%s) where game_id=(%s)", (playing_user, now, game_id,))
conn.commit()
cur.close()
return playing_user
except (Exception, psycopg2.DatabaseError) as error:
sys.exit(error)
finally:
if conn is not None:
conn.close()
def replying():
reply = False
moving = ''
content = cleanhtml(text)
content = unescape(content)
try:
start = content.index("@")
end = content.index(" ")
if len(content) > end:
content = content[0: start:] + content[end +1::]
neteja = content.count('@')
i = 0
while i < neteja :
start = content.rfind("@")
end = len(content)
content = content[0: start:] + content[end +1::]
i += 1
question = content.lower()
query_word = question
query_word_length = len(query_word)
if unidecode.unidecode(question)[0:query_word_length] == query_word:
if query_word[0:4] == 'nova':
reply = True
elif query_word[0:3] == 'mou':
moving = query_word[4:query_word_length].replace(" ","")
reply = True
elif query_word[0:2] == 'fi':
reply = True
else:
reply = False
return (reply, query_word, moving)
except ValueError as v_error:
print(v_error)
def mastodon():
# Load secrets from secrets file
secrets_filepath = "secrets/secrets.txt"
uc_client_id = get_parameter("uc_client_id", secrets_filepath)
uc_client_secret = get_parameter("uc_client_secret", secrets_filepath)
uc_access_token = get_parameter("uc_access_token", secrets_filepath)
# Load configuration from config file
config_filepath = "config/config.txt"
mastodon_hostname = get_parameter("mastodon_hostname", config_filepath)
bot_username = get_parameter("bot_username", config_filepath)
# Initialise Mastodon API
mastodon = Mastodon(
client_id = uc_client_id,
client_secret = uc_client_secret,
access_token = uc_access_token,
api_base_url = 'https://' + mastodon_hostname,
)
# Initialise access headers
headers={ 'Authorization': 'Bearer %s'%uc_access_token }
return (mastodon, mastodon_hostname, bot_username)
def db_config():
# Load db configuration from config file
config_filepath = "config/db_config.txt"
mastodon_db = get_parameter("mastodon_db", config_filepath)
mastodon_db_user = get_parameter("mastodon_db_user", config_filepath)
chess_db = get_parameter("chess_db", config_filepath)
chess_db_user = get_parameter("chess_db_user", config_filepath)
return (mastodon_db, mastodon_db_user, chess_db, chess_db_user)
def get_parameter( parameter, file_path ):
if not os.path.isfile(file_path):
print("File %s not found, exiting."%file_path)
sys.exit(0)
with open( file_path ) as f:
for line in f:
if line.startswith( parameter ):
return line.replace(parameter + ":", "").strip()
print(file_path + " Missing parameter %s "%parameter)
sys.exit(0)
def create_dir():
if not os.path.exists('games'):
os.makedirs('games')
def usage():
print('usage: python ' + sys.argv[0] + ' --play')
###############################################################################
# main
if __name__ == '__main__':
# usage modes
if len(sys.argv) == 1:
usage()
elif len(sys.argv) == 2:
if sys.argv[1] == '--play':
mastodon, mastodon_hostname, bot_username = mastodon()
mastodon_db, mastodon_db_user, chess_db, chess_db_user = db_config()
now = datetime.now()
create_dir()
last_posted = last_notification()
bot_id = get_bot_id()
last_notifications = get_new_notifications()
last_notifications = sorted(last_notifications)
####################################################################
i = 0
while i < len(last_notifications):
user_id = last_notifications[i][5]
activity_id = last_notifications[i][0]
n_created_at = last_notifications[i][3]
n_created_at = n_created_at + timedelta(hours=2)
n_created_datetime = n_created_at.strftime("%d/%m/%Y, %H:%M:%S")
if n_created_datetime <= last_posted:
i +=1
continue
username, domain, status_id, text, visibility = get_notification_data()
replied = check_replies(status_id)
if replied == True or domain != None:
i += 1
continue
if domain != None:
update_replies(username, status_id)
i += 1
# listen them or not
reply, query_word, moving = replying()
is_playing, game_id, white_user, black_user, on_going_game, waiting, playing_user = check_games()
if game_id == '':
game_id, game_waiting = waiting_games()
if reply == True and is_playing == False:
if query_word == 'nova' and not game_waiting:
board = chess.Board()
svgfile = chess.svg.board(board=board)
board_file = 'games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
toot_text = "@"+username+ " partida iniciada! Esperant jugador... " +"\n"
toot_text += '\n'
image_id = mastodon.media_post(board_file, "image/png").id
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
new_game()
update_replies(status_id, username, now)
elif query_word == 'nova' and game_waiting:
game_status, white_user, chess_game = join_player()
playing_user = white_user
next_move(username)
board = chess.Board(chess_game)
svgfile = chess.svg.board(board=board)
board_file = 'games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
toot_text = "@"+username + " jugues amb " + white_user + "\n"
toot_text += '\n'
toot_text += "@"+white_user + ": et toca a tu" + "\n"
toot_text += '\n'
toot_text += '#escacs' + '\n'
image_id = mastodon.media_post(board_file, "image/png").id
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
game_moves = board.ply()
update_moves(username, game_moves)
update_replies(status_id, username, now)
elif reply and is_playing:
if query_word == 'nova':
toot_text = "@"+username + ' ja estas jugant una partida!' + '\n'
if black_user != '':
toot_text += '@'+white_user + ' / ' + '@'+black_user + '\n'
else:
toot_text += "esperant a l'altre jugador" + '\n'
toot_text += '\n'
toot_text += '#escacs' + '\n'
board = chess.Board(on_going_game)
svgfile = chess.svg.board(board=board)
board_file = 'games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
image_id = mastodon.media_post(board_file, "image/png").id
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
update_replies(status_id, username, now)
elif query_word[0:3] == 'mou' and playing_user == username:
board = chess.Board(on_going_game)
try:
if chess.Move.from_uci(moving) in board.legal_moves == False:
toot_text = "@"+username + ' moviment il·legal!' + '\n'
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
else:
playing_user = next_move(username)
board.push(chess.Move.from_uci(moving))
if board.is_check() == True:
toot_text = "@"+username + " t'ha fet escac!"
if board.is_game_over() == True:
toot_text += "\nEscac i mat! \nEl guanyador és: " + "@"+username + '\n'
toot_text += "\n@"+playing_user + ": ben jugat!"
close_game()
else:
toot_text = "@"+playing_user + ' el teu torn.'+ '\n'
toot_text += '\n#escacs' + '\n'
if username == white_user:
svgfile = chess.svg.board(board=board, orientation=chess.BLACK, lastmove=chess.Move.from_uci(moving))
else:
svgfile = chess.svg.board(board=board, orientation=chess.WHITE, lastmove=chess.Move.from_uci(moving))
board_file = 'games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
image_id = mastodon.media_post(board_file, "image/png").id
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
board_game = board.fen()
update_game(board_game)
game_moves = board.ply()
update_moves(username, game_moves)
update_replies(status_id, username, now)
except ValueError as v_error:
print(v_error)
pass
except AssertionError as a_error:
print(a_error)
toot_text = "@"+username + ' moviment il·legal!' + '\n'
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
update_replies(status_id, username, now)
pass
elif query_word[0:2] == 'fi':
if black_user != '':
if username == white_user:
toot_text = "@"+username + " ha deixat la partida amb " + "@"+black_user
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
close_game()
update_replies(status_id, username, now)
i += 1
continue
else:
toot_text = "@"+username + " ha deixat la partida amb " + white_user
mastodon.statud_post(toot_text, in_reply_to_id=status_id,visibility=visibility)
close_game()
update_replies(status_id, username, now)
i += 1
continue
else:
toot_text = "@"+username + " ha abandonat la partida en espera."
mastodon.status_post(toot_text, in_reply_to_id=status_id, visibility=visibility)
close_game()
update_replies(status_id, username, now)
i += 1
continue
else:
if playing_user == None:
toot_text = "@"+username + " no és el teu torn." + "\n"
else:
toot_text = "@"+username + " és el torn de " + playing_user + "\n"
toot_text += '\n'
toot_text += '#escacs' + '\n'
board = chess.Board(on_going_game)
if username == white_user:
svgfile = chess.svg.board(board=board, orientation=chess.BLACK)
else:
svgfile = chess.svg.board(board=board, orientation=chess.WHITE)
board_file = 'games/' + str(game_id) + '_board.png'
svg2png(bytestring=svgfile,write_to=board_file)
image_id = mastodon.media_post(board_file, "image/png").id
mastodon.status_post(toot_text, in_reply_to_id=status_id,visibility=visibility, media_ids={image_id})
update_replies(status_id, username, now)
i += 1