tbp.monty.frameworks.utils#

tbp.monty.frameworks.utils.communication_utils#

get_percept_from_channel(percepts: list[Message], channel_name: str) Message[source]#

Given a list of percepts, return the percept of the specified channel.

Parameters:
  • percepts – List of percepts

  • channel_name – The name of the channel to return the percept for

Returns:

The percept of the specified channel

Raises:

ValueError – If the channel name is not found in the percepts

tbp.monty.frameworks.utils.dataclass_utils#

class Dataclass(*args, **kwargs)[source]#

Bases: Protocol

A protocol for dataclasses to be used in type hints.

This exists because dataclass.dataclass is not a valid type.

__init__(*args, **kwargs)#
as_dataclass_dict(obj)[source]#

Convert a dataclass instance to a serializable dataclass dict.

Parameters:

obj – The dataclass instance to convert.

Returns:

A dictionary with the dataclass fields and values.

Raises:

TypeError – If the object is not a dataclass instance.

create_dataclass_args(dataclass_name: str, function: Callable, base: type | None = None)[source]#

Create configuration dataclass args from given function arguments.

When the function arguments have type annotations, these annotations will be passed to the dataclass fields. Otherwise, the type will be inferred from any argument default value.

For example:

SingleSensorAgentArgs = create_dataclass_args(
    "SingleSensorAgentArgs", SingleSensorAgent.__init__
)

This is equivalent to:

@dataclass(frozen=True)
class SingleSensorAgentArgs:
    agent_id: AgentID
    sensor_id: str
    position: Tuple[float, float, float] = (0.0, 1.5, 0.0)
    rotation: Tuple[float, float, float, float] = (1.0, 0.0, 0.0, 0.0)
    height: float = 0.0
Parameters:
  • dataclass_name (str) – The name of the new dataclass.

  • function (Callable) – The function used to extract the parameters for the dataclass.

  • base (type | None) – Optional base class for the newly created dataclass.

Returns:

New dataclass with fields defined by the function arguments.

from_dataclass_dict(datadict)[source]#

Convert a serializable dataclass dict back into the original dataclass.

Assumes that the serializable dataclass dict was created via asdict().

Parameters:

datadict – The serializable dataclass dict to convert.

Returns:

The original dataclass instance.

Raises:

TypeError – If the object is not a dict instance.

tbp.monty.frameworks.utils.edge_detection#

tbp.monty.frameworks.utils.graph_matching_utils#

add_pose_features_to_tolerances(tolerances, default_tolerances=20) dict[source]#

Add default pose_vectors tolerances if not set.

Return type:

dict

Returns:

Tolerances dictionary with added pose_vectors if not set.

create_exponential_kernel(size, decay_rate)[source]#

Create an exponentially decaying kernel.

Used to convolve, for example, evidence history when determining whether we are on a new object.

Parameters:
  • size – Size of the kernel.

  • decay_rate – Decay rate of the kernel.

Returns:

Exponentially decaying kernel.

detect_new_object_exponential(max_ev_per_step, detection_threshold=-1.0, decay_rate=3)[source]#

Detect we’re on a new object using exponentially decaying evidence changes.

Evidence changes from multiple steps into the past are convolved by exponentially decaying constants, such that more recent steps carry more significance.

Parameters:
  • max_ev_per_step – List of the maximum evidence (across all locations/poses) for the current MLH object, across all steps of the current episode

  • detection_threshold – The total amount of negative evidence in the counter/sum that needs to be exceeded to determine that the LM has moved on to a new object.

  • decay_rate – The rate of decay that determines how much past evidence drops contribute to the current estimate of change.

Returns:

True if the total amount of negative evidence is less than or equal to the detection threshold; False otherwise.

detect_new_object_k_steps(max_ev_per_step, detection_threshold=-1.0, k=3, reset_at_positive_jump=False)[source]#

Detect we’re on a new object using the evidence changes from multiple steps.

Evidence changes from multiple steps into the past are considered. We look at the change in evidence over k discrete steps, weighing these equally.

Parameters:
  • max_ev_per_step – List of the maximum evidence (across all locations/poses) for the current MLH object, across all steps of the current episode

  • detection_threshold – The total amount of negative evidence in the counter/sum that needs to be exceeded to determine that the LM has moved on to a new object.

  • k – How many steps into the past to look when summing the negative change in evidence.

  • reset_at_positive_jump – Boolean to reset the accumulated changes when there is a positive increase in evidence, i.e., k is further limited by this history.

Returns:

True if the total evidence change is less than or equal to the detection threshold; False otherwise.

find_step_on_new_object(stepwise_targets, primary_target, n_steps_off_primary_target)[source]#

Returns the episode step at which we’ve moved off the primary target object.

The returned episode step is the first step at which we’ve been off the primary target for a total of n_steps_off_primary_target consecutive steps.

get_correct_k_n(k_n, num_datapoints)[source]#

Determine k_n given the number of datapoints.

The k_n specified in the hyperparameter may not be possible to achieve with the number of data points collected, so this function checks and adjusts k_n.

Parameters:
  • k_n – Current number of k nearest neighbors specified for graph building.

  • num_datapoints – Number of observations available to build the graph.

Returns:

Adjusted k_n.

get_custom_distances(nearest_node_locs, search_locs, search_sns, search_curvature)[source]#

Calculate custom distances modulated by surface normal and curvature.

Parameters:
  • nearest_node_locs – Locations of nearest nodes to search_locs (shape=(num_hyp, max_nneighbors, 3)).

  • search_locs – Search locations for each hypothesis (shape=(num_hyp, 3)).

  • search_sns – Sensed surface normal rotated by the hypothesis pose (shape=(num_hyp, 3)).

  • search_curvature – Magnitude of sensed curvature (maximum if using two principal curvatures) used to modulate the search sphere thickness in the direction of the surface normal (shape=1).

Returns:

custom distances of each nearest location

from its search location taking into account the hypothesis point normal and sensed curvature. shape=(num_hyp, max_nneighbors).

Return type:

custom_nearest_node_dists

