Source code for tbp.monty.simulators.habitat.simulator

# Copyright 2025 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

"""AI Habitat simulator interface for Monty.

See Also:
    https://github.com/facebookresearch/habitat-sim
"""

from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import habitat_sim
import magnum as mn
import numpy as np
from habitat_sim.utils import common as sim_utils
from importlib_resources import files

import tbp.monty.simulators.resources as resources
from tbp.monty.frameworks.actions.actions import Action
from tbp.monty.frameworks.environment_utils.habitat_utils import get_bounding_corners
from tbp.monty.simulators.habitat.actuator import HabitatActuator

from .agents import HabitatAgent

__all__ = [
    "HabitatSim",
    "PRIMITIVE_OBJECT_TYPES",
]

DEFAULT_SCENE = "NONE"
DEFAULT_PHYSICS_CONFIG = str(files(resources) / "default.physics_config.json")

#: Maps habitat-sim pre-configure primitive object types to semantic IDs
PRIMITIVE_OBJECT_TYPES = {
    "capsule3DSolid": 101,
    "coneSolid": 102,
    "cubeSolid": 103,
    "cylinderSolid": 104,
    "icosphereSolid": 105,
    "uvSphereSolid": 106,
}


Vector3 = Tuple[float, float, float]
Quaternion = Tuple[float, float, float, float]


