# Copyright 2025-2026 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
from __future__ import annotations
import uuid
from typing import Tuple
import habitat_sim
import quaternion as qt
from habitat_sim.agent import ActionSpec, ActuationSpec, AgentConfiguration, AgentState
from habitat_sim.sensor import SensorType
from typing_extensions import Literal
from tbp.monty.frameworks.agents import AgentID
from tbp.monty.frameworks.models.abstract_monty_classes import (
AgentObservations,
SensorObservation,
)
from tbp.monty.frameworks.sensors import SensorID
from tbp.monty.simulators.habitat.sensors import (
RGBDSensorConfig,
SemanticSensorConfig,
SensorConfig,
)
__all__ = [
"HabitatAgent",
"MultiSensorAgent",
"SingleSensorAgent",
]
ActionSpaceName = Literal["absolute_only", "distant_agent", "surface_agent"]
Vector3 = Tuple[float, float, float]
Quaternion = Tuple[float, float, float, float]
Size = Tuple[int, int]
[docs]class HabitatAgent:
"""Habitat agent wrapper.
Agents are used to define movable bodies in the environment.
Every Habitat agent will inherit from this class.
Attributes:
agent_id: Unique ID of this agent in the environment.
Observations returned by the environment will be mapped to this ID.
``{"agent_id": {"sensor": [...]}}``.
Actions provided by this sensor module will be prefixed by
this ID, i.e. "agent_id.move_forward"
position: Module initial position in meters. Defaults to (0, 1.5, 0).
rotation: Module initial rotation quaternion. Defaults to (1, 0, 0, 0).
height: Module height in meters. Defaults to 0.0.
"""
[docs] def __init__(
self,
agent_id: AgentID | None,
position: Vector3 = (0.0, 1.5, 0.0),
rotation: Quaternion = (1.0, 0.0, 0.0, 0.0),
height: float = 0.0,
):
if agent_id is None:
agent_id = AgentID(uuid.uuid4().hex)
self.agent_id = agent_id
self.position = position
self.rotation = rotation
self.height = height
self.sensors: list[SensorConfig] = []
self.habitat_sensor_to_monty_id_modality_map: dict[
str, tuple[SensorID, str] # {HabitatID: (SensorID, Modality)}
] = {}
self.sensor_type_to_modality_map = {
SensorType.COLOR: "rgba",
SensorType.DEPTH: "depth",
SensorType.SEMANTIC: "semantic",
}
[docs] def get_spec(self) -> AgentConfiguration:
"""Return a habitat-sim agent configuration.
Returns:
Spec created from this sensor module configuration.
"""
spec = AgentConfiguration()
spec.height = self.height
self.habitat_sensor_to_monty_id_modality_map.clear()
for sensor in self.sensors:
sensor_specs = sensor.get_specs()
for sensor_spec in sensor_specs:
habitat_id = sensor_spec.uuid
monty_id = SensorID(sensor.sensor_id)
modality_id = self.sensor_type_to_modality_map[sensor_spec.sensor_type]
self.habitat_sensor_to_monty_id_modality_map[habitat_id] = (
monty_id,
modality_id,
)
spec.sensor_specifications.extend(sensor_specs)
return spec
[docs] def initialize(self, simulator):
"""Initialize the habitat-sim agent's runtime state.
This method must be called to update the agent and sensors runtime
instance because some of the configuration attributes require access to
the instantiated node.
Args:
simulator: Instantiated :class:`.HabitatSim` instance
"""
# Initialize agent state
agent_state = AgentState()
agent_state.position = self.position
rotation = qt.quaternion(*self.rotation)
agent_state.rotation = rotation
simulator.initialize_agent(self.agent_id, agent_state)
[docs] def process_observations(self, agent_obs) -> AgentObservations:
"""Callback that processes raw Habitat agent observations.
Converts raw observations into Monty-compatible ones.
Args:
agent_obs: Raw habitat-sim observations from the agent
Returns:
The processed observations grouped by sensor_id
"""
# Habitat raw sensor observations are flat, where the observation key is
# composed of the `sensor_id.sensor_type`. The default agent starts by
# grouping habitat raw observations by sensor_id and sensor_type.
obs_by_sensor = AgentObservations()
for sensor_key, data in agent_obs.items():
sensor_id, modality = self.habitat_sensor_to_monty_id_modality_map[
sensor_key
]
obs_by_sensor.setdefault(sensor_id, SensorObservation())[modality] = data
# Call each sensor to post-process the observation data
for sensor in self.sensors:
sensor_id = SensorID(sensor.sensor_id)
sensor_obs = obs_by_sensor.get(sensor_id)
if sensor_obs is not None:
obs_by_sensor[sensor_id] = sensor.process_observations(sensor_obs)
return obs_by_sensor
def action_space(
action_space_type: ActionSpaceName,
agent_id: str,
translation_step: float,
rotation_step: float,
) -> dict[str, ActionSpec]:
"""Generate an action space for a given action space type.
Action space can be `absolute_only`, `distant_agent`, or `surface_agent`.
This method is currently used only in a couple of unit tests; otherwise,
use a default action space.
Action spaces are formatted as lists of tuples, with elements:
0: action name
1: (initial) action amount
2: (initial) constraint
Args:
action_space_type: The type of action space to generate.
agent_id: The ID of the agent.
translation_step: The translation step.
rotation_step: The rotation step.
Returns:
The generated action space.
"""
absolute_only_action_space: list[
tuple[str, float | list[float | qt.quaternion], float | None]
] = [
("set_yaw", 0.0, None),
("set_agent_pitch", 0.0, None),
("set_sensor_pitch", 0.0, None),
("set_agent_pose", [[0.0, 0.0, 0.0], qt.one], None),
("set_sensor_rotation", [[qt.one]], None),
("set_sensor_pose", [[0.0, 0.0, 0.0], qt.one], None),
]
distant_agent_action_space: list[
tuple[str, float | list[float | qt.quaternion], float | None]
] = [
("move_forward", translation_step, None),
("turn_left", rotation_step, None),
("turn_right", rotation_step, None),
("look_up", rotation_step, 90.0),
("look_down", rotation_step, 90.0),
("set_agent_pose", [[0.0, 0.0, 0.0], qt.one], None),
("set_sensor_rotation", [[qt.one]], None),
]
surface_agent_action_space: list[
tuple[str, float | list[float | qt.quaternion], float | None]
] = [
("move_forward", translation_step, None),
("move_tangentially", translation_step, None),
("orient_horizontal", rotation_step, None),
("orient_vertical", rotation_step, None),
("set_agent_pose", [[0.0, 0.0, 0.0], qt.one], None),
("set_sensor_rotation", [[qt.one]], None),
]
if action_space_type == "absolute_only":
action_spec = absolute_only_action_space
elif action_space_type == "distant_agent":
action_spec = distant_agent_action_space
elif action_space_type == "surface_agent":
action_spec = surface_agent_action_space
action_space: dict[str, ActionSpec] = {}
for action in action_spec:
action_space[f"{agent_id}.{action[0]}"] = ActionSpec(
f"{action[0]}",
ActuationSpec(
amount=action[1],
constraint=action[2],
), # type: ignore[arg-type]
)
return action_space
[docs]class MultiSensorAgent(HabitatAgent):
"""Minimal version of a HabitatAgent with multiple RGBD sensors mounted.
The RGBD sensors are mounted to the same movable object (like two go-pros
mounted to a helmet) with the following pre-defined actions:
- "`agent_id`.move_forward": Move camera forward using `translation_step`
- "`agent_id`.turn_left": Turn camera left `rotation_step`
- "`agent_id`.turn_right": Turn camera right `rotation_step`
- "`agent_id`.look_up": Turn the camera up `rotation_step`
- "`agent_id`.look_down": Turn the camera down `rotation_step`
- "`agent_id`.set_yaw": Set the camera agent's absolute yaw value
- "`agent_id`.set_sensor_pitch": Set the camera sensor's absolute pitch value
- "`agent_id`.set_agent_pitch": Set the camera agent's absolute pitch value
Each camera will return the following observations:
- "sensor_ids[i].rgba": Color information for every pixel (x, y, 4)
- "sensor_ids[i].depth": Depth information for every pixel (x, y, 1)
- "sensor_ids[i].semantic": Optional object semantic information for every pixel
(x, y, 1)
where i is an integer indexing the list of sensor_ids.
Note:
The parameters `resolutions`, `rotations`, and so on effectively specify
both the number of sensors, and the sensor parameters. For N sensors,
specify a list of N `resolutions`, and so on. All lists must be the same
length. By default, a list of length one will be provided. Therefore, do
not leave an argument blank if you wish to run a simulation with N > 1
sensors.
Note:
The parameters `translation_step` and `rotation_step` are set to 0 by
default. All action amounts should be specified by the MotorSystem.
Attributes:
agent_id: Actions provided by this camera will be prefixed by this ID.
Defaults to "camera".
sensor_ids: List of IDs for each sensor. Actions are prefixed with agent ID,
but observations are prefixed with sensor ID.
resolutions: List of camera resolutions (width, height). Defaults to (16, 16).
positions: List of camera initial absolute positions in meters, relative
to the agent.
rotations: List of camera rotations (quaternion). Defaults to (1, 0, 0, 0).
zooms: List of camera zoom multipliers. Use >1 to increase and 0 < factor < 1
to decrease. Defaults to 1.0.
semantics: List of booleans determining if each RGBD sensor also gets a semantic
sensor with it. Defaults to False.
height: Height of the mount itself in meters. Defaults to 0.0 (but position of
the agent will be 1.5 meters in the "height" dimension).
rotation_step: Rotation step in degrees used by the `turn_*` and
`look_*` actions. Defaults to 0 degrees.
translation_step: Translation length in meters used by the `move_*` actions.
Defaults to 0 m.
action_space_type: Decides between three action spaces:
"distant_agent" actions saccade like a ball-in-socket joint, viewing an
object from a distance.
"surface_agent" actions orient to an object surface and move tangentially
along it.
"absolute_only" actions are movements in absolute world coordinates only.
"""
[docs] def __init__(
self,
agent_id: AgentID | None,
sensor_ids: tuple[str],
position: Vector3 = (0.0, 1.5, 0.0), # Agent position
rotation: Quaternion = (1.0, 0.0, 0.0, 0.0),
height: float = 0.0,
rotation_step: float = 0.0,
translation_step: float = 0.0,
action_space_type: ActionSpaceName = "distant_agent",
resolutions: tuple[Size] = ((16, 16),),
positions: tuple[Vector3] = ((0.0, 0.0, 0.0),),
rotations: tuple[Quaternion] = ((1.0, 0.0, 0.0, 0.0),),
zooms: tuple[float] = (1.0,),
semantics: tuple[bool] = (False,),
):
super().__init__(agent_id, position, rotation, height)
if sensor_ids is None:
sensor_ids = (uuid.uuid4().hex,)
self.sensor_ids = sensor_ids
self.rotation_step = rotation_step
self.translation_step = translation_step
self.action_space_type: ActionSpaceName = action_space_type
self.resolutions = resolutions
self.positions = positions
self.rotations = rotations
self.zooms = zooms
self.semantics = semantics
param_lists = [
self.sensor_ids,
self.resolutions,
self.positions,
self.rotations,
self.zooms,
self.semantics,
]
num_sensors = len(self.sensor_ids)
assert all(len(p) == num_sensors for p in param_lists)
for sid, res, pos, rot, zoom, sem in zip(*param_lists):
# Add RGBD Camera
sensor_position = (pos[0], pos[1] + self.height, pos[2])
rgbd_sensor = RGBDSensorConfig(
sensor_id=sid,
position=sensor_position,
rotation=rot,
resolution=res,
zoom=zoom,
)
self.sensors.append(rgbd_sensor)
# Add optional semantic sensor
if sem:
semantic_sensor = SemanticSensorConfig(
sensor_id=sid,
position=sensor_position,
rotation=rot,
resolution=res,
zoom=zoom,
)
self.sensors.append(semantic_sensor)
[docs] def get_spec(self):
spec = super().get_spec()
spec.action_space = action_space(
self.action_space_type,
self.agent_id,
self.translation_step,
self.rotation_step,
)
return spec
[docs] def initialize(self, simulator):
"""Initialize agent runtime state.
This method must be called by :class:`.HabitatSim` to update the agent
and sensors runtime instance. This is necessary because some of the
configuration attributes require access to the instantiated scene node.
Args:
simulator: Instantiated :class:`.HabitatSim` instance
"""
super().initialize(simulator)
# Initialize camera zoom
agent = simulator.get_agent(self.agent_id)
for i, sensor_id in enumerate(self.sensor_ids):
zoom = self.zooms[i]
sensor_types = ["rgba", "depth"]
if self.semantics[i]:
sensor_types.append("semantic")
for s in sensor_types:
# FIXME: Using protected member `_sensor`
camera = agent._sensors[f"{sensor_id}.{s}"]
camera.zoom(zoom)
[docs]class SingleSensorAgent(HabitatAgent):
"""Minimal version of a HabitatAgent.
This is the special case of :class:`MultiSensorAgent` when there is at most 1
RGBD and 1 semantic sensor. Thus, the arguments are single values instead of
lists.
"""
[docs] def __init__(
self,
agent_id: AgentID | None,
sensor_id: SensorID,
agent_position: Vector3 = (0.0, 1.5, 0.0),
sensor_position: Vector3 = (0.0, 0.0, 0.0),
rotation: Quaternion = (1.0, 0.0, 0.0, 0.0),
height: float = 0.0,
resolution: Size = (16, 16),
zoom: float = 1.0,
semantic: bool = False,
rotation_step: float = 0.0,
translation_step: float = 0.0,
action_space_type: ActionSpaceName = "distant_agent",
):
"""Initialize agent runtime state.
Note that, like the multi-sensor agent, the position of the agent is treated
separately from the position of the sensor (which is relative to the agent).
"""
super().__init__(agent_id, agent_position, rotation, height)
self.sensor_id = sensor_id
self.sensor_position = sensor_position
self.resolution = resolution
self.zoom = zoom
self.semantic = semantic
self.rotation_step = rotation_step
self.translation_step = translation_step
self.action_space_type: ActionSpaceName = action_space_type
# Add RGBD Camera
effective_sensor_position = (
self.sensor_position[0],
self.sensor_position[1] + self.height,
self.sensor_position[2],
)
rgbd_sensor = RGBDSensorConfig(
sensor_id=self.sensor_id,
position=effective_sensor_position,
rotation=self.rotation,
resolution=self.resolution,
zoom=self.zoom,
)
self.sensors.append(rgbd_sensor)
# Add optional semantic sensor
if self.semantic:
semantic_sensor = SemanticSensorConfig(
sensor_id=self.sensor_id,
position=effective_sensor_position,
rotation=self.rotation,
resolution=self.resolution,
zoom=self.zoom,
)
self.sensors.append(semantic_sensor)
[docs] def get_spec(self):
spec = super().get_spec()
spec.action_space = action_space(
self.action_space_type,
self.agent_id,
self.translation_step,
self.rotation_step,
)
return spec
[docs] def initialize(self, simulator):
"""Initialize agent runtime state.
This method must be called by :class:`.HabitatSim` to update the agent
and sensors runtime instance. This is necessary because some of the
configuration attributes require access to the instantiated scene node.
Args:
simulator: Instantiated :class:`.HabitatSim` instance
"""
super().initialize(simulator)
# Update camera zoom only when the zoom value differs from 1x
if self.zoom != 1.0:
agent = simulator.get_agent(self.agent_id)
agent_config = agent.agent_config
for sensor in agent_config.sensor_specifications:
if isinstance(sensor, habitat_sim.CameraSensorSpec):
# FIXME: Using protected member `_sensor`
camera = agent._sensors[sensor.uuid]
camera.zoom(self.zoom)