Bonus: TD (Temporal Difference)#

Note: this lecture is the one I thought about ommitting from year 2024, thus implementation is left in Keras and it is listed as optional read.

Our goal in this lecture is to construct a simple reinforcement learning agent while going through the most important concepts (temporal difference, eligibility traces, …). On the way we will look for inspiration in behavioristic and neuroscience research and will try to construct our agent from scratch.

We start with Pavlov’s dog experiment. Most of you know about stimulus reward results - you ring a bell and the dog starts to excrete saliva knowing that food is on the way. That will be the first thing we expect from our system - learn to predict that reward is coming given a stimulus. Since in the end we want to use neural nets let’s stick to weight update rule that we have already seen.

$$ w_{t+1} = w_{t} + \text{learning rate} \cdot \text{error} \cdot \text{diff for given input} $$

For simplicity now let’s ignore time and focus on single trial as the input. In such a case our input is just a vector of indicators which show the presence of the various stimulus and output is predicted reward. For error we can simply pass difference between our prediction and reward.

import numpy as np

Let’s start with simple experiment that contains three trials and where stimulus B clearly predicts reward.

#           stimula  reward
#          [A, B, C] 
trials = [[[0, 1, 0], 1],
          [[0, 0, 0], 0],
          [[0, 1, 0], 1]]

For the model we will use simplest thing we can think of - weight vector.

w = np.array([0., 0., 0.])    # That's our 'model'

Since we have only three trials we will repeatedly use them for weight updates to get convergence.

def train_model(trials, w, alpha=0.2, repeat=10):

    for v, r in trials * repeat:    # instead of * we could use second for loop
        pred = w.dot(v)
        error = r - pred
        w += alpha * error * np.array(v)

    return w

train_model(trials, w)
array([0.        , 0.98847078, 0.        ])

As expected our model learned to associate stimulus B with reward. But our story only begins here.

Blocking#

In his later experiments Pavlov noticed that there is phenomenon called blocking. If you teach your dog to associate a bell with food and then introduce something new (for example showing some picture) after a bell - the dog will not learn to associate this new stimulus with food. Let’s test if our model does that. If you have executed cells above you already have trained weights and we can introduce some new trials that try to show stimulus C. This new stimulus should be ignored according to the experimental evidence.

#           stimula  reward
#          [A, B, C] 
trials = [[[0, 1, 1], 1],
          [[0, 0, 0], 0],
          [[0, 1, 1], 1]]
train_model(trials, w)
array([0.        , 0.99423518, 0.0057644 ])

Since model uses errors for the update if we have a stimulus that is able to predict reward it will block further learning. This is actually known as Rescota-Wagner model and in Barto & Sutton book is expressed as follows.

For the prediction using weights we have a dot product

$$\hat{v}(s, w) = w^T x(s).$$

Then our error is defined as

$$\delta_t = R_t - \hat{v}(S_t, w_t).$$

As implemented above update rule with learning rate $\alpha$ is

$$w_{t+1} = w_t + \alpha \delta_t x(S_t).$$

Higher order conditioning#

Notice that our model does not take time into account. That is a huge limitation. For sure we can not look for stimulus reward response for infinite time and splitting into trials does not represent reality well. It turns out that if after training to associate the bell with food you show an image before the bell, dog learns to associate that with food. Not only that there is effect called higher order conditioning and if after introducing new stimulus you do not give reward, dog still will learn to associate it with food. It seams that instead of learning that image associates with reward he learns that image predicts bell and bell predicts reward, thus even if reward is not given he will learn to pass know-how.

We will have to introduce time into our model. If you think about experiments described above it makes sense to say that model should not predict reward directly, but instead at each time step focus on difference in it’s own predictions. This is called temporal difference and we will discuss it thoroughly in the lecture. For the time lag we can simply add some factor that discount previous inputs. This trick is known as eligibility traces.

It’s time to put those ideas to the test. First let’s implement those and check if simple conditioning and blocking works as expected.

#           stimula  reward
#          [A, B, C] 
trials = [[[0, 1, 0], 1],
          [[0, 0, 0], 0],
          [[0, 1, 0], 1]]

To simulate time we will run through each stimulus in sequence and accumulate history of stimulus presence (see v_accum). Also, note that reward is received only at the end of the trial.

