Logo

dev-resources.site

for different kinds of informations.

Reinforcement Learning from Scratch - Part 2 - Deep Q Learning

Published at
7/30/2024
Categories
machinelearning
ai
python
tutorial
Author
Akshay Ballal
Reinforcement Learning from Scratch - Part 2 - Deep Q Learning

cover image

Hey everyone. This is the second part of my Reinforcement Learning Series, where we look at the different RL algorithms and discuss their implementation details. In the last part, we saw a basic RL algorithm that uses tabular Q learning. One disadvantage of that technique was that it required us to discretize the observation and action spaces. This increases the dimensionality of the observation space quite a bit and makes learning hard and slower.

To overcome this problem, at least partially, in this part, we look at Deep Q Learning (DQN), a prominent technique that brought Reinforcement learning into the mainstream. We can use the continuous observation space in this technique but still need a discrete action space. First, we will put together a simple DQN algorithm and then see how we can use LSTM architecture as the network in DQN. Let’s dive in.

DQN Theory

If you are not interested in the theory of DQN, that is fine. The code implementation below explains a lot. However, I am including some of the main driving equations below for completeness.

From the previous part, we know that the q value of a state action pair is the expected total return from that state if a certain action is taken. Using the Bellman Equation, we can write it out as:

bellman-equation

If we are following the policy π we can say that:

bootstraping

From this, we can say that the q-value of a state action pair is equal to the average (expectation) of the sum of reward and γ times the maximum q-value of the next state. This is called bootstrapping

For example, if we have a transition (s, 2, r, s’), we take action 2. Then, we do the following.

  1. Pass the state through the q-network and get the q-values of all the actions.
  2. Pick the q-value of the action taken, which is 2 here.
  3. Pass the next state s’ through the target q-network and get q-values of all the actions of s’.
  4. Pick the maximum q-value.

target

We can then take a Mean Square Error Loss between the target and the prediction and perform backward propagation.

The below image can make this more clear

visualization

The Algorithm

Below you can see the pseudocode for the Deep Q Learning. It looks more complex than it actually is. You can follow along with the code to get a better understanding.

pseudocode

Code Implementation

We will again use the Pendulum environment we used in the last part. We will also log some metrics to TensorBoard to see how the learning is going.

You can check out the code at: GitHub Repo

Let’s start with importing the dependencies



import gymnasium as gym
import numpy as np
from tqdm import tqdm
import torch
from torch import nn
from torch.optim import AdamW
from copy import deepcopy
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F


Pendulum Environment Wrapper

We define the Gaussian Function that I introduced in Part 1, which creates the set of possible actions.



def gaussian(x, mu, sigma, max_amp):
    return max_amp * np.exp(-0.5 * ((x - mu) / sigma)**2)


Then, we create the wrapper. This is very similar, in fact more simpler than the previous wrapper because we can use the observation space from the Pendulum Environment as it is and don't have to do any discretization.