get_initial_possible_poses(initial_possible_pose_type) list[Rotation] | None[source]#

Initialize initial_possible_poses to test based on initial_possible_pose_type.

Parameters:

initial_possible_pose_type

How to sample initial possible poses. Options are: - “uniform”: Sample uniformly from the space of possible poses. - “informed”: Sample poses that are likely to be possible based on

the object’s geometry and the first observation.

  • list of euler angles: Use a list of predefined poses to test (useful for

    debugging).

Returns:

List of initial possible poses to test, or None if initial_possible_pose_type is “informed”.

get_relevant_curvature(features)[source]#

Get the relevant curvature from features. Used to scale search sphere.

In the case of principal_curvatures and principal_curvatures_log we use the maximum absolute curvature between the two values. Otherwise we just return the curvature value.

Note

Not sure if other curvatures work as well as log curvatures since they may have too big of a range.

Returns:

Magnitude of sensed curvature (maximum if using two principal curvatures).

get_scaled_evidences(evidences, per_object=False)[source]#

Scale evidences to be in range [-1, 1] for voting.

This is useful so that the absolute evidence values don’t distort the votes (for example if one LM has already had a lot more matching steps than another and the evidence is not bounded). It is also useful to keep the evidence added from a single voting step in a reasonable range.

By default we normalize using the maximum and minimum evidence over all objects. There is also an option to scale the evidence for each object independently but that might give low evidence objects too much of a boost. We could probably remove this option.

Returns:

Scaled evidences.

get_uniform_initial_possible_poses(n_degrees_sampled=9)[source]#

Get an initial list of possible poses.

Parameters:

n_degrees_sampled – Number of degrees sampled for each axis. Defaults to 9, which means tested degrees are in [0., 45., 90., 135., 180., 225., 270., 315.]. This results in 512 unique pose combinations.

