agents.components.vision

Module Contents

Classes

Vision

This component performs object detection and tracking on input images and outputs a list of detected objects, along with their bounding boxes and confidence scores.

API

class agents.components.vision.Vision(*, inputs: List[Union[agents.ros.Topic, agents.ros.FixedInput]], outputs: List[agents.ros.Topic], model_client: Optional[agents.clients.model_base.ModelClient] = None, config: Optional[agents.config.VisionConfig] = None, trigger: Union[agents.ros.Topic, List[agents.ros.Topic], float] = 1.0, component_name: str, **kwargs)

Bases: agents.components.model_component.ModelComponent

This component performs object detection and tracking on input images and outputs a list of detected objects, along with their bounding boxes and confidence scores.

Parameters:
  • inputs (list[Union[Topic, FixedInput]]) – The input topics for the object detection. This should be a list of Topic objects or FixedInput objects, limited to Image (or RGBD) type.

  • outputs (list[Topic]) – The output topics for the object detection. This should be a list of Topic objects, Detection and Tracking types are handled automatically.

  • model_client (Optional[ModelClient]) – Optional model client for the vision component to access remote vision models. If not provided, enable_local_classifier should be set to True in VisionConfig This should be an instance of ModelClient. Defaults to None.

  • config (VisionConfig) – The configuration for the vision component. This should be an instance of VisionConfig. If not provided, defaults to VisionConfig().

  • trigger (Union[Topic, list[Topic], float]) – The trigger value or topic for the vision component. This can be a single Topic object, a list of Topic objects, or a float value for timed components.

  • component_name (str) – The name of the vision component. This should be a string and defaults to “vision_component”.

Example usage:

image_topic = Topic(name="image", msg_type="Image")
detections_topic = Topic(name="detections", msg_type="Detections")
config = VisionConfig()
model_client = ModelClient(model=DetectionModel(name='yolov5'))
vision_component = Vision(
    inputs=[image_topic],
    outputs=[detections_topic],
    model_client=model_client
    config=config,
    component_name = "vision_component"
)
custom_on_configure()

Create model client if provided and initialize model.

custom_on_deactivate()

Destroy model client if it exists

take_picture(topic_name: str, save_path: str = '~/emos/pictures', timeout: float = 0.5) bool

Take a picture from a specific input topic and save it to the specified location.

This method acts as an Action to capture a specific frame from a specific camera/topic. It prioritizes triggers over standard inputs if a name conflict exists (though unique names are expected).

Parameters:
  • topic_name (str) – The name of the topic to capture the image from. Must be one of the component’s registered input topics.

  • save_path (str) – The directory path where images will be saved. Defaults to “~/emos/pictures”.

  • timeout (float) – Timeout if an image is not available on the topic. Defaults to 0.5 seconds.

Returns:

True if successful, False otherwise.

Return type:

bool

Raises:

ValueError – If the provided topic_name is not found in inputs.

record_video(topic_name: str, duration: float = 5.0, save_path: str = '~/emos/videos', fps: int = 30) bool

Record a video from a specific input topic for a set duration.

This action spawns a background thread to capture frames and save them to a video file. It does not block the main execution loop.

Parameters:
  • topic_name (str) – The name of the topic to record from.

  • duration (float) – The duration of the recording in seconds. Defaults to 5.0.

  • save_path (str) – The directory path where the video will be saved. Defaults to “~/emos/videos”.

  • fps (int) – The frames per second for the recording. Defaults to 20.

Returns:

True if the recording thread started successfully, False otherwise.

Return type:

bool

Raises:

ValueError – If the topic_name is not registered.

property additional_model_clients: Optional[Dict[str, agents.clients.model_base.ModelClient]]

Get the dictionary of additional model clients registered to this component.

Returns:

A dictionary mapping client names (str) to ModelClient instances, or None if not set.

Return type:

Optional[Dict[str, ModelClient]]

change_model_client(model_client_name: str) bool

Hot-swap the active model client at runtime.

This method replaces the component’s current model_client with one from the registered additional_model_clients. It handles the safe de-initialization of the old client and initialization of the new one.

This is commonly used as a target for Actions in the Event system.

Parameters:

model_client_name (str) – The key corresponding to the desired client in additional_model_clients.

Returns:

True if the swap was successful, False otherwise (e.g., if the name was not found or initialization failed).

Return type:

bool

Example:


    from agents.ros import Action

    # Define an action to switch to the 'local_backup' client defined previously
    switch_to_local = Action(
        method=brain.change_model_client,
        args=("local_backup",)
    )

    # Trigger this action if the component fails (e.g. internet loss)
    brain.on_component_fail(action=switch_to_local, max_retries=3)
property warmup: bool

Enable warmup of the model.

custom_on_activate()

Custom configuration for creating triggers.

create_all_subscribers()

Override to handle trigger topics and fixed inputs. Called by parent BaseComponent

activate_all_triggers() None

Activates component triggers by attaching execution step to callbacks

destroy_all_subscribers() None

Destroys all node subscribers