class PendulumDiscreteStateAction(gym.Wrapper):

    def __init__(self, env: gym.Env, nvec_u: int, sigma: float):

        super(PendulumDiscreteStateAction, self).__init__(env)

        self.env = env
        self.nvec_u = nvec_u

        # Create a Discrete action space
        self.action_space = gym.spaces.Discrete(nvec_u)

        kernel = gaussian(np.linspace(0, 1, nvec_u//2), 0, sigma, 
                                               self.env.action_space.high[0])
        self.actions = (-kernel).tolist() + [0] + np.flip(kernel).tolist()

    def step(self, action: int) -> tuple[np.ndarray[float], float, bool, dict]:

        action = self.actions[action]
        obs, reward, terminated, truncated, info = self.env.step([action])
        reward = reward/16.2736044 # normalize the reward between -1 and 1
        obs: np.ndarray[float] = obs/self.env.observation_space.high # normalize the observation between -1 and 0
        return obs, reward, terminated, truncated, info

    def reset(self) -> tuple[np.ndarray[float], dict]:
        """
        Resets the environment.

        Returns:
        - The initial discrete observation and additional information.
        """
        obs, info = self.env.reset()
        obs: np.ndarray[float] = obs/self.env.observation_space.high
        return obs, info


Notable Implementation Details

  • Reward Scaling: The rewards were scaled using MinMax Scaler to make the learning smoother. This was done by dividing the returned reward by the minimum possible reward: rscaled = r/rmin. The rmin is defined in the documentation of the pendulum environment on gymnasium. The intuition behind this is that the network does not have to output significantly big q values, and the weights can remain smaller, making learning smooth
  • Scaled Observations: The observation space is scaled by dividing the maximum value of the observation space.

Define the Q-Network

Here we define a simple Fully Connected Network which maps the state to the q-values of all possible actions.



class QNetwork(nn.Module):
    def __init__(self, nvec_s, nvec_u):
        super(QNetwork, self).__init__()

        self.fc1 = nn.Linear(nvec_s, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, nvec_u)

    def forward(self, x:torch.Tensor) -> torch.Tensor:
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x


Create ReplayMemory Class



class ReplayMemory:
    def __init__(self, capacity, env: gym.Env, device: torch.device):

        self.position = 0
        self.size = 0
        self.capacity = capacity
        self.device = device

        self.n_states = env.observation_space.shape  # Number of dimensions in the state space
        self.n_actions = env.action_space.n  # Number of discrete actions

        # Initialize arrays to store the replay memory
        self.states = np.zeros((capacity, *self.n_states))
        self.actions = np.zeros((capacity))
        self.rewards = np.zeros(capacity)
        self.next_states = np.zeros((capacity, *self.n_states))
        self.terminated = np.zeros(capacity)
        self.truncated = np.zeros(capacity)

    def push(self, state:np.ndarray, action:int, next_state:np.ndarray, reward:float, terminated: bool, truncated:bool):

        self.states[self.position] = state.flatten()
        self.actions[self.position] = action
        self.next_states[self.position] = next_state.flatten()
        self.rewards[self.position] = reward
        self.terminated[self.position] = terminated
        self.truncated[self.position] = truncated

        self.position = (self.position + 1) % self.capacity
        self.size = min(self.size + 1, self.capacity)

    def sample(self, batch_size):

        indices = np.random.choice(self.size, batch_size, replace=False)

        states = torch.tensor(self.states[indices], dtype = torch.float32, device=self.device)
        actions = torch.tensor(self.actions[indices], dtype = torch.int64, device=self.device)
        next_states = torch.tensor(self.next_states[indices], dtype = torch.float32, device=self.device)
        rewards = torch.tensor(self.rewards[indices], dtype = torch.float32, device=self.device)
        terminated = torch.tensor(self.terminated[indices], dtype = torch.float32, device=self.device)
        truncated = torch.tensor(self.truncated[indices], dtype = torch.float32, device=self.device)

        return states, actions, next_states, rewards, terminated, truncated

    def __len__(self):
        return len(self.size)


Create Agent Class

Initialization and some helper functions

Here we define the agent initialization function that initiates the various variables and hyperparameters. We also have a get_action function that takes random action with a probability of ϵ and otherwise takes an action by passing the state through the q-network and taking argmax i.e selecting action with the highest q-value.



class Agent:
    def __init__(
        self,
        env: gym.Env,
        gamma=0.99,
        alpha=0.0003,
        initial_epsilon=1,
        min_epsilon=0.1,
        decay_rate=0.9999,
        batch_size=64,
        n_rollouts=2000,
        capacity=100000,
        device: torch.device = torch.device("cpu"),
    ):
        self.env = env  # Environment
        self.device = device  # Computation device (CPU or GPU)
        self.gamma = gamma  # Discount factor
        self.alpha = alpha  # Learning rate
        self.epsilon = initial_epsilon  # Initial epsilon value for exploration
        self.batch_size = batch_size  # Batch size for training
        self.n_rollouts = n_rollouts  # Number of rollouts to collect

        self.epsilon = 1  # Initial epsilon value
        self.min_epsilon = min_epsilon  # Minimum epsilon value
        self.decay_rate = decay_rate  # Epsilon decay rate

        # Replay memory to store experiences
        self.replay_memory = ReplayMemory(capacity, env, device)
        # Q-network and target network for Q-learning
        self.q_network = QNetwork(
            env.observation_space.shape[0], env.action_space.n
        ).to(device)
        self.target_network = deepcopy(self.q_network)
        # Optimizer for Q-network
        self.optimizer = AdamW(self.q_network.parameters(), lr=alpha)

        # Number of dimensions in the state space
        self.n_states = env.observation_space.shape[0]

        # For metrics
        self.n_time_steps = 0  # Number of time steps
        self.episodes = 0  # Number of episodes
        self.n_updates = 0  # Number of gradient updates
        self.best_reward = -np.inf  # Best reward seen so far

    def get_action(self, obs, greedy=False):

        if not greedy and np.random.rand() < self.epsilon:  # Epsilon-greedy exploration
            return np.random.randint(self.env.action_space.n)  # Random action
        obs = torch.tensor(obs, dtype=torch.float32, device=self.device).unsqueeze(0)  # Convert observation to tensor
        self.q_network.eval()  # Set Q-network to evaluation mode
        with torch.no_grad():
            q_values: torch.Tensor = self.q_network(obs)  # Get Q-values for the observation
            return q_values.argmax().item()  # Return action with highest Q-value

    def sample_experience(self):

        return self.replay_memory.sample(self.batch_size)  # Sample experiences from replay memory

    def update_target(self):

        self.target_network.load_state_dict(self.q_network.state_dict())  # Update target network


Collect Rollouts:

The collect_rollouts function collects the transitions by simulating the environment by using the epsilon-greedy policy that we have defined in the get_actions function. Each transition is stored in the replay memory and we decay the epsilon to reduce exploration.



def collect_rollouts(self):
        obs, info = self.env.reset()  # Reset environment
    terminated = False
    truncated = False
    rewards = 0  # Total rewards
    episodes = 0  # Total episodes
    for _ in range(self.n_rollouts):
        action = self.get_action(obs, greedy=False)  # Get action
        next_obs, reward, terminated, truncated, _ = self.env.step(action)  # Step environment
        self.replay_memory.push(
            obs, action, next_obs, reward, terminated, truncated
        )  # Save the transition
        obs = next_obs  # Update observation
        rewards += reward  # Accumulate reward
        self.n_time_steps += 1  # Increment time steps
        if terminated or truncated:  # Check if episode ended
            episodes += 1
            self.episodes += 1
            obs, info = self.env.reset()  # Reset environment

        self.epsilon = max(
            self.min_epsilon, self.decay_rate * self.epsilon
        )  # Decrease epsilon

    return rewards / episodes  # Return the average reward per episode


Learn

The learn function samples the experiences of batch_size from the replay memory, predicts the q-values and gets the target values. We then get the loss and perform backward propagation. We do this for the defined number of epochs.



def learn(self, epochs):

    self.q_network.train()  # Set Q-network to training mode

    average_loss = 0
    for i in range(epochs):
        obs, action, next_obs, reward, terminated, truncated = (
            self.sample_experience()
        )  # Sample a batch of experiences
        q_values: torch.Tensor = self.q_network(obs)  # Get Q-values for the batch
        next_q_values = self.target_network(next_obs)  # Get Q-values for the next states

        q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1)  # Gather Q-values for taken actions
        next_q_value = next_q_values.max(1).values  # Get max Q-value for next states
        target = reward + self.gamma * next_q_value * (1 - terminated) * (
            1 - truncated
        )  # Compute target Q-value

        loss = F.smooth_l1_loss(q_value, target)  # Compute loss

        self.optimizer.zero_grad()  # Zero gradients
        loss.backward()  # Backpropagate
        torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), 10)  # Clip gradients
        self.optimizer.step()  # Update Q-network

        average_loss += (loss.item() - average_loss) / (i + 1)  # Update average loss
        self.n_updates += 1  # Increment update count

        if self.n_updates % 1000 == 0:  # Update target network periodically
            self.update_target()

    return average_loss  # Return average loss


