Source code for tbp.monty.frameworks.utils.sensor_processing

# Copyright 2025 Thousand Brains Project
# Copyright 2023-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

import logging

import numpy as np
import torch

from tbp.monty.frameworks.utils.spatial_arithmetics import (
    get_angle_torch,
    get_right_hand_angle,
    non_singular_mat,
)


[docs]def get_point_normal_naive(point_cloud, patch_radius_frac=2.5): """Estimate point normal. This is a very simplified alternative to open3d's estimate_normals where we make use of several assumptions specific to our case: - we know which locations are neighboring locations from the camera patch arrangement - we only need the point normal at the center of the patch TODO: Calculate point normal from multiple points at different distances (tan_len values) and then take the average of them. Test if this improves robustness to raw sensor noise. Args: point_cloud: list of 3d coordinates and whether the points are on the object or not. shape = [n, 4] patch_radius_frac: Fraction of observation size to use for PN calculation. Default of 2.5 means that we look half_obs_dim//2.5 to the left, right, up and down. With a resolution of 64x64 that would be 12 pixels. The calculated tan_len (in this example 12) describes the distance of pixels used to span up the two tangent vectors to calculate the point normals. These two vectors are then used to calculate the point normal by taking the cross product. If we set tan_len to a larger value the point normal is more influenced by the global shape of the patch. Returns: norm: Estimated point normal at center of patch valid_pn: Boolean for whether the point-normal was valid or not (True by default); an invalid point-normal means there were not enough points in the patch to make any estimate of the point-normal """ obs_dim = int(np.sqrt(point_cloud.shape[0])) half_obs_dim = obs_dim // 2 center_id = half_obs_dim + obs_dim * half_obs_dim assert patch_radius_frac > 1, "patch_radius_frac needs to be > 1" tan_len = int(half_obs_dim // patch_radius_frac) found_point_normal = False valid_pn = True while not found_point_normal: center_id_up = half_obs_dim + obs_dim * (half_obs_dim - tan_len) center_id_down = half_obs_dim + obs_dim * (half_obs_dim + tan_len) vecup = point_cloud[center_id_up, :3] - point_cloud[center_id, :3] vecdown = point_cloud[center_id_down, :3] - point_cloud[center_id, :3] vecright = point_cloud[center_id + tan_len, :3] - point_cloud[center_id, :3] vecleft = point_cloud[center_id - tan_len, :3] - point_cloud[center_id, :3] vecup_norm = vecup / np.linalg.norm(vecup) vecdown_norm = vecdown / np.linalg.norm(vecdown) vecright_norm = vecright / np.linalg.norm(vecright) vecleft_norm = vecleft / np.linalg.norm(vecleft) # check if tan_len up and right end up on the object and calculate pn from those norm1, norm2 = None, None if (point_cloud[center_id_up, 3] > 0) and ( point_cloud[center_id + tan_len, 3] > 0 ): norm1 = -np.cross(vecup_norm, vecright_norm) # check if tan_len down and left end up on the object and calculate # pn from those if (point_cloud[center_id_down, 3] > 0) and ( point_cloud[center_id - tan_len, 3] > 0 ): norm2 = -np.cross(vecdown_norm, vecleft_norm) # If any of the surrounding points were not on the object only use one pn. if norm1 is None: norm1 = norm2 if norm2 is None: norm2 = norm1 if norm1 is None and norm2 is None: # Try the opposite if (point_cloud[center_id_up, 3] > 0) and ( point_cloud[center_id - tan_len, 3] > 0 ): norm1 = np.cross(vecup_norm, vecleft_norm) # check if tan_len down and left end up on the object and calculate # pn from those if (point_cloud[center_id_down, 3] > 0) and ( point_cloud[center_id + tan_len, 3] > 0 ): norm2 = np.cross(vecdown_norm, vecright_norm) if norm1 is None: norm1 = norm2 if norm2 is None: norm2 = norm1 if norm1 is not None: found_point_normal = True else: # if none of the combinations worked then 3/4 points are off the object # -> try a smaller tan_len tan_len = tan_len // 2 if tan_len < 1: # logging.debug( # "Too many off object points around center for point-normal" # ) norm1 = norm2 = [0, 0, 1] valid_pn = False found_point_normal = True norm = np.mean([norm1, norm2], axis=0) # norm = np.cross(vec1_norm, vec2_norm) norm = norm / np.linalg.norm(norm) return norm, valid_pn
[docs]def get_point_normal_ordinary_least_squares( sensor_frame_data, world_camera, center_id, neighbor_patch_frac=3.2 ): """Extracts the point-normal direction from a noisy point-cloud. Uses ordinary least-square fitting with error minimization along the view direction. Args: sensor_frame_data: point-cloud in sensor coordinates (assumes full patch is provided i.e. no preliminary filtering of off-object points). world_camera: matrix defining sensor-to-world frame transformation. center_id: id of the center point in point_cloud. neighbor_patch_frac: fraction of the patch width that defines the local neighborhood within which to perform the least-squares fitting. Returns: point_normal: Estimated point normal at center of patch valid_pn: Boolean for whether the point-normal was valid or not. Defaults to True. An invalid point-normal means there were not enough points in the patch to make any estimate of the point-normal """ point_cloud = sensor_frame_data.copy() # Make sure that patch center is on the object if point_cloud[center_id, 3] > 0: # Define local neighborhood for least-squares fitting # Only use neighbors that lie on an object to extract point normals neighbors_on_obj = get_center_neighbors( point_cloud, center_id, neighbor_patch_frac ) # Solve linear least-square regression: X^{T}X w = X^{T}y <==> Aw = b x_mat = neighbors_on_obj.copy() x_mat[:, 2] = np.ones((neighbors_on_obj.shape[0],)) y = neighbors_on_obj[:, 2] a_mat = np.matmul(x_mat.T, x_mat) b = np.matmul(x_mat.T, y) valid_pn = True if non_singular_mat(a_mat): w = np.linalg.solve(a_mat, b) # Compute surface normal from fitted weights and normalize it point_normal = np.ones((3,)) point_normal[:2] = -w[:2].copy() point_normal = point_normal / np.linalg.norm(point_normal) # Make sure point-normal points upwards if point_normal[2] < 0: point_normal *= -1 # Express point-normal back to world coordinate frame point_normal = np.matmul(world_camera[:3, :3], point_normal) else: # Not enough point to compute point_normal = np.array([0.0, 0.0, 1.0]) valid_pn = False logging.debug("Warning : Singular matrix encountered in get_point_normal!") # Patch center does not lie on an object else: point_normal = np.array([0.0, 0.0, 1.0]) valid_pn = False logging.debug("Warning : Patch center does not lie on an object!") return point_normal, valid_pn
[docs]def get_point_normal_total_least_squares( point_cloud_base, center_id, view_dir, neighbor_patch_frac=3.2 ): """Extracts the point-normal direction from a noisy point-cloud. Uses total least-square fitting. Error minimization is independent of view direction. Args: point_cloud_base: point-cloud in world coordinates (assumes full patch is provided i.e. no preliminary filtering of off-object points). center_id: id of the center point in point_cloud. view_dir: viewing direction used to adjust the sign of the estimated point-normal. neighbor_patch_frac: fraction of the patch width that defines the local neighborhood within which to perform the least-squares fitting. Returns: norm: Estimated point normal at center of patch valid_pn: Boolean for whether the point-normal was valid or not. Defaults to True. An invalid point-normal means there were not enough points in the patch to make any estimate of the point-normal """ point_cloud = point_cloud_base.copy() # Make sure that patch center is on the object if point_cloud[center_id, 3] > 0: # Define local neighborhood for least-squares fitting # Only use neighbors that lie on an object to extract point normals neighbors_on_obj = get_center_neighbors( point_cloud, center_id, neighbor_patch_frac ) # Compute matrix M and p_mean for TLS regression n_points = neighbors_on_obj.shape[0] x_mat = neighbors_on_obj.copy() p_mean = 1 / n_points * np.mean(x_mat, axis=0, keepdims=True).T m_mat = 1 / n_points * np.matmul(x_mat.T, x_mat) - np.matmul(p_mean, p_mean.T) try: # find eigenvector of M with min eigenvalue eig_val, eig_vec = np.linalg.eig(m_mat) n_dir = eig_vec[:, np.argmin(eig_val)] valid_pn = True # Align PN with viewing direction if np.dot(view_dir, n_dir) < 0: n_dir *= -1 except np.linalg.LinAlgError: n_dir = np.array([0.0, 0.0, 1.0]) valid_pn = False logging.debug("Warning : Non-diagonalizable matrix for PN estimation!") # Patch center does not lie on an object else: n_dir = np.array([0.0, 0.0, 1.0]) valid_pn = False logging.debug("Warning : Patch center does not lie on an object!") return n_dir, valid_pn
# Old version to get point normal with open3d. Leaving it here in # case we ever want to refer back to it. # def get_point_normal_open3d( # point_cloud, center_id, sensor_location, on_object_only=True # ): # """Estimate point normal at the center point of a point cloud. # # Args: # point_cloud: List of 3D locations # center_id: ID of center point in the point cloud # sensor_location: location of sensor. Used to have the point normal # point towards the sensor. # # Returns: # Point normal at center_id # """ # if on_object_only and point_cloud[center_id, 3] <= 0: # # center of sensor patch is not on object # return [0, 0, 1] # if on_object_only: # on_obj = point_cloud[:, 3] > 0 # adjusted_center_id = sum(on_obj[:center_id]) # point_cloud = point_cloud[on_obj, :3] # else: # # consider even off-object points # adjusted_center_id = center_id # point_cloud = point_cloud[:, :3] # pcd = o3d.geometry.PointCloud() # pcd.points = o3d.utility.Vector3dVector(point_cloud) # # This is what takes long on cloud CPUs (~0.002s on laptop but 0.3 on cloud) # pcd.estimate_normals( # search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=64) # ) # pcd.orient_normals_towards_camera_location(camera_location=sensor_location) # point_normal = pcd.normals[adjusted_center_id] # return point_normal # Old implementation for principal curvature extraction. Refer to # get_principal_curvatures() function for new implementation.
[docs]def get_curvature_at_point(point_cloud, center_id, normal): """Compute principal curvatures from point cloud. Computes the two principal curvatures of a 2D surface and corresponding principal directions Args: point_cloud: point cloud (2d numpy array) based on which the 2D surface is approximated center_id: center point around which the local curvature is estimated normal: surface normal at the center point Returns: k1: first principal curvature k2: second principal curvature dir1: first principal direction dir2: second principal direction """ if point_cloud[center_id, 3] > 0: on_obj = point_cloud[:, 3] > 0 adjusted_center_id = sum(on_obj[:center_id]) point_cloud = point_cloud[on_obj, :3] # Step 1) project point coordinates onto local reference frame computed based # on the normal: # get local reference frame (ev,fv,nv) at x: nv = normal / np.linalg.norm(normal) # in case normal is not normalized # find two directions ev and fv orthogonal to nv: e = np.zeros(3) if abs(nv[0]) < 0.5: e[0] = 1 elif abs(nv[1]) < 0.5: e[1] = 1 else: e[2] = 1 f = np.cross(nv, e) fv = f / np.linalg.norm(f) ev = np.cross(fv, nv) x = point_cloud[adjusted_center_id] dx = point_cloud - x # our point x is the origin of the local reference frame # compute projections: u = np.dot(dx, ev) v = np.dot(dx, fv) w = np.dot(dx, nv) # Step 2) do least-squares fit to get the parameters of the quadratic form # w=a*u*u+b*v*v+c*u*v+d*u+e*v: data = np.zeros((len(point_cloud), 5)) data[:, 0] = np.multiply(u, u) data[:, 1] = np.multiply(v, v) data[:, 2] = np.multiply(u, v) data[:, 3] = u data[:, 4] = v beta = np.dot(np.transpose(data), w) a = np.dot(np.transpose(data), data) # Rarely, "a" can be singular, causing numpy to throw an error; appears # to be caused by surface-agent gathering observations that are largely off the # object, but not entirely (e.g. <25% visible), resulting in a system # with insufficient data to be solvable if non_singular_mat(a): params = np.linalg.solve(a, beta) # Step 3) compute 1st and 2nd fundamental forms guv and buv: guv = np.zeros((2, 2)) guv[0, 0] = 1 + params[3] * params[3] guv[0, 1] = params[3] * params[4] guv[1, 0] = guv[0, 1] guv[1, 1] = 1 + params[4] * params[4] buv = np.zeros((2, 2)) buv[0, 0] = 2 * params[0] buv[0, 1] = params[2] buv[1, 0] = buv[0, 1] buv[1, 1] = 2 * params[1] # Step 4) compute the principle curvatures and directions: # TODO: here convex PCs are negative but I think they should be positive m = np.linalg.inv(guv).dot(buv) eigval, eigvec = np.linalg.eig(m) idx = eigval.argsort()[::-1] eigval_sorted = eigval[idx] eigvec_sorted = eigvec[:, idx] k1 = eigval_sorted[0] k2 = eigval_sorted[1] # TODO: sometimes dir1 and dir2 are not orthogonal, why? # principal directions in the same coordinate frame as points: dir1 = eigvec_sorted[0, 0] * ev + eigvec_sorted[1, 0] * fv dir2 = eigvec_sorted[0, 1] * ev + eigvec_sorted[1, 1] * fv if get_right_hand_angle(dir1, dir2, nv) < 0: # always have dir2 point to the righthand side of dir1 dir2 = -dir2 valid_pc = True else: k1, k2, dir1, dir2 = 0, 0, [0, 0, 0], [0, 0, 0] valid_pc = False logging.debug( "Warning : Singular matrix encountered in get-curvature-at-point!" ) else: k1, k2, dir1, dir2 = 0, 0, [0, 0, 0], [0, 0, 0] valid_pc = False return k1, k2, dir1, dir2, valid_pc
[docs]def get_principal_curvatures( point_cloud_base, center_id, n_dir, neighbor_patch_frac=2.13, weighted=True, fit_intercept=True, ): """Compute principal curvatures from point cloud. Computes the two principal curvatures of a 2D surface and corresponding principal directions Args: point_cloud_base: point cloud (2d numpy array) based on which the 2D surface is approximated center_id: center point around which the local curvature is estimated n_dir: surface normal at the center point neighbor_patch_frac: fraction of the patch width that defines the std of the gaussian distribution used to sample the weights. Defines a local neighborhood for principal curvature computation. weighted: boolean flag that determines if regression is weighted or not. Weighting scheme is defined in get_weight_matrix. fit_intercept: boolean flag that determines whether to fit an intercept term for the regression. Returns: k1: first principal curvature k2: second principal curvature dir1: first principal direction dir2: second principal direction """ point_cloud = point_cloud_base.copy() if point_cloud[center_id, 3] > 0: # Make sure point positions are expressed relative to the center point point_cloud[:, :3] -= point_cloud[center_id, :3] # Filter out points that are not on the object on_obj = point_cloud[:, 3] > 0 point_cloud = point_cloud[on_obj, :3] # find two directions u_dir and v_dir orthogonal to point-normal (n_dir): # If n_dir's z coef is 0 then normal is pointing in (x,y) plane u_dir = ( np.array([1.0, 0.0, -n_dir[0] / n_dir[2]]) if n_dir[2] else np.array([0.0, 0.0, 1.0]) ) u_dir /= np.linalg.norm(u_dir) v_dir = np.cross(n_dir, u_dir) v_dir /= np.linalg.norm(v_dir) # Project point coordinates onto local reference frame u = np.matmul(point_cloud, u_dir) v = np.matmul(point_cloud, v_dir) n = np.matmul(point_cloud, n_dir) # Compute the basis functions (features) for quadratic regression (only # fit the intercept if fit_intercept = True) # n = a * u^2 + b * v^2 + c * u * v + d * u + e * v (+ d) n_features = 6 if fit_intercept else 5 x_mat = np.zeros((len(point_cloud), n_features)) x_mat[:, 0] = np.multiply(u, u) x_mat[:, 1] = np.multiply(v, v) x_mat[:, 2] = np.multiply(u, v) x_mat[:, 3] = u x_mat[:, 4] = v if fit_intercept: x_mat[:, 5] = np.ones(u.shape) # Quadratic regression comes down to solving a linear system: A * u = b # Expressions for A and b differ depending on if regression is weighted. if weighted: # Compute the weights for weighted least-square regression n_points = on_obj.shape[0] weights = get_weight_matrix( n_points, center_id, neighbor_patch_frac=neighbor_patch_frac ) weights = weights[on_obj, :] # Filter off-object points # Extract matrices for weighted least-squares regression. # A = X.T * W * X and b = X.T * W * n a_mat = np.matmul(x_mat.T, np.multiply(weights, x_mat)) b = np.matmul(x_mat.T, np.multiply(weights, n[:, np.newaxis])) else: # Extract matrices for non-weighted least-squares regression. # A = X.T * X and b = X.T * n a_mat = np.matmul(x_mat.T, x_mat) b = np.matmul(x_mat.T, n[:, np.newaxis]) # Rarely, "a" can be singular, causing numpy to throw an error; appears # to be caused by touch-sensor gathering observations that are largely off the # object, but not entirely (e.g. <25% visible), resulting in a system # with insufficient data to be solvable if non_singular_mat(a_mat): # Step 2) do least-squares fit to get the parameters of the quadratic form params = np.linalg.solve(a_mat, b) # Step 3) compute 1st and 2nd fundamental forms guv and buv: # TODO: Extract improved point normal estimate from fitted curve guv = np.zeros((2, 2)) guv[0, 0] = 1 + params[3] * params[3] guv[0, 1] = params[3] * params[4] guv[1, 0] = guv[0, 1] guv[1, 1] = 1 + params[4] * params[4] buv = np.zeros((2, 2)) buv[0, 0] = 2 * params[0] buv[0, 1] = params[2] buv[1, 0] = buv[0, 1] buv[1, 1] = 2 * params[1] # Step 4) compute the principle curvatures and directions: # TODO: here convex PCs are negative but I think they should be positive m = np.linalg.inv(guv).dot(buv) eigval, eigvec = np.linalg.eig(m) idx = eigval.argsort()[::-1] eigval_sorted = eigval[idx] eigvec_sorted = eigvec[:, idx] k1 = eigval_sorted[0] k2 = eigval_sorted[1] # TODO: sometimes dir1 and dir2 are not orthogonal, why? # principal directions in the same coordinate frame as points: pc1_dir = eigvec_sorted[0, 0] * u_dir + eigvec_sorted[1, 0] * v_dir pc2_dir = eigvec_sorted[0, 1] * u_dir + eigvec_sorted[1, 1] * v_dir if get_right_hand_angle(pc1_dir, pc2_dir, n_dir) < 0: # always have dir2 point to the righthand side of dir1 pc2_dir = -pc2_dir valid_pc = True else: k1, k2, pc1_dir, pc2_dir = 0, 0, [0, 0, 0], [0, 0, 0] valid_pc = False logging.debug( "Warning : Singular matrix encountered in get-curvature-at-point!" ) else: k1, k2, pc1_dir, pc2_dir = 0, 0, [0, 0, 0], [0, 0, 0] valid_pc = False return k1, k2, pc1_dir, pc2_dir, valid_pc
[docs]def get_center_neighbors(point_cloud, center_id, neighbor_patch_frac): """Get neighbors within a given neighborhood of the patch center. Returns: Locations and semantic id of all points within a given neighborhood of the patch center which lie on an object. """ # Set patch center as origin of coordinate frame point_cloud[:, :3] -= point_cloud[center_id, :3] # Extract high-level parameters n_points = point_cloud.shape[0] patch_width = int(np.sqrt(n_points)) neighbor_radius = patch_width / neighbor_patch_frac # Compute pixel distances to patch center (in pixel space). dist_to_center = get_pixel_dist_to_center(n_points, patch_width, center_id) # Use distances to define local neighborhood. is_neighbor = dist_to_center <= neighbor_radius neighbors_idx = is_neighbor.reshape((n_points,)) neighbors = point_cloud[neighbors_idx, :] # Filter out points that do not lie on an object neighbors_on_obj = neighbors[neighbors[:, 3] > 0, :3] return neighbors_on_obj
[docs]def get_weight_matrix(n_points, center_id, neighbor_patch_frac=2.13): """Extracts individual pixel weights for least-squares fitting. Weight for each pixel is sampled from a gaussian distribution based on its distance to the patch center. Args: n_points: total number of points in the full RGB-D square patch. center_id: id of the center point in point_cloud. neighbor_patch_frac: fraction of the patch width that defines the std of the gaussian distribution used to sample the weights. Returns: w_diag """ # Extract center and all its neighbors patch_width = int(np.sqrt(n_points)) sigma = patch_width / neighbor_patch_frac # Compute pixel distances to patch center (in pixel space). dist_to_center = get_pixel_dist_to_center(n_points, patch_width, center_id) # Compute weight matrix based on those distances w_coefs = ( 1.0 / (np.sqrt(2 * np.pi) * sigma) * np.exp(-np.square(dist_to_center) / (2 * sigma**2)) ) w_diag = w_coefs.reshape((n_points, 1)) w_diag /= np.sum(w_diag) return w_diag
[docs]def get_pixel_dist_to_center(n_points, patch_width, center_id): """Extracts the relative distance of each pixel to patch center (in pixel space). Returns: Relative distance of each pixel to patch center (in pixel space) """ # Get coordinates (in pixel space) of all pixels in the patch point_idx = np.arange(n_points).reshape(patch_width, patch_width) x, y = np.meshgrid(np.arange(patch_width), np.arange(patch_width)) pos = np.dstack((x, y)) # Compute relative distance to patch center pos_center = pos[point_idx == center_id] dist_to_center = np.linalg.norm(pos - pos_center, axis=2) return dist_to_center
[docs]def point_pair_features(pos_i, pos_j, normal_i, normal_j): """Get point pair features between two points. Args: pos_i: Location of point 1 pos_j: Location of point 2 normal_i: Point normal of point 1 normal_j: Point normal of point 2 Returns: Point pair feature """ pseudo = pos_j - pos_i return torch.stack( [ pseudo.norm(p=2), get_angle_torch(normal_i, pseudo), get_angle_torch(normal_j, pseudo), get_angle_torch(normal_i, normal_j), ] )
[docs]def scale_clip(to_scale, clip): """Clip values into range and scale with sqrt. This can be used to get gaussian and mean curvatures into a reasonable range and remove outliers. Makes it easier to deal with noise. Preserves sign before applying sqrt. Args: to_scale: array where each element should be scaled. clip: range to which the array values should be clipped. Returns: scaled values of array. """ to_scale = np.clip(to_scale, -clip, clip) negative = to_scale < 0 scaled = np.sqrt(np.abs(to_scale)) if len(scaled.shape) == 0: # just a scalar value if negative: scaled = scaled * -1 else: # an array scaled[negative] = scaled[negative] * -1 return scaled
[docs]def log_sign(to_scale): """Apply symlog to input array, preserving sign. This implementation makes sure to preserve the sign of the input values and to avoid extreme outputs when values are close to 0. Args: to_scale: array to scale. Returns: Scaled values of array. """ to_scale = np.asarray(to_scale) sign = np.sign(to_scale) # preserve sign abs_vals = np.abs(to_scale) log_vals = np.log(abs_vals + 1) # avoid extreme values around 0 scaled_vals = sign * log_vals return scaled_vals