Depending on the displacement vector, some poses are equivalent (e.g., [0, 0, 0] and [180, 180, 180] or [0, 45, 90] and [180, 135, 270]). (a, b, c) == (a + 180, -b + 180, c + 180) (see https://books.google.gr/books?id=rn3OBQAAQBAJ p.267).

Returns:

List of poses to test.

get_unique_paths(possible_paths, threshold=0.01)[source]#

Get all unique paths in a list of possible paths.

Parameters:
  • possible_paths – List of possible paths (locations).

  • threshold – Minimum distance between two paths to be considered different. Defaults to 0.01.

Returns:

List of unique paths (locations).

is_in_ranges(array, ranges)[source]#

Check for each element in an array whether it is in its specified range.

Each element can have a different tolerance range.

Parameters:
  • array – Array of elements to check.

  • ranges – Sequence of (min, max) tuples defining the valid range for each element.

Returns:

True if all elements are in their respective ranges, False otherwise.

possible_sensed_directions(sensed_directions: np.ndarray, num_hyps_per_node: int) list[np.ndarray][source]#

Return the possible sensed directions for all nodes.

This function determines the possible sensed directions for a given set of sensed directions. It relies on two different behaviors depending on the value of num_hyps_per_node.

  • If num_hyps_per_node equals 2: then pose is well defined (i.e., PC1 != PC2).

    A well defined pose does not distinguish between mirrored directions of PC1 and PC2 (e.g., object can be upside down), therefore we sample both directions.

  • If num_hyps_per_node is not 2: this function samples additional poses in

    the plane perpendicular to the sensed surface normal.

Parameters:
  • sensed_directions – An array of sensed directions.

  • num_hyps_per_node – Number of rotations to get for each node.

Returns:

Possible sensed directions for all nodes at each rotation.

Return type:

possible_s_d

process_delta_evidence_values(max_ev_per_step)[source]#

Pre-process the max evidence values to get the change in evidence across steps.

Clip the values to be less than or equal to 0 and return the index of the most recent positive change in evidence.

Parameters:

max_ev_per_step – Sequence of maximum evidence values.

Returns:

Clipped change in evidence across steps. positive_jump_loc: Index of the most recent positive change in evidence.

Return type:

clipped_ev_changes

tbp.monty.frameworks.utils.live_plotter#

class LivePlotter[source]#

Bases: object

Class for plotting sensor observations during an experiment.

Set the show_sensor_output flag in the experiment config to True to enable live plotting.

WARNING: This plotter makes a number of assumptions right now. For example, it assumes that - sensor with ID “view_finder” exists - sensor with ID “patch” exists - “rgba” modality in “view_finder” sensor observation - “depth” modality in “patch” sensor observation

__init__()[source]#
add_text(mlh, pos, possible_matches, graph_ids, evidences)[source]#
hardcoded_assumptions(observation: Observations, model: Monty)[source]#

Extract some of the hardcoded assumptions from the observation.

TODO: Don’t do this. It is here for now to highlight the fragility of the live plotter implementation at the call site. We should make this less fragile by passing the necessary information to the live plotter.

Parameters:
  • observation (Observations) – The observation from the environment interface.

  • model (Monty) – The model.

Returns:

A tuple of the first learning module, the first sensor module raw observations, the patch depth, and the view finder rgba.

initialize_online_plotting()[source]#
setup_camera_ax()[source]#
setup_mlh_ax()[source]#
setup_sensor_ax()[source]#
show_mlh(mlh, mlh_model)[source]#
show_observations(first_learning_module, first_sensor_module_raw_observations, first_sensor_depth, view_finder_rgba, mlh, mlh_model, step: int, is_saccade_on_image_data_loader=False) None[source]#
Return type:

None

show_patch(first_sensor_depth)[source]#
show_view_finder(first_sensor_module_raw_observations, first_learning_module, first_sensor_depth, view_finder_rgba, is_saccade_on_image_data_loader)[source]#

tbp.monty.frameworks.utils.logging_utils#

accuracy_stats_for_compositional_objects(eval_stats_for_lm, parent_to_child_mapping)[source]#
add_evidence_lm_episode_stats(lm, stats, consistent_child_objects)[source]#
add_policy_episode_stats(lm, stats)[source]#
add_pose_lm_episode_stats(lm, stats)[source]#

Add possible poses of an LM to episode stats.

Parameters:
  • lm – LM instance from which to add the statistics.

  • stats – Statistics dictionary to update.

Returns:

Updated stats dictionary.

calculate_fpr(fp, tn)[source]#

Calculate False Positive Rate, aka specificity.

Parameters:
  • fp – false positives

  • tn – true negatives

Returns:

False Positive Rate

calculate_performance(stats, performance_type, lm, target_object)[source]#

Calculate performance of an LM on a given target object.

Parameters:
  • stats – Statistics dictionary to update.

  • performance_type – performance type index into stats

  • lm – Learning module for which to generate stats.

  • target_object – target (primary or stepwise) object for the LM to have converged to

Returns:

Updated stats dictionary.

calculate_tpr(tp, fn)[source]#

Calculate True Positive Rate, aka sensitivity.

Parameters:
  • tp – true positives

  • fn – false negatives

Returns:

True Positive Rate

check_detection_accuracy_at_step(stats, last_n_step=1)[source]#
check_rotation_accuracy(stats, last_n_step=1)[source]#
compositional_stats_for_all_lms(eval_stats, all_lm_ids, parent_to_child_mapping)[source]#
compute_pose_error(predicted_rotation: scipy.spatial.transform.Rotation, target_rotation: scipy.spatial.transform.Rotation, return_degrees: bool = False) float[source]#

Computes the minimum angular pose error between predicted and target rotations.

See compute_pose_errors for more details.

Parameters:
  • predicted_rotation (Rotation) – Predicted rotation(s). Can be a single or list of rotation.

  • target_rotation (Rotation) – Target rotation. Must represent a single rotation.

  • return_degrees (bool) – Whether to return the error in degrees. If False, returns the error in radians. Defaults to False.

Return type:

float

Returns:

The minimum angular error in radians (or degrees if return_degrees is True).

compute_pose_errors(predicted_rotation: Rotation, target_rotation: Rotation) npt.NDArray[np.float64] | float[source]#

Computes the angular pose errors between predicted and target rotations.

Both inputs must be instances of scipy.spatial.transform.Rotation. The predicted_rotation may contain a single rotation or a list of rotations, while target_rotation must be exactly one rotation.

The pose error is defined as the geodesic distance on SO(3) — the angle of the relative rotation between predicted and target. If predicted_rotation contains multiple rotations, this function returns the errors among them.

Note that the .inv() operation in this method is due to how geodesic distance between two rotations is calculated, not a side-effect of whether the target rotation is stored in its normal form, or as its inverse. The function therefore assumes that the orientations are already in the same coordinate system before the comparison.

Parameters:
  • predicted_rotation (Rotation) – Predicted rotation(s). Can be a single or list of rotation.

  • target_rotation (Rotation) – Target rotation. Must represent a single rotation.

Return type:

npt.NDArray[np.float64] | float

Returns:

The angular errors in radians.

compute_unsupervised_stats(possible_matches, target, graph_to_target, target_to_graph)[source]#

Compute statistics like how many graphs are built per object.

Parameters:
  • possible_matches – container of str that key into graph_to_target

  • target – str ground truth name of object being presented

  • graph_to_target – dict mapping each graph to the set of objects used to build

  • target_to_graph – dict mapping each object to the set of graphs that used it

Returns:

?

Return type:

dict

consistent_child_objects_accuracy(eval_stats_for_lm, parent_to_child_mapping)[source]#

Check whether most_likely_object is consistent with the parent_to_child_mapping.

Classified object is consistent if it is one of the children in the set of objects corresponding to the compositional object.

NOTE: This function is only called in compositional_stats_for_all_lms, which is a logging util, called from a notebook and hence none of our previous experiments should be affected by this or raise the ValueError, even if they don’t have a parent_to_child_mapping.

Returns:

The percentage of episodes in which a consistent child object is detected.

Raises:
  • ValueError – If the target object of an episode is not in the

  • parent_to_child_mapping.

deserialize_json_chunks(json_file, start=0, stop=None, episodes=None)[source]#

Deserialize one episode at a time from json file.

Only get episodes specified by arguments, which follow list / numpy like semantics.

Note

assumes line counter is exactly in line with episode keys

Parameters:
  • json_file – full path to the json file to load

  • start – int, get data starting at this episode

  • stop – int, get data ending at this episode, not inclusive as usual in Python

  • episodes – iterable of ints with episodes to pull

Returns:

dict containing contents of file_handle

Return type:

detailed_json

format_columns_for_wandb(lm_dict)[source]#

Various columns break wandb because we are playing fast and loose with types.

Put any standardizations here.

Parameters:

lm_dict – dict, part of a larger dict ~ {LM_0: lm_dict, LM_1: lm_dict}

Returns:

formatted lm_dict

get_graph_lm_episode_stats(lm)[source]#

Populate stats dictionary for one episode for an LM.

Parameters:

lm – Learning module for which to generate stats.

Returns:

dict with stats of one episode.

get_object_graph_stats(graph_to_target, target_to_graph)[source]#
get_reverse_rotation(rotation)[source]#
get_rgba_frames_single_sm(observations)[source]#

Convert a time series of rgba observations into format for wandb.Video.

Parameters:

observations – episode_stats[sm][___observations]

Returns:

formatted observations

get_stats_per_lm(model, target, episode_seed: int)[source]#

Loop through lms and get stats.

Parameters:
  • model – model instance

  • target – target object

  • episode_seed (int) – RNG seed used for the episode

Returns:

dict with stats per lm

Return type:

performance_dict

get_time_stats(all_ds, all_conditions) pandas.DataFrame[source]#

Get summary of run times in a dataframe for each condition.

Parameters:
  • all_ds – detailed stats (dict) for each condition

  • all_conditions – name of each condition

Return type:

DataFrame

Returns:

Runtime stats.

get_unique_euler_poses(poses)[source]#

Get unique poses for an object from possible poses per path.

Returns:

array of unique poses

Return type:

unique_poses

lm_stats_to_dataframe(stats, format_for_wandb=False)[source]#

Take in a dictionary and format into a dataframe.

Example:

{0: {LM_0: stats, LM_1: stats...}, 1:...} --> dataframe

Currently we are reporting once per episode, so the loop over episodes is only over a single key, value pair, but leaving it here because it is backward compatible.

Returns:

dataframe

load_models_from_dir(exp_path, pretrained_dict=None)[source]#
load_stats(exp_path, load_train=True, load_eval=True, load_detailed=True, load_models=True, pretrained_dict=None)[source]#

Load experiment statistics from an experiment for analysis.

Returns:

pandas DataFrame with training statistics eval_stats: pandas DataFrame with evaluation statistics detailed_stats: dict with detailed statistics lm_models: dict with loaded language models

Return type:

train_stats

matches_to_target_str(possible_matches, graph_to_target)[source]#

Get the possible target objects associated with each possible match.

Targets are concatenated into a single string name for easy saving in a csv.

Returns:

?

Return type:

dict

maybe_rename_existing_dir(dirpath: Path) None[source]#

If the given log directory already exists, rename it to <dirname>_old.

Raises:

ValueError – If dirpath is not a directory.

Return type:

None

maybe_rename_existing_file(filepath: Path) None[source]#

If the given log file already exists, rename it to <filename>_old.

Return type:

None

mean_num_steps_for_lm(eval_stats, lm_id)[source]#
overall_accuracy(eval_stats)[source]#
print_overall_stats(stats)[source]#
print_unsupervised_stats(stats, epoch_len)[source]#

Print stats of unsupervised learning experiment.

target_data_to_dict(target)[source]#

Format target params to dict.

Parameters:

target – target params

Returns:

dict with target params

total_size(o)[source]#

Returns the approximate memory footprint an object and all of its contents.

Automatically finds the contents of the following builtin containers and their subclasses: tuple, list, deque, dict, set and frozenset. To search other containers, add handlers to iterate over their contents:

handlers = {SomeContainerClass: iter,

OtherContainerClass: OtherContainerClass.get_elements}

The recursive recipe universally cited on stack exchange and blogs for gauging the size of Python objects in memory.

tbp.monty.frameworks.utils.object_model_utils#

class NumpyGraph(my_dict)[source]#

Bases: object

Alternative way to represent graphs without using torch.

Speeds up runtime significantly.

__init__(my_dict)[source]#
already_in_list(existing_points, new_point, features, clean_ids, query_id, graph_delta_thresholds) bool[source]#

Check if a given point is already in a list of points.

Parameters:
  • existing_points – List of x,y,z locations

  • new_point – new location

  • features – all features (both existing and candidate points)

  • clean_ids – indices (w.r.t “features”) that have been accepted into the graph and are compared to

  • query_id – index (w.r.t “features”) that is currently being considered

  • graph_delta_thresholds – Dictionary of thresholds used to determine whether a point should be considered sufficiently different so as to be included in the graph

Return type:

bool

Returns:

Whether the point is already in the list

build_point_cloud_graph(locations, features, feature_mapping)[source]#

Build a graph from observations without edges.

Parameters:
  • locations – array of x, y, z positions in space

  • features – dictionary of features at locations

  • feature_mapping

    ?

Returns:

A NumpyGraph containing the observed features at locations.

circular_mean(values)[source]#

Calculate the mean of a circular value such as hue where 0==1.

Returns:

Mean value.

expand_index_dims(indices_3d, last_dim_size)[source]#

Expand 3d indices to 4d indices by adding a 4th dimension with size.

Parameters:
  • indices_3d – 3d indices that should be converted to 4d

  • last_dim_size – desired size of the 4th dimension (will be filled with arange indices from 0 to last_dim_size-1)

Returns:

Tensor of 4d indices.

get_cubic_patches(arr_shape, centers, size)[source]#

Cut a cubic patch around a center id out of a 3d array.

NOTE: Currently not used. Was implemented for draft of nn search in grid.

Returns:

New centers and mask.

get_most_common_bool(booleans)[source]#

Get most common value out of a list of boolean values.

Returns:

True when we have equally many True as False entries.

get_most_common_value(values)[source]#

Get most common value out of a list of values (i.e., the mode).

Returns:

Most common value.

get_values_from_dense_last_dim(tensor, index_3d)[source]#

Get values from 4d tensor at indices in last dimension.

This function assumes that the entries in the last dimension are dense. This is the case in all our sparse tensors where the first 3 dimensions represent the 3d location (sparse) and the 4th represents values at this location (dense).

Returns:

List of values.

increment_sparse_tensor_by_count(old_tensor, indices)[source]#
pose_vector_mean(pose_vecs, pose_fully_defined)[source]#

Calculate mean of pose vectors.

This takes into account that surface normals may contain observations from two surface sides and curvature directions have an ambiguous direction. It also enforces them to stay orthogonal.

If not pose_fully_defined, the curvature directions are meaningless, and we just return the first observation. Theoretically this shouldn’t matter, but it can save some computation time.

Returns:

Tuple containing the representative pose vector mean and a bool indicating whether we used curvature directions to update it.

remove_close_points(point_cloud, features, graph_delta_thresholds, old_graph_index)[source]#

Remove points from a point cloud unless sufficiently far away.

Points are removed unless sufficiently far away either by Euclidean distance, or feature-space.

Parameters:
  • point_cloud – List of 3D points

  • features

    ?

  • graph_delta_thresholds – dictionary of thresholds; if the L-2 distance between the locations of two observations (or other feature-distance measure) is below all of the given thresholds, then a point will be considered insufficiently interesting to be added

  • old_graph_index – If the graph is not new, the index associated with the final point in the old graph; we will skip this when checking for sameness, as they will already have been compared in the past to one-another, saving computation.

Returns:

List of 3D points that are sufficiently novel w.r.t one-another, along with their associated indices.

torch_graph_to_numpy(torch_graph)[source]#

Turn a torch geometric data structure into a dict with numpy arrays.

Parameters:

torch_graph – Torch geometric data structure.

Returns:

NumpyGraph.

tbp.monty.frameworks.utils.plot_utils#

A collection of plot utilities used during normal platform runtime.

add_patch_outline_to_view_finder(view_finder_image, center_pixel_id, patch_size)[source]#
mark_obs(vis_obs, patch_obs)[source]#

Mark vis_obs with the observations from a patch.

Returns:

Marked observations.

tbp.monty.frameworks.utils.plot_utils_analysis#

tbp.monty.frameworks.utils.plot_utils_dev#

tbp.monty.frameworks.utils.profile_utils#

bar_chart_cumtime(df, n_functions=None)[source]#
bar_chart_tottime(df, n_functions=None)[source]#
drop_filename(string)[source]#

Drop filename for shorter strings and easier viz.

We do this because strings for code calls are long.

Returns:

String without filename.

get_data_from_df(df, sortby='cumtime')[source]#
get_total_time(df)[source]#
linebreak_long_strings(string, chars_per_line=40)[source]#

Strings with filename are long, try to get them more readable in bar plots.

Parameters:
  • string – String to format.

  • chars_per_line – Number of characters per line. Defaults to 40.

Returns:

Formatted string.

print_top_k_functions(func_names, k=20)[source]#
sort_by_cumtime(df)[source]#
sort_by_tottime(df)[source]#

tbp.monty.frameworks.utils.sensor_processing#

arc_from_projection(tangent_projection: float, curvature: float, threshold: float = 0.001) float[source]#

Correct displacement to true arc length on a curved surface.

When a sensor moves along a curved surface, the straight-line displacement measured in the tangent plane underestimates the true distance traveled along the curve. This function corrects that by converting the displacement projection back to the actual arc length.

The correction assumes that the surface is locally a circle. This approximation holds well when curvature is approximately constant over the displacement, but is inaccurate for surfaces with rapidly varying curvature.

The relationship between arc length and its tangent-plane projection on a circle of curvature k is:

tangent_projection = sin(k * arc_length) / k arc_length = arcsin(k * tangent_projection) / k

Reference:

Do Carmo, M.P. “Differential Geometry of Curves and Surfaces”, 2nd ed., Dover, 2016, Section 3-2.

Note

The formula works for both convex and concave surfaces because the arc-to-projection geometry on a circle is the same regardless of the sign of curvature.

Parameters:
  • tangent_projection (float) – Signed displacement component projected onto a tangent-plane basis direction.

  • curvature (float) – Normal curvature along the basis direction (from Euler’s formula). May be positive (convex) or negative (concave).

  • threshold (float) – Skip correction when |k * p| < threshold (the flat approximation is already accurate).

Return type:

float

Returns:

Estimated signed arc length. Returns tangent_projection unchanged if |k * p| < threshold (arc-chord difference is negligible) or |k * p| >= 1.0 (arcsin domain guard).

arc_length_corrected_displacement(du: float, dv: float, basis_u: np.ndarray, basis_v: np.ndarray, principal_curvatures: np.ndarray, curvature_pose_vectors: np.ndarray) tuple[float, float][source]#

Convert chord-length displacements to arc-length along each basis axis.

Uses Euler’s formula to find the normal curvature in each basis direction, then corrects the flat-plane displacement to the corresponding arc length.

Parameters:
  • du – Displacement along basis_u (chord length).

  • dv – Displacement along basis_v (chord length).

  • basis_u – First tangent-frame basis vector.

  • basis_v – Second tangent-frame basis vector.

  • principal_curvatures – Array [k1, k2] of principal curvature magnitudes.

  • curvature_pose_vectors – Pose matrix whose rows [1] and [2] are the principal curvature directions.

Returns:

Arc-length-corrected displacements.

Return type:

(arc_u, arc_v)

center_neighbors(point_cloud, center_id, neighbor_patch_frac)[source]#

Get neighbors within a given neighborhood of the patch center.

Returns:

Locations and semantic IDs of all points within a given neighborhood of the patch center that lie on an object.

curvature_at_point(point_cloud, center_id, normal)[source]#

Compute principal curvatures from a point cloud.

Computes the two principal curvatures of a 2D surface and the corresponding principal directions.

Parameters:
  • point_cloud – Point cloud (2D numpy array) on which the local surface is approximated.

  • center_id – Center point around which the local curvature is estimated.

  • normal – Surface normal at the center point.

Returns:

First principal curvature. k2: Second principal curvature. dir1: First principal direction. dir2: Second principal direction.

Return type:

k1

directional_curvature(movement_direction: numpy.typing.ArrayLike, k1: float, k2: float, pc1_dir: numpy.ndarray, pc2_dir: numpy.ndarray) float[source]#

Compute normal curvature in a given direction via Euler’s curvature formula.

Returns the scalar normal curvature of the surface along movement_direction, given the two principal curvatures and their directions.

k(theta) = k1 * cos^2(theta) + k2 * sin^2(theta)

where theta is the angle between movement_direction and pc1_dir.

This formula is only valid when pc1_dir and pc2_dir are the principal curvature directions and not for arbitrary orthonormal vectors.

Reference: Weisstein, Eric W. “Euler Curvature Formula.” MathWorld. https://mathworld.wolfram.com/EulerCurvatureFormula.html

Parameters:
  • movement_direction (ArrayLike) – Direction vector (will be normalized).

  • k1 (float) – First principal curvature (corresponds to pc1_dir).

  • k2 (float) – Second principal curvature (corresponds to pc2_dir).

  • pc1_dir (ndarray) – First principal curvature direction (unit vector in tangent plane).

  • pc2_dir (ndarray) – Second principal curvature direction (unit vector in tangent plane).

Return type:

float

Returns:

Normal curvature in the given direction.

Raises:

ValueError – If pc1_dir and pc2_dir are not orthogonal, or if movement_direction does not lie in the plane spanned by pc1_dir and pc2_dir.

is_coplanar(basis_1: numpy.typing.ArrayLike, basis_2: numpy.typing.ArrayLike, vector: numpy.typing.ArrayLike, tolerance: float = 1e-06) bool[source]#
Return type:

bool

is_orthogonal(v1: numpy.typing.ArrayLike, v2: numpy.typing.ArrayLike, tolerance: float = 1e-06) bool[source]#
Return type:

bool

is_unit_vector(vector: numpy.typing.ArrayLike, tolerance: float = 1e-06) bool[source]#
Return type:

bool

log_sign(to_scale)[source]#

Apply symlog to the input array, preserving sign.

This implementation ensures that the sign of the input values is preserved and avoids extreme outputs when values are close to 0.

Parameters:

to_scale – Array to scale.

Returns:

Scaled values of the array.

pixel_dist_to_center(n_points, patch_width, center_id)[source]#

Extract the relative distance of each pixel to the patch center (in pixel space).

Parameters:
  • n_points – Total number of points in the patch.

  • patch_width – Width of the square patch.

  • center_id – ID of the patch center.

Returns:

Relative distance of each pixel to the patch center (in pixel space).

point_pair_features(pos_i, pos_j, normal_i, normal_j)[source]#

Return point pair features between two points.

Parameters:
  • pos_i – Location of point 1.

  • pos_j – Location of point 2.

  • normal_i – Surface normal of point 1.

  • normal_j – Surface normal of point 2.

Returns:

Point pair features.

principal_curvatures(point_cloud_base, center_id, n_dir, neighbor_patch_frac=2.13, weighted=True, fit_intercept=True)[source]#

Compute principal curvatures from a point cloud.

Computes the two principal curvatures of a 2D surface and the corresponding principal directions.

Parameters:
  • point_cloud_base – Point cloud (2D numpy array) based on which the 2D surface is approximated.

  • center_id – Center point around which the local curvature is estimated.

  • n_dir – Surface normal at the center point.

  • neighbor_patch_frac – Fraction of the patch width that defines the standard deviation of the Gaussian distribution used to sample the weights; this defines a local neighborhood for principal curvature computation.

  • weighted – Boolean flag that determines if regression is weighted. The weighting scheme is defined in weight_matrix().

  • fit_intercept – Boolean flag that determines whether to fit an intercept term for the regression.

Returns:

First principal curvature. k2: Second principal curvature. dir1: First principal direction. dir2: Second principal direction.

Return type:

k1

scale_clip(to_scale, clip)[source]#

Clip values into a range and scale with the square root.

This can be used to bring Gaussian and mean curvatures into a reasonable range and remove outliers, which makes it easier to handle noise. The sign is preserved before applying the square root.

Parameters:
  • to_scale – Array where each element should be scaled.

  • clip – Range to which the array values should be clipped.

Returns:

Scaled values of the array.

surface_normal_naive(point_cloud, patch_radius_frac=2.5)[source]#

Estimate surface normal.

This is a very simplified alternative to open3d’s estimate_normals where we make use of several assumptions specific to our case: - we know which locations are neighboring locations from the camera patch

arrangement

  • we only need the surface normal at the center of the patch

TODO: Calculate surface normal from multiple points at different distances (tan_len

values) and then take the average of them. Test if this improves robustness to raw sensor noise.

Parameters:
  • point_cloud – List of 3D coordinates with flags indicating whether each point lies on the object. Shape = [n, 4].

  • patch_radius_frac – Fraction of observation size to use for SN calculation. Default of 2.5 means that we look half_obs_dim//2.5 to the left, right, up and down. With a resolution of 64x64 that would be 12 pixels. The calculated tan_len (in this example 12) describes the distance of pixels used to span up the two tangent vectors to calculate the surface normals. These two vectors are then used to calculate the surface normal by taking the cross product. If we set tan_len to a larger value, the surface normal is more influenced by the global shape of the patch.

Returns:

Estimated surface normal at the center of the patch. valid_sn: Boolean indicating whether the surface normal was valid (True by

default); an invalid surface normal means there were not enough points in the patch to make any estimate of the surface normal.

Return type:

norm

surface_normal_ordinary_least_squares(sensor_frame_data, cam_to_world, center_id, neighbor_patch_frac=3.2)[source]#

Extracts the surface normal direction from a noisy point cloud.

Uses ordinary least-squares fitting with error minimization along the view direction.

Parameters:
  • sensor_frame_data – Point cloud in sensor coordinates (assumes the full patch is provided, i.e., no preliminary filtering of off-object points).

  • cam_to_world – Matrix defining the sensor-to-world frame transformation.

  • center_id – ID of the center point in the point cloud.

  • neighbor_patch_frac – Fraction of the patch width that defines the local neighborhood within which to perform the least-squares fitting.

Returns:

Estimated surface normal at the center of the patch. valid_sn: Boolean indicating whether the surface normal was valid; defaults

to True. An invalid surface normal means there were not enough points in the patch to make any estimate of the surface normal.

Return type:

surface_normal

surface_normal_total_least_squares(point_cloud_base, center_id, view_dir, neighbor_patch_frac=3.2)[source]#

Extracts the surface normal direction from a noisy point-cloud.

Uses total least-squares fitting. Error minimization is independent of the view direction.

Parameters:
  • point_cloud_base – Point cloud in world coordinates (assumes the full patch is provided, i.e., no preliminary filtering of off-object points).

  • center_id – ID of the center point in the point cloud.

  • view_dir – Viewing direction used to adjust the sign of the estimated surface normal.

  • neighbor_patch_frac – Fraction of the patch width that defines the local neighborhood within which to perform the least-squares fitting.

Returns:

Estimated surface normal at the center of the patch. valid_sn: Boolean indicating whether the surface normal was valid; defaults

to True. An invalid surface normal means there were not enough points in the patch to make any estimate of the surface normal.

Return type:

norm

weight_matrix(n_points, center_id, neighbor_patch_frac=2.13)[source]#

Extract individual pixel weights for least-squares fitting.

Each pixel weight is sampled from a Gaussian distribution based on its distance to the patch center.

Parameters:
  • n_points – Total number of points in the full RGB-D square patch.

  • center_id – ID of the center point in the point cloud.

  • neighbor_patch_frac – Fraction of the patch width that defines the standard deviation of the Gaussian distribution used to sample the weights.

Returns:

Diagonal weight matrix of shape (n_points, 1).

tbp.monty.frameworks.utils.spatial_arithmetics#

class TangentFrame(surface_normal: numpy.ndarray) None[source]#

Bases: object

Orthonormal tangent frame on a surface.

Maintains a right-handed (u, v, n) basis where n is the surface normal, u is the horizontal tangent direction, and v is the vertical tangent direction. As the sensor moves across a curved surface, transport() rotates the tangent frame to match the new normal.

See:

https://en.wikipedia.org/wiki/Parallel_transport

Parameters:

surface_normal (ndarray) – Unit surface normal at the initial point.

__init__(surface_normal: numpy.ndarray) None[source]#

Initialize an orthonormal (u, v) basis in the tangent plane of a surface.

A surface normal defines a tangent plane but not a unique basis. We choose basis_u as the cross product of some_axis and the surface_normal, giving a horizontal tangent direction. basis_v follows as the cross product of the surface_normal and basis_u.

If the surface_normal is nearly parallel to some_axis (|cos(theta)| > 0.95), we fall back to using [0, 0, 1] to avoid a degenerate cross product.

Parameters:

surface_normal (ndarray) – Unit surface normal at the initial point.

transport(new_normal: numpy.ndarray) None[source]#

Parallel-transport the frame to a new surface normal.

As the sensor moves along a curved surface, the tangent plane rotates with the curvature (e.g. around a cylinder). Parallel transport transforms the basis (u, v) by exactly the rotation needed to stay in the new tangent plane. This is analogous to “unrolling” the curved surface.

Parameters:

new_normal (ndarray) – Unit surface normal at the new point.

Return type:

None

property basis_u: numpy.ndarray#

Horizontal tangent basis vector.

property basis_v: numpy.ndarray#

Vertical tangent basis vector.

property normal: numpy.ndarray#

Surface normal associated with this tangent frame.

align_multiple_orthonormal_vectors(ms1, ms2, as_scipy=True)[source]#

Calculate rotations between multiple orthonormal vector sets.

Parameters:
  • ms1 – Multiple orthonormal vector sets with shape = (N, 3, 3).

  • ms2 – Orthonormal vectors to align with, shape = (3, 3).

  • as_scipy – Whether to return a list of N scipy.Rotation objects or a np.array of rotation matrices (N, 3, 3).

Returns:

List of N Rotations that align ms2 with each element in ms1.

align_orthonormal_vectors(m1, m2, as_scipy=True)[source]#

Calculate the rotation that aligns two sets of orthonormal vectors.

Parameters:
  • m1 – First set of orthonormal vectors.

  • m2 – Second set of orthonormal vectors to align with.

  • as_scipy – Whether to return a scipy rotation object or a rotation matrix. Defaults to True.

Returns:

If as_scipy is True, a tuple (Rotation, float) containing the alignment rotation and the corresponding alignment error. Otherwise returns (np.ndarray, None), where the array is the rotation matrix aligning the vectors.

apply_rf_transform_to_points(locations, features, location_rel_model, object_location_rel_body, object_rotation, object_scale=1)[source]#

Apply location and rotation transform to locations and features.

These transforms tell us how to transform new observations into the existing model reference frame. They are calculated from the detected object pose.

Parameters:
  • locations – Locations to transform (in body reference frame). Shape (N, 3)

  • features – Features to transform (in body reference frame). Shape (N, F)

  • location_rel_model – Detected location of the sensor on the object (object reference frame).

  • object_location_rel_body – Location of the sensor in the body reference frame.

  • object_rotation – Rotation of the object in the world relative to the learned model of the object. Expresses how the object model needs to be rotated to be consistent with the observations. To transform the observed locations (rel. body) into the models reference frame, the inverse of this rotation is applied.

  • object_scale – Scale of the object relative to the model. Not used yet.

Note

Function can also be used in different contexts besides transforming points from body to object centric reference frame.

Returns:

Transformed locations features: Transformed features

Return type:

transformed_locations

check_orthonormal(matrix)[source]#
euler_to_quats(euler_rots, invert=False)[source]#

Convert Euler rotations to quaternions.

Parameters:
  • euler_rots – Euler rotations

  • invert – Whether to invert the rotation. Defaults to False.

Returns:

Quaternions

get_angle(vec1, vec2)[source]#

Get angle between two vectors.

NOTE: For efficiency reasons we assume vec1 and vec2 are already normalized (which is the case for surface normals and curvature directions).

Parameters:
  • vec1 – Vector 1

  • vec2 – Vector 2

Returns:

angle in radians

get_angle_beefed_up(v1, v2)[source]#

Return the angle in radians between vectors ‘v1’ and ‘v2’.

If one of the vectors is undefined, return an arbitrarily large distance.

If one of the vectors is the zero vector, return an arbitrarily large distance.

Also enforces that vectors are unit vectors, which makes it less efficient than the standard get_angle.

>>> angle_between_vecs((1, 0, 0), (0, 1, 0))
1.5707963267948966
>>> angle_between_vecs((1, 0, 0), (1, 0, 0))
0.0
>>> angle_between_vecs((1, 0, 0), (-1, 0, 0))
3.141592653589793
get_angle_torch(v1, v2)[source]#

Get angle between two torch vectors.

Parameters:
  • v1 – Vector 1

  • v2 – Vector 2

Returns:

angle in radians

get_angles_for_all_hypotheses(hyp_f, query_f)[source]#

Get all angles for hypotheses and their neighbors at once.

hyp_f shape = (num_hyp, num_nn, 3) query_f shape = (num_hyp, 3)

for each hypothesis we want to get num_nn angles.

Return shape = (num_hyp, num_nn)

Parameters:
  • hyp_f – Hypotheses features three pose vectors

  • query_f – Query features three pose vectors

Returns:

Angles between hypotheses and query pose vectors

get_more_directions_in_plane(vecs, n_poses) list[np.ndarray][source]#

Get a list of unit vectors, evenly spaced in a plane orthogonal to vecs[0].

This is used to sample possible poses orthogonal to the surface normal when the curvature directions are undefined (like on a flat surface).

Parameters:
  • vecs – Vector to get more directions in plane for

  • n_poses – Number of poses to get

Returns:

List of vectors evenly spaced in a plane orthogonal to vecs[0]

get_right_hand_angle(v1, v2, surface_normal)[source]#
get_unique_rotations(poses, similarity_th, get_reverse_r=True)[source]#

Get unique scipy.Rotations out of a list, given a similarity threshold.

Parameters:
  • poses – List of poses to get unique rotations from

  • similarity_th – Similarity threshold

  • get_reverse_r – Whether to get the reverse rotation. Defaults to True.

Returns:

Unique euler poses r_poses: Unique rotations corresponding to euler_poses

Return type:

euler_poses

is_parallel(v1: numpy.typing.ArrayLike, v2: numpy.typing.ArrayLike, tolerance: float = 1e-06) bool[source]#

True when v1 and v2 point in the same or opposite direction.

Assumes unit-length inputs. The metric 1 - |cos(theta)| is compared against tolerance.

Parameters:
  • v1 (ArrayLike) – First unit vector.

  • v2 (ArrayLike) – Second unit vector.

  • tolerance (float) – Maximum value of 1 - |cos(theta)| to consider parallel.

Return type:

bool

Returns:

True if v1 and v2 are parallel (same or opposite direction).

non_singular_mat(a)[source]#

Return True if a matrix is non-singular, i.e. can be inverted.

Uses the condition number of the matrix, which will approach a very large value, given by (1 / sys.float_info.epsilon) (where epsilon is the smallest possible floating-point difference)

normalize(v: numpy.typing.ArrayLike, epsilon: float = 1e-06) numpy.ndarray[source]#

Normalize a vector to unit length.

Parameters:
  • v (ArrayLike) – Input vector to normalize.

  • epsilon (float) – Small epsilon value below which the vector is considered zero.

Return type:

ndarray

Returns:

Unit vector in the direction of v in v’s dtype.

Raises:

ValueError – If the vector has near-zero length (norm < epsilon).

pose_is_new(all_poses, new_pose, similarity_th) bool[source]#

Check if a pose is different from a list of poses.

Use the magnitude of the difference between quaternions as a measure for similarity and check that it is below pose_similarity_threshold.

Return type:

bool

Returns:

True if the pose is new, False otherwise

project_onto_tangent_plane(v: numpy.typing.ArrayLike, n: numpy.typing.ArrayLike) numpy.ndarray[source]#

Project a vector onto the tangent plane perpendicular to a normal.

Removes the component of v that is parallel to n, leaving only the component that lies in the plane perpendicular to n.

Parameters:
  • v (ArrayLike) – Vector to project.

  • n (ArrayLike) – Normal vector defining the tangent plane. Normalized internally.

Return type:

ndarray

Returns:

The projection of v onto the plane perpendicular to n.

rot_mats_to_quats(rot_mats, invert=False)[source]#

Convert rotation matrices to quaternions.

Parameters:
  • rot_mats – Rotation matrices

  • invert – Whether to invert the rotation. Defaults to False.

Returns:

Quaternions

rotate_multiple_pose_dependent_features(features, ref_frame_rot) dict[source]#

Rotate surface normal and curve dirs given a rotation matrix.

Parameters:
  • features – dict of features with pose vectors to rotate. Pose vectors have shape (N, 9)

  • ref_frame_rot – scipy rotation to rotate pose vectors with.

Return type:

dict

Returns:

Features with rotated pose vectors

rotate_pose_dependent_features(features, ref_frame_rots) dict[source]#

Rotate pose_vectors given a list of rotation matrices.

Parameters:
  • features – dict of features with pose vectors to rotate. pose vectors have shape (3, 3)

  • ref_frame_rots – Rotation matrices to rotate pose features by. Can either be - A single scipy rotation (as used in FeatureGraphLM) - An array of rotation matrices of shape (N, 3, 3) or (3, 3) (as used in EvidenceGraphLM).

Return type:

dict

Returns:

Original features but with the pose_vectors rotated. If multiple rotations were given, pose_vectors entry will now contain multiple entries of shape (N, 3, 3).

rotations_to_quats(rotations, invert=False)[source]#

tbp.monty.frameworks.utils.transform_utils#

numpy_to_scipy_quat(quat)[source]#

Convert from wxyz to xyzw format of quaternions.

i.e. identity rotation in scipy is (0,0,0,1).

Parameters:

quat – A quaternion in wxyz format

Returns:

A quaternion in xyzw format

rotation_as_quat(rot: scipy.spatial.transform.Rotation, scalar_first: bool = True) numpy.ndarray[source]#

Convert a scipy rotation to its quaternion representation.

Scipy added a scalar_first argument to Rotation.as_quat in version 1.14.0. (https://scipy.github.io/devdocs/release/1.14.0-notes.html). This function backports that behavior. Note, however, that scipy defaults to scalar-last format.

Parameters:
  • rot (Rotation) – The scipy rotation object to convert.

  • scalar_first (bool) – Whether to return the array in (w, x, y, z) or (x, y, z, w) order. Defaults to True, i.e., (w, x, y, z) order.

Return type:

ndarray

Returns:

An array with shape (4,) representing a single quaternion, or an array with shape (N, 4) representing N quaternions.

rotation_from_quat(quat: numpy.typing.ArrayLike, scalar_first: bool = True) scipy.spatial.transform.Rotation[source]#

Create a scipy rotation object from a quaternion.

Scipy added a scalar_first argument to Rotation.from_quat in version 1.14.0. (https://scipy.github.io/devdocs/release/1.14.0-notes.html). This function backports that behavior. Note, however, that scipy defaults to scalar-last format.

Parameters:
  • quat (ArrayLike) – An array with shape (4,) for a single quaternion, or an array with shape (N, 4) for N quaternions.

  • scalar_first (bool) – Whether the scalar component is first or last. Defaults to True, i.e., (w, x, y, z) order.

Return type:

Rotation

Returns:

The scipy rotation object.

scipy_to_numpy_quat(quat: numpy.ndarray) quaternion.quaternion[source]#
Return type:

quaternion