# Make a string for the piece (used for debug)
def __str__(self):
- return str(self.current_type) + " " + str(self.types) + " at " + str(self.x) + ","+str(self.y)
+ return str(self.colour) + " " + str(self.current_type) + " " + str(self.types) + " at " + str(self.x) + ","+str(self.y)
# Draw the piece in a pygame surface
def draw(self, window, grid_sz = [80,80], style="quantum"):
# Draw the two possible types underneath the current_type image
for i in range(len(self.types)):
- if self.types_revealed[i] == True:
+ if always_reveal_states == True or self.types_revealed[i] == True:
img = small_images[self.colour][self.types[i]]
else:
img = small_images[self.colour]["unknown"] # If the type hasn't been revealed, show a placeholder
# --- piece.py --- #
[w,h] = [8,8] # Width and height of board(s)
+always_reveal_states = False
+
# Class to represent a quantum chess board
class Board():
# Initialise; if master=True then the secondary piece types are assigned
for c in ["black", "white"]:
del self.unrevealed_types[c]["unknown"]
+ if style == "empty":
+ return
+
# Add all the pieces with known primary types
for i in range(0, 2):
def run_agent(agent):
#sys.stderr.write(sys.argv[0] + " : Running agent " + str(agent) + "\n")
- colour = sys.stdin.readline().strip(" \r\n")
- agent.colour = colour
while True:
line = sys.stdin.readline().strip(" \r\n")
if line == "SELECTION?":
class ExternalWrapper(ExternalAgent):
def __init__(self, agent):
- run = "python -u -c \"import sys;import os;from qchess import *;agent = " + agent.__class__.__name__ + "('" + agent.name + "','"+agent.colour+"');sys.exit(run_agent(agent))\""
+ run = "python -u -c \"import sys;import os;from qchess import *;agent = " + agent.__class__.__name__ + "('" + agent.name + "','"+agent.colour+"');sys.stdin.readline();sys.exit(run_agent(agent))\""
# str(run)
ExternalAgent.__init__(self, run, agent.colour)
def stopped(self):
return self._stop.isSet()
# --- thread_util.py --- #
+log_file = None
+import datetime
+import urllib2
+class LogFile():
+ def __init__(self, log):
+
+ self.log = log
+ self.logged = []
+ self.log.write("# Log starts " + str(datetime.datetime.now()) + "\n")
-log_file = None
+ def write(self, s):
+ now = datetime.datetime.now()
+ self.log.write(str(now) + " : " + s + "\n")
+ self.logged.append((now, s))
+
+ def setup(self, board, players):
+
+ for p in players:
+ self.log.write("# " + p.colour + " : " + p.name + "\n")
+
+ self.log.write("# Initial board\n")
+ for x in range(0, w):
+ for y in range(0, h):
+ if board.grid[x][y] != None:
+ self.log.write(str(board.grid[x][y]) + "\n")
+
+ self.log.write("# Start game\n")
+
+ def close(self):
+ self.log.write("# EOF\n")
+ self.log.close()
+
+class HttpLog(LogFile):
+ def __init__(self, file_name):
+ LogFile.__init__(self, open(file_name, "w", 0))
+ self.file_name = file_name
+ self.phase = 0
+
+ def write(self, s):
+ now = datetime.datetime.now()
+ self.logged.append((now, s))
+
+ if self.phase == 0:
+ self.log.close()
+ self.log = open(self.file_name, "w", 0)
+ self.log.write("# Short log updated " + str(datetime.datetime.now()) + "\n")
+ LogFile.setup(self, game.board, game.players)
+
+ elif self.phase == 1:
+ for message in self.logged[len(self.logged)-2:]:
+ self.log.write(str(message[0]) + " : " + message[1] + "\n")
+
+ self.phase = (self.phase + 1) % 2
+
+ def close(self):
+ self.log.write("# EOF\n")
+ self.log.close()
+
+
+class HeadRequest(urllib2.Request):
+ def get_method(self):
+ return "HEAD"
+
+class HttpGetter(StoppableThread):
+ def __init__(self, address):
+ StoppableThread.__init__(self)
+ self.address = address
+ self.log = urllib2.urlopen(address)
+ self.lines = []
+ self.lock = threading.RLock() #lock for access of self.state
+ self.cond = threading.Condition() # conditional
+
+ def run(self):
+ while not self.stopped():
+ line = self.log.readline()
+ if line == "":
+ date_mod = datetime.datetime.strptime(self.log.headers['last-modified'], "%a, %d %b %Y %H:%M:%S GMT")
+ self.log.close()
+
+ next_log = urllib2.urlopen(HeadRequest(self.address))
+ date_new = datetime.datetime.strptime(next_log.headers['last-modified'], "%a, %d %b %Y %H:%M:%S GMT")
+ while date_new <= date_mod and not self.stopped():
+ next_log = urllib2.urlopen(HeadRequest(self.address))
+ date_new = datetime.datetime.strptime(next_log.headers['last-modified'], "%a, %d %b %Y %H:%M:%S GMT")
+ if self.stopped():
+ break
+
+ self.log = urllib2.urlopen(self.address)
+ line = self.log.readline()
+
+ self.cond.acquire()
+ self.lines.append(line)
+ self.cond.notifyAll()
+ self.cond.release()
+ #sys.stderr.write(" HttpGetter got \'" + str(line) + "\'\n")
+
+ self.log.close()
+
+
+
+
+
+class HttpReplay():
+ def __init__(self, address):
+ self.getter = HttpGetter(address)
+ self.getter.start()
+
+ def readline(self):
+ self.getter.cond.acquire()
+ while len(self.getter.lines) == 0:
+ self.getter.cond.wait()
+
+ result = self.getter.lines[0]
+ self.getter.lines = self.getter.lines[1:]
+ self.getter.cond.release()
+
+ return result
+
+
+ def close(self):
+ self.getter.stop()
+
def log(s):
if log_file != None:
- import datetime
- log_file.write(str(datetime.datetime.now()) + " : " + s + "\n")
+ log_file.write(s)
+
+
+def log_init(board, players):
+ if log_file != None:
+ log_file.setup(board, players)
+
+# --- log.py --- #
+
self.lock = threading.RLock() #lock for access of self.state
self.cond = threading.Condition() # conditional for some reason, I forgot
self.final_result = ""
+
+
# Run the game (run in new thread with start(), run in current thread with run())
def run(self):
if self.stopped():
break
- self.board.update_move(x, y, x2, y2)
result = str(x) + " " + str(y) + " -> " + str(x2) + " " + str(y2)
+ log(result)
+
+ self.board.update_move(x, y, x2, y2)
+
for p2 in self.players:
p2.update(result) # Inform players of what happened
- log(result)
+
if isinstance(graphics, GraphicsThread):
with graphics.lock:
log(self.final_result)
- graphics.stop()
+ if isinstance(graphics, GraphicsThread):
+ graphics.stop()
# A thread that replays a log file
class ReplayThread(GameThread):
- def __init__(self, players, src):
- self.board = Board(style="agent")
+ def __init__(self, players, src, end=False,max_lines=None):
+ self.board = Board(style="empty")
GameThread.__init__(self, self.board, players)
self.src = src
+ self.max_lines = max_lines
+ self.line_number = 0
+ self.end = end
+
+ self.reset_board(self.src.readline())
+
+ def reset_board(self, line):
+ pieces = {"white" : [], "black" : []}
+ king = {"white" : None, "black" : None}
+ grid = [[None] * w for _ in range(h)]
+ for x in range(w):
+ for y in range(h):
+ self.board.grid[x][y] = None
+ while line != "# Start game":
+ if line[0] == "#":
+ line = self.src.readline().strip(" \r\n")
+ continue
- self.ended = False
+ tokens = line.split(" ")
+ [x, y] = map(int, tokens[len(tokens)-1].split(","))
+ current_type = tokens[1]
+ types = map(lambda e : e.strip("'[], "), (tokens[2]+tokens[3]).split(","))
+
+ target = Piece(tokens[0], x, y, types)
+ target.current_type = current_type
+
+ try:
+ target.choice = types.index(current_type)
+ except:
+ target.choice = -1
+
+ pieces[tokens[0]].append(target)
+ if target.current_type == "king":
+ king[tokens[0]] = target
+ grid[x][y] = target
+
+ line = self.src.readline().strip(" \r\n")
+
+ self.board.pieces = pieces
+ self.board.king = king
+ self.board.grid = grid
+
+ # Update the player's boards
def run(self):
- i = 0
- phase = 0
- for line in self.src:
-
+ move_count = 0
+ line = self.src.readline().strip(" \r\n")
+ while line != "# EOF":
if self.stopped():
- self.ended = True
break
- with self.lock:
- self.state["turn"] = self.players[i]
+
+
+ if line[0] == '#':
+ line = self.src.readline().strip(" \r\n")
+ continue
- line = line.split(":")
- result = line[len(line)-1].strip(" \r\n")
- log(result)
+ tokens = line.split(" ")
+ if tokens[0] == "white" or tokens[0] == "black":
+ self.reset_board(line)
+ line = self.src.readline().strip(" \r\n")
+ continue
+ move = line.split(":")
+ move = move[len(move)-1].strip(" \r\n")
+ tokens = move.split(" ")
+
+
try:
- self.board.update(result)
+ [x,y] = map(int, tokens[0:2])
except:
- self.ended = True
- self.final_result = result
- if isinstance(graphics, GraphicsThread):
- graphics.stop()
+ self.stop()
break
- [x,y] = map(int, result.split(" ")[0:2])
+ log(move)
+
target = self.board.grid[x][y]
+ with self.lock:
+ if target.colour == "white":
+ self.state["turn"] = self.players[0]
+ else:
+ self.state["turn"] = self.players[1]
+
+ move_piece = (tokens[2] == "->")
+ if move_piece:
+ [x2,y2] = map(int, tokens[len(tokens)-2:])
if isinstance(graphics, GraphicsThread):
- if phase == 0:
+ with graphics.lock:
+ graphics.state["select"] = target
+
+ if not move_piece:
+ self.board.update_select(x, y, int(tokens[2]), tokens[len(tokens)-1])
+ if isinstance(graphics, GraphicsThread):
with graphics.lock:
graphics.state["moves"] = self.board.possible_moves(target)
- graphics.state["select"] = target
-
time.sleep(turn_delay)
-
- elif phase == 1:
- [x2,y2] = map(int, result.split(" ")[3:5])
+ else:
+ self.board.update_move(x, y, x2, y2)
+ if isinstance(graphics, GraphicsThread):
with graphics.lock:
graphics.state["moves"] = [[x2,y2]]
-
time.sleep(turn_delay)
-
with graphics.lock:
graphics.state["select"] = None
- graphics.state["dest"] = None
graphics.state["moves"] = None
+ graphics.state["dest"] = None
+
+
+
+
+
+ for p in self.players:
+ p.update(move)
+
+ line = self.src.readline().strip(" \r\n")
+
+
+
+
+
+
+
+
- for p in self.players:
- p.update(result)
+
- phase = (phase + 1) % 2
- if phase == 0:
- i = (i + 1) % 2
+
+ if self.max_lines != None and self.max_lines > count:
+ sys.stderr.write(sys.argv[0] + " : Replaying from file; stopping at last line (" + str(count) + ")\n")
+ sys.stderr.write(sys.argv[0] + " : (You requested line " + str(self.max_lines) + ")\n")
+
+ if self.end and isinstance(graphics, GraphicsThread):
+ #graphics.stop()
+ pass # Let the user stop the display
+ elif not self.end:
+ global game
+ game = GameThread(self.board, self.players)
+ game.run()
+
else:
return "white"
# --- game.py --- #
-import pygame
+try:
+ import pygame
+except:
+ pass
import os
# Dictionary that stores the unicode character representations of the different pieces
#print "Test font"
pygame.font.Font(os.path.join(os.path.curdir, "data", "DejaVuSans.ttf"), 32).render("Hello", True,(0,0,0))
- #create_images(grid_sz)
+ #load_images()
create_images(grid_sz)
"""
global log_file
global src_file
global graphics_enabled
+ global always_reveal_states
-
+ max_lines = None
src_file = None
style = "quantum"
style = "classical"
elif arg[1] == '-' and arg[2:] == "quantum":
style = "quantum"
+ elif arg[1] == '-' and arg[2:] == "reveal":
+ always_reveal_states = True
elif (arg[1] == '-' and arg[2:] == "graphics"):
graphics_enabled = not graphics_enabled
elif (arg[1] == '-' and arg[2:].split("=")[0] == "file"):
if len(arg[2:].split("=")) == 1:
src_file = sys.stdin
else:
- src_file = open(arg[2:].split("=")[1])
+ f = arg[2:].split("=")[1]
+ if f[0] == '@':
+ src_file = HttpReplay("http://" + f.split(":")[0][1:])
+ else:
+ src_file = open(f.split(":")[0], "r", 0)
+
+ if len(f.split(":")) == 2:
+ max_lines = int(f.split(":")[1])
+
elif (arg[1] == '-' and arg[2:].split("=")[0] == "log"):
# Log file
if len(arg[2:].split("=")) == 1:
- log_file = sys.stdout
+ log_file = LogFile(sys.stdout)
else:
- log_file = open(arg[2:].split("=")[1], "w")
+ f = arg[2:].split("=")[1]
+ if f[0] == '@':
+ log_file = HttpLog(f[1:])
+ else:
+ log_file = LogFile(open(f, "w", 0))
elif (arg[1] == '-' and arg[2:].split("=")[0] == "delay"):
# Delay
if len(arg[2:].split("=")) == 1:
# Construct a GameThread! Make it global! Damn the consequences!
if src_file != None:
- if len(players) == 0:
+ # Hack to stop ReplayThread from exiting
+ #if len(players) == 0:
+ # players = [HumanPlayer("dummy", "white"), HumanPlayer("dummy", "black")]
+
+ # Normally the ReplayThread exits if there are no players
+ # TODO: Decide which behaviour to use, and fix it
+ end = (len(players) == 0)
+ if end:
players = [Player("dummy", "white"), Player("dummy", "black")]
- game = ReplayThread(players, src_file)
+ elif len(players) != 2:
+ sys.stderr.write(sys.argv[0] + " : Usage " + sys.argv[0] + " white black\n")
+ if graphics_enabled:
+ sys.stderr.write(sys.argv[0] + " : (You won't get a GUI, because --file was used, and the author is lazy)\n")
+ return 44
+ game = ReplayThread(players, src_file, end=end, max_lines=max_lines)
else:
board = Board(style)
game = GameThread(board, players)
+ log_init(game.board, players)
+
if graphics != None:
game.start() # This runs in a new thread
graphics.run()
- game.join()
+ if game.is_alive():
+ game.join()
+
+
error = game.error + graphics.error
else:
game.run()
error = game.error
+
if log_file != None and log_file != sys.stdout:
log_file.close()
# This is how python does a main() function...
if __name__ == "__main__":
- sys.exit(main(sys.argv))
+ try:
+ sys.exit(main(sys.argv))
+ except KeyboardInterrupt:
+ sys.stderr.write(sys.argv[0] + " : Got KeyboardInterrupt. Stopping everything\n")
+ if isinstance(graphics, StoppableThread):
+ graphics.stop()
+ graphics.run() # Will clean up graphics because it is stopped, not run it (a bit dodgy)
+
+ if isinstance(game, StoppableThread):
+ game.stop()
+ if game.is_alive():
+ game.join()
+
+ sys.exit(102)
+
# --- main.py --- #
-# EOF - created from make on Tue Jan 29 18:10:18 WST 2013
+# EOF - created from make on Wed Jan 30 21:00:29 WST 2013