Some Implementation Details

  • Smooth L1 Loss: This was inspired by the Stable Baselines implementation of DQN. Smooth L1 loss prevents exploding gradients due to outliers by transitioning from L2 loss to L1 loss. This is useful in off-policy algorithms because when transitions are sampled from the Replay Buffer, they can belong to an old, much different policy. It is not desirable to fit to these ”outliers.”
  • Gradient Clipping: Before performing the optimizer step, the L2 norm of the gradients is clipped to 10 to counteract the influence of outliers further. This prevents exploding gradients by keeping all gradient norms below 10.

Evaluate the current Policy

According to Zhenyi Wang, in many cases, DQN starts to forget and then relearn again once it has reached near the maximum reward. This can also be seen in the below figure, where the agent sometimes drops to a low reward. This can happen because even when the agent has achieved an optimal greedy policy, the rollout still happens with ϵ − greedy policy. This can cause the agent to suddenly take sub-optimal action, which can cause a large change in the network and cause the agent to unlearn. This can also be sometimes beneficial for making the agent more robust to perturbance. Nevertheless, the policy is evaluated after every learning cycle, and the best model is saved. This makes sure that, in the end, the best-performing model is retrievable.

evaluation



def evaluate(self, n_steps):
        self.q_network.eval()  # Set Q-network to evaluation mode
    rewards = 0  # Total rewards
    episodes = 0  # Total episodes
    with torch.no_grad():
        obs, info = self.env.reset()  # Reset environment
        for _ in range(n_steps):
            action = self.get_action(obs, greedy=True)  # Get action
            obs, reward, terminated, truncated, _ = self.env.step(action)  # Step environment
            rewards += reward  # Accumulate reward

            if terminated or truncated:  # Check if episode ended
                episodes += 1
                self.env.reset()  # Reset environment

    rewards /= episodes  # Compute average reward per episode   
    if rewards > self.best_reward:  # Save best model if improved
        self.best_reward = rewards
        torch.save(self.q_network.state_dict(), "dqn_best_model.pth")
        print("New best model saved!")
    return rewards


