agents.components.vision

Module Contents

Classes

Vision

This component performs object detection and tracking on input images and outputs a list of detected objects, along with their bounding boxes and confidence scores.

API

class agents.components.vision.Vision(*, inputs: List[Union[agents.ros.Topic, agents.ros.FixedInput]], outputs: List[agents.ros.Topic], model_client: Optional[agents.clients.model_base.ModelClient] = None, config: Optional[agents.config.VisionConfig] = None, trigger: Union[agents.ros.Topic, List[agents.ros.Topic], float] = 1.0, component_name: str, **kwargs)

Bases: agents.components.model_component.ModelComponent

This component performs object detection and tracking on input images and outputs a list of detected objects, along with their bounding boxes and confidence scores.

Parameters:
  • inputs (list[Union[Topic, FixedInput]]) – The input topics for the object detection. This should be a list of Topic objects or FixedInput objects, limited to Image (or RGBD) type.

  • outputs (list[Topic]) – The output topics for the object detection. This should be a list of Topic objects, Detection and Tracking types are handled automatically.

  • model_client (Optional[ModelClient]) – Optional model client for the vision component to access remote vision models. If not provided, enable_local_classifier should be set to True in VisionConfig This should be an instance of ModelClient. Defaults to None.

  • config (VisionConfig) – The configuration for the vision component. This should be an instance of VisionConfig. If not provided, defaults to VisionConfig().

  • trigger (Union[Topic, list[Topic], float]) – The trigger value or topic for the vision component. This can be a single Topic object, a list of Topic objects, or a float value for timed components.

  • component_name (str) – The name of the vision component. This should be a string and defaults to “vision_component”.

Example usage:

image_topic = Topic(name="image", msg_type="Image")
detections_topic = Topic(name="detections", msg_type="Detections")
config = VisionConfig()
model_client = ModelClient(model=DetectionModel(name='yolov5'))
vision_component = Vision(
    inputs=[image_topic],
    outputs=[detections_topic],
    model_client=model_client
    config=config,
    component_name = "vision_component"
)
custom_on_configure()

Create model client if provided and initialize model.

custom_on_deactivate()

Destroy model client if it exists

take_picture(topic_name: str, save_path: str = '~/emos/pictures', timeout: float = 0.5) str

Take a picture from a specific input topic and save it to the specified location.

This method acts as an Action to capture a specific frame from a specific camera/topic. It prioritizes triggers over standard inputs if a name conflict exists (though unique names are expected).

Parameters:
  • topic_name (str) – The name of the topic to capture the image from. Must be one of the component’s registered input topics.

  • save_path (str) – The directory path where images will be saved. Defaults to “~/emos/pictures”.

  • timeout (float) – Timeout if an image is not available on the topic. Defaults to 0.5 seconds.

Returns:

The full path to the saved image file.

Return type:

str

Raises:
  • ValueError – If the topic is not one of the component inputs.

  • TimeoutError – If no image was received within the timeout.

record_video(topic_name: str, duration: float = 5.0, save_path: str = '~/emos/videos', fps: int = 30) str

Record a video from a specific input topic for a set duration.

This action spawns a background thread to capture frames and save them to a video file. It does not block the main execution loop.

Parameters:
  • topic_name (str) – The name of the topic to record from.

  • duration (float) – The duration of the recording in seconds. Defaults to 5.0.

  • save_path (str) – The directory path where the video will be saved. Defaults to “~/emos/videos”.

  • fps (int) – The frames per second for the recording. Defaults to 20.

Returns:

A confirmation message describing the started recording.

Return type:

str

Raises:

ValueError – If the topic is not one of the component inputs.

track(label: str) str

Start tracking objects matching the given label.

Configures the remote model server to enable ByteTrack trackers (reinitializing if needed) and sets the label filter so that tracking results are published on the component’s Tracking output topics.

Parameters:

label (str) – Object label to track (e.g. ‘person’, ‘cup’).

Returns:

A confirmation message describing the started tracking.

Return type:

str

Raises:

RuntimeError – If the component does not have a remote RoboML model client or a Tracking output topic.

property additional_model_clients: Optional[Dict[str, agents.clients.model_base.ModelClient]]

Get the dictionary of additional model clients registered to this component.

Returns:

A dictionary mapping client names (str) to ModelClient instances, or None if not set.

Return type:

Optional[Dict[str, ModelClient]]

fallback_to_local() str

Switch from remote model_client to the built-in local model at runtime.

The local model is deployed on first call (lazy initialization) to avoid consuming GPU memory until actually needed. If enable_local_model is not already set in config, it is enabled automatically.

This is commonly used as a target for Actions in the Event system.

Returns:

A confirmation message describing the switch.

Return type:

str

Raises:

RuntimeError – If the local model could not be deployed.

Example:


    from agents.ros import Action

    # Define an action to switch to the 'local model' available in each component
    switch_to_local = Action(
        method=brain.fallback_to_local,
    )

    # Trigger this action if the component fails (e.g. internet outage)
    brain.on_component_fail(action=switch_to_local, max_retries=3)
change_model_client(model_client_name: str) str

Hot-swap the active model client at runtime.

This method replaces the component’s current model_client with one from the registered additional_model_clients. It handles the safe de-initialization of the old client and initialization of the new one.

This is commonly used as a target for Actions in the Event system.

Parameters:

model_client_name (str) – The key corresponding to the desired client in additional_model_clients.

Returns:

A confirmation message describing the swap.

Return type:

str

Raises:

RuntimeError – If no additional clients are registered, the requested client name is not found, or initialization fails.

Example:


    from agents.ros import Action

    # Define an action to switch to the 'remote_backup' client defined previously
    switch_to_backup = Action(
        method=brain.change_model_client,
        args=("remote_backup",)
    )

    # Trigger this action if the component fails (e.g. server down)
    brain.on_component_fail(action=switch_to_backup, max_retries=3)
inspect_component() str

Return component info including additional model clients.

property warmup: bool

Enable warmup of the model.

custom_on_activate()

Custom configuration for creating triggers.

create_all_subscribers()

Override to handle trigger topics and fixed inputs. Called by parent BaseComponent

activate_all_triggers() None

Activates component triggers by attaching execution step to callbacks

destroy_all_subscribers() None

Destroys all node subscribers