Source code for algorithms.mcts

import math
from random import choice
from tqdm import tqdm

from gameai.core import Algorithm
from .utils import assign_rewards

DEFAULT_C_PUNT = 1.4


[docs]class MCTS(Algorithm): ''' Implementation of a Monte Carlo Tree Search. We want to learn how to play a game by keeping track of the best action in any state. We will do this by propagating whether or not the current player won the game back up through the game history. After enough iterations of game simulations we can choose the best move based on this stored information Attributes: wins (dict): A dictionary where the key is a tuple :code:`(player, state_hash)` and the value is the number of wins that occurred at that state for the player. Note that the player represents whoever *played* the move in the state. plays (dict): A dictionary of the same format as wins which represents the number of times the player made a move in the given state ''' def __init__(self): self.wins = {} self.plays = {}
[docs] def search(self, g, num_iters=100, verbose=False, nnet=None, c_punt=DEFAULT_C_PUNT): ''' Play out a certain number of games, each time updating our win and play counts for any state that we visit during the game. As we continue to play, num_wins / num_plays for a given state should begin to converge on the true optimality of a state Args: g (Game): Game to train on num_iters (int): Number of search iterations verbose (bool): Whether or not to render a progress bar nnet (Network): Optional nework to be used for the policy instead of a naive random playout c_punt (float): The degree of exploration. Defaults to 1.4 ''' if verbose: for _ in tqdm(range(num_iters)): self.execute_episode(g, nnet=nnet, c_punt=c_punt) else: for _ in range(num_iters): self.execute_episode(g, nnet=nnet, c_punt=c_punt)
[docs] def execute_episode(self, g, nnet=None, c_punt=DEFAULT_C_PUNT): ''' Execute a single iteration of the search and update the internal state based on the generated examples Args: g (Game): The game nnet (Network): Optional nework to be used for the policy instead of a naive random playout c_punt (float): The degree of exploration. Defaults to 1.4 ''' examples = self.search_episode(g, nnet=nnet, c_punt=c_punt) self.update(examples)
[docs] def search_episode(self, g, nnet=None, c_punt=DEFAULT_C_PUNT): ''' We play a game by starting in the boards starting state and then choosing a random move. We then move to the next state, keeping track of which moves we chose. At the end of the game we go through our visited list and update the values of wins and plays so that we have a better understanding of which states are good and which are bad Args: g (Game): Game to search nnet (Network): Optional nework to be used for the policy instead of a naive random playout c_punt (float): The degree of exploration. Defaults to 1.4 Returns: list: List of examples where each entry is of the format :code:`[player, state_hash, reward]` ''' s = g.initial_state() p = 0 examples = [] while True: # Update visited with the next state a, expand = self.monte_carlo_action(g, s, p, c_punt) s = g.next_state(s, a, p) examples.append([p, g.to_hash(s), None]) p = 1 - p if g.terminal(s): examples = assign_rewards(examples, g.winner(s)) return examples if expand: # Do a random playout until we reach a terminal state # TODO: If it is a network use the policy to choose an action # instead of just choosing a truly random action...not sure # how much of this should be done in the agent??? winner = self.random_playout(g, s, p) examples = assign_rewards(examples, winner) return examples
[docs] def monte_carlo_action(self, g, s, p, c_punt): ''' Choose an action during self play based on the UCB1 algorithm. Instead of just choosing the action that led to the most wins in the past, we choose the action that balances this concern with exploration Args: g (Game): The game s (any): The state of the game p (int): The player who is about to make a move c_punt (float): The degree of exploration Returns: tuple: Tuple :code:`(best_move, expand)`, where playout is a boolean denoting whether or not the expansion phase has begun ''' actions = g.action_space(s) expand = False # Stop out early if there is only one choice if len(actions) == 1: return actions[0], False next_state_hashes = [g.to_hash(g.next_state(s, a, p)) for a in actions] best_move = None # We first check that this player has been in each of the subsequent states # If they have not, then we simply choose a random action if all((p, s_hash) in self.plays for s_hash in next_state_hashes): log_total = math.log( sum(self.plays[(p, s_hash)] for s_hash in next_state_hashes)) values = [ (self.wins[(p, s_hash)] / self.plays[(p, s_hash)]) + c_punt * math.sqrt(log_total / self.plays[(p, s_hash)]) for s_hash in next_state_hashes ] next_move_index = values.index(max(values)) best_move = actions[next_move_index] else: best_move = choice(actions) expand = True return (best_move, expand)
[docs] def pi(self, g, s): ''' Return the favorability of each action in a given state Args: g (Game): The game s (any): The state to evaluate Returns: :obj:`list` of :obj:`float`: The favorabiltiy of each action ''' actions = g.action_space(s)
[docs] def update(self, examples): ''' Backpropagate the result of the training episodes Args: examples (list): List of examples where each entry is of the format :code:`[player, state_hash, reward]` ''' for [p, s, reward] in examples: self.plays[(p, s)] = self.plays.get((p, s), 0) + 1 self.wins[(p, s)] = self.wins.get((p, s), 0) + reward
[docs] def best_action(self, g, s, p): ''' Get the best action for a given player in a given game state Args: g (Game): The game s (state): The current state of the game p (int): The current player Returns: int: The best action given the current knowledge of the game ''' actions = g.action_space(s) # Stop out early if there is only one choice if len(actions) == 1: return actions[0] best_move = None next_state_hashes = [ g.to_hash(g.next_state(s, a, p)) for a in actions] # We first check that this player has been in each of the subsequent states # If they have not, then we simply choose a random action if all((p, s_hash) in self.plays for s_hash in next_state_hashes): q_values = [self.wins[(p, s_hash)] / self.plays[(p, s_hash)] for s_hash in next_state_hashes] best_move_index = q_values.index(max(q_values)) best_move = actions[best_move_index] else: best_move = choice(actions) return best_move
[docs] @staticmethod def random_playout(g, s, p, max_moves=1000): ''' Perform a random playout and return the winner Args: g (Game): The game s (any): The state of the game to start the playout from p (player): The player whose turn it currently is max_moves (int): Maximum number of moves before the function exits Returns: int: The winner of the game, or -1 if there is not one ''' for _ in range(max_moves): a = choice(g.action_space(s)) s = g.next_state(s, a, p) p = 1 - p if g.terminal(s): return g.winner(s) return -1