Skip to main content

Python Agents: Controlling Robots with rclpy

Why Python for Robotics?​

While robotics traditionally relied on C++ for performance, Python has become the lingua franca of Physical AI for three key reasons:

  1. AI/ML Integration: PyTorch, TensorFlow, and HuggingFace models are Python-first
  2. Rapid Prototyping: Test ideas in minutes, not hours
  3. Ecosystem: NumPy, OpenCV, SciPy provide rich scientific computing tools

rclpy (ROS Client Library for Python) bridges the gap between Python's ease of development and ROS 2's real-time communication infrastructure.

Performance Note

For perception and planning (where AI models run), Python is perfect. For ultra-low-latency motor control loops (1kHz+), you'd use C++ nodes. Most humanoid robots use both languages in the same system.


Anatomy of a ROS 2 Python Node​

Every ROS 2 Python node follows this structure:

import rclpy
from rclpy.node import Node

class MyNode(Node):
def __init__(self):
super().__init__('node_name')
# Initialize publishers, subscribers, timers, etc.

def callback_function(self):
# Handle events (timers, messages, etc.)
pass

def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node) # Keep node running
rclpy.shutdown()

if __name__ == '__main__':
main()

Key Components:

  1. Import rclpy: The ROS 2 Python library
  2. Create Node Class: Inherits from rclpy.node.Node
  3. Initialize Node: Call super().__init__('node_name') in constructor
  4. Setup Communication: Create publishers, subscribers, timers in __init__
  5. Spin Node: rclpy.spin(node) processes callbacks in a loop
  6. Shutdown: Clean up resources when node terminates

Publisher Node: Sending Commands​

Let's create a velocity commander that tells a robot to move forward:

Complete Code: velocity_publisher.py​

#!/usr/bin/env python3
"""
Velocity Publisher Node
Publishes movement commands to control a robot's wheels
"""

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist

class VelocityPublisher(Node):
"""
Node that publishes velocity commands at 10 Hz
Topic: /cmd_vel (geometry_msgs/Twist)
"""

def __init__(self):
super().__init__('velocity_publisher')

# Create publisher on /cmd_vel topic
self.publisher_ = self.create_publisher(
Twist, # Message type
'/cmd_vel', # Topic name
10 # Queue size (keep last 10 messages if subscriber is slow)
)

# Create timer that calls publish_velocity() every 0.1 seconds (10 Hz)
self.timer = self.create_timer(0.1, self.publish_velocity)

# Initialize velocity command
self.cmd = Twist()
self.cmd.linear.x = 0.5 # Move forward at 0.5 m/s
self.cmd.angular.z = 0.2 # Rotate at 0.2 rad/s (gentle left turn)

self.get_logger().info('Velocity Publisher started - Commanding robot to move')

def publish_velocity(self):
"""Callback function called by timer"""
self.publisher_.publish(self.cmd)
self.get_logger().info(f'Publishing: linear.x={self.cmd.linear.x}, angular.z={self.cmd.angular.z}')

def main(args=None):
rclpy.init(args=args)
node = VelocityPublisher()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Code Breakdown​

Lines 20-24: Create Publisher

self.publisher_ = self.create_publisher(Twist, '/cmd_vel', 10)
  • Twist: Message type (contains linear and angular velocity)
  • /cmd_vel: Standard ROS 2 topic name for robot velocity commands
  • 10: Queue size (buffers messages if network is slow)

Lines 26-27: Create Timer

self.timer = self.create_timer(0.1, self.publish_velocity)
  • Calls publish_velocity() every 0.1 seconds (10 Hz frequency)
  • Timers are ROS 2's preferred way to execute periodic tasks

Lines 29-32: Initialize Message

self.cmd = Twist()
self.cmd.linear.x = 0.5 # Forward velocity
self.cmd.angular.z = 0.2 # Rotational velocity
  • Twist has two 3D vectors: linear (m/s) and angular (rad/s)
  • For a wheeled robot, typically only linear.x and angular.z are used

Lines 37-39: Publish Message

self.publisher_.publish(self.cmd)
  • Sends message on /cmd_vel topic
  • All subscribers receive it instantly via DDS middleware

Subscriber Node: Receiving Sensor Data​

Now let's create a node that listens for messages on a topic:

Complete Code: sensor_listener.py​

#!/usr/bin/env python3
"""
Sensor Listener Node
Subscribes to sensor data and logs it
"""

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class SensorListener(Node):
"""
Node that subscribes to /sensor_data topic
Topic: /sensor_data (std_msgs/String)
"""

def __init__(self):
super().__init__('sensor_listener')

# Create subscriber
self.subscription = self.create_subscription(
String, # Message type
'/sensor_data', # Topic name
self.listener_callback, # Callback function
10 # Queue size
)

self.get_logger().info('Sensor Listener started - Waiting for data...')

def listener_callback(self, msg):
"""
Called automatically whenever a message arrives on /sensor_data

Args:
msg (std_msgs/String): The received message
"""
self.get_logger().info(f'Received: "{msg.data}"')

# Example: Parse sensor data and take action
if "warning" in msg.data.lower():
self.get_logger().warn('⚠️ SENSOR ALERT DETECTED!')

def main(args=None):
rclpy.init(args=args)
node = SensorListener()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Key Differences from Publisher​

1. Subscriber Creation (Lines 21-26)