def train_model(trials, w, alpha=0.2, repeat=10):
    dim = w.shape[0]
    for v, r in trials * repeat:
        z = np.zeros(dim)
        prev_pred = 0
        v_accum = np.tri(dim, dim) * v
        for t in range(dim):
            v_t = v_accum[t]
            pred = w.dot(v_t)
            if t < 2:
                # reward is zero while stimulus are coming in,
                # thus we look only for TD error
                next_v_t = v_accum[t + 1]
                next_pred = w.dot(next_v_t)
                error = 0 + next_pred - pred
            else:
                # At the end there is no next prediction and
                # we will finally pass actual reward signal
                error = r - pred

            w += alpha * error * v_t
            prev_pred = pred

    return w

w = np.array([0., 0., 0.])    # Let's start over
train_model(trials, w)
array([0.        , 0.98847078, 0.        ])

Let’s check if blocking still works.

#           stimula  reward
#          [A, B, C] 
trials = [[[0, 1, 1], 1],
          [[0, 0, 0], 0],
          [[0, 1, 1], 1]]

train_model(trials, w)
array([0.00000000e+00, 9.99547284e-01, 7.99164294e-04])

Higher order conditioning with reward.

# Reset to the original state first
trials = [[[0, 1, 0], 1],
          [[0, 0, 0], 0],
          [[0, 1, 0], 1]]

w = np.array([0., 0., 0.])
train_model(trials, w)

#           stimula  reward
#          [A, B, C] 
trials = [[[1, 1, 0], 1],
          [[0, 0, 0], 0],
          [[1, 1, 0], 1]]

train_model(trials, w)
array([0.94702972, 0.0944271 , 0.        ])

Even if reward is not there every time, model will still learn to transition the signal complying with real life experiments. For sure at the moment our implementation is not flexible enough. To get more control we will introduce couple parameters to the mix - decay and discount.

Formaly $\text{TD}(\lambda)$ is defined by following equations (source: Barto & Sutton book).

Error is

$$\delta_t = R_{t+1} + \delta \hat{v} (S_{t+1}, w_t) - \hat{v}(S_t, w_t).$$

Weight update is

$$w_{t+1} = w_t + \alpha \delta_t z_t,$$

where $z_t$ is defined by

$$z_{t+1} = \delta \lambda z_t + x(S_t).$$

Here $\lambda \in [0, 1]$ is eligibility trace decay parameter and $\delta \in [0,1]$ is discount factor. We can easily add them to our code by modifying

error = 0 + discount * next_pred - pred

and

z = decay * discount * z + v_t
w += alpha * error * z

Tic-Tac-Toe#

Most likely you have heard about the success of reinforcement learning in games likes Go. To implement MuZero from scratch would require quite a lot of tricks and we lack a bunch of theory to do that. Instead we will do similar thing as TD-Backgammon while playing Tic-Tac-Toe. We will train $\text{TD}(0)$ agent from scratch using self play.

Let’s create simple tic-tac-toe environment.

class TicTacToe:
    
    def __init__(self):
        self.reset()
        self.idx = [[0, 1, 2], [3, 4, 5], [6, 7, 8],  # rows
                    [0, 3, 6], [1, 4, 7], [2, 5, 8],  # cols
                    [0, 4, 8], [2, 4, 6]]  # diagonals
        
    def move(self, position):
        if position not in self.available_actions():
            raise Exception('Move is not allowed')
        self.board[position] = self.tic_or_tac
        win = self.winner()
        draw = not len(self.available_actions())
        state = self.state()
        self.tic_or_tac *= -1
        if win:
            return 1, state      # a win
        if draw:
            return 0.5, state    # a draw
        return 0, state
        
    def available_actions(self):
        return np.where(self.board == 0)[0]
    
    def available_action_states(self):
        S, A = [], []
        for a in self.available_actions():
            s = self.state()
            s[0, a] = self.tic_or_tac
            S.append(s)
            A.append(a)
        return np.vstack(S), A
    
    def winner(self):
        return np.any([self.board[i].sum() == 3 * self.tic_or_tac
                       for i in self.idx])
    
    def state(self):
        return self.board.reshape((1, 9)).astype('int')
    
    def reset(self):
        self.board = np.zeros(9).astype('int')
        self.tic_or_tac = 1   # 1 for tic, -1 for tac
        
    def __repr__(self):
        return str(self.board.astype('int').reshape((3, 3)))
        
ttt = TicTacToe()
ttt.move(4)
ttt.move(0)
ttt
[[-1  0  0]
 [ 0  1  0]
 [ 0  0  0]]

Let’s try to win this game.

assert ttt.move(1)[0] == 0
assert ttt.move(8)[0] == 0
assert ttt.move(7)[0] == 1

