+ # Are you sure you want to quit?
+ def quit(self, final_result):
+ if graphics == None:
+ sys.stdout.write("QUIT " + final_result + "\n")
+
+ # Completely useless function
+ def update(self, result):
+ if isinstance(graphics, GraphicsThread):
+ pass
+ else:
+ sys.stdout.write(result + "\n")
+
+
+# Default internal player (makes random moves)
+class InternalAgent(Player):
+ def __init__(self, name, colour):
+ Player.__init__(self, name, colour)
+ self.choice = None
+
+ self.board = Board(style = "agent")
+
+
+
+ def update(self, result):
+
+ self.board.update(result)
+ self.board.verify()
+
+ def quit(self, final_result):
+ pass
+
+class AgentRandom(InternalAgent):
+ def __init__(self, name, colour):
+ InternalAgent.__init__(self, name, colour)
+
+ def select(self):
+ while True:
+ self.choice = self.board.pieces[self.colour][random.randint(0, len(self.board.pieces[self.colour])-1)]
+ all_moves = []
+ # Check that the piece has some possibility to move
+ tmp = self.choice.current_type
+ if tmp == "unknown": # For unknown pieces, try both types
+ for t in self.choice.types:
+ if t == "unknown":
+ continue
+ self.choice.current_type = t
+ all_moves += self.board.possible_moves(self.choice)
+ else:
+ all_moves = self.board.possible_moves(self.choice)
+ self.choice.current_type = tmp
+ if len(all_moves) > 0:
+ break
+ return [self.choice.x, self.choice.y]
+
+ def get_move(self):
+ moves = self.board.possible_moves(self.choice)
+ move = moves[random.randint(0, len(moves)-1)]
+ return move
+
+
+# Terrible, terrible hacks
+
+def run_agent(agent):
+ #sys.stderr.write(sys.argv[0] + " : Running agent " + str(agent) + "\n")
+ while True:
+ line = sys.stdin.readline().strip(" \r\n")
+ if line == "SELECTION?":
+ #sys.stderr.write(sys.argv[0] + " : Make selection\n")
+ [x,y] = agent.select() # Gets your agent's selection
+ #sys.stderr.write(sys.argv[0] + " : Selection was " + str(agent.choice) + "\n")
+ sys.stdout.write(str(x) + " " + str(y) + "\n")
+ elif line == "MOVE?":
+ #sys.stderr.write(sys.argv[0] + " : Make move\n")
+ [x,y] = agent.get_move() # Gets your agent's move
+ sys.stdout.write(str(x) + " " + str(y) + "\n")
+ elif line.split(" ")[0] == "QUIT":
+ #sys.stderr.write(sys.argv[0] + " : Quitting\n")
+ agent.quit(" ".join(line.split(" ")[1:])) # Quits the game
+ break
+ else:
+ agent.update(line) # Updates agent.board
+ return 0
+
+
+# Sort of works?
+
+class ExternalWrapper(ExternalAgent):
+ def __init__(self, agent):
+ run = "python -u -c \"import sys;import os;from qchess import *;agent = " + agent.__class__.__name__ + "('" + agent.name + "','"+agent.colour+"');sys.stdin.readline();sys.exit(run_agent(agent))\""
+ # str(run)
+ ExternalAgent.__init__(self, run, agent.colour)
+
+
+
+# --- player.py --- #
+# A sample agent
+
+
+class AgentBishop(InternalAgent): # Inherits from InternalAgent (in qchess)
+ def __init__(self, name, colour):
+ InternalAgent.__init__(self, name, colour)
+ self.value = {"pawn" : 1, "bishop" : 3, "knight" : 3, "rook" : 5, "queen" : 9, "king" : 100, "unknown" : 4}
+
+ self.aggression = 2.0 # Multiplier for scoring due to aggressive actions
+ self.defence = 1.0 # Multiplier for scoring due to defensive actions
+
+ self.depth = 0 # Current depth
+ self.max_depth = 2 # Recurse this many times (for some reason, makes more mistakes when this is increased???)
+ self.recurse_for = -1 # Recurse for the best few moves each times (less than 0 = all moves)
+
+ for p in self.board.pieces["white"] + self.board.pieces["black"]:
+ p.last_moves = None
+ p.selected_moves = None
+
+
+
+ def get_value(self, piece):
+ if piece == None:
+ return 0.0
+ return float(self.value[piece.types[0]] + self.value[piece.types[1]]) / 2.0
+
+ # Score possible moves for the piece
+
+ def prioritise_moves(self, piece):
+
+ #sys.stderr.write(sys.argv[0] + " : " + str(self) + " prioritise called for " + str(piece) + "\n")
+
+
+
+ grid = self.board.probability_grid(piece)
+ #sys.stderr.write("\t Probability grid " + str(grid) + "\n")
+ moves = []
+ for x in range(w):
+ for y in range(h):
+ if grid[x][y] < 0.3: # Throw out moves with < 30% probability
+ #sys.stderr.write("\tReject " + str(x) + "," + str(y) + " (" + str(grid[x][y]) + ")\n")
+ continue
+
+ target = self.board.grid[x][y]
+
+
+
+
+ # Get total probability that the move is protected
+ [xx,yy] = [piece.x, piece.y]
+ [piece.x, piece.y] = [x, y]
+ self.board.grid[x][y] = piece
+ self.board.grid[xx][yy] = None
+
+ defenders = self.board.coverage(x, y, piece.colour, reject_allied = False)
+ d_prob = 0.0
+ for d in defenders.keys():
+ d_prob += defenders[d]
+ if len(defenders.keys()) > 0:
+ d_prob /= float(len(defenders.keys()))
+
+ if (d_prob > 1.0):
+ d_prob = 1.0
+
+ # Get total probability that the move is threatened
+ attackers = self.board.coverage(x, y, opponent(piece.colour), reject_allied = False)
+ a_prob = 0.0
+ for a in attackers.keys():
+ a_prob += attackers[a]
+ if len(attackers.keys()) > 0:
+ a_prob /= float(len(attackers.keys()))
+
+ if (a_prob > 1.0):
+ a_prob = 1.0
+
+ self.board.grid[x][y] = target
+ self.board.grid[xx][yy] = piece
+ [piece.x, piece.y] = [xx, yy]
+
+
+ # Score of the move
+ value = self.aggression * (1.0 + d_prob) * self.get_value(target) - self.defence * (1.0 - d_prob) * a_prob * self.get_value(piece)
+
+ # Adjust score based on movement of piece out of danger
+ attackers = self.board.coverage(piece.x, piece.y, opponent(piece.colour))
+ s_prob = 0.0
+ for a in attackers.keys():
+ s_prob += attackers[a]
+ if len(attackers.keys()) > 0:
+ s_prob /= float(len(attackers.keys()))
+
+ if (s_prob > 1.0):
+ s_prob = 1.0
+ value += self.defence * s_prob * self.get_value(piece)
+
+ # Adjust score based on probability that the move is actually possible
+ moves.append([[x, y], grid[x][y] * value])
+
+ moves.sort(key = lambda e : e[1], reverse = True)
+ #sys.stderr.write(sys.argv[0] + ": Moves for " + str(piece) + " are " + str(moves) + "\n")
+
+ piece.last_moves = moves
+ piece.selected_moves = None
+
+
+
+
+ return moves
+
+ def select_best(self, colour):
+
+ self.depth += 1
+ all_moves = {}
+ for p in self.board.pieces[colour]:
+ self.choice = p # Temporarily pick that piece
+ m = self.prioritise_moves(p)
+ if len(m) > 0:
+ all_moves.update({p : m[0]})
+
+ if len(all_moves.items()) <= 0:
+ return None
+
+
+ opts = all_moves.items()
+ opts.sort(key = lambda e : e[1][1], reverse = True)
+
+ if self.depth >= self.max_depth:
+ self.depth -= 1
+ return list(opts[0])
+
+ if self.recurse_for >= 0:
+ opts = opts[0:self.recurse_for]
+ #sys.stderr.write(sys.argv[0] + " : Before recurse, options are " + str(opts) + "\n")
+
+ # Take the best few moves, and recurse
+ for choice in opts[0:self.recurse_for]:
+ [xx,yy] = [choice[0].x, choice[0].y] # Remember position
+ [nx,ny] = choice[1][0] # Target
+ [choice[0].x, choice[0].y] = [nx, ny] # Set position
+ target = self.board.grid[nx][ny] # Remember piece in spot
+ self.board.grid[xx][yy] = None # Remove piece
+ self.board.grid[nx][ny] = choice[0] # Replace with moving piece
+
+ # Recurse
+ best_enemy_move = self.select_best(opponent(choice[0].colour))
+ choice[1][1] -= best_enemy_move[1][1] / float(self.depth + 1.0)
+
+ [choice[0].x, choice[0].y] = [xx, yy] # Restore position
+ self.board.grid[nx][ny] = target # Restore taken piece
+ self.board.grid[xx][yy] = choice[0] # Restore moved piece
+
+
+
+ opts.sort(key = lambda e : e[1][1], reverse = True)
+ #sys.stderr.write(sys.argv[0] + " : After recurse, options are " + str(opts) + "\n")
+
+ self.depth -= 1
+ return list(opts[0])
+
+
+
+ # Returns [x,y] of selected piece
+ def select(self):
+ #sys.stderr.write("Getting choice...")
+ self.choice = self.select_best(self.colour)[0]
+ #sys.stderr.write(" Done " + str(self.choice)+"\n")
+ return [self.choice.x, self.choice.y]
+
+ # Returns [x,y] of square to move selected piece into
+ def get_move(self):
+ #sys.stderr.write("Choice is " + str(self.choice) + "\n")
+ self.choice.selected_moves = self.choice.last_moves
+ moves = self.prioritise_moves(self.choice)
+ if len(moves) > 0:
+ return moves[0][0]
+ else:
+ return InternalAgent.get_move(self)
+
+# --- agent_bishop.py --- #
+import multiprocessing
+
+# Hacky alternative to using select for timing out players
+
+# WARNING: Do not wrap around HumanPlayer or things breakify
+# WARNING: Do not use in general or things breakify
+
+class Sleeper(multiprocessing.Process):
+ def __init__(self, timeout):
+ multiprocessing.Process.__init__(self)
+ self.timeout = timeout
+
+ def run(self):
+ time.sleep(self.timeout)
+
+
+class Worker(multiprocessing.Process):
+ def __init__(self, function, args, q):
+ multiprocessing.Process.__init__(self)
+ self.function = function
+ self.args = args
+ self.q = q
+
+ def run(self):
+ #print str(self) + " runs " + str(self.function) + " with args " + str(self.args)
+ self.q.put(self.function(*self.args))
+
+
+
+def TimeoutFunction(function, args, timeout):
+ q = multiprocessing.Queue()
+ w = Worker(function, args, q)
+ s = Sleeper(timeout)
+ w.start()
+ s.start()
+ while True: # Busy loop of crappyness
+ if not w.is_alive():
+ s.terminate()
+ result = q.get()
+ w.join()
+ #print "TimeoutFunction gets " + str(result)
+ return result
+ elif not s.is_alive():
+ w.terminate()
+ s.join()
+ raise Exception("TIMEOUT")
+
+
+
+
+# A player that wraps another player and times out its moves
+# Uses threads
+# A (crappy) alternative to the use of select()
+class TimeoutPlayer(Player):
+ def __init__(self, base_player, timeout):
+ Player.__init__(self, base_player.name, base_player.colour)
+ self.base_player = base_player
+ self.timeout = timeout
+
+ def select(self):
+ return TimeoutFunction(self.base_player.select, [], self.timeout)
+
+
+ def get_move(self):
+ return TimeoutFunction(self.base_player.get_move, [], self.timeout)
+
+ def update(self, result):
+ return TimeoutFunction(self.base_player.update, [result], self.timeout)
+
+ def quit(self, final_result):
+ return TimeoutFunction(self.base_player.quit, [final_result], self.timeout)
+# --- timeout_player.py --- #
+import socket
+import select
+
+network_timeout_start = -1.0 # Timeout in seconds to wait for the start of a message
+network_timeout_delay = 1.0 # Maximum time between two characters being received
+
+class Network():
+ def __init__(self, colour, address = None):
+ self.socket = socket.socket()
+ #self.socket.setblocking(0)
+
+ if colour == "white":
+ self.port = 4562
+ else:
+ self.port = 4563
+
+ self.src = None
+
+ # print str(self) + " listens on port " + str(self.port)
+
+ if address == None:
+ self.host = socket.gethostname()
+ self.socket.bind((self.host, self.port))
+ self.socket.listen(5)
+
+ self.src, self.address = self.socket.accept()
+ self.src.send("ok\n")
+ if self.get_response() == "QUIT":
+ self.src.close()
+ else:
+ self.host = address
+ self.socket.connect((address, self.port))
+ self.src = self.socket
+ self.src.send("ok\n")
+ if self.get_response() == "QUIT":
+ self.src.close()
+
+ def get_response(self):
+ # Timeout the start of the message (first character)
+ if network_timeout_start > 0.0:
+ ready = select.select([self.src], [], [], network_timeout_start)[0]
+ else:
+ ready = [self.src]
+ if self.src in ready:
+ s = self.src.recv(1)
+ else:
+ raise Exception("UNRESPONSIVE")
+
+
+ while s[len(s)-1] != '\n':
+ # Timeout on each character in the message
+ if network_timeout_delay > 0.0:
+ ready = select.select([self.src], [], [], network_timeout_delay)[0]
+ else:
+ ready = [self.src]
+ if self.src in ready:
+ s += self.src.recv(1)
+ else:
+ raise Exception("UNRESPONSIVE")
+
+ return s.strip(" \r\n")
+
+ def send_message(self,s):
+ if network_timeout_start > 0.0:
+ ready = select.select([], [self.src], [], network_timeout_start)[1]
+ else:
+ ready = [self.src]
+
+ if self.src in ready:
+ self.src.send(s + "\n")
+ else:
+ raise Exception("UNRESPONSIVE")
+
+ def check_quit(self, s):
+ s = s.split(" ")
+ if s[0] == "QUIT":
+ with game.lock:
+ game.final_result = " ".join(s[1:]) + " " + str(opponent(self.colour))
+ game.stop()
+ return True
+
+
+
+class NetworkSender(Player,Network):
+ def __init__(self, base_player, address = None):
+ self.base_player = base_player
+ Player.__init__(self, base_player.name, base_player.colour)
+
+ self.address = address
+
+ def connect(self):
+ Network.__init__(self, self.base_player.colour, self.address)
+
+
+
+ def select(self):
+ [x,y] = self.base_player.select()
+ choice = self.board.grid[x][y]
+ s = str(x) + " " + str(y)
+ #print str(self) + ".select sends " + s
+ self.send_message(s)
+ return [x,y]
+
+ def get_move(self):
+ [x,y] = self.base_player.get_move()
+ s = str(x) + " " + str(y)
+ #print str(self) + ".get_move sends " + s
+ self.send_message(s)
+ return [x,y]
+
+ def update(self, s):
+ self.base_player.update(s)
+ s = s.split(" ")
+ [x,y] = map(int, s[0:2])
+ selected = self.board.grid[x][y]
+ if selected != None and selected.colour == self.colour and len(s) > 2 and not "->" in s:
+ s = " ".join(s[0:3])
+ for i in range(2):
+ if selected.types_revealed[i] == True:
+ s += " " + str(selected.types[i])
+ else:
+ s += " unknown"
+ #print str(self) + ".update sends " + s
+ self.send_message(s)
+
+
+ def quit(self, final_result):
+ self.base_player.quit(final_result)
+ #self.src.send("QUIT " + str(final_result) + "\n")
+ self.src.close()
+
+class NetworkReceiver(Player,Network):
+ def __init__(self, colour, address=None):
+
+ Player.__init__(self, address, colour)
+
+ self.address = address
+
+ self.board = None
+
+ def connect(self):
+ Network.__init__(self, self.colour, self.address)
+
+
+ def select(self):
+
+ s = self.get_response()
+ #print str(self) + ".select gets " + s
+ [x,y] = map(int,s.split(" "))
+ if x == -1 and y == -1:
+ #print str(self) + ".select quits the game"
+ with game.lock:
+ game.final_state = "network terminated " + self.colour
+ game.stop()
+ return [x,y]
+ def get_move(self):
+ s = self.get_response()
+ #print str(self) + ".get_move gets " + s
+ [x,y] = map(int,s.split(" "))
+ if x == -1 and y == -1:
+ #print str(self) + ".get_move quits the game"
+ with game.lock:
+ game.final_state = "network terminated " + self.colour
+ game.stop()
+ return [x,y]
+
+ def update(self, result):
+
+ result = result.split(" ")
+ [x,y] = map(int, result[0:2])
+ selected = self.board.grid[x][y]
+ if selected != None and selected.colour == self.colour and len(result) > 2 and not "->" in result:
+ s = self.get_response()
+ #print str(self) + ".update - receives " + str(s)
+ s = s.split(" ")
+ selected.choice = int(s[2])
+ for i in range(2):
+ selected.types[i] = str(s[3+i])
+ if s[3+i] == "unknown":
+ selected.types_revealed[i] = False
+ else:
+ selected.types_revealed[i] = True
+ selected.current_type = selected.types[selected.choice]
+ else:
+ pass
+ #print str(self) + ".update - ignore result " + str(result)
+
+
+ def quit(self, final_result):
+ self.src.close()
+
+# --- network.py --- #
+import threading
+
+# A thread that can be stopped!
+# Except it can only be stopped if it checks self.stopped() periodically
+# So it can sort of be stopped
+class StoppableThread(threading.Thread):
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self._stop = threading.Event()
+
+ def stop(self):
+ self._stop.set()
+
+ def stopped(self):
+ return self._stop.isSet()
+# --- thread_util.py --- #
+
+
+log_file = None
+
+def log(s):
+ if log_file != None:
+ import datetime
+ log_file.write(str(datetime.datetime.now()) + " : " + s + "\n")
+
+def log_init(board, players):
+ if log_file != None:
+ import datetime
+ log_file.write("# Log starts " + str(datetime.datetime.now()) + "\n")
+ for p in players:
+ log_file.write("# " + p.colour + " : " + p.name + "\n")
+
+ log_file.write("# Initial board\n")
+ for x in range(0, w):
+ for y in range(0, h):
+ if board.grid[x][y] != None:
+ log_file.write(str(board.grid[x][y]) + "\n")
+
+ log_file.write("# Start game\n")
+
+
+# A thread that runs the game
+class GameThread(StoppableThread):
+ def __init__(self, board, players):
+ StoppableThread.__init__(self)
+ self.board = board
+ self.players = players
+ self.state = {"turn" : None} # The game state
+ self.error = 0 # Whether the thread exits with an error
+ self.lock = threading.RLock() #lock for access of self.state
+ self.cond = threading.Condition() # conditional for some reason, I forgot
+ self.final_result = ""
+
+
+
+ # Run the game (run in new thread with start(), run in current thread with run())
+ def run(self):
+ result = ""
+ while not self.stopped():
+
+ for p in self.players:
+ with self.lock:
+ if isinstance(p, NetworkSender):
+ self.state["turn"] = p.base_player # "turn" contains the player who's turn it is
+ else:
+ self.state["turn"] = p
+ #try:
+ if True:
+ [x,y] = p.select() # Player selects a square
+ if self.stopped():
+ break
+
+
+
+
+ result = self.board.select(x, y, colour = p.colour)
+ for p2 in self.players:
+ p2.update(result) # Inform players of what happened
+
+
+ log(result)
+
+ target = self.board.grid[x][y]
+ if isinstance(graphics, GraphicsThread):
+ with graphics.lock:
+ graphics.state["moves"] = self.board.possible_moves(target)
+ graphics.state["select"] = target
+
+ time.sleep(turn_delay)
+
+
+ if len(self.board.possible_moves(target)) == 0:
+ #print "Piece cannot move"
+ target.deselect()
+ if isinstance(graphics, GraphicsThread):
+ with graphics.lock:
+ graphics.state["moves"] = None
+ graphics.state["select"] = None
+ graphics.state["dest"] = None
+ continue
+
+ try:
+ [x2,y2] = p.get_move() # Player selects a destination
+ except:
+ self.stop()
+
+ if self.stopped():
+ break
+
+ self.board.update_move(x, y, x2, y2)
+ result = str(x) + " " + str(y) + " -> " + str(x2) + " " + str(y2)
+ for p2 in self.players:
+ p2.update(result) # Inform players of what happened
+
+ log(result)
+
+ if isinstance(graphics, GraphicsThread):
+ with graphics.lock:
+ graphics.state["moves"] = [[x2,y2]]
+
+ time.sleep(turn_delay)
+
+ if isinstance(graphics, GraphicsThread):
+ with graphics.lock:
+ graphics.state["select"] = None
+ graphics.state["dest"] = None
+ graphics.state["moves"] = None
+
+ # Commented out exception stuff for now, because it makes it impossible to tell if I made an IndentationError somewhere
+ # except Exception,e:
+ # result = e.message
+ # #sys.stderr.write(result + "\n")
+ #
+ # self.stop()
+ # with self.lock:
+ # self.final_result = self.state["turn"].colour + " " + e.message
+
+ if self.board.king["black"] == None:
+ if self.board.king["white"] == None:
+ with self.lock:
+ self.final_result = self.state["turn"].colour + " DRAW"
+ else:
+ with self.lock:
+ self.final_result = "white"
+ self.stop()
+ elif self.board.king["white"] == None:
+ with self.lock:
+ self.final_result = "black"
+ self.stop()
+
+
+ if self.stopped():
+ break
+
+
+ for p2 in self.players:
+ p2.quit(self.final_result)
+
+ log(self.final_result)