Source code for tbp.monty.frameworks.environment_utils.transforms

# Copyright 2025 Thousand Brains Project
# Copyright 2021-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

from numbers import Number
from typing import Optional, Tuple

import numpy as np
import quaternion as qt
import scipy

from tbp.monty.frameworks.models.states import State

__all__ = [
    "AddNoiseToRawDepthImage",
    "DepthTo3DLocations",
    "GaussianSmoothing",
    "MissingToMaxDepth",
]


[docs]class MissingToMaxDepth: """Return max depth when no mesh is present at a location. Habitat depth sensors return 0 when no mesh is present at a location. Instead, return max_depth. See: https://github.com/facebookresearch/habitat-sim/issues/1157 for discussion. """ def __init__(self, agent_id, max_depth, threshold=0): """Initialize the transform. Args: agent_id: agent id of the agent where the transform should be applied. max_depth: numeric that will replace missing threshold: (optional) numeric, anything less than this is counted as missing. Defaults to 0. """ self.agent_id = agent_id self.max_depth = max_depth self.threshold = threshold self.needs_rng = False def __call__(self, observation, state=None): """Replace missing depth values with max_depth. Args: observation: observation to modify in place. state: not used. Returns: observation, same as input, with missing data modified in place """ # loop over sensor modules for sm in observation[self.agent_id].keys(): m = np.where(observation[self.agent_id][sm]["depth"] <= self.threshold) observation[self.agent_id][sm]["depth"][m] = self.max_depth return observation
[docs]class AddNoiseToRawDepthImage: """Add gaussian noise to raw sensory input.""" def __init__(self, agent_id, sigma): """Initialize the transform. Args: agent_id: agent id of the agent where the transform should be applied. Transform will be applied to all depth sensors of the agent. sigma: standard deviation of noise distribution. """ self.agent_id = agent_id self.sigma = sigma self.needs_rng = True def __call__(self, observation, state=None): """Add gaussian noise to raw sensory input. Args: observation: observation to modify in place. state: not used. Returns: observation, same as input, with added gaussian noise to depth values. Raises: Exception: if no depth sensor is present. """ # loop over sensor modules for sm in observation[self.agent_id].keys(): if "depth" in observation[self.agent_id][sm].keys(): noise = self.rng.normal( 0, self.sigma, observation[self.agent_id][sm]["depth"].shape, ) observation[self.agent_id][sm]["depth"] += noise else: raise Exception("NO DEPTH SENSOR PRESENT. Don't use this transform") return observation
[docs]class GaussianSmoothing: """Deals with gaussian noise on the raw depth image. This transform is designed to deal with gaussian noise on the raw depth image. It remains to be tested whether it will also help with the kind of noise in a real-world depth camera. """ def __init__(self, agent_id, sigma=2, kernel_width=3): """Initialize the transform. Args: agent_id: agent id of the agent where the transform should be applied. Transform will be applied to all depth sensors of the agent. sigma: sigma of gaussian smoothing kernel. Default is 2. kernel_width: width of the smoothing kernel. Default is 3. """ self.agent_id = agent_id self.sigma = sigma self.kernel_width = kernel_width self.pad_size = kernel_width // 2 self.kernel = self.create_kernel() self.needs_rng = False def __call__(self, observation, state=None): """Apply gaussian smoothing to depth images. Args: observation: observation to modify in place. state: not used. Returns: observation, same as input, with smoothed depth values. Raises: Exception: if no depth sensor is present. """ # loop over sensor modules for sm in observation[self.agent_id].keys(): if "depth" in observation[self.agent_id][sm].keys(): depth_img = observation[self.agent_id][sm]["depth"].copy() padded_img = self.get_padded_img(depth_img, pad_type="edge") filtered_img = scipy.signal.convolve( padded_img, self.kernel, mode="valid" ) observation[self.agent_id][sm]["depth"] = filtered_img else: raise Exception("NO DEPTH SENSOR PRESENT. Don't use this transform") return observation
[docs] def create_kernel(self): """Create a normalized gaussian kernel. Returns: normalized gaussian kernel. Array of size (kernel_width, kernel_width). """ x = np.linspace(-self.pad_size, self.pad_size, self.kernel_width) kernel_1d = ( 1.0 / (np.sqrt(2 * np.pi) * self.sigma) * np.exp(-np.square(x) / (2 * self.sigma**2)) ) kernel_2d = np.outer(kernel_1d, kernel_1d) return kernel_2d / np.sum(kernel_2d)
[docs] def get_padded_img(self, img, pad_type="edge"): if pad_type == "edge": padded_img = np.pad(img.astype(float), pad_width=self.pad_size, mode="edge") elif pad_type == "empty": padded_img = np.pad( img.astype(float), pad_width=self.pad_size, mode="constant", constant_values=np.nan, ) return padded_img
[docs] def conv2d(self, img, kernel_renorm=False): """Apply a 2D convolution to the image. Args: img: 2D image to be filtered. kernel_renorm: flag that specifies whether kernel values should be renormalized (based on the number on non-NaN values in image window). Returns: filtered version of the input image. """ [n_rows, n_cols] = img.shape filtered_img = img[ self.pad_size : (n_rows - self.pad_size), self.pad_size : (n_cols - self.pad_size), ].copy() # TODO: Investigate vectorizing this for i in range(n_rows - self.kernel_width + 1): for j in range(n_cols - self.kernel_width + 1): # Extracts image subset to be averaged out by the smoothing kernel. # Identify indices of non-NaN values, and sum the corresponding kernel # weights to get the normalization factor. img_subset = img[i : i + self.kernel_width, j : j + self.kernel_width] mask = ~np.isnan(img_subset) norm_factor = np.sum(mask * self.kernel) if kernel_renorm else 1.0 normalized_kernel = self.kernel / norm_factor filtered_img[i, j] = np.nansum(normalized_kernel * img_subset) return filtered_img
[docs]class DepthTo3DLocations: """Transform semantic and depth observations from 2D into 3D. Transform semantic and depth observations from camera coordinate (2D) into agent (or world) coordinate (3D). This transform will add the transformed results as a new observation called "semantic_3d" which will contain the 3d coordinates relative to the agent (or world) with the semantic ID and 3D location of every object observed:: "semantic_3d" : [ # x-pos , y-pos , z-pos , semantic_id [-0.06000001, 1.56666668, -0.30000007, 25.], [ 0.06000001, 1.56666668, -0.30000007, 25.], [-0.06000001, 1.43333332, -0.30000007, 25.], [ 0.06000001, 1.43333332, -0.30000007, 25.]]) ] Attributes: agent_id: Agent ID to get observations from resolution: Camera resolution (H, W) zoom: Camera zoom factor. Defaul 1.0 (no zoom) hfov: Camera HFOV, default 90 degrees semantic_sensor: Semantic sensor id. Default "semantic" depth_sensor: Depth sensor id. Default "depth" world_coord: Whether to return 3D locations in world coordinates. If enabled, then :meth:`__call__` must be called with the agent and sensor states in addition to observations. Default True. get_all_points: Whether to return all 3D coordinates or only the ones that land on an object. depth_clip_sensors: tuple of sensor indices to which to apply a clipping transform where all values > clip_value are set to clip_value. Empty tuple ~ apply to none of them. clip_value: depth parameter for the clipping transform Warning: This transformation is only valid for pinhole cameras """ def __init__( self, agent_id, sensor_ids, resolutions, zooms=1.0, hfov=90.0, clip_value=0.05, depth_clip_sensors=(), world_coord=True, get_all_points=False, use_semantic_sensor=False, ): self.needs_rng = False self.inv_k = [] self.h, self.w = [], [] if isinstance(zooms, (int, float)): zooms = [zooms] * len(sensor_ids) if isinstance(hfov, (int, float)): hfov = [hfov] * len(sensor_ids) for i, zoom in enumerate(zooms): # Pinhole camera, focal length fx = fy hfov[i] = float(hfov[i] * np.pi / 180.0) fx = np.tan(hfov[i] / 2.0) / zoom fy = fx # Adjust fy for aspect ratio self.h.append(resolutions[i][0]) self.w.append(resolutions[i][1]) fy = fy * self.h[i] / self.w[i] # Intrinsic matrix, K # Assuming skew is 0 for pinhole camera and center at (0,0) k = np.array( [ [1.0 / fx, 0.0, 0.0, 0.0], [0.0, 1 / fy, 0.0, 0.0], [0.0, 0.0, 1.0, 0], [0.0, 0.0, 0.0, 1.0], ] ) # Inverse K self.inv_k.append(np.linalg.inv(k)) self.agent_id = agent_id self.sensor_ids = sensor_ids self.world_coord = world_coord self.get_all_points = get_all_points self.use_semantic_sensor = use_semantic_sensor self.clip_value = clip_value self.depth_clip_sensors = depth_clip_sensors def __call__(self, observations: dict, state: Optional[State] = None): """Apply the depth-to-3D-locations transform to sensor observations. Applies spatial transforms to the observations and generates a mask used to determine which pixels are the object's surface. This modifies the observations in-place. Some items may be modified, such as depth values, while other values are added (see Returns section). In the first part of this function, we build a mask that indicates not only which part of the image is on-object, but also which part of the image is "on-surface". The distinction between on-object and on-surface arises from the fact that the field of view may contain parts of the object that are far away from each other. For example, we may be looking at the front lip of a mug, but the back lip of the mug is also in the field of view. When we compute surface point normals or surface curvature for the front lip of the mug, we don't want to include pixels from the back lip of the mug. To do this, we first need a semantic map that indicates only whether the pixel is on- or off-object. We then analyze the depth data to determine whether the pixels in the field of view appear to belong to the same part of the object (see `get_surface_from_depth` for details). The intersection of these two maps forms the on-surface mask (called `semantic_obs`) that is embedded into the observation dict and is used later when performing point-normal and curvature estimation. How we decide to build these masks is dependent on several factors, such as whether we are using a distant agent or a surface agent, and whether a ground-truth semantic map should be used. This necessitates a few different code paths. Here is a brief outline of the parameters that reflect these factors as they are commonly used in Monty: - when using a surface agent, self.depth_clip_sensors is a non-empty tuple. More specifically, we know which sensor is the surface agent since it's index will be in self.depth_clip_sensors. We only apply depth clipping to the surface agent. - surface agents also have their depth and semantic data clipped to a a very short range from the sensor. This is done to more closely model a finger which has short reach. - `use_semantic_sensor` is currently only used with multi-object experiments, and when this is `True`, the observation dict will have an item called "semantic". In the future, we would like to include semantic estimation for multi-object experiments. But until then, we continue to support use of the semantic sensor for multi-object experiments. - When two parts of an object are visible, we separate the two parts according to depth. However, the threshold we use to separate the two parts is dependent on distant vs surface agents. The parameter `default_on_surface_th` changes based on whether we are using a distant or surface agent. After the mask is generated, we unproject the 2D camera coordinates into 3D coordinates relative to the agent. We then add the transformed observations to the original observations dict. Args: observations (dict): Observations returned by the data loader. state (State, optional): Optionally supplied CMP-compliant state object. Returns: dict: The original observations dict with the following possibly added: - "semantic_3d": 3D coordinates for each pixel. If `self.world_coord` is `True` (default), then the coordinates are in the world's reference frame and are in the sensor's reference frame otherwise. It is structured as a 2D array with shape (n_pixels, 4) with columns containing x-, y-, z-coordinates, and a semantic ID. - "world_camera": Sensor-to-world coordinate frame transform. Included only when `self.world_coord` is `True` (default). - "sensor_frame_data": 3D coordinates for each pixel relative to the sensor. Has the same structure as "semantic_3d". Included only when `self.get_all_points` is `True`. """ for i, sensor_id in enumerate(self.sensor_ids): agent_obs = observations[self.agent_id][sensor_id] depth_patch = agent_obs["depth"] # We need a semantic map that masks off-object pixels. We can use the # ground-truth semantic map if it's available. Otherwise, we generate one # from the depth map and (temporarily) add it to the observation dict. if "semantic" in agent_obs.keys(): semantic_patch = agent_obs["semantic"] else: # The generated map uses depth observations to determine whether # pixels are on object using 1 meter as a threshold since # `MissingToMaxDepth` sets the background void to 1. semantic_patch = np.ones_like(depth_patch, dtype=int) semantic_patch[depth_patch >= 1] = 0 # Apply depth clipping to the surface agent, and initialize the # surface-separation threshold for later use. if i in self.depth_clip_sensors: # Surface agent: clip depth and semantic data (in place), and set # the default surface-separation threshold to be very short. self.clip(depth_patch, semantic_patch) default_on_surface_th = self.clip_value else: # Distance agent: do not clip depth or semantic data, and set the # default surface-separation threshold to be very far away. default_on_surface_th = 1000.0 # Build a mask that only includes the pixels that are on-surface, where # on-surface means the pixels are on-object and locally connected to # the center of the patch's field of view. However, if we are using a # surface agent and are using the semantic sensor, we may use the # (clipped) ground-truth semantic mask as a shortcut (though it doesn't # use surface estimation--just on-objectness). if self.depth_clip_sensors and self.use_semantic_sensor: # NOTE: this particular combination of self.depth_clip_sensors and # self.use_semantic_sensor is not commonly used at present, if ever. # self.depth_clip_sensors implies a surface agent, and # self.use_semantic_sensor implies multi-object experiments. surface_patch = agent_obs["semantic"] else: surface_patch = self.get_surface_from_depth( depth_patch, semantic_patch, default_on_surface_th, ) # Approximate true world coordinates x, y = np.meshgrid( np.linspace(-1, 1, self.w[i]), np.linspace(1, -1, self.h[i]) ) x = x.reshape(1, self.h[i], self.w[i]) y = y.reshape(1, self.h[i], self.w[i]) # Unproject 2D camera coordinates into 3D coordinates relative to the agent depth = depth_patch.reshape(1, self.h[i], self.w[i]) xyz = np.vstack((x * depth, y * depth, -depth, np.ones(depth.shape))) xyz = xyz.reshape(4, -1) xyz = np.matmul(self.inv_k[i], xyz) sensor_frame_data = xyz.T.copy() if self.world_coord and state is not None: # Get agent and sensor states from state dictionary agent_state = state[self.agent_id] depth_state = agent_state["sensors"][sensor_id + ".depth"] agent_rotation = agent_state["rotation"] agent_rotation_matrix = qt.as_rotation_matrix(agent_rotation) agent_position = agent_state["position"] sensor_rotation = depth_state["rotation"] sensor_position = depth_state["position"] # --- Apply camera transformations to get world coordinates --- # Combine body and sensor rotation (since sensor rotation is relative to # the agent this will give us the sensor rotation in world coordinates) sensor_rotation_rel_world = agent_rotation * sensor_rotation # Calculate sensor position in world coordinates -> sensor_position is # in the agent's coordinate frame, so we need to rotate it first by # agent_rotation_matrix and then add it to the agent's position rotated_sensor_position = agent_rotation_matrix @ sensor_position sensor_translation_rel_world = agent_position + rotated_sensor_position # Apply the rotation and translation to get the world coordinates rotation_matrix = qt.as_rotation_matrix(sensor_rotation_rel_world) world_camera = np.eye(4) world_camera[0:3, 0:3] = rotation_matrix world_camera[0:3, 3] = sensor_translation_rel_world xyz = np.matmul(world_camera, xyz) # Add sensor-to-world coordinate frame transform, used for point-normal # extraction. View direction is the third column of the matrix. observations[self.agent_id][sensor_id]["world_camera"] = world_camera # Extract 3D coordinates of detected objects (semantic_id != 0) semantic = surface_patch.reshape(1, -1) if self.get_all_points: semantic_3d = xyz.transpose(1, 0) semantic_3d[:, 3] = semantic[0] sensor_frame_data[:, 3] = semantic[0] # Add point-cloud data expressed in sensor coordinate frame. Used for # point-normal extraction observations[self.agent_id][sensor_id]["sensor_frame_data"] = ( sensor_frame_data ) else: detected = semantic.any(axis=0) xyz = xyz.transpose(1, 0) semantic_3d = xyz[detected] semantic_3d[:, 3] = semantic[0, detected] # Add transformed observation to existing dict. We don't need to create # a deepcopy because we are appending a new observation observations[self.agent_id][sensor_id]["semantic_3d"] = semantic_3d return observations
[docs] def clip(self, depth_patch: np.ndarray, semantic_patch: np.ndarray) -> None: """Clip the depth and semantic data that lie beyond a certain depth threshold. This is currently used for surface agent observations to limit the "reach" of the agent. Pixels beyond the clip value are set to 0 in the semantic patch, and the depth values beyond the clip value (or equal to 0) are set to the clip value. This function this modifies `depth_patch` and `semantic_patch` in-place. Args: depth_patch: depth observations semantic_patch: binary mask indicating on-object locations """ semantic_patch[depth_patch >= self.clip_value] = 0 depth_patch[depth_patch > self.clip_value] = self.clip_value depth_patch[depth_patch == 0] = self.clip_value
[docs] def get_on_surface_th( self, depth_patch: np.ndarray, semantic_patch: np.ndarray, min_depth_range: Number, default_on_surface_th: Number, ) -> Tuple[Number, bool]: """Return a depth threshold if we have a bimodal depth distribution. If the depth values are in a large enough range (> min_depth_range) we may be looking at more than one surface within our patch. This could either be two disjoint surfaces of the object or the object and the background. To figure out if we have two disjoint sets of depth values we look at the histogram and check for empty bins in the middle. The center of the empty part if the histogram will be defined as the threshold. If we do have a bimodal depth distribution, we effectively have two surfaces. This could be the mug's handle vs the mug's body, or the front lip of a mug vs the rear of the mug. We will use the depth threshold defined above to mask out the surface that is not in the center of the patch. Args: depth_patch: sensor patch observations of depth semantic_patch: binary mask indicating on-object locations min_depth_range: minimum range of depth values to even be considered default_on_surface_th: default threshold to use if no bimodal distribution is found Returns: threshold and whether we want to use values above or below threshold """ center_loc = (depth_patch.shape[0] // 2, depth_patch.shape[1] // 2) depth_center = depth_patch[center_loc[0], center_loc[1]] semantic_center = semantic_patch[center_loc[0], center_loc[1]] depths = np.asarray(depth_patch).flatten() flip_sign = False th = default_on_surface_th if (max(depths) - min(depths)) > min_depth_range: # only check for bimodal distribution if we have a large enough # range in depth values height, bins = np.histogram( np.array(depth_patch).flatten(), bins=8, density=False ) gap = np.where(height == 0)[0] if len(gap) > 0: # There is a bimodal distribution gap_center = len(gap) // 2 th_id = gap[gap_center] th = bins[th_id] if depth_center > th and semantic_center > 0: # if the FOV's center is on the further away surface and the FOV's # center is on-object, then we want to use the further-away surface. flip_sign = True return th, flip_sign
[docs] def get_surface_from_depth( self, depth_patch: np.ndarray, semantic_patch: np.ndarray, default_on_surface_th: Number, ) -> np.ndarray: """Return surface patch information from heuristics on depth patch. This function returns a binary mask indicating whether each pixel is "on-surface" using heuristics on the depth patch and the semantic mask. When we say "on-surface", we mean "on the part of the object that the sensor is centered on". For example, the sensor may be looking directly at the front lip of a mug, but the back lip of the mug is also in view. While both parts of the mug are on-object, we only want to use the front lip of the mug for later computation (such as point-normal extraction and curvature estimation), and so we want to mask out the back lip of the mug. Continuing with the the mug front/back lip example, we separate the front and back lips by their depth data. When we generate a histogram of pixel depths, we should see two distincts peaks of the histogram (or 3 peaks if there is a part of the field of view that is off-object entirely). We would then compute which depth threshold separates the two peaks that correspond to the two surfaces, and this is performed by `get_on_surface_th`. This function uses that value to mask out the pixels that are beyond that threshold. Finally, we element-wise multiply this surface mask by the semantic (i.e., object mask) mask which ensures that off-object observations are also masked out. Note that we most often don't have multiple surfaces in view. For example, when exploring a mug, we are most often looking direclty at one locally connected part of the mug, such as a patch on the mug's cylindrical body. In this case, we don't attempt to find a surface-separating threshold, and we instead use the default threshold `default_on_surface_th`. As with a computed threshold, anything beyond it is zeroed out. For surface agents, this threshold is usually quite small (e.g., 5 cm). For distant agents, the threshold is large (e.g., 1000 meters) which is functionally infinite. Args: depth_patch: sensor patch observations of depth semantic_patch: binary mask indicating on-object locations default_on_surface_th: default threshold to use if no bimodal distribution is found Returns: sensor patch shaped info about whether each pixel is on surface of not """ # avoid large range when seeing the table (goes up to almost 100 and then # just using 8 bins will not work anymore) depth_patch = np.array(depth_patch) depth_patch[depth_patch > 1] = 1.0 # If all depth values are at maximum (1.0), then we are automatically # off-object. if np.all(depth_patch >= 1.0): return np.zeros_like(depth_patch, dtype=bool) # Compute the on-suface depth threshold (and whether we need to flip the # sign), and apply it to the depth to get the semantic patch. th, flip_sign = self.get_on_surface_th( depth_patch, semantic_patch, min_depth_range=0.01, default_on_surface_th=default_on_surface_th, ) if flip_sign is False: surface_patch = depth_patch < th else: surface_patch = depth_patch > th return surface_patch * semantic_patch