Make sure you understand how states are expressed.

ttt = TicTacToe()
ttt.move(4)
ttt.available_action_states()
(array([[-1,  0,  0,  0,  1,  0,  0,  0,  0],
        [ 0, -1,  0,  0,  1,  0,  0,  0,  0],
        [ 0,  0, -1,  0,  1,  0,  0,  0,  0],
        [ 0,  0,  0, -1,  1,  0,  0,  0,  0],
        [ 0,  0,  0,  0,  1, -1,  0,  0,  0],
        [ 0,  0,  0,  0,  1,  0, -1,  0,  0],
        [ 0,  0,  0,  0,  1,  0,  0, -1,  0],
        [ 0,  0,  0,  0,  1,  0,  0,  0, -1]]),
 [0, 1, 2, 3, 5, 6, 7, 8])

Following TD ideas presented above we want to push weights in such a way that previous prediction is closer to the new one, thus

$$V(S_t) \leftarrow V(S_t) + \alpha [V(S_{t+1}) - V(S_t)].$$

Training is completely driven by self-play. We also need some way to explore new strategies. For the first try we can simple store values in the dictionary.

Nice viz that behind the scenes uses the same method.

class Agent:
    
    def __init__(self, exploration=0.95, lr=0.5):
        self.value_dict = {}
        self.state = np.zeros((1, 9))
        self.lr = lr
        self.exploration = exploration

    def move(self, ttt):
        if np.random.random() > self.exploration:
            a = np.random.choice(ttt.available_actions())
            r, new_state = ttt.move(a)
            if r:    # game over
                return r
        else:
            # Choose the best action according to the stored values
            states, actions = ttt.available_action_states()
            values = [self.value_dict.get(str(s.reshape((3, 3)).astype('int')), 0.5) for s in states]
            a = actions[np.argmax(values)]
            r, new_state = ttt.move(a)
            key = str(self.state.reshape((3, 3)).astype('int'))
            new_key = str(new_state.reshape((3, 3)).astype('int'))
            self.value_dict[key] = self.value_dict.get(key, 0.5) + self.lr * (self.value_dict.get(new_key, 0.5) - self.value_dict.get(key, 0.5))
            if r:   # game over
                self.value_dict[new_key] = self.value_dict.get(new_key, 0.5) + self.lr * ((r if r == 1 else 0) - self.value_dict.get(new_key, 0.5))
                return r
        self.state = new_state
        return 0.
    
    def lost(self):
        key = str(self.state.reshape((3, 3)).astype('int'))
        self.value_dict[key] = self.value_dict.get(key, 0.5) + self.lr * (0 - self.value_dict.get(key, 0.5))

Let’s train our agents for 10k games.

%%time 

episodes = 10_001
agent_1 = Agent(0.95, 0.5)
agent_2 = Agent(0.95, 0.5)

win_history = []
ttt = TicTacToe()
# Loop for each episode:
for ep in range(episodes):
    # Initialize S
    ttt.reset()
    # Loop for each step of episode:
    # until S is terminated
    while True:
        r = agent_1.move(ttt)
        if r:
            agent_2.lost()
            win_history.append(r)
            break

        r = agent_2.move(ttt)
        if r:
            agent_1.lost()
            win_history.append(1 - r)
            break

    if ep % 1000 == 0:
        print('win or draw rate (last 1000 games) - {0:.02%}'.format(
              np.mean(np.array(win_history[-1000:]) != 0.)))
win or draw rate (last 1000 games) - 100.00%
win or draw rate (last 1000 games) - 82.30%
win or draw rate (last 1000 games) - 95.30%
win or draw rate (last 1000 games) - 93.10%
win or draw rate (last 1000 games) - 92.80%
win or draw rate (last 1000 games) - 85.30%
win or draw rate (last 1000 games) - 93.90%
win or draw rate (last 1000 games) - 86.50%
win or draw rate (last 1000 games) - 91.40%
win or draw rate (last 1000 games) - 81.90%
win or draw rate (last 1000 games) - 88.70%
CPU times: user 1min, sys: 22.7 ms, total: 1min
Wall time: 1min 1s

Agent 1 learns that it is optimal to start the game in the middle.

ttt = TicTacToe()
states, actions = ttt.available_action_states()
values = [agent_1.value_dict.get(str(s.reshape((3, 3)).astype('int')), 0.5) for s in states]
np.array(values).reshape((3, 3))
array([[0.57454762, 0.56282788, 0.61569813],
       [0.50418249, 0.81350581, 0.52416274],
       [0.61962225, 0.51681234, 0.56206647]])