[docs]class HabitatSim(HabitatActuator): """Habitat-sim interface for tbp.monty. This class wraps `habitat-sim <https://aihabitat.org/docs/habitat-sim>`_ simulator for tbp.monty. It aims to hide habitat-sim internals simplifying experiments configuration within the tbp.monty framework. Example:: camera = SingleSensorAgent( agent_id="camera", sensor_id="camera_id", resolution=(64, 64), ) with HabitatSim(agents=[camera]) as sim: sim.add_object(name="coneSolid", position=(0.0, 1.5, -0.2)) obs = sim.get_observations() plot_image(obs["camera"]["camera_id"]["rgba"]) plot_image(obs["camera"]["camera_id"]["depth"]) Attributes: agents: List of :class:`HabitatAgents` to place in the simulator data_path: Habitat data path location, usually the same path used by :class:`habitat_sim.utils.environments_download` scene_id: Scene to use or None for empty environment. seed: Simulator seed to use enable_physics: Whether or not to enable the physics engine. Default False """ def __init__( self, agents: List[HabitatAgent], data_path: Optional[str] = None, scene_id: Optional[str] = None, seed: int = 42, enable_physics: bool = True, # Physics is now standard ): backend_config = habitat_sim.SimulatorConfiguration() backend_config.physics_config_file = DEFAULT_PHYSICS_CONFIG # NOTE that currently we do not have gravity, although this can be adjusted in # the above config by setting "gravity": [0, -9.8, 0] self.sim_enable_physics = enable_physics backend_config.enable_physics = self.sim_enable_physics backend_config.scene_id = scene_id or DEFAULT_SCENE backend_config.random_seed = seed self.np_rng = np.random.default_rng(seed) agent_configs = [] self._agents = agents self._action_space = set() self._agent_id_to_index = dict() self._object_counter = 0 # Track the number of objects added to an environment for index, agent in enumerate(self._agents): config = agent.get_spec() # Update global action space self._action_space.update(config.action_space.keys()) # Holds a dict mapping monty's agent id to habitat agent index self._agent_id_to_index[agent.agent_id] = index agent_configs.append(config) self._sim = habitat_sim.Simulator( habitat_sim.Configuration(backend_config, agent_configs) ) # Load objects from data_path if data_path is not None: obj_mgr = self._sim.get_object_template_manager() absolute_data_path = Path(data_path).expanduser().absolute() v1_2_path = absolute_data_path / "configs" if v1_2_path.is_dir(): # "object" sub-directory no longer exists for YCB version 1.2 (present # in Habitat v0.22); instead config folder is directly present objects_path = absolute_data_path else: # Objects downloaded with `habitat_sim.utils.environments_download` are # stored in the sub-dir called "objects" for older versions of YCB (eg. # 1.0) objects_path = absolute_data_path / "objects" # The appended /objects is also key to triggering the below -else- # "dataset downloaded some other way" in unit tests if objects_path.is_dir(): # Search "objects" dir for habitat objects. # Habitat dataset objects are stored in a directory containing # json files with the attribures of each object in the dataset. # The json file name is in this format: # "{object_name}.object_config.json". # See https://aihabitat.org/docs/habitat-sim/attributesJSON.html#objectattributes # noqa: E501 objects_data_path = { f.parent for f in objects_path.glob("*/**/*.object_config.json") } else: # The dataset was downloaded some other way. # The data path must be the path to the object config files objects_data_path = [absolute_data_path] # Add each object data path to the simulator objects_added = False for path in objects_data_path: valid_objs = obj_mgr.load_configs(str(path), True) if valid_objs: objects_added = True if not objects_added: self.close() raise ValueError(f"No valid habitat data found in {data_path}") for agent in self._agents: agent.initialize(self)
[docs] def initialize_agent(self, agent_id, agent_state): """Update agent runtime state. Usually called first thing to update agent initial pose. Args: agent_id: Agent id of the agent to be updated agent_state: Agent state to update to """ agent_index = self._agent_id_to_index[agent_id] self._sim.initialize_agent(agent_index, agent_state)
[docs] def remove_all_objects(self): """Remove all objects from simulated environment.""" rigid_mgr = self._sim.get_rigid_object_manager() rigid_mgr.remove_all_objects() self._object_counter = 0
[docs] def add_object( self, name: str, position: Vector3 = (0.0, 0.0, 0.0), rotation: Quaternion = (1.0, 0.0, 0.0, 0.0), scale: Vector3 = (1.0, 1.0, 1.0), semantic_id: Optional[str] = None, enable_physics: Optional[bool] = False, object_to_avoid=False, primary_target_bb=None, ): """Add new object to simulated environment. Args: name: Registered object name. It could be any of habitat-sim primitive objects or any configured habitat object. For a list of primitive objects see :const:`PRIMITIVE_OBJECT_TYPES` position: Object initial absolute position rotation: Object rotation quaternion. Default (1, 0, 0, 0) scale: Object scale. Default (1, 1, 1) semantic_id: Optional override object semantic ID enable_physics: Whether or not to enable physics on this objects object_to_avoid: If True, run collision checks to ensure the object is not colliding with any other objects in the scene, and otherwise move it primary_target_bb: If not None, the bounding box of the primary target object; passed when we're adding multiple objects, such that we ensure that the added object obscures the initial view of the primary target object (which avoiding collision alone cannot guarantee); defined by a list of the min and max corners Returns: RigidObject: The newly added object """ obj_mgr = self._sim.get_object_template_manager() rigid_mgr = self._sim.get_rigid_object_manager() # Get first match obj_handle = obj_mgr.get_template_handles(name)[0] # Check if we are changing the object scale scale = tuple(scale) if scale != (1.0, 1.0, 1.0): # Get scaled object template scaled_obj_handle = f"{obj_handle}_scale_{scale}" scaled_tpl = obj_mgr.get_template_handles(scaled_obj_handle) if not scaled_tpl: # Add new template for scaled object scaled_tpl = obj_mgr.get_template_by_handle(obj_handle) scaled_tpl.scale *= mn.Vector3d(*scale) obj_mgr.register_template(scaled_tpl, scaled_obj_handle) obj_handle = scaled_obj_handle else: obj_handle = scaled_tpl[0] obj = rigid_mgr.add_object_by_template_handle(obj_handle) # Update pose obj.translation = position if isinstance(rotation, (list, tuple)): rotation = np.quaternion(*rotation) obj.rotation = sim_utils.quat_to_magnum(rotation) if object_to_avoid: assert ( self.sim_enable_physics ), "Sim-level physics must be enabled to support collision detection" # Temporarily enable *object* physics for collision detection obj.motion_type = habitat_sim.physics.MotionType.DYNAMIC obj = self.find_non_colliding_positions( obj, start_position=position, start_orientation=rotation, primary_obj_bb=primary_target_bb, ) # Update object-specific physics # This physics setting will persist into the environment e.g. during inference if enable_physics: obj.motion_type = habitat_sim.physics.MotionType.DYNAMIC else: # Set the motion-type to kinematic # (i.e. only a user-specified force/motion will affect the object), rather # than dynamic (i.e. object subject to forces like gravity, friction, and # collision detection according to physics simulations) obj.motion_type = habitat_sim.physics.MotionType.KINEMATIC # Update semantic id if semantic_id is not None: # Override default semantic id obj.semantic_id = semantic_id elif name in PRIMITIVE_OBJECT_TYPES: # Update semantic ID for primitive types obj.semantic_id = PRIMITIVE_OBJECT_TYPES[name] elif obj_handle in PRIMITIVE_OBJECT_TYPES: # Update semantic ID for primitive types obj.semantic_id = PRIMITIVE_OBJECT_TYPES[obj_handle] # Compare the intended number of objects added (counter) vs the number # instantiated in the Habitat environmnet self._object_counter += 1 num_objects_added = self.get_num_objects() if isinstance(num_objects_added, int): # In some units tests (e.g. MontyRunTest.test_main_with_single_experiment), # a simulator mock object is used, and so self.get_num_objects does not # return a meaningful int, but instead another mock object # TODO make this test more robust and move to its own unit test assert self._object_counter == num_objects_added, "Not all objects added" return obj
[docs] def non_conflicting_vector(self): """Find a non-conflicting vector. A non-conflicting vector avoids sampling directions that will be just in front of or behind a target object. Returns: np.array: The non-conflicting vector """ angle_ranges = [ (0, 30), # Forbidden 120 degrees (150, 180), (180, 210), # Forbidden 120 degrees (330, 360), ] # Choose which angle range to use selected_range = self.np_rng.choice(np.array(angle_ranges)) angle_z = self.np_rng.uniform(selected_range[0], selected_range[1]) z = np.sin(np.deg2rad(angle_z)) x = self.np_rng.choice([-1.0, 1.0]) return np.array([x, 0, z])
[docs] def check_viewpoint_collision( self, primary_obj_bb, new_obj_bb, overlap_threshold=0.75, ): """Check if the object being added overlaps in the x-axis with target. The object overlapping the primary target object risks obstructing the initial view of the agent at the start of the experiment. Recall that +z is out of the page, where the agent starts facing in the -z direction at the beginning of the episode; +y is the up vector, and +x is the right-ward direction Args: primary_obj_bb: the bounding box of the primary target in the scene new_obj_bb: the bounding box fo the new object being added overlap_threshold: The threshold for overlap. Defaults to 0.75. Returns: bool: True if the overlap is greater than overlap_threshold; 1.0 corresponds to total overlap (the primary target is potentially not visible) """ primary_start, primary_end = primary_obj_bb[0][0], primary_obj_bb[1][0] new_start, new_end = new_obj_bb[0][0], new_obj_bb[1][0] overlap_start = max(primary_start, new_start) overlap_end = min(primary_end, new_end) if overlap_start >= overlap_end: return False # 0.0 percent overlap overlap_length = overlap_end - overlap_start primary_length = primary_end - primary_start overlap_proportion = overlap_length / primary_length return overlap_proportion > overlap_threshold
[docs] def find_non_colliding_positions( self, new_object, start_position, start_orientation, primary_obj_bb, max_distance=1, step_size=0.00005, ): """Find a position for the object being added. The criteria are such that the object does not: i) have a physical collision with other objects (i.e. collision meshes intersect) ii) "collide" with the initial view of the primary target object, i.e. obscure the ability of the agent to start on the primary target at the beginning of an experiment Args: new_object: The object being added start_position: The starting position of the new object start_orientation: The initial orientation of the new object primary_obj_bb: Bounding box of the primary target object (list of two defining corners) max_distance: The maximum distance to attempt moving the new object step_size: The step size for moving the new object Returns: The newly added object (position updated) Raises: RuntimeError: If failed to find a non-colliding position """ direction = self.non_conflicting_vector() direction /= np.linalg.norm(direction) # Move the second object along the direction vector until they no longer collide for distance in np.arange(0, max_distance, step_size): obj_pos = start_position + distance * direction new_object.translation = obj_pos # Extract updated bounding box of new object being added min_corner, max_corner = get_bounding_corners(new_object) # Step the physics simulation to allow objects to settle and compute # collisions self._sim.step_physics(0.0001) # 0.0001 appears to be the smallest possible # time step physical_collision = self._sim.contact_test(new_object.object_id) if physical_collision: # Reset the pose of the object if any collision, as the physics # timestep can cause the object to rotate from this new_object.rotation = sim_utils.quat_to_magnum(start_orientation) viewpoint_collision = self.check_viewpoint_collision( primary_obj_bb=primary_obj_bb, new_obj_bb=[min_corner, max_corner] ) if not physical_collision and not viewpoint_collision: # No collision, so not necessary to reset the pose return new_object raise RuntimeError("Failed to find non-colliding positions")
[docs] def get_num_objects(self): """Return the number of instantiated objects.""" rigid_mgr = self._sim.get_rigid_object_manager() return rigid_mgr.get_num_objects()
[docs] def get_action_space(self): """Returns a set with all available actions. All available actions are those registered by all agents in the environment. """ return set(self._action_space)
[docs] def get_agent(self, agent_id): """Return habitat agent instance.""" agent_index = self._agent_id_to_index[agent_id] return self._sim.get_agent(agent_index)
[docs] def apply_action(self, action: Action) -> Dict[str, Dict]: """Execute given action in the environment. Args: action: The action to execute Returns: A dictionary with the observations grouped by agent_id Raises: ValueError: If the action name is invalid """ action_name = self.action_name(action) if action_name not in self._action_space: raise ValueError(f"Invalid action name: {action_name}") action.act(self) observations = self.get_observations() return observations
[docs] def get_observations(self): """Get sensor observations. Returns: dict: A dictionary with all sensor observations grouped by sensor module. For example: { "agent1": { "sensor1": { "rgba": [....], "depth": [....], : }, "agent2": { "sensor2": "rgba": [....], "depth": [....], : } } """ agent_indices = range(len(self._agents)) obs = self._sim.get_sensor_observations(agent_ids=agent_indices) obs = self.process_observations(obs) return obs
[docs] def process_observations(self, obs): """Habitat returns observations grouped by agent_index. Initially, we group observations by agent_id instead and call all agents to further process the observations. Args: obs: The observations to process Returns: dict: The processed observations grouped by agent_id """ processed_obs = defaultdict(dict) for agent_index, agent_obs in obs.items(): agent = self._agents[agent_index] agent_id = self._agents[agent_index].agent_id processed_obs[agent_id] = agent.process_observations(agent_obs) return processed_obs
[docs] def get_states(self): """Get agent and sensor states (position, rotation, etc..). Returns: dict: A dictionary with the agent pose in world coordinates and any other agent specific state as well as every sensor pose relative to the agent as well as any sensor specific state that is not returned by :meth:`get_observations`. For example: { "camera": { "position": [2.125, 1.5, -5.278], "rotation": [0.707107, 0.0, 0.0.707107, 0.0], "sensors" : { "rgba": { "position": [0.0, 1.5, 0.0], "rotation": [1.0, 0.0, 0.0, 0.0], }, "depth": { "position": [0.0, 1.5, 0.0], "rotation": [1.0, 0.0, 0.0, 0.0], }, : } }, : } """ result = {} for agent_index, sim_agent in enumerate(self._sim.agents): # Get agent and sensor poses from simulator agent_node = sim_agent.scene_node sensors = {} for sensor_id, sensor in agent_node.node_sensors.items(): rotation = sim_utils.quat_from_magnum(sensor.node.rotation) sensors[sensor_id] = { "position": sensor.node.translation, "rotation": rotation, } # Update agent/module state agent_id = self._agents[agent_index].agent_id rotation = sim_utils.quat_from_magnum(agent_node.rotation) result[agent_id] = { "position": agent_node.translation, "rotation": rotation, "sensors": sensors, } return result
[docs] def reset(self): # All agents managed by this simulator agent_indices = range(len(self._agents)) obs = self._sim.reset(agent_ids=agent_indices) obs = self.process_observations(obs) return obs
[docs] def close(self): """Close simulator and release resources.""" sim = getattr(self, "_sim", None) if sim is not None: sim.close() self._sim = None
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()