self.subscription = self.create_subscription(
String,
'/sensor_data',
self.listener_callback, # ← Callback function
10
)
  • You provide a callback function that ROS 2 calls when messages arrive
  • No timer neededβ€”callbacks are event-driven

2. Callback Function (Lines 30-41)

def listener_callback(self, msg):
self.get_logger().info(f'Received: "{msg.data}"')
  • The msg parameter contains the received data
  • This function runs asynchronously whenever a message arrives
Best Practice

Keep callbacks fastβ€”don't do heavy computation here. If processing takes >1ms, use a separate thread or queue the data for later processing.


Running Your Nodes​

Step 1: Make Scripts Executable​

chmod +x velocity_publisher.py
chmod +x sensor_listener.py

Step 2: Run Publisher (Terminal 1)​

python3 velocity_publisher.py

Expected Output:

[INFO] [velocity_publisher]: Velocity Publisher started - Commanding robot to move
[INFO] [velocity_publisher]: Publishing: linear.x=0.5, angular.z=0.2
[INFO] [velocity_publisher]: Publishing: linear.x=0.5, angular.z=0.2

Step 3: Verify Topic (Terminal 2)​

ros2 topic echo /cmd_vel

Expected Output:

linear:
x: 0.5
y: 0.0
z: 0.0
angular:
x: 0.0
y: 0.0
z: 0.2
---

ROS 2 Package Structure​

To integrate nodes into a ROS 2 workspace, you need two key files:

1. package.xml - Package Metadata​

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_robot_controller</name>
<version>0.1.0</version>
<description>Python nodes for robot control</description>
<maintainer email="you@example.com">Your Name</maintainer>
<license>Apache-2.0</license>

<!-- Build tool dependency -->
<buildtool_depend>ament_python</buildtool_depend>

<!-- Runtime dependencies -->
<exec_depend>rclpy</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>std_msgs</exec_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>

Key Elements:

  • <name>: Package name (used in ros2 run <package_name> <node_name>)
  • <buildtool_depend>: Build system (ament_python for Python packages)
  • <exec_depend>: Runtime dependencies (rclpy, message types)

2. setup.py - Python Package Configuration​

from setuptools import setup

package_name = 'my_robot_controller'

setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Your Name',
maintainer_email='you@example.com',
description='Python nodes for robot control',
license='Apache-2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'velocity_publisher = my_robot_controller.velocity_publisher:main',
'sensor_listener = my_robot_controller.sensor_listener:main',
],
},
)

Critical Section: entry_points

'velocity_publisher = my_robot_controller.velocity_publisher:main'
  • Left side: Command name (what you type after ros2 run <package>)
  • Right side: Python module path and function to call

This allows you to run:

ros2 run my_robot_controller velocity_publisher

Directory Structure​

A complete ROS 2 Python package looks like this:

my_robot_controller/
β”œβ”€β”€ my_robot_controller/ # Python module
β”‚ β”œβ”€β”€ __init__.py # Empty file (marks as Python package)
β”‚ β”œβ”€β”€ velocity_publisher.py # Publisher node
β”‚ └── sensor_listener.py # Subscriber node
β”œβ”€β”€ resource/
β”‚ └── my_robot_controller # Empty marker file
β”œβ”€β”€ test/
β”‚ β”œβ”€β”€ test_copyright.py
β”‚ └── test_flake8.py
β”œβ”€β”€ package.xml # ROS 2 package metadata
└── setup.py # Python package configuration

Building and Running​

# From your ROS 2 workspace root
cd ~/ros2_ws

# Build the package
colcon build --packages-select my_robot_controller

# Source the workspace
source install/setup.bash

# Run nodes
ros2 run my_robot_controller velocity_publisher
ros2 run my_robot_controller sensor_listener

Hands-On Exercise: Temperature Monitor​

Challenge: Create a two-node system:

  1. Publisher: Simulates a temperature sensor, publishes random values (15-30Β°C) every second
  2. Subscriber: Logs temperatures and triggers a warning if temp > 25Β°C

Starter Code: Temperature Publisher​

import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
import random

class TemperatureSensor(Node):
def __init__(self):
super().__init__('temperature_sensor')
self.publisher_ = self.create_publisher(Float32, '/temperature', 10)
self.timer = self.create_timer(1.0, self.publish_temperature)

def publish_temperature(self):
msg = Float32()
msg.data = random.uniform(15.0, 30.0) # Simulate sensor reading
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing temperature: {msg.data:.2f}Β°C')

def main(args=None):
rclpy.init(args=args)
node = TemperatureSensor()
rclpy.spin(node)
rclpy.shutdown()

if __name__ == '__main__':
main()

Your Task: Write the subscriber node that:

  • Subscribes to /temperature
  • Logs each reading
  • Prints a warning if temperature exceeds 25Β°C

Key Takeaways​

βœ… rclpy is the Python client library for ROS 2
βœ… Nodes are Python classes that inherit from rclpy.node.Node
βœ… Publishers send messages with create_publisher() and timers
βœ… Subscribers receive messages with create_subscription() and callbacks
βœ… package.xml defines ROS 2 dependencies
βœ… setup.py defines Python package structure and entry points


What's Next?​

You've learned to write Python nodes that publish and subscribe to topics. The next chapter introduces URDF (Unified Robot Description Format)β€”the XML language for defining robot bodies, which ROS 2 uses for visualization, simulation, and control.


Further Reading​