And agent 2 understands that he is doomed in such case and can not win.

ttt = TicTacToe()
ttt.move(4)
states, actions = ttt.available_action_states()
values = [agent_2.value_dict.get(str(s.reshape((3, 3)).astype('int')), 0.5) for s in states]
T = np.zeros((3, 3))
for v, a in zip(values, actions):
    T[a // 3, a % 3] = v
T
array([[0.16927708, 0.06483458, 0.17500661],
       [0.14831087, 0.        , 0.1324976 ],
       [0.08615713, 0.10270979, 0.08706275]])

As you can see, self-play is a critical component if we want to end up with a smart agent.

Deep TD(0)#

Instead of using value dictionary we could try to use neural network. That might be usefull if the game is much more complex. In such case our value network expects board state as input and returns expected win probability.

There is one big problem - while training NN we need to ensure that data is i.i.d. and it is not so if we update it during each step. Instead we will gather historical plays into a list and sample it randomly to train the network. This trick is called experience replay and is vital component in RL.

import tensorflow as tf
from tensorflow import keras
class Agent:
    
    def __init__(self, exploration=0.9):
        self.state = np.zeros((1, 9))
        self.exploration = exploration
        self.v = keras.models.Sequential()
        self.v.add(keras.layers.Dense(9, activation='relu', input_shape=(9,)))
        self.v.add(keras.layers.Dense(9, activation='relu'))
        self.v.add(keras.layers.Dense(9, activation='relu'))
        self.v.add(keras.layers.Dense(1, activation='sigmoid'))
        self.v.compile(loss='mse', optimizer='adam')
        self.hist = []

    def step(self):
        if len(self.hist) > 90:
            hist = [self.hist[idx] for idx in
                    np.random.choice(range(len(self.hist)), 16 * 5)]
            self.v.fit(np.vstack([h[0] for h in hist]),
                   np.vstack([h[1] if type(h[1]) == int
                              else self.v(h[1]) for h in hist]),
                   verbose=0, batch_size=16)

    def move(self, ttt):
        if np.random.random() > self.exploration:
            a = np.random.choice(ttt.available_actions())
            r, new_state = ttt.move(a)
            if r:    # game over
                self.step()
                return r
        else:
            # Choose the best action according to the stored values
            states, actions = ttt.available_action_states()
            a = actions[np.argmax(self.v(states))]
            r, new_state = ttt.move(a)
            self.hist.append([self.state, new_state])
            if r:   # game over
                self.state = new_state
                for _ in range(5):
                    self.hist.append([new_state, 1 if r == 1 else 0])
                self.step()
                return r
        self.state = new_state
        return 0.

    def lost(self):
        for _ in range(5):
            self.hist.append([self.state, 0])
        self.step()

Since we are training NN it will take a while.

%%time 

episodes = 10_001
agent_1 = Agent()
agent_2 = Agent()

win_history = []
ttt = TicTacToe()
# Loop for each episode:
for ep in range(episodes):
    # Initialize S
    ttt.reset()
    # Loop for each step of episode:
    # until S is terminated
    while True:
        r = agent_1.move(ttt)
        if r:
            agent_2.lost()
            win_history.append(r)
            break

        r = agent_2.move(ttt)
        if r:
            agent_1.lost()
            win_history.append(1 - r)
            break

    if ep % 1000 == 0:
        print('win or draw rate (last 1000 games) - {0:.02%}'.format(
              np.mean(np.array(win_history[-1000:]) != 0.)))
        ttt = TicTacToe()
        states, actions = ttt.available_action_states()
        print(agent_1.v(states).numpy().reshape((3, 3)))
        ttt.move(4)
        states, actions = ttt.available_action_states()
        T = np.zeros((3, 3))
        for w, a in zip(agent_2.v(states), actions):
            T[a // 3, a % 3] = w
        print(T)
win or draw rate (last 1000 games) - 100.00%
[[0.4916704  0.48525652 0.47802982]
 [0.48747227 0.49330837 0.48380294]
 [0.51800615 0.48726496 0.52977437]]
[[0.50695962 0.53919619 0.57110035]
 [0.52761245 0.         0.56992632]
 [0.58140391 0.53762287 0.55702502]]
win or draw rate (last 1000 games) - 70.80%
[[0.19857982 0.5731377  0.22716764]
 [0.24610418 0.8355091  0.19589329]
 [0.3072345  0.10940447 0.5762914 ]]
[[0.00848639 0.02569416 0.02345979]
 [0.01719093 0.         0.02280116]
 [0.00972328 0.03122777 0.0841741 ]]
win or draw rate (last 1000 games) - 87.10%
[[0.2558624  0.48461166 0.5048217 ]
 [0.4042833  0.924652   0.18331254]
 [0.33432338 0.0753983  0.44958222]]
[[0.00572839 0.02248144 0.01194683]
 [0.01206854 0.         0.01220462]
 [0.00607294 0.02383551 0.05225745]]
win or draw rate (last 1000 games) - 92.50%
[[0.40035123 0.54496217 0.7670125 ]
 [0.48041937 0.9245795  0.17145503]
 [0.5033201  0.06989989 0.50753075]]
[[0.00171757 0.00827879 0.00377765]
 [0.00448915 0.         0.00560331]
 [0.00186938 0.01140457 0.04909718]]
win or draw rate (last 1000 games) - 94.30%
[[0.55405456 0.49907514 0.79478425]
 [0.60511076 0.9764682  0.15467757]
 [0.61028516 0.06207812 0.5619402 ]]
[[0.00067696 0.0036236  0.00148234]
 [0.00243032 0.         0.00312361]
 [0.00106969 0.00739855 0.02599847]]
win or draw rate (last 1000 games) - 95.20%
[[0.6284584  0.46854228 0.8208797 ]
 [0.6521456  0.97109735 0.11898199]
 [0.3457625  0.04561237 0.42221943]]
[[0.00142226 0.00576219 0.00131583]
 [0.00481987 0.         0.00633064]
 [0.00144213 0.01076797 0.01400068]]
win or draw rate (last 1000 games) - 94.10%
[[0.8050296  0.489751   0.8788645 ]
 [0.72182167 0.9754179  0.13875425]
 [0.47207278 0.05375874 0.49411008]]
[[0.00182512 0.0086399  0.00112212]
 [0.0077593  0.         0.00856066]
 [0.00193074 0.01376402 0.01478055]]
win or draw rate (last 1000 games) - 87.40%
[[0.6750169  0.42116672 0.81997144]
 [0.7628833  0.98143387 0.16021743]
 [0.5187866  0.04705718 0.4210554 ]]
[[0.00334474 0.01778463 0.00178617]
 [0.01105863 0.         0.00800505]
 [0.00590411 0.02112225 0.01209691]]
win or draw rate (last 1000 games) - 91.10%
[[0.7669873  0.36637706 0.90033984]
 [0.6709896  0.990423   0.22330934]
 [0.41973612 0.04740518 0.36507422]]
[[0.00517729 0.03302112 0.00217733]
 [0.01435807 0.         0.02991852]
 [0.01144838 0.03529805 0.01022997]]
win or draw rate (last 1000 games) - 91.10%
[[0.45975676 0.44303674 0.85523885]
 [0.47312692 0.9946798  0.192976  ]
 [0.3131255  0.08917099 0.42057443]]
[[0.00182348 0.02626097 0.00140223]
 [0.01025584 0.         0.02912375]
 [0.00474551 0.01831719 0.00718066]]
win or draw rate (last 1000 games) - 92.30%
[[0.4421326  0.50690556 0.63432586]
 [0.45558614 0.9926535  0.26947683]
 [0.32076532 0.10975155 0.4054354 ]]
[[0.00166827 0.01705    0.00143492]
 [0.01052824 0.         0.02090362]
 [0.00661147 0.0184623  0.00693658]]
CPU times: user 22min 46s, sys: 28.5 s, total: 23min 14s
Wall time: 23min 26s

As you can see it was able to produce similar results as the dictionary approach. We could extend it further, for example AlphaZero like implementation can be found at AlphaToe.

For sure for tic-tac-toe using TD is an overkill. Keep in mind that what we did here was for illustrative purposes - the aim was to show that TD is a general principle that can be used in games. Similar principles are applied in TD-Backgammon, Alpha Zero, MuZero and other RL agents.

Where to go next?#

If you are interested to dig deeper:

  • Read what deepmind is doing.

  • Read what openAI is doing.

  • Watch AlphaGO movie and read paper.

  • Read Barto & Sutton book.

  • Work through Deep RL, which contains more examples and intuitive lower level implementations. This medium series is great.

  • Read book “Deep Reinforcement Learning Hands-On” by Maxim Laptan.