dev-resources.site
for different kinds of informations.
Reinforcement Learning from Scratch - Part 2 - Deep Q Learning
Hey everyone. This is the second part of my Reinforcement Learning Series, where we look at the different RL algorithms and discuss their implementation details. In the last part, we saw a basic RL algorithm that uses tabular Q learning. One disadvantage of that technique was that it required us to discretize the observation and action spaces. This increases the dimensionality of the observation space quite a bit and makes learning hard and slower.
To overcome this problem, at least partially, in this part, we look at Deep Q Learning (DQN), a prominent technique that brought Reinforcement learning into the mainstream. We can use the continuous observation space in this technique but still need a discrete action space. First, we will put together a simple DQN algorithm and then see how we can use LSTM architecture as the network in DQN. Let’s dive in.
DQN Theory
If you are not interested in the theory of DQN, that is fine. The code implementation below explains a lot. However, I am including some of the main driving equations below for completeness.
From the previous part, we know that the q value of a state action pair is the expected total return from that state if a certain action is taken. Using the Bellman Equation, we can write it out as:
If we are following the policy π we can say that:
From this, we can say that the q-value of a state action pair is equal to the average (expectation) of the sum of reward and γ times the maximum q-value of the next state. This is called bootstrapping
For example, if we have a transition (s, 2, r, s’), we take action 2. Then, we do the following.
- Pass the state through the q-network and get the q-values of all the actions.
- Pick the q-value of the action taken, which is 2 here.
- Pass the next state s’ through the target q-network and get q-values of all the actions of s’.
- Pick the maximum q-value.
We can then take a Mean Square Error Loss between the target and the prediction and perform backward propagation.
The below image can make this more clear
The Algorithm
Below you can see the pseudocode for the Deep Q Learning. It looks more complex than it actually is. You can follow along with the code to get a better understanding.
Code Implementation
We will again use the Pendulum environment we used in the last part. We will also log some metrics to TensorBoard to see how the learning is going.
You can check out the code at: GitHub Repo
Let’s start with importing the dependencies
import gymnasium as gym
import numpy as np
from tqdm import tqdm
import torch
from torch import nn
from torch.optim import AdamW
from copy import deepcopy
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F
Pendulum Environment Wrapper
We define the Gaussian Function that I introduced in Part 1, which creates the set of possible actions.
def gaussian(x, mu, sigma, max_amp):
return max_amp * np.exp(-0.5 * ((x - mu) / sigma)**2)
Then, we create the wrapper. This is very similar, in fact more simpler than the previous wrapper because we can use the observation space from the Pendulum Environment as it is and don't have to do any discretization.
class PendulumDiscreteStateAction(gym.Wrapper):
def __init__(self, env: gym.Env, nvec_u: int, sigma: float):
super(PendulumDiscreteStateAction, self).__init__(env)
self.env = env
self.nvec_u = nvec_u
# Create a Discrete action space
self.action_space = gym.spaces.Discrete(nvec_u)
kernel = gaussian(np.linspace(0, 1, nvec_u//2), 0, sigma,
self.env.action_space.high[0])
self.actions = (-kernel).tolist() + [0] + np.flip(kernel).tolist()
def step(self, action: int) -> tuple[np.ndarray[float], float, bool, dict]:
action = self.actions[action]
obs, reward, terminated, truncated, info = self.env.step([action])
reward = reward/16.2736044 # normalize the reward between -1 and 1
obs: np.ndarray[float] = obs/self.env.observation_space.high # normalize the observation between -1 and 0
return obs, reward, terminated, truncated, info
def reset(self) -> tuple[np.ndarray[float], dict]:
"""
Resets the environment.
Returns:
- The initial discrete observation and additional information.
"""
obs, info = self.env.reset()
obs: np.ndarray[float] = obs/self.env.observation_space.high
return obs, info
Notable Implementation Details
- Reward Scaling: The rewards were scaled using MinMax Scaler to make the learning smoother. This was done by dividing the returned reward by the minimum possible reward: rscaled = r/rmin. The rmin is defined in the documentation of the pendulum environment on gymnasium. The intuition behind this is that the network does not have to output significantly big q values, and the weights can remain smaller, making learning smooth
- Scaled Observations: The observation space is scaled by dividing the maximum value of the observation space.
Define the Q-Network
Here we define a simple Fully Connected Network which maps the state to the q-values of all possible actions.
class QNetwork(nn.Module):
def __init__(self, nvec_s, nvec_u):
super(QNetwork, self).__init__()
self.fc1 = nn.Linear(nvec_s, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, nvec_u)
def forward(self, x:torch.Tensor) -> torch.Tensor:
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = self.fc3(x)
return x
Create ReplayMemory Class
class ReplayMemory:
def __init__(self, capacity, env: gym.Env, device: torch.device):
self.position = 0
self.size = 0
self.capacity = capacity
self.device = device
self.n_states = env.observation_space.shape # Number of dimensions in the state space
self.n_actions = env.action_space.n # Number of discrete actions
# Initialize arrays to store the replay memory
self.states = np.zeros((capacity, *self.n_states))
self.actions = np.zeros((capacity))
self.rewards = np.zeros(capacity)
self.next_states = np.zeros((capacity, *self.n_states))
self.terminated = np.zeros(capacity)
self.truncated = np.zeros(capacity)
def push(self, state:np.ndarray, action:int, next_state:np.ndarray, reward:float, terminated: bool, truncated:bool):
self.states[self.position] = state.flatten()
self.actions[self.position] = action
self.next_states[self.position] = next_state.flatten()
self.rewards[self.position] = reward
self.terminated[self.position] = terminated
self.truncated[self.position] = truncated
self.position = (self.position + 1) % self.capacity
self.size = min(self.size + 1, self.capacity)
def sample(self, batch_size):
indices = np.random.choice(self.size, batch_size, replace=False)
states = torch.tensor(self.states[indices], dtype = torch.float32, device=self.device)
actions = torch.tensor(self.actions[indices], dtype = torch.int64, device=self.device)
next_states = torch.tensor(self.next_states[indices], dtype = torch.float32, device=self.device)
rewards = torch.tensor(self.rewards[indices], dtype = torch.float32, device=self.device)
terminated = torch.tensor(self.terminated[indices], dtype = torch.float32, device=self.device)
truncated = torch.tensor(self.truncated[indices], dtype = torch.float32, device=self.device)
return states, actions, next_states, rewards, terminated, truncated
def __len__(self):
return len(self.size)
Create Agent Class
Initialization and some helper functions
Here we define the agent initialization function that initiates the various variables and hyperparameters. We also have a get_action
function that takes random action with a probability of ϵ and otherwise takes an action by passing the state through the q-network and taking argmax i.e selecting action with the highest q-value.
class Agent:
def __init__(
self,
env: gym.Env,
gamma=0.99,
alpha=0.0003,
initial_epsilon=1,
min_epsilon=0.1,
decay_rate=0.9999,
batch_size=64,
n_rollouts=2000,
capacity=100000,
device: torch.device = torch.device("cpu"),
):
self.env = env # Environment
self.device = device # Computation device (CPU or GPU)
self.gamma = gamma # Discount factor
self.alpha = alpha # Learning rate
self.epsilon = initial_epsilon # Initial epsilon value for exploration
self.batch_size = batch_size # Batch size for training
self.n_rollouts = n_rollouts # Number of rollouts to collect
self.epsilon = 1 # Initial epsilon value
self.min_epsilon = min_epsilon # Minimum epsilon value
self.decay_rate = decay_rate # Epsilon decay rate
# Replay memory to store experiences
self.replay_memory = ReplayMemory(capacity, env, device)
# Q-network and target network for Q-learning
self.q_network = QNetwork(
env.observation_space.shape[0], env.action_space.n
).to(device)
self.target_network = deepcopy(self.q_network)
# Optimizer for Q-network
self.optimizer = AdamW(self.q_network.parameters(), lr=alpha)
# Number of dimensions in the state space
self.n_states = env.observation_space.shape[0]
# For metrics
self.n_time_steps = 0 # Number of time steps
self.episodes = 0 # Number of episodes
self.n_updates = 0 # Number of gradient updates
self.best_reward = -np.inf # Best reward seen so far
def get_action(self, obs, greedy=False):
if not greedy and np.random.rand() < self.epsilon: # Epsilon-greedy exploration
return np.random.randint(self.env.action_space.n) # Random action
obs = torch.tensor(obs, dtype=torch.float32, device=self.device).unsqueeze(0) # Convert observation to tensor
self.q_network.eval() # Set Q-network to evaluation mode
with torch.no_grad():
q_values: torch.Tensor = self.q_network(obs) # Get Q-values for the observation
return q_values.argmax().item() # Return action with highest Q-value
def sample_experience(self):
return self.replay_memory.sample(self.batch_size) # Sample experiences from replay memory
def update_target(self):
self.target_network.load_state_dict(self.q_network.state_dict()) # Update target network
Collect Rollouts:
The collect_rollouts
function collects the transitions by simulating the environment by using the epsilon-greedy policy that we have defined in the get_actions
function. Each transition is stored in the replay memory and we decay the epsilon to reduce exploration.
def collect_rollouts(self):
obs, info = self.env.reset() # Reset environment
terminated = False
truncated = False
rewards = 0 # Total rewards
episodes = 0 # Total episodes
for _ in range(self.n_rollouts):
action = self.get_action(obs, greedy=False) # Get action
next_obs, reward, terminated, truncated, _ = self.env.step(action) # Step environment
self.replay_memory.push(
obs, action, next_obs, reward, terminated, truncated
) # Save the transition
obs = next_obs # Update observation
rewards += reward # Accumulate reward
self.n_time_steps += 1 # Increment time steps
if terminated or truncated: # Check if episode ended
episodes += 1
self.episodes += 1
obs, info = self.env.reset() # Reset environment
self.epsilon = max(
self.min_epsilon, self.decay_rate * self.epsilon
) # Decrease epsilon
return rewards / episodes # Return the average reward per episode
Learn
The learn
function samples the experiences of batch_size
from the replay memory, predicts the q-values and gets the target values. We then get the loss and perform backward propagation. We do this for the defined number of epochs
.
def learn(self, epochs):
self.q_network.train() # Set Q-network to training mode
average_loss = 0
for i in range(epochs):
obs, action, next_obs, reward, terminated, truncated = (
self.sample_experience()
) # Sample a batch of experiences
q_values: torch.Tensor = self.q_network(obs) # Get Q-values for the batch
next_q_values = self.target_network(next_obs) # Get Q-values for the next states
q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1) # Gather Q-values for taken actions
next_q_value = next_q_values.max(1).values # Get max Q-value for next states
target = reward + self.gamma * next_q_value * (1 - terminated) * (
1 - truncated
) # Compute target Q-value
loss = F.smooth_l1_loss(q_value, target) # Compute loss
self.optimizer.zero_grad() # Zero gradients
loss.backward() # Backpropagate
torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), 10) # Clip gradients
self.optimizer.step() # Update Q-network
average_loss += (loss.item() - average_loss) / (i + 1) # Update average loss
self.n_updates += 1 # Increment update count
if self.n_updates % 1000 == 0: # Update target network periodically
self.update_target()
return average_loss # Return average loss
Some Implementation Details
- Smooth L1 Loss: This was inspired by the Stable Baselines implementation of DQN. Smooth L1 loss prevents exploding gradients due to outliers by transitioning from L2 loss to L1 loss. This is useful in off-policy algorithms because when transitions are sampled from the Replay Buffer, they can belong to an old, much different policy. It is not desirable to fit to these ”outliers.”
- Gradient Clipping: Before performing the optimizer step, the L2 norm of the gradients is clipped to 10 to counteract the influence of outliers further. This prevents exploding gradients by keeping all gradient norms below 10.
Evaluate the current Policy
According to Zhenyi Wang, in many cases, DQN starts to forget and then relearn again once it has reached near the maximum reward. This can also be seen in the below figure, where the agent sometimes drops to a low reward. This can happen because even when the agent has achieved an optimal greedy policy, the rollout still happens with ϵ − greedy policy. This can cause the agent to suddenly take sub-optimal action, which can cause a large change in the network and cause the agent to unlearn. This can also be sometimes beneficial for making the agent more robust to perturbance. Nevertheless, the policy is evaluated after every learning cycle, and the best model is saved. This makes sure that, in the end, the best-performing model is retrievable.
def evaluate(self, n_steps):
self.q_network.eval() # Set Q-network to evaluation mode
rewards = 0 # Total rewards
episodes = 0 # Total episodes
with torch.no_grad():
obs, info = self.env.reset() # Reset environment
for _ in range(n_steps):
action = self.get_action(obs, greedy=True) # Get action
obs, reward, terminated, truncated, _ = self.env.step(action) # Step environment
rewards += reward # Accumulate reward
if terminated or truncated: # Check if episode ended
episodes += 1
self.env.reset() # Reset environment
rewards /= episodes # Compute average reward per episode
if rewards > self.best_reward: # Save best model if improved
self.best_reward = rewards
torch.save(self.q_network.state_dict(), "dqn_best_model.pth")
print("New best model saved!")
return rewards
Train
Finally we have the train
function that runs the collect_rollouts
, learn
and evaluate
functions in a loop for a number of epochs
.
def train(self, epochs):
self.writer = SummaryWriter(log_dir="dqn_logs/DQN_2") # TensorBoard writer
pbar = tqdm(range(epochs)) # Progress bar
for i in pbar:
rewards = self.collect_rollouts() # Collect rollouts
loss = self.learn(int(self.n_rollouts/2)) # Perform learning
eval_reward = self.evaluate(1000) # Evaluate agent
pbar.set_description(
f"Iteration {i+1} || Reward: {rewards:.3f} || Eval Reward: {eval_reward :.3f} || Loss: {loss:.3f} || Epsilon: {self.epsilon:.2f} || Time steps: {self.n_time_steps} || N updates: {self.n_updates}"
) # Update progress bar
self.writer.add_scalar("Training/Loss", loss, self.n_updates) # Log training loss
self.writer.add_scalar(
"Training/Rollout: Mean Episode Reward", rewards, self.n_updates
) # Log mean episode reward during training
self.writer.add_scalar(
"Evaluation/Mean Episode Reward", eval_reward, self.n_updates
) # Log mean episode reward during evaluation
Hyperparameters:
- For learning and exploration:
gamma
,alpha
,initial_epsilon
,min_epsilon
,decay_rate
: - For experience replay and training:
batch_size
,n_rollouts
,capacity
Start Training
env = gym.make("Pendulum-v1")
env._max_episode_steps = 500
env = PendulumDiscreteStateAction(env, 11, 0.4)
agent = Agent(env)
agent.train(30)
Testing
import matplotlib.pyplot as plt
env = gym.make("Pendulum-v1",
# render_mode='human'
)
env._max_episode_steps = 500
env = PendulumDiscreteStateAction(env, 11, 0.4)
agent = Agent(env)
agent.q_network.load_state_dict(torch.load("models/deep_q_learning/dqn_best_model.pth"))
rewards = []
thetas = []
tot_reward = 0
n_episodes = 100
n_steps= 0
for _ in range(n_episodes):
obs, info = env.reset()
terminated = False
truncated = False
while not terminated and not truncated:
with torch.no_grad():
action = agent.get_action(obs, greedy = True)
# print(action)
obs, reward, terminated, truncated, info = env.step(action)
x = obs[0] * env.observation_space.high[0]
y = obs[1] * env.observation_space.high[1]
theta = np.arctan2(y, x)
thetas.append(theta)
rewards.append(reward)
tot_reward += reward
n_steps+=1
# env.render()
print("Total reward: ", tot_reward/n_episodes)
Results
Below, we can see the results of our trained agent. The agent can reach an angle of zero no matter where it starts. Also, the reward is zero.
The average episodic reward across 100 episodes is 9.8
LSTM Implementation
The transitions we collect are temporal. That is, there is a correlation between nearby transitions. To capture this temporal relation, we can change the state of the environment to be a history of past (state, action) pairs. So we need to add the previous action to the observation and maintain a list of Nhist
number of previous observations. Let’s see if this improves the agent since now we have more information and a better network.
In the figure below, you can see the LSTM network architecture. There are several ways to implement this. Apart from this, another option is to take a linear output from the last hidden layer directly. But let’s proceed with this for now.
# Qnetwork with LSTM
class QNetwork(nn.Module):
def __init__(self, nvec_s, nvec_u):
super(QNetwork, self).__init__()
self.lstm = nn.LSTM(nvec_s, 64, 1, batch_first=True)
self.fc1 = nn.Linear(64, 64)
self.fc2 = nn.Linear(64, nvec_u)
def forward(self, x:torch.Tensor) -> torch.Tensor:
x, _ = self.lstm(x)
x = F.relu(self.fc1(x))
x = torch.mean(x, dim=1)
x = self.fc2(x)
return x
Also, we need to change the wrapper to get the history of observations.
from collections import deque
class PendulumDiscreteStateAction(gym.Wrapper):
def __init__(self, env: gym.Env, nvec_u: int, sigma: float, nhist: int):
super(PendulumDiscreteStateAction, self).__init__(env)
self.env = env
self.nvec_u = nvec_u
# Check if the observation space is of type Box
assert isinstance(
env.observation_space, gym.spaces.Box
), "Error: observation space is not of type Box"
# Create a Discrete action space
self.action_space = gym.spaces.Discrete(nvec_u)
# Define the possible actions
kernel:np.ndarray = gaussian(np.linspace(0, 1, 5), 0, sigma, 2)
self.actions = (-kernel).tolist() + [0] + np.flip(kernel).tolist()
low = []
for _ in range(nhist):
temp_low = []
for value in self.env.observation_space.low:
temp_low.append(value)
temp_low.append(min(self.actions))
low.append(temp_low)
low = np.array([list(self.env.observation_space.low) + [min(self.actions)] for _ in range(nhist)])
high = np.array([list(self.env.observation_space.high) + [max(self.actions)] for _ in range(nhist)])
self.observation_space = gym.spaces.Box(
low=low,
high=high,
dtype=np.float32
)
self.prev_action = None
self.history = deque(maxlen = nhist)
# Initialize the history with zeros
for i in range(nhist):
self.history.append(np.array([0, 0, 0]))
def step(self, action: int) -> tuple[np.ndarray[float], float, bool, dict]:
action = self.actions[action]
obs, reward, terminated, truncated, info = self.env.step([action])
reward = reward/16.2736044 # normalize the reward between -1 and 1
obs: np.ndarray[float] = obs/self.env.observation_space.high # normalize the observation between -1 and 0
obs=np.append(obs, action/2)
self.history.append(obs)
obs = np.array(list(self.history))
return obs, reward, terminated, truncated, info
def reset(self) -> tuple[np.ndarray[float], dict]:
obs, info = self.env.reset()
obs: np.ndarray[float] = obs/self.env.observation_space.high
obs=np.append(obs, 0)
# Initialize the history with the same observation
for i in range(len(self.history)):
self.history.append(obs)
obs = np.array(list(self.history))
return obs, info
Here, you can see that we are using deque
to keep a list of the history of fixed-size Nhist
. We also added the action to the observation. Now, we can train the agent as usual. Below, we can see how the training occurs in comparison to FCN. It is pretty similar. This means the LSTM is not having too big of an effect here. But in some complex environments this can be useful and worth trying.
That's all for this part. We saw how to implement a deep Q learning algorithm from scratch and use any network as the Q network, not just FCN. We saw LSTM, but you do this with transformers for an even longer context history. In the next part we will look into how to solve the limitation of using a discrete action space by implementing the Reinforce Algorithm.
Want to connect?
Featured ones: