You are an expert robotics software engineer and developer assistant for **EmbodiedAgents**, a production-grade Physical AI framework built on ROS2 by Automatika Robotics. You have been provided with the official EmbodiedAgents developer documentation, which covers the internal architecture, extension patterns, and custom type system. This documentation is structured with file headers like `## File: filename.md`. For usage documentation, tutorials, and example recipes, refer to the EMOS documentation at https://emos.automatikarobotics.com/. Your primary task is to answer developer questions about extending EmbodiedAgents: creating custom components, model clients, model wrappers, and ROS message types. Follow these rules rigorously: 1. **Strict Grounding:** Base your answers ONLY on the provided documentation. Do not invent, guess, or hallucinate components, config parameters, clients, or API methods that are not explicitly mentioned in the text. 2. **Handle Unknowns Gracefully:** If the user asks a question that cannot be answered using the provided context, politely inform them that the documentation does not cover that specific topic. Do not attempt to fill in the blanks using outside knowledge of ROS2, general AI, or generic Python libraries. 3. **Write Idiomatic Code:** When providing code examples, strictly follow the patterns shown in the documentation. Ensure accurate imports (e.g., `from agents.components import ...`, `from agents.ros import Topic, Launcher`), correct config instantiation, and proper use of the `Launcher` class for execution. 4. **Emphasize the Framework's Philosophy:** Keep in mind that EmbodiedAgents uses a pure Python, event-driven, and multi-modal architecture. Emphasize modularity, self-referential design, and production-readiness (fallback mechanisms, multiprocessing) where relevant. 5. **Cite Your Sources:** When explaining a concept or providing a solution, briefly mention the file (e.g., "According to `development/architecture.md`...") so the user knows where to read more. Think step-by-step before answering. Parse the user's request, search the provided documentation for relevant files, synthesize the solution, and format your response clearly using Markdown and well-commented Python code blocks. ## File: development/architecture.md ```markdown # Architecture Overview This document describes the internal architecture of EmbodiedAgents for developers who want to extend the framework, contribute new components, or understand how the pieces fit together. ## Component Hierarchy Every processing unit in EmbodiedAgents is a **component**. Components form a strict inheritance chain: ``` BaseComponent (ros_sugar) └── Component (agents.components.component_base) └── ModelComponent (agents.components.model_component) ├── LLM ├── VLM / MLLM ├── VLA ├── Vision ├── SpeechToText ├── TextToSpeech ├── SemanticRouter ├── MapEncoding └── VideoMessageMaker ``` ### BaseComponent `BaseComponent` is provided by [Sugarcoat](https://github.com/automatika-robotics/sugarcoat) and wraps a ROS2 Lifecycle Node. It manages the node lifecycle (configure, activate, deactivate, shutdown), subscriber/publisher creation, and the multiprocess execution model. You should never subclass `BaseComponent` directly. ### Component `Component` (defined in `agents.components.component_base`) adds: - **Input/output validation** via `allowed_inputs` and `allowed_outputs` dictionaries. - **Trigger system** for deciding when `_execution_step()` fires. - **Event/Action wiring** through `custom_on_configure()` and `activate_all_triggers()`. ### ModelComponent `ModelComponent` (defined in `agents.components.model_component`) extends `Component` with: - A `model_client` slot (`ModelClient` instance) initialized during the configure lifecycle phase. - Support for `additional_model_clients` and hot-swapping via `change_model_client()`. - `_call_inference()` dispatching to HTTP or WebSocket clients. - Output topic validation against `handled_outputs`. - Warmup logic. - Streaming support via a fast timer (`_handle_websocket_streaming()`). All specialized components (`LLM`, `VLM`, `Vision`, etc.) subclass `ModelComponent`. ## The `_execution_step()` Pattern Every concrete component must implement `_execution_step(**kwargs)`. This is the core processing callback that runs each time the component is triggered. The general flow inside a `ModelComponent._execution_step()` is: 1. **Gather inputs** -- read the latest data from all input callbacks. 2. **Create inference input** -- call `_create_input()` to assemble a dict suitable for the model client. 3. **Run inference** -- call `_call_inference(inference_input)`, which delegates to the `ModelClient`. 4. **Publish results** -- call `_publish(result)` to send output to all registered publishers. ```python def _execution_step(self, **kwargs): # 1. Read inputs from callbacks text = self.callbacks["text0"].get_output() # 2. Assemble inference dict inference_input = self._create_input(text) if inference_input is None: return # 3. Call the model result = self._call_inference(inference_input) if result is None: return # 4. Publish self._publish(result) ``` For non-model components (subclassing `Component` directly), `_execution_step()` performs custom logic without calling a model client. ## Input/Output Validation Components declare what topic types they accept using two class-level dictionaries: ```python self.allowed_inputs = { "Required": [Image, [String, Audio]], "Optional": [CompressedImage], } self.allowed_outputs = { "Required": [String], } ``` - **Required**: Each entry must have at least one matching topic. A list entry like `[String, Audio]` means "one of these types." - **Optional**: Accepted but not mandatory. Validation runs in `Component.__init__()` via `_validate_topics()`. It checks that every provided topic's `msg_type` is a subclass of at least one allowed type, and that all required types are covered. ## Trigger System The `trigger` parameter controls when `_execution_step()` fires: | Trigger Type | Value | Behavior | |---|---|---| | **Timed** | `float` (e.g., `1.0`) | Runs at a fixed frequency (Hz). Sets `ComponentRunType.TIMED`. | | **Topic** | `Topic` instance | Fires when a message arrives on that topic. The topic must be one of the component's inputs. Sets `ComponentRunType.EVENT`. | | **Multi-topic** | `List[Topic]` | Fires when any of the listed topics receives a message. | | **Event** | `Event` instance | Fires when an external event is raised. Wired via an `Action` in `custom_on_configure()`. | | **None** | `None` | Only valid for `ACTION_SERVER` or `SERVER` run types (e.g., VLA). | When a `Topic` trigger is set, the topic's callback is moved from `self.callbacks` to `self.trig_callbacks`. The trigger callback's `on_callback_execute()` is wired to call `_execution_step()` in `activate_all_triggers()`. ## Streaming Components that support streaming (LLM, TextToSpeech) use WebSocket-based model clients. The flow: 1. `ModelComponent.custom_on_configure()` detects `config.stream == True` with a `RoboMLWSClient`. 2. A fast timer (1ms period) is created calling `_handle_websocket_streaming()`. 3. Inference requests go into `self.req_queue`; responses come back via `self.resp_queue`. 4. The child component's `_handle_websocket_streaming()` reads partial results from the queue and publishes them incrementally (e.g., token-by-token for LLM, audio chunks for TTS). For HTTP clients, streaming is handled at the client level using generator-based responses. ## Configuration Chain All configuration uses the `attrs` library with the `@define` decorator: ``` BaseComponentConfig (ros_sugar) └── ModelComponentConfig (agents.config) ├── LLMConfig │ └── MLLMConfig (aliased as VLMConfig) ├── VLAConfig ├── VisionConfig ├── SpeechToTextConfig ├── TextToSpeechConfig ├── SemanticRouterConfig └── VideoMessageMakerConfig └── MapConfig (extends BaseComponentConfig directly) ``` Each config class: - Uses `@define(kw_only=True)` for explicit, keyword-only construction. - Declares fields with `field()`, including defaults, validators (from `base_validators`), and converters. - Implements `_get_inference_params() -> Dict` to extract the subset of parameters passed to the model client at inference time. The config is deep-copied at component init so that multiple component instances sharing the same config class do not interfere with each other. ## Model / Client / Component Relationship The three-layer pattern is central to the architecture: ``` Model (data class) --> ModelClient (connection logic) --> ModelComponent (ROS node) ``` - **Model** (`agents.models.Model`): An `attrs` `@define` class holding model metadata (name, checkpoint, platform-specific options). Its `_get_init_params()` returns a dict sent to the serving platform. - **ModelClient** (`agents.clients.model_base.ModelClient`): Manages the connection to a model serving platform. Implements `_check_connection()`, `_initialize()`, `_inference()`, `_deinitialize()`. Must be serializable for multiprocess execution. - **ModelComponent**: Holds a `ModelClient` instance, calls it during `_execution_step()`, and manages the ROS lifecycle around it. This separation means the same model can be served by different clients (Ollama, RoboML, GenericHTTP), and the same client can be used across different component types. ## Local Model Deployment Components that subclass `ModelComponent` can optionally run without a remote model client by enabling a built-in local model. This is controlled via `enable_local_model=True` in the component's config. ### How It Works When `enable_local_model` is set, the component's `custom_on_configure()` calls `_deploy_local_model()`, which instantiates a lightweight local inference wrapper. The `_call_inference()` dispatcher in `ModelComponent` automatically routes to the local model when no `model_client` is set: ``` ModelComponent._call_inference() ├── model_client (RoboML, Ollama, GenericHTTP, ...) └── local_model (LocalLLM, LocalVLM, LocalSTT, LocalTTS) ``` ### Runtime Backends Each component type uses a runtime optimized for edge deployment: | Component | Backend | Package | Default Model | |-----------|---------|---------|---------------| | LLM | llama.cpp | `llama-cpp-python` | Qwen3-0.6B (GGUF) | | MLLM/VLM | llama.cpp + MoondreamChatHandler | `llama-cpp-python` | Moondream2 (GGUF) | | Vision | ONNX Runtime | `onnxruntime` | DEIM (CVPR 2025) | | SpeechToText | sherpa-onnx (Whisper) | `sherpa-onnx` | Whisper tiny.en | | TextToSpeech | sherpa-onnx (Kokoro) | `sherpa-onnx` | Kokoro English | These backends require no PyTorch, no Transformers, and no heavy ML frameworks -- they are designed for robots and edge devices including NVIDIA Jetson. ### Local Model Wrappers The local model wrappers live in `agents/utils/` and follow a simple callable interface: - `LocalLLM(model_path, device, ncpu)` -- wraps `llama-cpp-python`, returns `{"output": str}` or `{"output": generator}` for streaming, with optional `"tool_calls"` - `LocalVLM(model_path, device, ncpu)` -- wraps `llama-cpp-python` with `MoondreamChatHandler`, accepts images as RGB numpy arrays - `LocalVisionModel(model_path, device, ncpu)` -- wraps `onnxruntime` for object detection, returns bounding boxes, labels, and scores - `LocalSTT(model_path, device, ncpu)` -- wraps `sherpa-onnx` `OfflineRecognizer`, accepts audio bytes or numpy arrays - `LocalTTS(model_path, device, ncpu)` -- wraps `sherpa-onnx` `OfflineTts`, returns WAV bytes ### Customizing Local Models Each config exposes a `local_model_path` field that accepts a HuggingFace repository ID or a local file path. Users can swap in any compatible model by setting this field: ```python config = LLMConfig( enable_local_model=True, local_model_path="bartowski/Llama-3.2-1B-Instruct-GGUF", # any GGUF model ) ``` For available STT and TTS models, see the [sherpa-onnx pretrained models catalog](https://k2-fsa.github.io/sherpa/onnx/pretrained_models/index.html). ``` ## File: development/custom_component.md ```markdown # Creating a Custom Component This guide walks through building a new EmbodiedAgents component from scratch. ## When to Subclass What Choose your base class based on whether your component needs a model client: | Base Class | Use When | |---|---| | `Component` | Your component performs pure data processing, transformations, or routing without calling an ML model. | | `ModelComponent` | Your component wraps an ML model and needs inference via a `ModelClient`. | Most custom components will subclass `ModelComponent`. Think of components as **capabilities**. Each built-in component represents a distinct capability: Vision sees, SpeechToText hears, TextToSpeech speaks, LLM reasons, VLM understands images. A good custom component adds a new capability that isn't already covered. ## Defining Allowed Inputs and Outputs Every component must declare what topic types it accepts. These are set as instance attributes before the `super().__init__()` call: ```python from agents.ros import SupportedType, String, Image, Audio class DepthEstimator(ModelComponent): def __init__(self, ...): self.allowed_inputs = { "Required": [Image], # Must have at least one Image input } self.allowed_outputs = { "Required": [Image], # Outputs a depth map as an Image } super().__init__(...) ``` ### Cardinality Rules - Each entry in the `"Required"` list must have at least one matching topic in the provided inputs/outputs. - A nested list like `[String, Audio]` means "at least one topic of type `String` **or** `Audio`." - `"Optional"` entries are accepted but not enforced. - Subtypes are matched: if `StreamingString` is a subclass of the allowed type, it passes validation. ## Implementing `_execution_step()` This is the core logic of your component. For `ModelComponent` subclasses, you must also implement `_create_input()`, `_warmup()`, and `_handle_websocket_streaming()`. ```python from abc import abstractmethod class DepthEstimator(ModelComponent): @abstractmethod def _execution_step(self, **kwargs): """Called each time the component is triggered.""" ... @abstractmethod def _create_input(self, *args, **kwargs): """Assemble the inference input dict from callback data.""" ... @abstractmethod def _warmup(self, *args, **kwargs): """Optional warmup call during configure phase.""" ... @abstractmethod def _handle_websocket_streaming(self): """Handle streaming responses from WebSocket clients.""" ... ``` For `Component` subclasses (no model client), you only need to implement `_execution_step()`. ## Configuration Class Pattern Define a config class using `attrs`: ```python from attrs import define, field from agents.config import ModelComponentConfig from agents.ros import base_validators @define(kw_only=True) class DepthEstimatorConfig(ModelComponentConfig): """Configuration for the Depth Estimator component.""" input_height: int = field(default=518, validator=base_validators.gt(0)) input_width: int = field(default=518, validator=base_validators.gt(0)) max_depth: float = field(default=20.0, validator=base_validators.gt(0.0)) def _get_inference_params(self): return { "input_height": self.input_height, "input_width": self.input_width, } ``` Key points: - Always use `@define(kw_only=True)`. - Extend `ModelComponentConfig` (which itself extends `BaseComponentConfig`). - Implement `_get_inference_params()` to return the dict passed to the model at inference time. - Use `base_validators` for field validation (`gt`, `in_range`, `in_`). ### Adding Local Model Support If your custom component should support local inference (without a remote model client), add the standard local model fields to your config: ```python @define(kw_only=True) class MyConfig(ModelComponentConfig): enable_local_model: bool = field(default=False) device_local_model: Literal["cpu", "cuda"] = field(default="cuda") ncpu_local_model: int = field(default=1) local_model_path: Optional[str] = field(default="your/default-model") ``` Then implement `_deploy_local_model()` in your component to instantiate the appropriate local wrapper. See `agents/components/llm.py` for a reference implementation. ## Wiring the Trigger The trigger determines when your component's `_execution_step()` fires. Set it in the constructor: ```python # Trigger on a specific input topic depth = DepthEstimator( inputs=[camera], outputs=[depth_map], model_client=my_client, trigger=camera, # fires when a new frame arrives ) # Trigger on a timer (2 Hz) depth = DepthEstimator( ..., trigger=2.0, # fires twice per second ) # Trigger on an external event from agents.ros import Event my_event = Event(name="estimate_depth") depth = DepthEstimator( ..., trigger=my_event, ) ``` When a `Topic` is used as trigger, it must be one of the component's inputs. Internally, the topic's callback is moved from `self.callbacks` to `self.trig_callbacks`, and `_execution_step()` is wired as a post-callback. ## Complete Skeleton: A Depth Estimation Component Below is a complete, working skeleton for a component that takes a camera image, sends it to a depth estimation model, and publishes the depth map. This represents a distinct perception capability -- monocular depth estimation -- that is not covered by the built-in Vision (detection), VLM (visual Q&A), or other components. ```python from typing import Any, Dict, List, Optional, Sequence, Type, Union from types import NoneType import numpy as np from attrs import define, field from agents.components.model_component import ModelComponent from agents.clients.model_base import ModelClient from agents.config import ModelComponentConfig from agents.ros import ( Topic, FixedInput, Image, SupportedType, Event, base_validators, ) # --- Config --- @define(kw_only=True) class DepthEstimatorConfig(ModelComponentConfig): """Configuration for the Depth Estimator component.""" input_height: int = field(default=518, validator=base_validators.gt(0)) input_width: int = field(default=518, validator=base_validators.gt(0)) max_depth: float = field(default=20.0, validator=base_validators.gt(0.0)) def _get_inference_params(self) -> Dict: return { "input_height": self.input_height, "input_width": self.input_width, } # --- Component --- class DepthEstimator(ModelComponent): """A component that estimates depth from monocular camera images. This capability enables spatial understanding for navigation, obstacle avoidance, and manipulation tasks. """ def __init__( self, inputs: Optional[Sequence[Union[Topic, FixedInput]]] = None, outputs: Optional[Sequence[Topic]] = None, model_client: Optional[ModelClient] = None, config: Optional[DepthEstimatorConfig] = None, trigger: Union[Topic, List[Topic], float, Event, NoneType] = 1.0, component_name: str = "depth_estimator", **kwargs, ): # Declare allowed I/O before super().__init__ self.allowed_inputs = { "Required": [Image], } self.handled_outputs: List[Type[SupportedType]] = [Image] if not config: config = DepthEstimatorConfig() super().__init__( inputs=inputs, outputs=outputs, model_client=model_client, config=config, trigger=trigger, component_name=component_name, **kwargs, ) def _create_input(self, *args, **kwargs) -> Optional[Dict[str, Any]]: """Assemble inference input from the latest camera frame.""" image = None # Read from trigger callback for cb in self.trig_callbacks.values(): image = cb.get_output() # Fall back to regular callbacks if image is None: for cb in self.callbacks.values(): image = cb.get_output() if image is None: self.get_logger().warning("No image received yet") return None return { "images": [image], **self.inference_params, } def _execution_step(self, **kwargs): """Main processing loop: receive image, estimate depth, publish.""" inference_input = self._create_input() if inference_input is None: return result = self._call_inference(inference_input) if result is None: return self._publish(result) def _warmup(self, *args, **kwargs): """Send a dummy image to warm up the model.""" dummy = np.zeros( (self.config.input_height, self.config.input_width, 3), dtype=np.uint8 ) self._call_inference({"images": [dummy], **self.inference_params}) def _handle_websocket_streaming(self) -> Optional[Any]: """Not used -- depth estimation is not a streaming task.""" pass ``` ### Usage ```python from agents.clients.ollama import OllamaClient from agents.models import OllamaModel from agents.ros import Topic, Launcher camera = Topic(name="camera", msg_type="Image") depth_map = Topic(name="depth", msg_type="Image") model = OllamaModel(name="depth_model", checkpoint="depth-anything-v2") client = OllamaClient(model) depth = DepthEstimator( inputs=[camera], outputs=[depth_map], model_client=client, trigger=camera, config=DepthEstimatorConfig(max_depth=10.0), ) launcher = Launcher() launcher.add_pkg(components=[depth]) launcher.bringup() ``` ``` ## File: development/custom_client.md ```markdown # Creating a Custom Model Client Model clients in EmbodiedAgents manage the connection to a model serving platform. This guide covers the `ModelClient` base contract, serialization requirements, tool calling support, the `DBClient` variant, and a complete skeleton for an HTTP-based client. ## ModelClient Base Contract All model clients must subclass `ModelClient` (defined in `agents.clients.model_base`) and implement four abstract methods: ```python from agents.clients.model_base import ModelClient class MyClient(ModelClient): def _check_connection(self) -> None: """Verify that the serving platform is reachable. Called during the component's configure lifecycle phase. Raise an exception if the connection cannot be established.""" ... def _initialize(self) -> None: """Initialize the model on the serving platform. Called after _check_connection() during configure. Only runs if init_on_activation is True (default).""" ... def _inference(self, inference_input: dict) -> dict | None: """Run a single inference call. Takes a dict assembled by the component's _create_input(). Returns a dict with at least an 'output' key, or None on failure. For streaming, may return a Generator as the 'output' value.""" ... def _deinitialize(self) -> None: """Clean up model resources on the serving platform. Called during the component's deactivate lifecycle phase.""" ... ``` ### Constructor The `ModelClient.__init__()` accepts: | Parameter | Type | Description | |---|---|---| | `model` | `Model` or `Dict` | The model definition (attrs class or deserialized dict). | | `host` | `str` or `None` | Hostname of the serving platform. | | `port` | `int` or `None` | Port of the serving platform. | | `inference_timeout` | `int` | Timeout in seconds for inference calls. Default: 30. | | `init_on_activation` | `bool` | Whether to call `_initialize()` on activation. Default: True. | | `logging_level` | `str` | Logging level (e.g., "info", "debug"). | Your subclass constructor must call `super().__init__()`: ```python def __init__(self, model, host="127.0.0.1", port=8080, **kwargs): super().__init__(model=model, host=host, port=port, **kwargs) # Custom init here ``` ## Serialization for Multiprocess Execution EmbodiedAgents components run in separate processes. The model client must be serializable so it can be reconstructed in the child process. The base `ModelClient.serialize()` method returns a dict: ```python { "client_type": "MyClient", "model": { "model_name": "my_model", "model_type": "OllamaModel", "init_timeout": None, "model_init_params": {"checkpoint": "llama3.2:3b", ...}, }, "host": "127.0.0.1", "port": 8080, "init_on_activation": True, "logging_level": "INFO", "inference_timeout": 30, } ``` If your client stores additional state (e.g., API keys, custom headers), override `serialize()` to include them, and handle deserialization from a `Dict` in your `__init__()`: ```python def __init__(self, model, host=None, port=None, api_key=None, **kwargs): super().__init__(model=model, host=host, port=port, **kwargs) self.api_key = api_key def serialize(self): base = super().serialize() base["api_key"] = self.api_key return base ``` ## Tool Calling Support If your model serving platform supports function/tool calling, override the `supports_tool_calls` property: ```python @property def supports_tool_calls(self) -> bool: return True ``` When `supports_tool_calls` returns `True`, components like `LLM` will include tool descriptions in the inference payload. Your `_inference()` method must then handle the tool calling protocol (parsing tool call responses, executing tools, and returning final results). The default implementation returns `False`. ## DBClient for Vector Databases `DBClient` (defined in `agents.clients.db_base`) is the equivalent of `ModelClient` for vector database connections. It is used by `MapEncoding` and `SemanticRouter` components. The abstract methods: ```python from agents.clients.db_base import DBClient class MyDBClient(DBClient): def _check_connection(self) -> None: ... def _initialize(self) -> None: ... def _add(self, db_input: dict) -> dict | None: ... def _conditional_add(self, db_input: dict) -> dict | None: ... def _query(self, db_input: dict) -> dict | None: ... def _metadata_query(self, db_input: dict) -> dict | None: ... def _deinitialize(self) -> None: ... ``` The constructor takes a `DB` instance (from `agents.vectordbs`) instead of a `Model`. ## Complete Skeleton: Custom HTTP Client Below is a complete skeleton for a client that communicates with a custom HTTP inference server. ```python from typing import Any, Dict, Optional, Union, MutableMapping import httpx from agents.clients.model_base import ModelClient from agents.models import Model class CustomHTTPClient(ModelClient): """Client for a custom HTTP model serving endpoint.""" def __init__( self, model: Union[Model, Dict], host: str = "127.0.0.1", port: int = 5000, inference_timeout: int = 30, api_key: Optional[str] = None, logging_level: str = "info", **kwargs, ): super().__init__( model=model, host=host, port=port, inference_timeout=inference_timeout, logging_level=logging_level, **kwargs, ) self.api_key = api_key self.base_url = f"http://{self.host}:{self.port}" self._client: Optional[httpx.Client] = None def _check_connection(self) -> None: """Ping the server health endpoint.""" try: resp = httpx.get( f"{self.base_url}/health", timeout=5.0, ) resp.raise_for_status() self.logger.info("Server is reachable") except httpx.HTTPError as e: raise ConnectionError( f"Cannot reach server at {self.base_url}: {e}" ) def _initialize(self) -> None: """Load the model on the server and create the HTTP client.""" headers = {} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" self._client = httpx.Client( base_url=self.base_url, timeout=self.inference_timeout, headers=headers, ) # Send model init params to the server resp = self._client.post( "/v1/models/load", json=self.model_init_params, ) resp.raise_for_status() self.logger.info(f"Model {self.model_name} initialized on server") def _inference( self, inference_input: Dict[str, Any] ) -> Optional[MutableMapping]: """Send an inference request and return the result.""" if not self._client: self.logger.error("Client not initialized") return None try: resp = self._client.post( "/v1/inference", json=inference_input, ) resp.raise_for_status() data = resp.json() return {"output": data.get("result", "")} except httpx.HTTPError as e: self.logger.error(f"Inference failed: {e}") return None def _deinitialize(self) -> None: """Unload the model and close the HTTP client.""" if self._client: try: self._client.post("/v1/models/unload") except httpx.HTTPError: pass self._client.close() self._client = None self.logger.info(f"Model {self.model_name} deinitialized") def serialize(self) -> Dict: """Include api_key in serialization for multiprocess.""" base = super().serialize() base["api_key"] = self.api_key return base ``` ### Usage ```python from agents.models import GenericLLM from agents.components import LLM from agents.ros import Topic, Launcher model = GenericLLM(name="my_model", checkpoint="my-custom-model") client = CustomHTTPClient(model, host="10.0.0.5", port=5000, api_key="sk-...") text_in = Topic(name="input_text", msg_type="String") text_out = Topic(name="output_text", msg_type="String") llm = LLM( inputs=[text_in], outputs=[text_out], model_client=client, trigger=text_in, ) launcher = Launcher() launcher.add_pkg(components=[llm]) launcher.bringup() ``` ``` ## File: development/custom_model.md ```markdown # Creating a Custom Model Wrapper Model wrappers in EmbodiedAgents are data classes that describe an ML model's identity and initialization parameters. They are passed to `ModelClient` instances, which use them to load and configure the model on the serving platform. This guide covers the `attrs` pattern, required fields, and how to wrap a new model. ## The `attrs` `@define` Pattern All model classes use the [attrs](https://www.attrs.org/) library: ```python from attrs import define, field from agents.ros import BaseAttrs @define(kw_only=True) class MyModel(Model): ... ``` Key conventions: - **Always use `@define(kw_only=True)`**: this forces keyword-only construction, preventing positional argument bugs. - **Inherit from `Model`** (defined in `agents.models`), which itself inherits from `BaseAttrs`. - **Use `field()`** for every attribute, with `default`, `validator`, and `converter` as needed. - **Validators** from `base_validators` (imported from `agents.ros`) provide common checks: `gt()`, `in_range()`, `in_()`. ## Required Fields The `Model` base class defines three fields that every model must have: ```python @define(kw_only=True) class Model(BaseAttrs): name: str # Arbitrary identifier checkpoint: str # Model checkpoint / HuggingFace repo ID init_timeout: Optional[int] = field(default=None) # Timeout for initialization ``` - `name`: A user-chosen identifier. Used for logging and to key the model in the client. - `checkpoint`: The model identifier on the serving platform (e.g., an Ollama tag, a HuggingFace repo, or a custom endpoint path). - `init_timeout`: Optional timeout for model loading on the server side. ## Implementing `_get_init_params()` Every model subclass must override `_get_init_params()` to return a dictionary of parameters sent to the serving platform during initialization: ```python def _get_init_params(self) -> Dict: return { "checkpoint": self.checkpoint, "my_custom_param": self.my_custom_param, } ``` This dict is stored in the client as `self.model_init_params` and included in serialization. The serving platform uses these parameters to load the model correctly. ## Existing Model Hierarchy ``` Model ├── LLM (adds quantization) │ ├── OllamaModel (adds port, options) │ ├── TransformersLLM (HuggingFace checkpoint) │ │ └── TransformersMLLM │ └── RoboBrain2 ├── GenericLLM (OpenAI-compatible) │ └── GenericMLLM ├── GenericTTS ├── GenericSTT ├── Whisper (adds compute_type) ├── TransformersTTS (adds voice, vocoder_checkpoint) ├── VisionModel (adds tracking options) └── LeRobotPolicy (adds policy_type, features, actions) ``` ## Example: Wrapping a New Platform Suppose you want to integrate a hypothetical "TurboServe" platform that serves LLMs and requires a `precision` and `max_batch_size` parameter. ### Step 1: Define the Model ```python from typing import Optional, Dict, Any from attrs import define, field from agents.models import Model from agents.ros import base_validators @define(kw_only=True) class TurboServeModel(Model): """Model configuration for the TurboServe inference platform. :param name: Arbitrary model name. :type name: str :param checkpoint: Model identifier on TurboServe. :type checkpoint: str :param precision: Inference precision. One of "fp16", "fp32", "int8". :type precision: str :param max_batch_size: Maximum batch size for inference. :type max_batch_size: int :param init_timeout: Timeout for model loading in seconds. :type init_timeout: int, optional """ precision: str = field( default="fp16", validator=base_validators.in_(["fp16", "fp32", "int8"]), ) max_batch_size: int = field( default=1, validator=base_validators.gt(0), ) def _get_init_params(self) -> Dict: return { "checkpoint": self.checkpoint, "precision": self.precision, "max_batch_size": self.max_batch_size, } ``` ### Step 2: Use with a Client The model is passed to a `ModelClient` subclass. The client reads `self.model_init_params` (set from `_get_init_params()`) to configure the serving platform: ```python model = TurboServeModel( name="turbo_llama", checkpoint="meta-llama/Llama-3.1-8B", precision="fp16", max_batch_size=4, ) client = TurboServeClient(model, host="gpu-server.local", port=9000) ``` ### Step 3: Adding Validation with `__attrs_post_init__` For complex validation or derived fields, use `__attrs_post_init__()`: ```python @define(kw_only=True) class TurboServeModel(Model): precision: str = field(default="fp16") max_batch_size: int = field(default=1) _effective_memory: Optional[int] = field(default=None) def __attrs_post_init__(self): # Compute derived field based on precision memory_map = {"fp32": 4, "fp16": 2, "int8": 1} self._effective_memory = memory_map.get(self.precision, 2) def _get_init_params(self) -> Dict: return { "checkpoint": self.checkpoint, "precision": self.precision, "max_batch_size": self.max_batch_size, "effective_memory": self._effective_memory, } ``` ### Step 4: Options Dict Pattern For platforms with many tunable parameters, use a validated `options` dict (following the `OllamaModel` pattern): ```python @define(kw_only=True) class TurboServeModel(Model): precision: str = field(default="fp16") options: Optional[Dict[str, Any]] = field(default=None) @options.validator def _validate_options(self, _, value): if value is None: return allowed_keys = { "temperature": float, "top_p": float, "max_tokens": int, } for key, val in value.items(): if key not in allowed_keys: raise ValueError(f"Invalid option: {key}") if not isinstance(val, allowed_keys[key]): raise TypeError( f"Option '{key}' must be {allowed_keys[key].__name__}" ) def _get_init_params(self) -> Dict: return { "checkpoint": self.checkpoint, "precision": self.precision, "options": self.options, } ``` ``` ## File: development/messages.md ```markdown # Custom ROS Message Types EmbodiedAgents defines custom ROS2 message and action types for data that does not fit standard ROS messages. This guide covers the existing types, how they connect to the Python type system via `SupportedType`, and how to add new message types. ## Existing Message Types The following custom messages are defined in the `automatika_embodied_agents` package under `msg/`: | Message | File | Description | |---|---|---| | `Point2D` | `Point2D.msg` | A 2D point with `x` and `y` float fields. Used as a building block in other messages. | | `Bbox2D` | `Bbox2D.msg` | A 2D bounding box with `top_left_x`, `top_left_y`, `bottom_right_x`, `bottom_right_y`. | | `Detections2D` | `Detections2D.msg` | Object detection results: arrays of `scores`, `labels`, `boxes` (Bbox2D[]), plus optional `image` and `depth`. | | `Detections2DMultiSource` | `Detections2DMultiSource.msg` | An array of `Detections2D` for multi-camera setups. | | `Trackings` | `Trackings.msg` | Tracked object data: `ids`, `labels`, `boxes`, `centroids`, `estimated_velocities`, plus source image. | | `TrackingsMultiSource` | `TrackingsMultiSource.msg` | An array of `Trackings` for multi-camera tracking. | | `StreamingString` | `StreamingString.msg` | A string with `stream` (bool) and `done` (bool) flags for token-by-token LLM output. | | `Video` | `Video.msg` | A sequence of `Image` and/or `CompressedImage` frames bundled as a single message. | | `PointsOfInterest` | `PointsOfInterest.msg` | A list of `Point2D` coordinates on an image, plus the source image/depth. | ### Action Types | Action | File | Description | |---|---|---| | `VisionLanguageAction` | `VisionLanguageAction.action` | ROS2 action for VLA inference. Defines goal, feedback, and result for vision-language-action loops. | ## The `SupportedType` System Every ROS message used as a topic type in EmbodiedAgents must have a corresponding Python `SupportedType` wrapper class. This class (from Sugarcoat's `ros_sugar.supported_types`) provides three things: 1. **`_ros_type`**: Class attribute pointing to the actual ROS message class. 2. **`callback`**: A callback class that handles deserialization and buffering of incoming messages. 3. **`convert()` class method**: Converts Python data (dicts, numpy arrays, etc.) into a ROS message for publishing. Example from the codebase -- the `StreamingString` wrapper: ```python from ros_sugar.supported_types import SupportedType class StreamingString(SupportedType): callback = StreamingStringCallback _ros_type = ROSStreamingString @classmethod def convert(cls, output: str, stream: bool = False, done: bool = True, **_): msg = ROSStreamingString() msg.stream = stream msg.done = done msg.data = output return msg ``` ### Registration with `add_additional_datatypes()` After defining wrapper classes, they must be registered so that `Topic(msg_type="StreamingString")` resolves correctly: ```python from ros_sugar.supported_types import add_additional_datatypes agent_types = [StreamingString, Video, Detections, ...] add_additional_datatypes(agent_types) ``` This is done at the bottom of `agents/ros.py` and makes these types available as string identifiers in `Topic` definitions. ## When to Create New Message Types Create a new custom message when: - Standard ROS messages (`String`, `Image`, `Odometry`, etc.) do not capture the data structure you need. - Your component produces structured output that downstream components must parse (e.g., detection results with labels and bounding boxes). - You need to bundle multiple pieces of data atomically (e.g., image + detections, or video frames). Do **not** create a new message if a standard type suffices -- use `String`, `Image`, `Audio`, `Odometry`, `OccupancyGrid`, `JointState`, `JointTrajectory`, etc. from the existing supported types. ## Step-by-Step: Adding a New Message Type ### Step 1: Define the `.msg` File Create a new `.msg` file in the `msg/` directory of the `automatika_embodied_agents` package: ``` # msg/SemanticLabel.msg string label float32 confidence sensor_msgs/Image image ``` ### Step 2: Register in CMakeLists.txt Add the new message file to the `rosidl_generate_interfaces` call in `CMakeLists.txt`: ```cmake rosidl_generate_interfaces(${PROJECT_NAME} "msg/SemanticLabel.msg" # ... existing messages ... DEPENDENCIES std_msgs sensor_msgs ) ``` ### Step 3: Build the Package ```bash cd colcon build --packages-select automatika_embodied_agents source install/setup.bash ``` After building, the ROS message class will be available as `automatika_embodied_agents.msg.SemanticLabel`. ### Step 4: Create a Callback Class Define a callback class in `agents/callbacks.py` (or a new file) that extends the base callback from Sugarcoat. The callback handles deserializing the ROS message into Python data: ```python from ros_sugar.io.callbacks import GenericCallback class SemanticLabelCallback(GenericCallback): """Callback for SemanticLabel messages.""" def _msg_to_output(self, msg) -> dict: """Convert a SemanticLabel ROS message to a Python dict.""" return { "label": msg.label, "confidence": msg.confidence, } ``` ### Step 5: Create the `SupportedType` Wrapper In `agents/ros.py`, define the wrapper class: ```python from automatika_embodied_agents.msg import SemanticLabel as ROSSemanticLabel from .callbacks import SemanticLabelCallback class SemanticLabel(SupportedType): """Wraps automatika_embodied_agents/msg/SemanticLabel.""" _ros_type = ROSSemanticLabel callback = SemanticLabelCallback @classmethod def convert(cls, output: dict, **_) -> ROSSemanticLabel: msg = ROSSemanticLabel() msg.label = output["label"] msg.confidence = float(output["confidence"]) return msg ``` ### Step 6: Register the New Type Add it to the `agent_types` list at the bottom of `agents/ros.py`: ```python agent_types = [ StreamingString, Video, Detections, # ... existing types ... SemanticLabel, # <-- add here ] add_additional_datatypes(agent_types) ``` ### Step 7: Export and Use Add the new type to `__all__` in `agents/ros.py`: ```python __all__ = [ # ... existing exports ... "SemanticLabel", ] ``` Now it can be used in component definitions: ```python from agents.ros import Topic label_topic = Topic(name="semantic_label", msg_type="SemanticLabel") ``` ## Callback Architecture Each `SupportedType.callback` class inherits from `GenericCallback` (Sugarcoat). The callback: 1. Is instantiated per-topic when the component creates subscribers. 2. Receives raw ROS messages via its subscriber. 3. Converts them to Python objects via `_msg_to_output()`. 4. Buffers the latest output so the component can read it via `get_output()`. For types like `Image`, the callback may also handle conversion to numpy arrays. For `Detections`, it parses bounding boxes into Python dicts. This conversion layer is what allows `_execution_step()` to work with clean Python data. ```