Proximal Policy Optimization, Deep Reinforcement Learning
This notebook contains my implementation of the Proximal Policy Optimization algorithm, tested within the LunarLanderContinuous-v2 Gymnasium environment.
Information about this environment can be found here.
In this environment, the robot is tasked with piloting the lander into the center of a designated landing zone, without the body of the lander contacting the surface. The complete reward function has many factors involved, which can all be found at the above link.
Basic import functions:
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import matplotlib.pyplot as plt
The cell below implements the Actor network, the Critic network, and the PPOAgent. The actor network outputs a \mu and std for each action dimension, which is then used to create and sample from a normal distribution. The critic network outputs a single value corresponding to the current state. Both networks input the current state vector.
The PPOAgent instantiates an Actor network and a Critic network, as well as a shared optimizer, meant to introduce stability into the training step of the networks. Three additional functions are provided. First, select_action() allows for a state vector to be input, and will input that through the actor network, and sample an action based upon the output. This function returns the action vector, the log probability of that action, and the entropy of the distribution for that action. Second, the compute_gae() function allows for the computation of the generalized advantage estimation for each timestep in the collected batch of experiences, returning a returns vector and an advantages vector. Lastly, the update() function performs the minibatch generation and update loop for the two networks based upon the PPO algorithm.
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=64):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
self.log_std = nn.Parameter(torch.ones(action_dim) * -0.5)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = torch.tanh(self.fc3(x))
mu = x
std = torch.exp(self.log_std)
return mu, std
class Critic(nn.Module):
def __init__(self, state_dim, hidden_dim=64):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
class PPOAgent:
def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99, gae_lambda = 0.95, clip_epsilon=0.2, hidden_dim=64):
self.actor = Actor(state_dim, action_dim, hidden_dim)
self.critic = Critic(state_dim, hidden_dim)
self.optimizer = optim.Adam(list(self.actor.parameters()) + list(self.critic.parameters()), lr=lr)
self.gamma = gamma
self.gae_lambda = gae_lambda
self.clip_epsilon = clip_epsilon
def select_action(self, state):
state = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
mu, std = self.actor(state)
dist = Normal(mu, std)
action = dist.sample()
action_clipped = torch.clamp(action, -1.0, 1.0)
entropy = dist.entropy().sum(dim=-1)
log_prob = dist.log_prob(action).sum(dim=-1)
return action_clipped.squeeze(), log_prob.squeeze(), entropy.squeeze()
def compute_gae(self, rewards, values, dones, next_value):
num_steps = rewards.shape[0]
advantages = torch.zeros_like(rewards)
gae = torch.zeros_like(next_value)
for step in reversed(range(num_steps)):
if step == num_steps - 1:
next_val = next_value
else:
next_val = values[step + 1]
delta = rewards[step] + self.gamma * next_val * (1 - dones[step]) - values[step]
gae = delta + self.gamma * self.gae_lambda * (1 - dones[step]) * gae
advantages[step] = gae
returns = advantages + values
return advantages, returns
def update(self, states, actions, log_probs_old, returns, advantages, epochs=4, batch_size=64):
b_states = states.reshape(-1, states.shape[-1])
b_actions = actions.reshape(-1, actions.shape[-1])
b_log_probs_old = log_probs_old.reshape(-1)
b_returns = returns.reshape(-1)
b_advantages = advantages.reshape(-1)
b_advantages = (b_advantages - b_advantages.mean()) / (b_advantages.std() + 1e-8)
dataset_size = b_states.shape[0]
for epoch in range(epochs):
indices = np.random.permutation(dataset_size)
for start in range(0, dataset_size, batch_size):
end = start + batch_size
batch_indices = indices[start:end]
batch_states = b_states[batch_indices]
batch_actions = b_actions[batch_indices]
batch_log_probs_old = b_log_probs_old[batch_indices]
batch_returns = b_returns[batch_indices]
batch_advantages = b_advantages[batch_indices]
mu, std = self.actor(batch_states)
dist = Normal(mu, std)
log_probs = dist.log_prob(batch_actions).sum(dim=-1)
entropy = dist.entropy().sum(dim=-1).mean()
ratios = torch.exp(log_probs - batch_log_probs_old)
surr1 = ratios * batch_advantages
surr2 = torch.clamp(ratios, 1.0 - self.clip_epsilon, 1.0 + self.clip_epsilon) * batch_advantages
actor_loss = -torch.min(surr1, surr2).mean()
values = self.critic(batch_states).squeeze()
critic_loss = F.mse_loss(values, batch_returns)
loss = actor_loss + 0.5 * critic_loss - 0.001 * entropy
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
The function below, run_ppo(), performs the entire training loop of PPO for a given continuous state and action space Gymnasium environment, from collecting the experiences for each batch, computing the advantages, and performing the update loop. Additionally, this will save the final and best performing actor network, or policy.
def run_ppo():
num_envs = 16
env_name = "LunarLanderContinuous-v2"
envs = gym.make_vec(env_name, num_envs=num_envs)
num_mini_batches = 32
state_dim = envs.single_observation_space.shape[0]
action_dim = envs.single_action_space.shape[0]
agent = PPOAgent(state_dim, action_dim, lr=1e-4, hidden_dim=128, gae_lambda=0.95)
state, info = envs.reset()
state = torch.FloatTensor(state)
episode_rewards = []
episode_lengths = []
current_episode_rewards = np.zeros(envs.num_envs)
current_episode_lengths = np.zeros(envs.num_envs)
total_timesteps = 10000000
steps_per_rollout = 4096
num_updates = total_timesteps // steps_per_rollout
num_steps = steps_per_rollout // num_envs
batch_size = steps_per_rollout // num_mini_batches
curr_max = -float('inf')
reward_steps = []
for update in range(num_updates):
states = torch.zeros((num_steps, num_envs, state_dim))
actions = torch.zeros((num_steps, num_envs, action_dim), dtype=torch.float)
log_probs = torch.zeros((num_steps, num_envs))
rewards = torch.zeros((num_steps, num_envs))
dones = torch.zeros((num_steps, num_envs))
values = torch.zeros((num_steps, num_envs))
entropies = torch.zeros((num_steps, num_envs))
for step in range(num_steps):
states[step] = state
with torch.no_grad():
action, log_prob, entropy = agent.select_action(state)
value = agent.critic(state).squeeze()
next_state, reward, terminated, truncated, info = envs.step(action.numpy())
done = np.logical_or(terminated, truncated)
actions[step] = action
log_probs[step] = log_prob
rewards[step] = torch.FloatTensor(reward)
dones[step] = torch.FloatTensor(done)
values[step] = value
entropies[step] = entropy
current_episode_rewards += reward
current_episode_lengths += 1
state = torch.FloatTensor(next_state)
# print(state.shape)
# print(states)
for env in range(num_envs):
if done[env]:
episode_rewards.append(current_episode_rewards[env])
episode_lengths.append(current_episode_lengths[env])
current_episode_rewards[env] = 0
current_episode_lengths[env] = 0
with torch.no_grad():
next_value = agent.critic(state).squeeze()
rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-8)
advantages, returns = agent.compute_gae(rewards, values, dones, next_value)
agent.update(states, actions, log_probs, returns, advantages, epochs=10, batch_size=batch_size) # Remove normalization here
avg_reward = np.mean(episode_rewards[-100:]) if len(episode_rewards) >= 100 else np.mean(episode_rewards)
if (update + 1) % 10 == 0:
print(f"Update {update + 1}/{num_updates}, Average Reward: {avg_reward:.2f}, Episodes: {len(episode_rewards)}, Total Timesteps: {(update + 1) * steps_per_rollout}")
if (avg_reward > curr_max) and (avg_reward > 225):
curr_max = avg_reward
torch.save(agent.actor.state_dict(), "ppo_actor_lunar_lander_continuous_best.pth")
print(f"New best model saved with average reward: {curr_max:.2f}")
if avg_reward >= 275:
print(f"Solved in {update + 1} updates!")
break
envs.close()
torch.save(agent.actor.state_dict(), "ppo_actor_lunar_lander_continuous.pth")
reward_steps = episode_rewards
return reward_steps
plot_rewards = []
if __name__ == "__main__":
plot_rewards = run_ppo()
In the cell below, the function run_trained_agent() allows for us to visually see the performance of our trained policy from the above cell.
def run_trained_agent():
env_name = "LunarLanderContinuous-v2"
env = gym.make(env_name, render_mode="human")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
agent = PPOAgent(state_dim, action_dim, hidden_dim=128, gae_lambda=0.95)
agent.actor.load_state_dict(torch.load("ppo_actor_lunar_lander_continuous_best.pth", weights_only=True))
state, info = env.reset()
state = torch.FloatTensor(state)
for i in np.arange(10):
env.reset()
done = False
ep_reward = 0
while not done:
with torch.no_grad():
action, _, _ = agent.select_action(state)
random = np.random.uniform(low=-1.0, high=1.0, size=action.shape)
# next_state, reward, terminated, truncated, info = env.step(random)
next_state, reward, terminated, truncated, info = env.step(action.numpy())
done = terminated or truncated
state = torch.FloatTensor(next_state)
ep_reward += reward
# time.sleep(0.1)
print(f"Episode {i+1}: Reward = {ep_reward}")
env.close()
if __name__ == "__main__":
run_trained_agent()
Below is the code and associated plot for the reward for each episode during training. The official barrier for an episode to be considered a solution is to reach +200 reward. In my training, I wanted my agent to perform better, so I had the model train until it was able to average +275 reward over the last 100 episodes.
window = 100
smoothed_rewards = np.convolve(plot_rewards, np.ones(window)/window, mode='valid')
plt.figure(figsize=(10, 5))
plt.plot(plot_rewards, label='Episode Reward')
plt.plot(np.arange(window-1, len(plot_rewards)), smoothed_rewards, label=f'Smoothed Reward (window={window})', color='orange')
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Episode Rewards and Smoothed Rewards Over Time")
plt.legend()
plt.show()