Train

Finally we have the train function that runs the collect_rollouts, learn and evaluate functions in a loop for a number of epochs.



 def train(self, epochs):
                self.writer = SummaryWriter(log_dir="dqn_logs/DQN_2")  # TensorBoard writer

        pbar = tqdm(range(epochs))  # Progress bar
        for i in pbar:
            rewards = self.collect_rollouts()  # Collect rollouts
            loss = self.learn(int(self.n_rollouts/2))  # Perform learning
            eval_reward = self.evaluate(1000)  # Evaluate agent

            pbar.set_description(
                f"Iteration {i+1} || Reward: {rewards:.3f} || Eval Reward: {eval_reward :.3f} || Loss: {loss:.3f} || Epsilon: {self.epsilon:.2f} || Time steps: {self.n_time_steps} || N updates: {self.n_updates}"
            )  # Update progress bar
            self.writer.add_scalar("Training/Loss", loss, self.n_updates)  # Log training loss
            self.writer.add_scalar(
                "Training/Rollout: Mean Episode Reward", rewards, self.n_updates
            )  # Log mean episode reward during training
            self.writer.add_scalar(
                "Evaluation/Mean Episode Reward", eval_reward, self.n_updates
            )  # Log mean episode reward during evaluation


Hyperparameters:

  • For learning and exploration:gamma, alpha, initial_epsilon, min_epsilon, decay_rate:
  • For experience replay and training: batch_size, n_rollouts, capacity

Start Training



env = gym.make("Pendulum-v1")
env._max_episode_steps = 500
env = PendulumDiscreteStateAction(env, 11, 0.4)

agent = Agent(env)
agent.train(30)


Testing



import matplotlib.pyplot as plt

env = gym.make("Pendulum-v1",
            #    render_mode='human'
               )
env._max_episode_steps = 500
env = PendulumDiscreteStateAction(env, 11, 0.4)

agent = Agent(env)
agent.q_network.load_state_dict(torch.load("models/deep_q_learning/dqn_best_model.pth"))

rewards = []
thetas = []
tot_reward = 0
n_episodes = 100
n_steps= 0
for _ in range(n_episodes):
    obs, info = env.reset()
    terminated = False
    truncated = False
    while not terminated and not truncated:
        with torch.no_grad():
            action = agent.get_action(obs, greedy = True)
            # print(action)
            obs, reward, terminated,  truncated, info = env.step(action)
            x = obs[0] * env.observation_space.high[0]
            y = obs[1] * env.observation_space.high[1]
            theta = np.arctan2(y, x)
            thetas.append(theta)
            rewards.append(reward)
            tot_reward += reward
            n_steps+=1
            # env.render()
print("Total reward: ", tot_reward/n_episodes)


Results

Below, we can see the results of our trained agent. The agent can reach an angle of zero no matter where it starts. Also, the reward is zero.

The average episodic reward across 100 episodes is 9.8

results

LSTM Implementation

The transitions we collect are temporal. That is, there is a correlation between nearby transitions. To capture this temporal relation, we can change the state of the environment to be a history of past (state, action) pairs. So we need to add the previous action to the observation and maintain a list of Nhist number of previous observations. Let’s see if this improves the agent since now we have more information and a better network.

In the figure below, you can see the LSTM network architecture. There are several ways to implement this. Apart from this, another option is to take a linear output from the last hidden layer directly. But let’s proceed with this for now.

lstm



# Qnetwork with LSTM
class QNetwork(nn.Module):
    def __init__(self, nvec_s, nvec_u):
        super(QNetwork, self).__init__()

        self.lstm = nn.LSTM(nvec_s, 64, 1, batch_first=True)
        self.fc1 = nn.Linear(64, 64)
        self.fc2 = nn.Linear(64, nvec_u)

    def forward(self, x:torch.Tensor) -> torch.Tensor:
        x, _ = self.lstm(x)
        x = F.relu(self.fc1(x))
        x = torch.mean(x, dim=1)
        x = self.fc2(x)
        return x


Also, we need to change the wrapper to get the history of observations.



from collections import deque

class PendulumDiscreteStateAction(gym.Wrapper):

    def __init__(self, env: gym.Env, nvec_u: int, sigma: float, nhist: int):

        super(PendulumDiscreteStateAction, self).__init__(env)

        self.env = env
        self.nvec_u = nvec_u

        # Check if the observation space is of type Box
        assert isinstance(
            env.observation_space, gym.spaces.Box
        ), "Error: observation space is not of type Box"

        # Create a Discrete action space
        self.action_space = gym.spaces.Discrete(nvec_u)


        # Define the possible actions
        kernel:np.ndarray = gaussian(np.linspace(0, 1, 5), 0, sigma, 2)
        self.actions = (-kernel).tolist() + [0] + np.flip(kernel).tolist()

        low = []
        for _ in range(nhist):
            temp_low = []
            for value in self.env.observation_space.low:
                temp_low.append(value)
            temp_low.append(min(self.actions))
            low.append(temp_low)

        low = np.array([list(self.env.observation_space.low) + [min(self.actions)] for _ in range(nhist)])
        high = np.array([list(self.env.observation_space.high) + [max(self.actions)] for _ in range(nhist)])


        self.observation_space = gym.spaces.Box(
            low=low,
            high=high,
            dtype=np.float32
        )

        self.prev_action = None
        self.history = deque(maxlen = nhist)

        # Initialize the history with zeros
        for i in range(nhist):
            self.history.append(np.array([0, 0, 0]))

    def step(self, action: int) -> tuple[np.ndarray[float], float, bool, dict]:

        action = self.actions[action]
        obs, reward, terminated, truncated, info = self.env.step([action])
        reward = reward/16.2736044 # normalize the reward between -1 and 1
        obs: np.ndarray[float] = obs/self.env.observation_space.high # normalize the observation between -1 and 0
        obs=np.append(obs, action/2)
        self.history.append(obs)
        obs = np.array(list(self.history))  
        return obs, reward, terminated, truncated, info

    def reset(self) -> tuple[np.ndarray[float], dict]:

        obs, info = self.env.reset()
        obs: np.ndarray[float] = obs/self.env.observation_space.high
        obs=np.append(obs, 0)

        # Initialize the history with the same observation
        for i in range(len(self.history)):
            self.history.append(obs)
        obs = np.array(list(self.history))
        return obs, info


Here, you can see that we are using deque to keep a list of the history of fixed-size Nhist. We also added the action to the observation. Now, we can train the agent as usual. Below, we can see how the training occurs in comparison to FCN. It is pretty similar. This means the LSTM is not having too big of an effect here. But in some complex environments this can be useful and worth trying.

results

That's all for this part. We saw how to implement a deep Q learning algorithm from scratch and use any network as the Q network, not just FCN. We saw LSTM, but you do this with transformers for an even longer context history. In the next part we will look into how to solve the limitation of using a discrete action space by implementing the Reinforce Algorithm.

Want to connect?

🌍My Website

🐦My Twitter

👨My LinkedIn

Featured ones: