Skip to main content

Reinforcement Learning for Robotic Control

Introductionโ€‹

Reinforcement Learning (RL) enables robots to learn complex behaviors through trial and error, making it ideal for tasks where explicit programming is infeasible.


1. RL Fundamentalsโ€‹

1.1 Markov Decision Process (MDP)โ€‹

An MDP is defined by:

  • States (S): Robot configuration and environment state
  • Actions (A): Motor commands, joint torques
  • Rewards (R): Task-specific feedback (e.g., distance to goal)
  • Transition Dynamics (P): P(s' | s, a)
  • Discount Factor (ฮณ): Future reward importance (0 < ฮณ < 1)

Objective: Find policy ฯ€ that maximizes expected return:

J(ฯ€) = E[ฮฃ ฮณ^t ยท r_t]

1.2 Value Functionsโ€‹

State-Value Function V^ฯ€(s):

V^ฯ€(s) = E[ฮฃ ฮณ^t ยท r_t | s_0 = s, ฯ€]

Action-Value Function Q^ฯ€(s,a):

Q^ฯ€(s,a) = E[r + ฮณ ยท V^ฯ€(s')]

2. Classical RL Algorithmsโ€‹

2.1 Q-Learning (Off-Policy TD)โ€‹

import numpy as np

class QLearningAgent:
def __init__(self, state_size, action_size, lr=0.1, gamma=0.99, epsilon=0.1):
self.Q = np.zeros((state_size, action_size))
self.lr = lr
self.gamma = gamma
self.epsilon = epsilon

def select_action(self, state):
"""Epsilon-greedy policy"""
if np.random.rand() < self.epsilon:
return np.random.randint(self.Q.shape[1]) # Explore
else:
return np.argmax(self.Q[state]) # Exploit

def update(self, state, action, reward, next_state):
"""Q-learning update rule"""
best_next_action = np.argmax(self.Q[next_state])
td_target = reward + self.gamma * self.Q[next_state, best_next_action]
td_error = td_target - self.Q[state, action]
self.Q[state, action] += self.lr * td_error

2.2 SARSA (On-Policy TD)โ€‹

class SARSAgent:
def update(self, state, action, reward, next_state, next_action):
"""SARSA update (uses actual next action, not max)"""
td_target = reward + self.gamma * self.Q[next_state, next_action]
td_error = td_target - self.Q[state, action]
self.Q[state, action] += self.lr * td_error

3. Deep Reinforcement Learningโ€‹

3.1 Deep Q-Network (DQN)โ€‹

import torch
import torch.nn as nn
import torch.optim as optim

class DQN(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.fc1 = nn.Linear(state_dim, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, action_dim)

def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x) # Q-values for each action

class DQNAgent:
def __init__(self, state_dim, action_dim):
self.policy_net = DQN(state_dim, action_dim)
self.target_net = DQN(state_dim, action_dim)
self.target_net.load_state_dict(self.policy_net.state_dict())

self.optimizer = optim.Adam(self.policy_net.parameters(), lr=1e-3)
self.memory = []
self.gamma = 0.99

def select_action(self, state, epsilon=0.1):
if np.random.rand() < epsilon:
return np.random.randint(self.policy_net.fc3.out_features)
else:
with torch.no_grad():
state_tensor = torch.FloatTensor(state)
q_values = self.policy_net(state_tensor)
return q_values.argmax().item()

def train(self, batch_size=64):
if len(self.memory) < batch_size:
return

# Sample batch
batch = random.sample(self.memory, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)

states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones)

# Compute Q(s,a)
q_values = self.policy_net(states).gather(1, actions.unsqueeze(1))

# Compute target: r + ฮณ ยท max Q(s',a')
with torch.no_grad():
next_q_values = self.target_net(next_states).max(1)[0]
target = rewards + self.gamma * next_q_values * (1 - dones)

# Loss
loss = nn.MSELoss()(q_values.squeeze(), target)

# Optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

4. Policy Gradient Methodsโ€‹

4.1 REINFORCE Algorithmโ€‹

class REINFORCEAgent:
def __init__(self, state_dim, action_dim, lr=1e-3):
self.policy = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, action_dim),
nn.Softmax(dim=-1)
)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
self.gamma = 0.99

def select_action(self, state):
state_tensor = torch.FloatTensor(state)
probs = self.policy(state_tensor)
action = torch.multinomial(probs, 1).item()
return action, probs[action]

def train_episode(self, episode_rewards, episode_log_probs):
"""
Update policy after full episode
"""
# Compute returns (discounted cumulative rewards)
returns = []
G = 0
for r in reversed(episode_rewards):
G = r + self.gamma * G
returns.insert(0, G)

returns = torch.FloatTensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8) # Normalize

# Policy gradient loss
loss = 0
for log_prob, G in zip(episode_log_probs, returns):
loss += -log_prob * G

# Optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

