Python Agents: Controlling Robots with rclpy
Why Python for Robotics?β
While robotics traditionally relied on C++ for performance, Python has become the lingua franca of Physical AI for three key reasons:
- AI/ML Integration: PyTorch, TensorFlow, and HuggingFace models are Python-first
- Rapid Prototyping: Test ideas in minutes, not hours
- Ecosystem: NumPy, OpenCV, SciPy provide rich scientific computing tools
rclpy (ROS Client Library for Python) bridges the gap between Python's ease of development and ROS 2's real-time communication infrastructure.
For perception and planning (where AI models run), Python is perfect. For ultra-low-latency motor control loops (1kHz+), you'd use C++ nodes. Most humanoid robots use both languages in the same system.
Anatomy of a ROS 2 Python Nodeβ
Every ROS 2 Python node follows this structure:
import rclpy
from rclpy.node import Node
class MyNode(Node):
def __init__(self):
super().__init__('node_name')
# Initialize publishers, subscribers, timers, etc.
def callback_function(self):
# Handle events (timers, messages, etc.)
pass
def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node) # Keep node running
rclpy.shutdown()
if __name__ == '__main__':
main()
Key Components:
- Import rclpy: The ROS 2 Python library
- Create Node Class: Inherits from
rclpy.node.Node - Initialize Node: Call
super().__init__('node_name')in constructor - Setup Communication: Create publishers, subscribers, timers in
__init__ - Spin Node:
rclpy.spin(node)processes callbacks in a loop - Shutdown: Clean up resources when node terminates
Publisher Node: Sending Commandsβ
Let's create a velocity commander that tells a robot to move forward:
Complete Code: velocity_publisher.pyβ
#!/usr/bin/env python3
"""
Velocity Publisher Node
Publishes movement commands to control a robot's wheels
"""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
class VelocityPublisher(Node):
"""
Node that publishes velocity commands at 10 Hz
Topic: /cmd_vel (geometry_msgs/Twist)
"""
def __init__(self):
super().__init__('velocity_publisher')
# Create publisher on /cmd_vel topic
self.publisher_ = self.create_publisher(
Twist, # Message type
'/cmd_vel', # Topic name
10 # Queue size (keep last 10 messages if subscriber is slow)
)
# Create timer that calls publish_velocity() every 0.1 seconds (10 Hz)
self.timer = self.create_timer(0.1, self.publish_velocity)
# Initialize velocity command
self.cmd = Twist()
self.cmd.linear.x = 0.5 # Move forward at 0.5 m/s
self.cmd.angular.z = 0.2 # Rotate at 0.2 rad/s (gentle left turn)
self.get_logger().info('Velocity Publisher started - Commanding robot to move')
def publish_velocity(self):
"""Callback function called by timer"""
self.publisher_.publish(self.cmd)
self.get_logger().info(f'Publishing: linear.x={self.cmd.linear.x}, angular.z={self.cmd.angular.z}')
def main(args=None):
rclpy.init(args=args)
node = VelocityPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Code Breakdownβ
Lines 20-24: Create Publisher
self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
Twist: Message type (contains linear and angular velocity)/cmd_vel: Standard ROS 2 topic name for robot velocity commands10: Queue size (buffers messages if network is slow)
Lines 26-27: Create Timer
self.timer = self.create_timer(0.1, self.publish_velocity)
- Calls
publish_velocity()every 0.1 seconds (10 Hz frequency) - Timers are ROS 2's preferred way to execute periodic tasks
Lines 29-32: Initialize Message
self.cmd = Twist()
self.cmd.linear.x = 0.5 # Forward velocity
self.cmd.angular.z = 0.2 # Rotational velocity
Twisthas two 3D vectors:linear(m/s) andangular(rad/s)- For a wheeled robot, typically only
linear.xandangular.zare used
Lines 37-39: Publish Message
self.publisher_.publish(self.cmd)
- Sends message on
/cmd_veltopic - All subscribers receive it instantly via DDS middleware
Subscriber Node: Receiving Sensor Dataβ
Now let's create a node that listens for messages on a topic:
Complete Code: sensor_listener.pyβ
#!/usr/bin/env python3
"""
Sensor Listener Node
Subscribes to sensor data and logs it
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class SensorListener(Node):
"""
Node that subscribes to /sensor_data topic
Topic: /sensor_data (std_msgs/String)
"""
def __init__(self):
super().__init__('sensor_listener')
# Create subscriber
self.subscription = self.create_subscription(
String, # Message type
'/sensor_data', # Topic name
self.listener_callback, # Callback function
10 # Queue size
)
self.get_logger().info('Sensor Listener started - Waiting for data...')
def listener_callback(self, msg):
"""
Called automatically whenever a message arrives on /sensor_data
Args:
msg (std_msgs/String): The received message
"""
self.get_logger().info(f'Received: "{msg.data}"')
# Example: Parse sensor data and take action
if "warning" in msg.data.lower():
self.get_logger().warn('β οΈ SENSOR ALERT DETECTED!')
def main(args=None):
rclpy.init(args=args)
node = SensorListener()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Key Differences from Publisherβ
1. Subscriber Creation (Lines 21-26)
self.subscription = self.create_subscription(
String,
'/sensor_data',
self.listener_callback, # β Callback function
10
)
- You provide a callback function that ROS 2 calls when messages arrive
- No timer neededβcallbacks are event-driven
2. Callback Function (Lines 30-41)
def listener_callback(self, msg):
self.get_logger().info(f'Received: "{msg.data}"')
- The
msgparameter contains the received data - This function runs asynchronously whenever a message arrives
Keep callbacks fastβdon't do heavy computation here. If processing takes >1ms, use a separate thread or queue the data for later processing.
Running Your Nodesβ
Step 1: Make Scripts Executableβ
chmod +x velocity_publisher.py
chmod +x sensor_listener.py
Step 2: Run Publisher (Terminal 1)β
python3 velocity_publisher.py
Expected Output:
[INFO] [velocity_publisher]: Velocity Publisher started - Commanding robot to move
[INFO] [velocity_publisher]: Publishing: linear.x=0.5, angular.z=0.2
[INFO] [velocity_publisher]: Publishing: linear.x=0.5, angular.z=0.2
Step 3: Verify Topic (Terminal 2)β
ros2 topic echo /cmd_vel
Expected Output:
linear:
x: 0.5
y: 0.0
z: 0.0
angular:
x: 0.0
y: 0.0
z: 0.2
---
ROS 2 Package Structureβ
To integrate nodes into a ROS 2 workspace, you need two key files:
1. package.xml - Package Metadataβ
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_robot_controller</name>
<version>0.1.0</version>
<description>Python nodes for robot control</description>
<maintainer email="you@example.com">Your Name</maintainer>
<license>Apache-2.0</license>
<!-- Build tool dependency -->
<buildtool_depend>ament_python</buildtool_depend>
<!-- Runtime dependencies -->
<exec_depend>rclpy</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>std_msgs</exec_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
Key Elements:
<name>: Package name (used inros2 run <package_name> <node_name>)<buildtool_depend>: Build system (ament_python for Python packages)<exec_depend>: Runtime dependencies (rclpy, message types)
2. setup.py - Python Package Configurationβ
from setuptools import setup
package_name = 'my_robot_controller'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Your Name',
maintainer_email='you@example.com',
description='Python nodes for robot control',
license='Apache-2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'velocity_publisher = my_robot_controller.velocity_publisher:main',
'sensor_listener = my_robot_controller.sensor_listener:main',
],
},
)
Critical Section: entry_points
'velocity_publisher = my_robot_controller.velocity_publisher:main'
- Left side: Command name (what you type after
ros2 run <package>) - Right side: Python module path and function to call
This allows you to run:
ros2 run my_robot_controller velocity_publisher
Directory Structureβ
A complete ROS 2 Python package looks like this:
my_robot_controller/
βββ my_robot_controller/ # Python module
β βββ __init__.py # Empty file (marks as Python package)
β βββ velocity_publisher.py # Publisher node
β βββ sensor_listener.py # Subscriber node
βββ resource/
β βββ my_robot_controller # Empty marker file
βββ test/
β βββ test_copyright.py
β βββ test_flake8.py
βββ package.xml # ROS 2 package metadata
βββ setup.py # Python package configuration
Building and Runningβ
# From your ROS 2 workspace root
cd ~/ros2_ws
# Build the package
colcon build --packages-select my_robot_controller
# Source the workspace
source install/setup.bash
# Run nodes
ros2 run my_robot_controller velocity_publisher
ros2 run my_robot_controller sensor_listener
Hands-On Exercise: Temperature Monitorβ
Challenge: Create a two-node system:
- Publisher: Simulates a temperature sensor, publishes random values (15-30Β°C) every second
- Subscriber: Logs temperatures and triggers a warning if temp > 25Β°C
Starter Code: Temperature Publisherβ
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
import random
class TemperatureSensor(Node):
def __init__(self):
super().__init__('temperature_sensor')
self.publisher_ = self.create_publisher(Float32, '/temperature', 10)
self.timer = self.create_timer(1.0, self.publish_temperature)
def publish_temperature(self):
msg = Float32()
msg.data = random.uniform(15.0, 30.0) # Simulate sensor reading
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing temperature: {msg.data:.2f}Β°C')
def main(args=None):
rclpy.init(args=args)
node = TemperatureSensor()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
Your Task: Write the subscriber node that:
- Subscribes to
/temperature - Logs each reading
- Prints a warning if temperature exceeds 25Β°C
Key Takeawaysβ
β
rclpy is the Python client library for ROS 2
β
Nodes are Python classes that inherit from rclpy.node.Node
β
Publishers send messages with create_publisher() and timers
β
Subscribers receive messages with create_subscription() and callbacks
β
package.xml defines ROS 2 dependencies
β
setup.py defines Python package structure and entry points
What's Next?β
You've learned to write Python nodes that publish and subscribe to topics. The next chapter introduces URDF (Unified Robot Description Format)βthe XML language for defining robot bodies, which ROS 2 uses for visualization, simulation, and control.