Adding a New Modality End-to-End¶
This guide walks through adding a completely new modality to EmbodiedAgents – from ROS message definition through to a working component. Read Custom ROS Message Types, Creating a Custom Component, and Creating a Custom Model Client first; this guide ties all three layers together.
Overview¶
A “modality” in EmbodiedAgents is a complete data pipeline: a ROS message carries the data, a SupportedType wrapper bridges it to Python, a callback deserializes incoming messages, and a component produces or consumes the data via a model client.
Adding a new modality touches these files:
Layer |
File(s) |
What you add |
|---|---|---|
ROS message |
|
Message definition, build registration |
Callback |
|
Deserialization from ROS to Python |
SupportedType |
|
Type wrapper, convert method, registration |
UI (optional) |
|
Visualization rendering |
Config |
|
Component configuration class |
Component |
|
Processing logic |
Component registration |
|
Export |
The rest of this guide walks through each layer using a concrete example: Haptic Sensing – a component that processes tactile sensor data through an ML model and publishes grasp quality predictions.
Step 1: Define the ROS Message¶
Create a .msg file in the automatika_embodied_agents package:
# msg/HapticReading.msg
float32[] pressures
float32[] temperatures
uint32 sensor_id
std_msgs/Header header
Register it in CMakeLists.txt:
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/HapticReading.msg"
# ... existing messages ...
DEPENDENCIES std_msgs sensor_msgs
)
Build the package:
cd <workspace_root>
colcon build --packages-select automatika_embodied_agents
source install/setup.bash
After building, the message is available as automatika_embodied_agents.msg.HapticReading.
Step 2: Create the Callback¶
Add a callback class to agents/callbacks.py. The callback converts the raw ROS message into Python data that components work with:
import numpy as np
from ros_sugar.io.callbacks import GenericCallback
class HapticReadingCallback(GenericCallback):
"""Callback for HapticReading messages."""
def _get_output(self, **_):
"""Convert HapticReading ROS message to a numpy array."""
if self.msg is None:
return None
pressures = np.array(self.msg.pressures, dtype=np.float32)
temperatures = np.array(self.msg.temperatures, dtype=np.float32)
# Stack into a single feature array: shape (2, N_sensors)
return np.stack([pressures, temperatures], axis=0)
Key points:
Inherit from
GenericCallback(orTextCallbackfor string data).Implement
_get_output(self, **_)– this is whatcallback.get_output()returns to the component.Handle the
self.msg is Nonecase (no message received yet).Optionally implement
_get_ui_content(self, **_)to return rendered HTML or JPEG-encoded bytes for UI visualization.
Handling Fixed Inputs¶
If your modality should support FixedInput (static data from a file), check for it in the callback:
def _get_output(self, **_):
# FixedInput case: self.msg is a string path
if isinstance(self.msg, str):
return np.load(self.msg)
if self.msg is None:
return None
# Normal ROS message case
return np.stack([
np.array(self.msg.pressures, dtype=np.float32),
np.array(self.msg.temperatures, dtype=np.float32),
], axis=0)
Step 3: Create the SupportedType Wrapper¶
Add the wrapper class in agents/ros.py. This bridges the ROS message, the callback, and the publish-side conversion:
from automatika_embodied_agents.msg import HapticReading as ROSHapticReading
from .callbacks import HapticReadingCallback
class HapticReading(SupportedType):
"""Wraps automatika_embodied_agents/msg/HapticReading."""
_ros_type = ROSHapticReading
callback = HapticReadingCallback
@classmethod
def convert(cls, output, **_):
"""Convert a numpy array back to a HapticReading ROS message."""
msg = ROSHapticReading()
msg.pressures = output[0].tolist()
msg.temperatures = output[1].tolist()
return msg
Then register it by adding to the agent_types list at the bottom of the same file:
agent_types = [
StreamingString,
Video,
Detections,
# ... existing types ...
HapticReading, # <-- add here
]
add_additional_datatypes(agent_types)
And export it in __all__:
__all__ = [
# ... existing exports ...
"HapticReading",
]
Lazy-Loading External Message Types¶
If your ROS message comes from an external package that might not be installed, use get_ros_type() instead of _ros_type:
class HapticReading(SupportedType):
callback = HapticReadingCallback
@classmethod
def get_ros_type(cls):
from some_external_pkg.msg import HapticReading as ROSHapticReading
return ROSHapticReading
@classmethod
def convert(cls, output, **_):
ros_type = cls.get_ros_type()
msg = ros_type()
msg.pressures = output[0].tolist()
return msg
See RGBD in agents/ros.py for a real example of this pattern.
Step 4: Add UI Visualization (Optional)¶
If your modality has meaningful visual output, register a UI element in agents/ui_elements.py:
from .ros import HapticReading
def _log_haptic_element(logging_card, output, data_src: str):
"""Render haptic data as a text summary in the logging card."""
summary = f"Sensor pressures: mean={output[0].mean():.2f}, max={output[0].max():.2f}"
return _log_text_element(logging_card, summary, data_src)
OUTPUT_ELEMENTS[HapticReading] = _log_haptic_element
Step 5: Define the Component Configuration¶
Add a config class to agents/config.py:
from attrs import define, field
@define(kw_only=True)
class HapticConfig(ModelComponentConfig):
"""Configuration for the Haptic Sensing component."""
pressure_threshold: float = field(
default=0.5,
validator=base_validators.in_range(min_value=0.0, max_value=1.0),
)
window_size: int = field(default=10, validator=base_validators.gt(0))
enable_local_model: bool = field(default=False)
device_local_model: Literal["cpu", "cuda"] = field(default="cpu")
def _get_inference_params(self):
return {
"pressure_threshold": self.pressure_threshold,
"window_size": self.window_size,
}
Key points:
Always use
@define(kw_only=True).Extend
ModelComponentConfigfor model-based components, orBaseComponentConfigfor pure data-processing components.Implement
_get_inference_params()to return the dict passed to the model at inference time.Use
base_validatorsfor field validation.Add
enable_local_modelanddevice_local_modelfields if you plan to support local inference.
Step 6: Build the Component¶
Create agents/components/haptic.py:
from typing import Any, Dict, List, Optional, Sequence, Type, Union
from types import NoneType
import numpy as np
from agents.components.model_component import ModelComponent
from agents.clients.model_base import ModelClient
from agents.config import HapticConfig
from agents.ros import (
Topic,
FixedInput,
HapticReading,
String,
SupportedType,
Event,
)
class Haptic(ModelComponent):
"""A component that processes tactile sensor data through an ML model.
This capability enables grasp quality estimation, texture
classification, and contact event detection.
"""
def __init__(
self,
inputs: Optional[Sequence[Union[Topic, FixedInput]]] = None,
outputs: Optional[Sequence[Topic]] = None,
model_client: Optional[ModelClient] = None,
config: Optional[HapticConfig] = None,
trigger: Union[Topic, List[Topic], float, Event, NoneType] = 1.0,
component_name: str = "haptic",
**kwargs,
):
# Declare allowed I/O before super().__init__
self.allowed_inputs = {
"Required": [HapticReading],
}
self.allowed_outputs = {
"Required": [String], # Grasp quality prediction
"Optional": [HapticReading], # Processed/filtered readings
}
self.handled_outputs: List[Type[SupportedType]] = [String]
if not config:
config = HapticConfig()
super().__init__(
inputs=inputs,
outputs=outputs,
model_client=model_client,
config=config,
trigger=trigger,
component_name=component_name,
**kwargs,
)
self._reading_buffer: List[np.ndarray] = []
def _create_input(self, *args, **kwargs) -> Optional[Dict[str, Any]]:
"""Assemble inference input from buffered haptic readings."""
reading = None
for cb in self.trig_callbacks.values():
reading = cb.get_output()
if reading is None:
for cb in self.callbacks.values():
reading = cb.get_output()
if reading is None:
self.get_logger().warning("No haptic reading received yet")
return None
self._reading_buffer.append(reading)
if len(self._reading_buffer) > self.config.window_size:
self._reading_buffer = self._reading_buffer[-self.config.window_size:]
# Stack readings into a time-series array
window = np.stack(self._reading_buffer, axis=0)
return {
"readings": window,
**self.inference_params,
}
def _execution_step(self, **kwargs):
"""Main loop: buffer readings, run inference, publish prediction."""
inference_input = self._create_input()
if inference_input is None:
return
result = self._call_inference(inference_input)
if result is None:
return
self._publish(result)
def _warmup(self, *args, **kwargs):
"""Send dummy data to warm up the model."""
dummy = np.zeros((self.config.window_size, 2, 16), dtype=np.float32)
self._call_inference({"readings": dummy, **self.inference_params})
def _handle_websocket_streaming(self) -> Optional[Any]:
"""Not used -- haptic inference is not a streaming task."""
pass
Register the Component¶
Export it in agents/components/__init__.py:
from .haptic import Haptic
__all__ = [
# ... existing exports ...
"Haptic",
]
Step 7: Use It¶
from agents.components import Haptic
from agents.config import HapticConfig
from agents.clients.roboml import RoboMLHTTPClient
from agents.models import RoboMLModel
from agents.ros import Topic, Launcher
# Topics
tactile = Topic(name="/tactile_sensor", msg_type="HapticReading")
grasp_quality = Topic(name="/grasp_quality", msg_type="String")
# Model
model = RoboMLModel(name="grasp_net", checkpoint="grasp-quality-v1")
client = RoboMLHTTPClient(model)
# Component
haptic = Haptic(
inputs=[tactile],
outputs=[grasp_quality],
model_client=client,
trigger=tactile,
config=HapticConfig(pressure_threshold=0.3, window_size=20),
)
# Launch
launcher = Launcher()
launcher.add_pkg(components=[haptic])
launcher.bringup()
Checklist¶
When adding a new modality, verify:
.msgfile defined and registered inCMakeLists.txtPackage builds cleanly with
colcon buildCallback class in
agents/callbacks.pywith_get_output()implementedSupportedTypewrapper inagents/ros.pywith_ros_type,callback, andconvert()Type added to
agent_typeslist and__all__inagents/ros.pyConfig class in
agents/config.pywith_get_inference_params()Component in
agents/components/with all abstract methods implementedComponent exported in
agents/components/__init__.pyUI element registered in
agents/ui_elements.py(if applicable)