Reinforcement Learning for Robotic Control
Introductionโ
Reinforcement Learning (RL) enables robots to learn complex behaviors through trial and error, making it ideal for tasks where explicit programming is infeasible.
1. RL Fundamentalsโ
1.1 Markov Decision Process (MDP)โ
An MDP is defined by:
- States (S): Robot configuration and environment state
- Actions (A): Motor commands, joint torques
- Rewards (R): Task-specific feedback (e.g., distance to goal)
- Transition Dynamics (P): P(s' | s, a)
- Discount Factor (ฮณ): Future reward importance (0 < ฮณ < 1)
Objective: Find policy ฯ that maximizes expected return:
J(ฯ) = E[ฮฃ ฮณ^t ยท r_t]
1.2 Value Functionsโ
State-Value Function V^ฯ(s):
V^ฯ(s) = E[ฮฃ ฮณ^t ยท r_t | s_0 = s, ฯ]
Action-Value Function Q^ฯ(s,a):
Q^ฯ(s,a) = E[r + ฮณ ยท V^ฯ(s')]
2. Classical RL Algorithmsโ
2.1 Q-Learning (Off-Policy TD)โ
import numpy as np
class QLearningAgent:
def __init__(self, state_size, action_size, lr=0.1, gamma=0.99, epsilon=0.1):
self.Q = np.zeros((state_size, action_size))
self.lr = lr
self.gamma = gamma
self.epsilon = epsilon
def select_action(self, state):
"""Epsilon-greedy policy"""
if np.random.rand() < self.epsilon:
return np.random.randint(self.Q.shape[1]) # Explore
else:
return np.argmax(self.Q[state]) # Exploit
def update(self, state, action, reward, next_state):
"""Q-learning update rule"""
best_next_action = np.argmax(self.Q[next_state])
td_target = reward + self.gamma * self.Q[next_state, best_next_action]
td_error = td_target - self.Q[state, action]
self.Q[state, action] += self.lr * td_error
2.2 SARSA (On-Policy TD)โ
class SARSAgent:
def update(self, state, action, reward, next_state, next_action):
"""SARSA update (uses actual next action, not max)"""
td_target = reward + self.gamma * self.Q[next_state, next_action]
td_error = td_target - self.Q[state, action]
self.Q[state, action] += self.lr * td_error
3. Deep Reinforcement Learningโ
3.1 Deep Q-Network (DQN)โ
import torch
import torch.nn as nn
import torch.optim as optim
class DQN(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.fc1 = nn.Linear(state_dim, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, action_dim)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x) # Q-values for each action
class DQNAgent:
def __init__(self, state_dim, action_dim):
self.policy_net = DQN(state_dim, action_dim)
self.target_net = DQN(state_dim, action_dim)
self.target_net.load_state_dict(self.policy_net.state_dict())
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=1e-3)
self.memory = []
self.gamma = 0.99
def select_action(self, state, epsilon=0.1):
if np.random.rand() < epsilon:
return np.random.randint(self.policy_net.fc3.out_features)
else:
with torch.no_grad():
state_tensor = torch.FloatTensor(state)
q_values = self.policy_net(state_tensor)
return q_values.argmax().item()
def train(self, batch_size=64):
if len(self.memory) < batch_size:
return
# Sample batch
batch = random.sample(self.memory, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones)
# Compute Q(s,a)
q_values = self.policy_net(states).gather(1, actions.unsqueeze(1))
# Compute target: r + ฮณ ยท max Q(s',a')
with torch.no_grad():
next_q_values = self.target_net(next_states).max(1)[0]
target = rewards + self.gamma * next_q_values * (1 - dones)
# Loss
loss = nn.MSELoss()(q_values.squeeze(), target)
# Optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
4. Policy Gradient Methodsโ
4.1 REINFORCE Algorithmโ
class REINFORCEAgent:
def __init__(self, state_dim, action_dim, lr=1e-3):
self.policy = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, action_dim),
nn.Softmax(dim=-1)
)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
self.gamma = 0.99
def select_action(self, state):
state_tensor = torch.FloatTensor(state)
probs = self.policy(state_tensor)
action = torch.multinomial(probs, 1).item()
return action, probs[action]
def train_episode(self, episode_rewards, episode_log_probs):
"""
Update policy after full episode
"""
# Compute returns (discounted cumulative rewards)
returns = []
G = 0
for r in reversed(episode_rewards):
G = r + self.gamma * G
returns.insert(0, G)
returns = torch.FloatTensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8) # Normalize
# Policy gradient loss
loss = 0
for log_prob, G in zip(episode_log_probs, returns):
loss += -log_prob * G
# Optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
4.2 Proximal Policy Optimization (PPO)โ
class PPOAgent:
def __init__(self, state_dim, action_dim):
self.actor = nn.Sequential(
nn.Linear(state_dim, 128),
nn.Tanh(),
nn.Linear(128, action_dim),
nn.Softmax(dim=-1)
)
self.critic = nn.Sequential(
nn.Linear(state_dim, 128),
nn.Tanh(),
nn.Linear(128, 1)
)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
self.clip_epsilon = 0.2
self.gamma = 0.99
def compute_advantage(self, rewards, values, next_values, dones):
"""Generalized Advantage Estimation (GAE)"""
advantages = []
gae = 0
for i in reversed(range(len(rewards))):
delta = rewards[i] + self.gamma * next_values[i] * (1 - dones[i]) - values[i]
gae = delta + self.gamma * 0.95 * (1 - dones[i]) * gae
advantages.insert(0, gae)
return torch.FloatTensor(advantages)
def update(self, states, actions, old_log_probs, returns, advantages):
"""PPO clipped objective"""
for _ in range(10): # Multiple epochs
# Actor update
probs = self.actor(states)
dist = torch.distributions.Categorical(probs)
new_log_probs = dist.log_prob(actions)
ratio = torch.exp(new_log_probs - old_log_probs)
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# Critic update
values = self.critic(states).squeeze()
critic_loss = nn.MSELoss()(values, returns)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
5. Sim-to-Real Transferโ
5.1 Domain Randomizationโ
class RandomizedEnv:
def __init__(self, base_env):
self.base_env = base_env
def reset(self):
# Randomize physics parameters
self.base_env.set_gravity(np.random.uniform(8.0, 11.0))
self.base_env.set_friction(np.random.uniform(0.3, 0.9))
self.base_env.set_mass_scale(np.random.uniform(0.8, 1.2))
# Randomize visual appearance
self.base_env.set_light_intensity(np.random.uniform(0.5, 1.5))
self.base_env.set_color_jitter(np.random.rand(3))
return self.base_env.reset()
5.2 System Identificationโ
def system_identification(real_robot_data, sim_env):
"""
Optimize simulation parameters to match real robot
"""
def loss_function(sim_params):
sim_env.set_parameters(sim_params)
sim_trajectories = sim_env.run_experiments()
# Compare with real data
error = np.mean((sim_trajectories - real_robot_data)**2)
return error
# Optimize (e.g., using CMA-ES or Bayesian Optimization)
optimal_params = scipy.optimize.minimize(loss_function, initial_guess)
return optimal_params
6. Isaac Gym Integrationโ
6.1 Parallel Trainingโ
from isaacgym import gymapi, gymutil
def train_with_isaac_gym(num_envs=4096):
gym = gymapi.acquire_gym()
# Create simulation
sim = gym.create_sim(0, 0, gymapi.SIM_PHYSX, sim_params)
# Create multiple environments in parallel
envs = []
for i in range(num_envs):
env = gym.create_env(sim, lower, upper, num_per_row)
envs.append(env)
# Training loop
for iteration in range(10000):
# Step all environments in parallel
gym.simulate(sim)
gym.fetch_results(sim, True)
# Collect observations
states = get_observations(envs)
# Select actions from policy
actions = policy.predict(states)
# Apply actions
apply_actions(envs, actions)
# Compute rewards
rewards = compute_rewards(envs)
# Update policy
policy.update(states, actions, rewards)
7. Case Study: Bipedal Walking with RLโ
class WalkingEnv:
def __init__(self):
self.robot = HumanoidRobot()
self.target_velocity = 1.0 # m/s
def reset(self):
self.robot.reset_pose()
return self.get_state()
def get_state(self):
"""
State: [joint angles, joint velocities, body orientation, CoM position]
"""
return np.concatenate([
self.robot.get_joint_positions(),
self.robot.get_joint_velocities(),
self.robot.get_orientation(),
self.robot.get_com_position()
])
def step(self, action):
"""
Action: Joint torques
"""
self.robot.apply_torques(action)
self.robot.simulate_step(dt=0.01)
# Reward function
velocity = self.robot.get_forward_velocity()
reward = -abs(velocity - self.target_velocity) # Velocity tracking
reward -= 0.01 * np.sum(action**2) # Energy penalty
# Check termination
height = self.robot.get_com_height()
done = height < 0.5 # Fell down
return self.get_state(), reward, done
Summaryโ
- โ MDP formulation and value functions
- โ Q-Learning and SARSA
- โ Deep RL (DQN, PPO)
- โ Policy gradient methods
- โ Sim-to-real transfer techniques
- โ Isaac Gym for parallel training
Next: Module 6.2 - Model Predictive Control (MPC)