4.2 Proximal Policy Optimization (PPO)โ€‹

class PPOAgent:
def __init__(self, state_dim, action_dim):
self.actor = nn.Sequential(
nn.Linear(state_dim, 128),
nn.Tanh(),
nn.Linear(128, action_dim),
nn.Softmax(dim=-1)
)
self.critic = nn.Sequential(
nn.Linear(state_dim, 128),
nn.Tanh(),
nn.Linear(128, 1)
)

self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)

self.clip_epsilon = 0.2
self.gamma = 0.99

def compute_advantage(self, rewards, values, next_values, dones):
"""Generalized Advantage Estimation (GAE)"""
advantages = []
gae = 0

for i in reversed(range(len(rewards))):
delta = rewards[i] + self.gamma * next_values[i] * (1 - dones[i]) - values[i]
gae = delta + self.gamma * 0.95 * (1 - dones[i]) * gae
advantages.insert(0, gae)

return torch.FloatTensor(advantages)

def update(self, states, actions, old_log_probs, returns, advantages):
"""PPO clipped objective"""
for _ in range(10): # Multiple epochs
# Actor update
probs = self.actor(states)
dist = torch.distributions.Categorical(probs)
new_log_probs = dist.log_prob(actions)

ratio = torch.exp(new_log_probs - old_log_probs)
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages

actor_loss = -torch.min(surr1, surr2).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

# Critic update
values = self.critic(states).squeeze()
critic_loss = nn.MSELoss()(values, returns)

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

5. Sim-to-Real Transferโ€‹

5.1 Domain Randomizationโ€‹

class RandomizedEnv:
def __init__(self, base_env):
self.base_env = base_env

def reset(self):
# Randomize physics parameters
self.base_env.set_gravity(np.random.uniform(8.0, 11.0))
self.base_env.set_friction(np.random.uniform(0.3, 0.9))
self.base_env.set_mass_scale(np.random.uniform(0.8, 1.2))

# Randomize visual appearance
self.base_env.set_light_intensity(np.random.uniform(0.5, 1.5))
self.base_env.set_color_jitter(np.random.rand(3))

return self.base_env.reset()

5.2 System Identificationโ€‹

def system_identification(real_robot_data, sim_env):
"""
Optimize simulation parameters to match real robot
"""
def loss_function(sim_params):
sim_env.set_parameters(sim_params)
sim_trajectories = sim_env.run_experiments()

# Compare with real data
error = np.mean((sim_trajectories - real_robot_data)**2)
return error

# Optimize (e.g., using CMA-ES or Bayesian Optimization)
optimal_params = scipy.optimize.minimize(loss_function, initial_guess)

return optimal_params

6. Isaac Gym Integrationโ€‹

6.1 Parallel Trainingโ€‹

from isaacgym import gymapi, gymutil

def train_with_isaac_gym(num_envs=4096):
gym = gymapi.acquire_gym()

# Create simulation
sim = gym.create_sim(0, 0, gymapi.SIM_PHYSX, sim_params)

# Create multiple environments in parallel
envs = []
for i in range(num_envs):
env = gym.create_env(sim, lower, upper, num_per_row)
envs.append(env)

# Training loop
for iteration in range(10000):
# Step all environments in parallel
gym.simulate(sim)
gym.fetch_results(sim, True)

# Collect observations
states = get_observations(envs)

# Select actions from policy
actions = policy.predict(states)

# Apply actions
apply_actions(envs, actions)

# Compute rewards
rewards = compute_rewards(envs)

# Update policy
policy.update(states, actions, rewards)

7. Case Study: Bipedal Walking with RLโ€‹

class WalkingEnv:
def __init__(self):
self.robot = HumanoidRobot()
self.target_velocity = 1.0 # m/s

def reset(self):
self.robot.reset_pose()
return self.get_state()

def get_state(self):
"""
State: [joint angles, joint velocities, body orientation, CoM position]
"""
return np.concatenate([
self.robot.get_joint_positions(),
self.robot.get_joint_velocities(),
self.robot.get_orientation(),
self.robot.get_com_position()
])

def step(self, action):
"""
Action: Joint torques
"""
self.robot.apply_torques(action)
self.robot.simulate_step(dt=0.01)

# Reward function
velocity = self.robot.get_forward_velocity()
reward = -abs(velocity - self.target_velocity) # Velocity tracking
reward -= 0.01 * np.sum(action**2) # Energy penalty

# Check termination
height = self.robot.get_com_height()
done = height < 0.5 # Fell down

return self.get_state(), reward, done

Summaryโ€‹

  • โœ… MDP formulation and value functions
  • โœ… Q-Learning and SARSA
  • โœ… Deep RL (DQN, PPO)
  • โœ… Policy gradient methods
  • โœ… Sim-to-real transfer techniques
  • โœ… Isaac Gym for parallel training

Next: Module 6.2 - Model Predictive Control (MPC)