tbp.monty.frameworks.utils#
tbp.monty.frameworks.utils.communication_utils#
- get_first_sensory_state(states)[source]#
Given a list of states, return the first one from a sensory channel.
- Returns:
First state from a sensory channel.
- get_state_from_channel(states, channel_name)[source]#
Given a list of states, return the state of the specified channel.
- Parameters:
states – List of states
channel_name – The name of the channel to return the state for
- Returns:
The state of the specified channel
- Raises:
ValueError – If the channel name is not found in the states
tbp.monty.frameworks.utils.dataclass_utils#
- class Dataclass(*args, **kwargs)[source]#
Bases:
ProtocolA protocol for dataclasses to be used in type hints.
This exists because dataclass.dataclass is not a valid type.
- __init__(*args, **kwargs)#
- as_dataclass_dict(obj)[source]#
Convert a dataclass instance to a serializable dataclass dict.
- Parameters:
obj – The dataclass instance to convert.
- Returns:
A dictionary with the dataclass fields and values.
- Raises:
TypeError – If the object is not a dataclass instance.
- create_dataclass_args(dataclass_name: str, function: Callable, base: type | None = None)[source]#
Create configuration dataclass args from given function arguments.
When the function arguments have type annotations, these annotations will be passed to the dataclass fields. Otherwise, the type will be inferred from any argument default value.
For example:
SingleSensorAgentArgs = create_dataclass_args( "SingleSensorAgentArgs", SingleSensorAgent.__init__ )
This is equivalent to:
@dataclass(frozen=True) class SingleSensorAgentArgs: agent_id: AgentID sensor_id: str position: Tuple[float, float, float] = (0.0, 1.5, 0.0) rotation: Tuple[float, float, float, float] = (1.0, 0.0, 0.0, 0.0) height: float = 0.0
- Parameters:
dataclass_name (str) – The name of the new dataclass.
function (Callable) – The function used to extract the parameters for the dataclass.
base (type | None) – Optional base class for the newly created dataclass.
- Returns:
New dataclass with fields defined by the function arguments.
- from_dataclass_dict(datadict)[source]#
Convert a serializable dataclass dict back into the original dataclass.
Assumes that the serializable dataclass dict was created via
asdict().- Parameters:
datadict – The serializable dataclass dict to convert.
- Returns:
The original dataclass instance.
- Raises:
TypeError – If the object is not a dict instance.
tbp.monty.frameworks.utils.evidence_matching#
- exception InvalidEvidenceThresholdConfig[source]#
Bases:
ValueErrorRaised when the evidence update threshold is invalid.
- class ChannelMapper(channel_sizes: dict[str, int] | None = None) None[source]#
Bases:
objectMarks the range of hypotheses that correspond to each input channel.
The EvidenceGraphLM implementation stacks the hypotheses from all input channels in the same array to perform efficient vector operations on them. Therefore, we need to keep track of which indices in the stacked array correspond to which input channel. This class stores only the sizes of the input channels in an ordered data structure (OrderedDict), and computes the range of indices for each channel. Storing the sizes of channels in an ordered dictionary allows us to insert or remove channels, as well as dynamically resize them.
- __init__(channel_sizes=None)[source]#
Initializes the ChannelMapper with an ordered dictionary of channel sizes.
- Parameters:
channel_sizes – Dictionary of {channel_name: size}.
- add_channel(channel_name: str, size: int, position: int | None = None) None[source]#
Adds a new channel at a specified position (default is at the end).
- Parameters:
channel_name (str) – The name of the new channel.
size (int) – The size of the new channel.
position (int | None) – The index at which to insert the channel.
- Raises:
ValueError – If the channel already exists or position is out of bounds.
- Return type:
None
- channel_range(channel_name: str) tuple[int, int][source]#
Returns the start and end indices of the given channel.
- Parameters:
channel_name – The name of the channel.
- Returns:
The start and end indices of the channel.
- Raises:
ValueError – If the channel is not found.
- channel_size(channel_name: str) int[source]#
Returns the total number of hypotheses for a specific channel.
- Return type:
- Returns:
Size of channel
- Raises:
ValueError – If the channel is not found.
- delete_channel(channel_name: str) None[source]#
Delete a channel from the mapping.
- Parameters:
channel_name (
str) – The name of the channel to delete.- Raises:
ValueError – If the channel is not found.
- Return type:
- extract(original: numpy.ndarray, channel: str) numpy.ndarray[source]#
Extracts the portion of the original array corresponding to a given channel.
- Parameters:
- Return type:
- Returns:
The extracted slice of the original array. Returns a view, not a copy of the original array.
- Raises:
ValueError – If the channel is not found.
- extract_hypotheses(hypotheses: Hypotheses, channel: str) ChannelHypotheses[source]#
Extracts the hypotheses corresponding to a given channel.
- Parameters:
hypotheses (
Hypotheses) – The full hypotheses array across all channels.channel (
str) – The name of the channel to extract.
- Return type:
- Returns:
The hypotheses corresponding to the given channel.
- resize_channel_to(channel_name: str, new_size: int) None[source]#
Sets the size of the given channel to a specific value.
This function will also delete the channel if the new_size is 0.
- Parameters:
- Raises:
ValueError – If the channel is not found or if the new size is not positive.
- Return type:
- update(original: numpy.ndarray, channel: str, data: numpy.ndarray) numpy.ndarray[source]#
Inserts data into the original array at the position of the given channel.
This function inserts new data at the index range previously associated with the provided channel. If the new data is of the same shape as the existing channel data shape, we simply replace the data at the channel range indices. Otherwise, we split the original array around the input channel range, then concatenate the before and after splits with the data to be inserted. This accommodates ‘data’ being of a different size than the current channel size.
For example, if original has the shape (20, 3), channel start index is 10, channel end index is 13, and the data has the shape (5, 3). We would concatenate as such: (original[0:10], data, original[13:]). This will result in an array of the shape (22, 3), i.e., we removed 3 rows and added new 5 rows.
- Parameters:
- Return type:
- Returns:
The resulting array after insertion. Can return a new copy or a view, depending on whether the inserted data is of the same size as the existing channel.
- Raises:
ValueError – If the channel is not found.
- class EvidenceSlopeTracker(window_size: int = 10, min_age: int = 5) None[source]#
Bases:
objectTracks the slopes of evidence streams over a sliding window per input channel.
Each input channel maintains its own hypotheses with independent evidence histories. This tracker supports adding, updating, pruning, and analyzing hypotheses per channel.
Note
- One optimization might be to treat the array of tracked values as a ring-like
structure. Rather than shifting the values every time they are updated, we could just iterate an index which determines where in the ring we are. Then we would update one column, which based on the index, corresponds to the most recent values.
- Another optimization is only track slopes not the actual evidence values. The
pairwise slopes for previous scores are not expected to change over time and therefore can be calculated a single time and stored.
- We can also test returning a random subsample of indices with
slopes < mean(slopes) for to_remove instead of using np.argsort.
- window_size#
Number of past values to consider for slope calculation.
- min_age#
Minimum number of updates before a hypothesis can be considered for removal.
- evidence_buffer#
Maps channel names to their hypothesis evidence buffers.
- hyp_age#
Maps channel names to hypothesis age counters.
- __init__(window_size: int = 10, min_age: int = 5) None[source]#
Initializes the EvidenceSlopeTracker.
- add_hyp(num_new_hyp: int, channel: str) None[source]#
Adds new hypotheses to the specified input channel.
- calculate_slopes(channel: str) numpy.typing.NDArray.numpy.float64[source]#
Computes the average slope of hypotheses in a channel.
This method calculates the slope of the evidence signal for each hypothesis by subtracting adjacent values along the time dimension (i.e., computing deltas between consecutive evidence values). It then computes the average of these differences while ignoring any missing (NaN) values. For hypotheses with no valid evidence differences (e.g., all NaNs), the slope is returned as NaN.
- Parameters:
channel (
str) – Name of the input channel.- Return type:
float64- Returns:
Array of average slopes, one per hypothesis.
- hyp_ages(channel: str) numpy.typing.NDArray.numpy.int_[source]#
Returns the ages of hypotheses in a channel.
- Parameters:
channel (
str) – Name of the input channel.- Return type:
int_
- removable_indices_mask(channel: str) numpy.typing.NDArray.numpy.bool_[source]#
Returns a boolean mask for removable hypotheses in a channel.
- Parameters:
channel (
str) – Name of the input channel.- Return type:
bool_- Returns:
Boolean array indicating removable hypotheses (age >= min_age).
- remove_hyp(hyp_ids: numpy.typing.NDArray.numpy.int_, channel: str) None[source]#
Removes specific hypotheses by index in the specified channel.
- select_hypotheses(slope_threshold: float, channel: str) HypothesesSelection[source]#
Returns a hypotheses selection given a slope threshold.
- A hypothesis is maintained if:
Its slope is >= the threshold, OR
It is not yet removable due to age.
- Parameters:
- Return type:
- Returns:
A selection of hypotheses to maintain.
- Raises:
ValueError – If the channel does not exist.
- update(values: numpy.typing.NDArray.numpy.float64, channel: str) None[source]#
Updates all hypotheses in a channel with new evidence values.
- Parameters:
values (
float64) – List or array of new evidence values.channel (
str) – Name of the input channel.
- Raises:
ValueError – If the channel doesn’t exist or the number of values is incorrect.
- Return type:
- class HypothesesSelection(maintain_mask: numpy.typing.NDArray.numpy.bool_) None[source]#
Bases:
objectEncapsulates the selection of hypotheses to maintain or remove.
This class stores a boolean mask indicating which hypotheses should be maintained. From this mask, it can return the indices and masks for both the maintained and removed hypotheses. It also provides convenience constructors for creating a selection from maintain/remove masks or from maintain/remove index lists.
- _maintain_mask#
Boolean mask of shape (N,) where True indicates a maintain hypothesis and False indicates a remove hypothesis.
- __init__(maintain_mask: numpy.typing.NDArray.numpy.bool_) None[source]#
Initializes a HypothesesSelection from a maintain mask.
- Parameters:
maintain_mask (
bool_) – Boolean array-like of shape (N,) where True indicates a maintained hypothesis and False indicates a removed hypothesis.
- classmethod from_maintain_ids(total_size: int, ids: numpy.typing.NDArray.numpy.int_) HypothesesSelection[source]#
Creates a hypotheses selection from maintain indices.
- Parameters:
total_size (
int) – Total number of hypotheses.ids (
int_) – Indices of hypotheses to maintain.
- Return type:
- Returns:
A HypothesesSelection instance.
- Raises:
IndexError – If any index is out of range [0, total_size).
- classmethod from_maintain_mask(mask: numpy.typing.NDArray.numpy.bool_) HypothesesSelection[source]#
Creates a selection from a maintain mask.
- Parameters:
mask (
bool_) – Boolean array-like where True indicates a maintained hypothesis.- Return type:
- Returns:
A HypothesesSelection instance.
Note
This method is added from completeness, but it is redundant as it calls the default class __init__ function.
- classmethod from_remove_ids(total_size: int, ids: numpy.typing.NDArray.numpy.int_) HypothesesSelection[source]#
Creates a selection from remove indices.
- Parameters:
total_size (
int) – Total number of hypotheses.ids (
int_) – Indices of hypotheses to remove.
- Return type:
- Returns:
A HypothesesSelection instance.
- Raises:
IndexError – If any index is out of range [0, total_size).
- classmethod from_remove_mask(mask: numpy.typing.NDArray.numpy.bool_) HypothesesSelection[source]#
Creates a hypotheses selection from a remove mask.
- Parameters:
mask (
bool_) – Boolean array-like where True indicates a hypothesis to remove.- Return type:
- Returns:
A HypothesesSelection instance.
- property maintain_ids: numpy.typing.NDArray.numpy.int_#
Returns the indices of maintained hypotheses.
- property maintain_mask: numpy.typing.NDArray.numpy.bool_#
Returns the maintain mask.
- property remove_ids: numpy.typing.NDArray.numpy.int_#
Returns the indices of removed hypotheses.
- property remove_mask: numpy.typing.NDArray.numpy.bool_#
Returns the remove mask.
- evidence_update_threshold(evidence_threshold_config: float | str, x_percent_threshold: float | str, max_global_evidence: float, evidence_all_channels: np.ndarray) float[source]#
Determine how much evidence a hypothesis should have to be updated.
- Parameters:
evidence_threshold_config (float | str) – The heuristic for deciding which hypotheses should be updated.
x_percent_threshold (float | str) – The x_percent value to use for deciding on the evidence_update_threshold when the x_percent_threshold is used as a heuristic.
max_global_evidence (float) – Highest evidence of all hypotheses (i.e., current mlh evidence),
evidence_all_channels (np.ndarray) – Evidence values for all hypotheses.
- Return type:
float
- Returns:
The evidence update threshold.
Note
The logic of evidence_threshold_config=”all” can be optimized by bypassing the np.min function here and bypassing the indexing of np.where function in the displacer. We want to update all the existing hypotheses, therefore there is no need to find the specific indices for them in the hypotheses space.
- Raises:
InvalidEvidenceThresholdConfig – If evidence_threshold_config is not in the allowed values
tbp.monty.frameworks.utils.graph_matching_utils#
- add_pose_features_to_tolerances(tolerances, default_tolerances=20) dict[source]#
Add default pose_vectors tolerances if not set.
- Return type:
- Returns:
Tolerances dictionary with added pose_vectors if not set.
- create_exponential_kernel(size, decay_rate)[source]#
Create an exponentially decaying kernel.
Used to convolve, for example, evidence history when determining whether we are on a new object.
- Parameters:
size – Size of the kernel.
decay_rate – Decay rate of the kernel.
- Returns:
Exponentially decaying kernel.
- detect_new_object_exponential(max_ev_per_step, detection_threshold=-1.0, decay_rate=3)[source]#
Detect we’re on a new object using exponentially decaying evidence changes.
Evidence changes from multiple steps into the past are convolved by exponentially decaying constants, such that more recent steps carry more significance.
- Parameters:
max_ev_per_step – List of the maximum evidence (across all locations/poses) for the current MLH object, across all steps of the current episode
detection_threshold – The total amount of negative evidence in the counter/sum that needs to be exceeded to determine that the LM has moved on to a new object.
decay_rate – The rate of decay that determines how much past evidence drops contribute to the current estimate of change.
- Returns:
True if the total amount of negative evidence is less than or equal to the detection threshold; False otherwise.
- detect_new_object_k_steps(max_ev_per_step, detection_threshold=-1.0, k=3, reset_at_positive_jump=False)[source]#
Detect we’re on a new object using the evidence changes from multiple steps.
Evidence changes from multiple steps into the past are considered. We look at the change in evidence over k discrete steps, weighing these equally.
- Parameters:
max_ev_per_step – List of the maximum evidence (across all locations/poses) for the current MLH object, across all steps of the current episode
detection_threshold – The total amount of negative evidence in the counter/sum that needs to be exceeded to determine that the LM has moved on to a new object.
k – How many steps into the past to look when summing the negative change in evidence.
reset_at_positive_jump – Boolean to reset the accumulated changes when there is a positive increase in evidence, i.e., k is further limited by this history.
- Returns:
True if the total evidence change is less than or equal to the detection threshold; False otherwise.
- find_step_on_new_object(stepwise_targets, primary_target, n_steps_off_primary_target)[source]#
Returns the episode step at which we’ve moved off the primary target object.
The returned episode step is the first step at which we’ve been off the primary target for a total of n_steps_off_primary_target consecutive steps.
- get_correct_k_n(k_n, num_datapoints)[source]#
Determine k_n given the number of datapoints.
The k_n specified in the hyperparameter may not be possible to achieve with the number of data points collected, so this function checks and adjusts k_n.
- Parameters:
k_n – Current number of k nearest neighbors specified for graph building.
num_datapoints – Number of observations available to build the graph.
- Returns:
Adjusted k_n.
- get_custom_distances(nearest_node_locs, search_locs, search_sns, search_curvature)[source]#
Calculate custom distances modulated by surface normal and curvature.
- Parameters:
nearest_node_locs – Locations of nearest nodes to search_locs (shape=(num_hyp, max_nneighbors, 3)).
search_locs – Search locations for each hypothesis (shape=(num_hyp, 3)).
search_sns – Sensed surface normal rotated by the hypothesis pose (shape=(num_hyp, 3)).
search_curvature – Magnitude of sensed curvature (maximum if using two principal curvatures) used to modulate the search sphere thickness in the direction of the surface normal (shape=1).
- Returns:
- custom distances of each nearest location
from its search location taking into account the hypothesis point normal and sensed curvature. shape=(num_hyp, max_nneighbors).
- Return type:
custom_nearest_node_dists
- get_initial_possible_poses(initial_possible_pose_type) list[Rotation] | None[source]#
Initialize initial_possible_poses to test based on initial_possible_pose_type.
- Parameters:
initial_possible_pose_type –
How to sample initial possible poses. Options are: - “uniform”: Sample uniformly from the space of possible poses. - “informed”: Sample poses that are likely to be possible based on
the object’s geometry and the first observation.
- list of euler angles: Use a list of predefined poses to test (useful for
debugging).
- Returns:
List of initial possible poses to test, or None if initial_possible_pose_type is “informed”.
- get_relevant_curvature(features)[source]#
Get the relevant curvature from features. Used to scale search sphere.
In the case of principal_curvatures and principal_curvatures_log we use the maximum absolute curvature between the two values. Otherwise we just return the curvature value.
Note
Not sure if other curvatures work as well as log curvatures since they may have too big of a range.
- Returns:
Magnitude of sensed curvature (maximum if using two principal curvatures).
- get_scaled_evidences(evidences, per_object=False)[source]#
Scale evidences to be in range [-1, 1] for voting.
This is useful so that the absolute evidence values don’t distort the votes (for example if one LM has already had a lot more matching steps than another and the evidence is not bounded). It is also useful to keep the evidence added from a single voting step in a reasonable range.
By default we normalize using the maximum and minimum evidence over all objects. There is also an option to scale the evidence for each object independently but that might give low evidence objects too much of a boost. We could probably remove this option.
- Returns:
Scaled evidences.
- get_uniform_initial_possible_poses(n_degrees_sampled=9)[source]#
Get an initial list of possible poses.
- Parameters:
n_degrees_sampled – Number of degrees sampled for each axis. Defaults to 9, which means tested degrees are in [0., 45., 90., 135., 180., 225., 270., 315.]. This results in 512 unique pose combinations.
Depending on the displacement vector, some poses are equivalent (e.g., [0, 0, 0] and [180, 180, 180] or [0, 45, 90] and [180, 135, 270]). (a, b, c) == (a + 180, -b + 180, c + 180) (see https://books.google.gr/books?id=rn3OBQAAQBAJ p.267).
- Returns:
List of poses to test.
- get_unique_paths(possible_paths, threshold=0.01)[source]#
Get all unique paths in a list of possible paths.
- Parameters:
possible_paths – List of possible paths (locations).
threshold – Minimum distance between two paths to be considered different. Defaults to 0.01.
- Returns:
List of unique paths (locations).
- is_in_ranges(array, ranges)[source]#
Check for each element in an array whether it is in its specified range.
Each element can have a different tolerance range.
- Parameters:
array – Array of elements to check.
ranges – Sequence of (min, max) tuples defining the valid range for each element.
- Returns:
True if all elements are in their respective ranges, False otherwise.
- possible_sensed_directions(sensed_directions: np.ndarray, num_hyps_per_node: int) list[np.ndarray][source]#
Return the possible sensed directions for all nodes.
This function determines the possible sensed directions for a given set of sensed directions. It relies on two different behaviors depending on the value of num_hyps_per_node.
- If num_hyps_per_node equals 2: then pose is well defined (i.e., PC1 != PC2).
A well defined pose does not distinguish between mirrored directions of PC1 and PC2 (e.g., object can be upside down), therefore we sample both directions.
- If num_hyps_per_node is not 2: this function samples additional poses in
the plane perpendicular to the sensed surface normal.
- Parameters:
sensed_directions – An array of sensed directions.
num_hyps_per_node – Number of rotations to get for each node.
- Returns:
Possible sensed directions for all nodes at each rotation.
- Return type:
possible_s_d
- process_delta_evidence_values(max_ev_per_step)[source]#
Pre-process the max evidence values to get the change in evidence across steps.
Clip the values to be less than or equal to 0 and return the index of the most recent positive change in evidence.
- Parameters:
max_ev_per_step – Sequence of maximum evidence values.
- Returns:
Clipped change in evidence across steps. positive_jump_loc: Index of the most recent positive change in evidence.
- Return type:
clipped_ev_changes
tbp.monty.frameworks.utils.live_plotter#
- class LivePlotter[source]#
Bases:
objectClass for plotting sensor observations during an experiment.
Set the show_sensor_output flag in the experiment config to True to enable live plotting.
WARNING: This plotter makes a number of assumptions right now. For example, it assumes that - sensor with ID “view_finder” exists - sensor with ID “patch” exists - “rgba” modality in “view_finder” sensor observation - “depth” modality in “patch” sensor observation
- hardcoded_assumptions(observation, model)[source]#
Extract some of the hardcoded assumptions from the observation.
TODO: Don’t do this. It is here for now to highlight the fragility of the live plotter implementation at the call site. We should make this less fragile by passing the necessary information to the live plotter.
- Parameters:
observation – The observation from the environment interface.
model – The model.
- Returns:
A tuple of the first learning module, the first sensor module raw observations, the patch depth, and the view finder rgba.
tbp.monty.frameworks.utils.logging_utils#
- add_pose_lm_episode_stats(lm, stats)[source]#
Add possible poses of an LM to episode stats.
- Parameters:
lm – LM instance from which to add the statistics.
stats – Statistics dictionary to update.
- Returns:
Updated stats dictionary.
- calculate_fpr(fp, tn)[source]#
Calculate False Positive Rate, aka specificity.
- Parameters:
fp – false positives
tn – true negatives
- Returns:
False Positive Rate
- calculate_performance(stats, performance_type, lm, target_object)[source]#
Calculate performance of an LM on a given target object.
- Parameters:
stats – Statistics dictionary to update.
performance_type – performance type index into stats
lm – Learning module for which to generate stats.
target_object – target (primary or stepwise) object for the LM to have converged to
- Returns:
Updated stats dictionary.
- calculate_tpr(tp, fn)[source]#
Calculate True Positive Rate, aka sensitivity.
- Parameters:
tp – true positives
fn – false negatives
- Returns:
True Positive Rate
- compute_pose_error(predicted_rotation: scipy.spatial.transform.Rotation, target_rotation: scipy.spatial.transform.Rotation, return_degrees: bool = False) float[source]#
Computes the minimum angular pose error between predicted and target rotations.
See compute_pose_errors for more details.
- Parameters:
predicted_rotation (
Rotation) – Predicted rotation(s). Can be a single or list of rotation.target_rotation (
Rotation) – Target rotation. Must represent a single rotation.return_degrees (
bool) – Whether to return the error in degrees. If False, returns the error in radians. Defaults to False.
- Return type:
- Returns:
The minimum angular error in radians (or degrees if return_degrees is True).
- compute_pose_errors(predicted_rotation: Rotation, target_rotation: Rotation) npt.NDArray[np.float64] | float[source]#
Computes the angular pose errors between predicted and target rotations.
Both inputs must be instances of scipy.spatial.transform.Rotation. The predicted_rotation may contain a single rotation or a list of rotations, while target_rotation must be exactly one rotation.
The pose error is defined as the geodesic distance on SO(3) — the angle of the relative rotation between predicted and target. If predicted_rotation contains multiple rotations, this function returns the errors among them.
Note that the .inv() operation in this method is due to how geodesic distance between two rotations is calculated, not a side-effect of whether the target rotation is stored in its normal form, or as its inverse. The function therefore assumes that the orientations are already in the same coordinate system before the comparison.
- Parameters:
predicted_rotation (Rotation) – Predicted rotation(s). Can be a single or list of rotation.
target_rotation (Rotation) – Target rotation. Must represent a single rotation.
- Return type:
npt.NDArray[np.float64] | float
- Returns:
The angular errors in radians.
- compute_unsupervised_stats(possible_matches, target, graph_to_target, target_to_graph)[source]#
Compute statistics like how many graphs are built per object.
- Parameters:
possible_matches – container of str that key into graph_to_target
target – str ground truth name of object being presented
graph_to_target – dict mapping each graph to the set of objects used to build
target_to_graph – dict mapping each object to the set of graphs that used it
- Returns:
?
- Return type:
- consistent_child_objects_accuracy(eval_stats_for_lm, parent_to_child_mapping)[source]#
Check whether most_likely_object is consistent with the parent_to_child_mapping.
Classified object is consistent if it is one of the children in the set of objects corresponding to the compositional object.
NOTE: This function is only called in compositional_stats_for_all_lms, which is a logging util, called from a notebook and hence none of our previous experiments should be affected by this or raise the ValueError, even if they don’t have a parent_to_child_mapping.
- Returns:
The percentage of episodes in which a consistent child object is detected.
- Raises:
ValueError – If the target object of an episode is not in the
parent_to_child_mapping. –
- deserialize_json_chunks(json_file, start=0, stop=None, episodes=None)[source]#
Deserialize one episode at a time from json file.
Only get episodes specified by arguments, which follow list / numpy like semantics.
Note
assumes line counter is exactly in line with episode keys
- Parameters:
json_file – full path to the json file to load
start – int, get data starting at this episode
stop – int, get data ending at this episode, not inclusive as usual in Python
episodes – iterable of ints with episodes to pull
- Returns:
dict containing contents of file_handle
- Return type:
detailed_json
- format_columns_for_wandb(lm_dict)[source]#
Various columns break wandb because we are playing fast and loose with types.
Put any standardizations here.
- Parameters:
lm_dict – dict, part of a larger dict ~ {LM_0: lm_dict, LM_1: lm_dict}
- Returns:
formatted lm_dict
- get_graph_lm_episode_stats(lm)[source]#
Populate stats dictionary for one episode for an LM.
- Parameters:
lm – Learning module for which to generate stats.
- Returns:
dict with stats of one episode.
- get_rgba_frames_single_sm(observations)[source]#
Convert a time series of rgba observations into format for wandb.Video.
- Parameters:
observations – episode_stats[sm][___observations]
- Returns:
formatted observations
- get_stats_per_lm(model, target, episode_seed: int)[source]#
Loop through lms and get stats.
- Parameters:
model – model instance
target – target object
episode_seed (
int) – RNG seed used for the episode
- Returns:
dict with stats per lm
- Return type:
performance_dict
- get_time_stats(all_ds, all_conditions) pandas.DataFrame[source]#
Get summary of run times in a dataframe for each condition.
- Parameters:
all_ds – detailed stats (dict) for each condition
all_conditions – name of each condition
- Return type:
- Returns:
Runtime stats.
- get_unique_euler_poses(poses)[source]#
Get unique poses for an object from possible poses per path.
- Returns:
array of unique poses
- Return type:
unique_poses
- lm_stats_to_dataframe(stats, format_for_wandb=False)[source]#
Take in a dictionary and format into a dataframe.
Example:
{0: {LM_0: stats, LM_1: stats...}, 1:...} --> dataframe
Currently we are reporting once per episode, so the loop over episodes is only over a single key, value pair, but leaving it here because it is backward compatible.
- Returns:
dataframe
- load_stats(exp_path, load_train=True, load_eval=True, load_detailed=True, load_models=True, pretrained_dict=None)[source]#
Load experiment statistics from an experiment for analysis.
- Returns:
pandas DataFrame with training statistics eval_stats: pandas DataFrame with evaluation statistics detailed_stats: dict with detailed statistics lm_models: dict with loaded language models
- Return type:
train_stats
- matches_to_target_str(possible_matches, graph_to_target)[source]#
Get the possible target objects associated with each possible match.
Targets are concatenated into a single string name for easy saving in a csv.
- Returns:
?
- Return type:
- maybe_rename_existing_dir(dirpath: Path) None[source]#
If the given log directory already exists, rename it to <dirname>_old.
- Raises:
ValueError – If dirpath is not a directory.
- Return type:
- maybe_rename_existing_file(filepath: Path) None[source]#
If the given log file already exists, rename it to <filename>_old.
- Return type:
- print_unsupervised_stats(stats, epoch_len)[source]#
Print stats of unsupervised learning experiment.
- target_data_to_dict(target)[source]#
Format target params to dict.
- Parameters:
target – target params
- Returns:
dict with target params
- total_size(o)[source]#
Returns the approximate memory footprint an object and all of its contents.
Automatically finds the contents of the following builtin containers and their subclasses: tuple, list, deque, dict, set and frozenset. To search other containers, add handlers to iterate over their contents:
- handlers = {SomeContainerClass: iter,
OtherContainerClass: OtherContainerClass.get_elements}
The recursive recipe universally cited on stack exchange and blogs for gauging the size of Python objects in memory.
tbp.monty.frameworks.utils.object_model_utils#
- class NumpyGraph(my_dict)[source]#
Bases:
objectAlternative way to represent graphs without using torch.
Speeds up runtime significantly.
- already_in_list(existing_points, new_point, features, clean_ids, query_id, graph_delta_thresholds) bool[source]#
Check if a given point is already in a list of points.
- Parameters:
existing_points – List of x,y,z locations
new_point – new location
features – all features (both existing and candidate points)
clean_ids – indices (w.r.t “features”) that have been accepted into the graph and are compared to
query_id – index (w.r.t “features”) that is currently being considered
graph_delta_thresholds – Dictionary of thresholds used to determine whether a point should be considered sufficiently different so as to be included in the graph
- Return type:
- Returns:
Whether the point is already in the list
- build_point_cloud_graph(locations, features, feature_mapping)[source]#
Build a graph from observations without edges.
- Parameters:
locations – array of x, y, z positions in space
features – dictionary of features at locations
feature_mapping –
?
- Returns:
A NumpyGraph containing the observed features at locations.
- circular_mean(values)[source]#
Calculate the mean of a circular value such as hue where 0==1.
- Returns:
Mean value.
- expand_index_dims(indices_3d, last_dim_size)[source]#
Expand 3d indices to 4d indices by adding a 4th dimension with size.
- Parameters:
indices_3d – 3d indices that should be converted to 4d
last_dim_size – desired size of the 4th dimension (will be filled with arange indices from 0 to last_dim_size-1)
- Returns:
Tensor of 4d indices.
- get_cubic_patches(arr_shape, centers, size)[source]#
Cut a cubic patch around a center id out of a 3d array.
NOTE: Currently not used. Was implemented for draft of nn search in grid.
- Returns:
New centers and mask.
- get_most_common_bool(booleans)[source]#
Get most common value out of a list of boolean values.
- Returns:
True when we have equally many True as False entries.
- get_most_common_value(values)[source]#
Get most common value out of a list of values (i.e., the mode).
- Returns:
Most common value.
- get_values_from_dense_last_dim(tensor, index_3d)[source]#
Get values from 4d tensor at indices in last dimension.
This function assumes that the entries in the last dimension are dense. This is the case in all our sparse tensors where the first 3 dimensions represent the 3d location (sparse) and the 4th represents values at this location (dense).
- Returns:
List of values.
- pose_vector_mean(pose_vecs, pose_fully_defined)[source]#
Calculate mean of pose vectors.
This takes into account that surface normals may contain observations from two surface sides and curvature directions have an ambiguous direction. It also enforces them to stay orthogonal.
If not pose_fully_defined, the curvature directions are meaningless, and we just return the first observation. Theoretically this shouldn’t matter, but it can save some computation time.
- Returns:
Tuple containing the representative pose vector mean and a bool indicating whether we used curvature directions to update it.
- remove_close_points(point_cloud, features, graph_delta_thresholds, old_graph_index)[source]#
Remove points from a point cloud unless sufficiently far away.
Points are removed unless sufficiently far away either by Euclidean distance, or feature-space.
- Parameters:
point_cloud – List of 3D points
features –
?
graph_delta_thresholds – dictionary of thresholds; if the L-2 distance between the locations of two observations (or other feature-distance measure) is below all of the given thresholds, then a point will be considered insufficiently interesting to be added
old_graph_index – If the graph is not new, the index associated with the final point in the old graph; we will skip this when checking for sameness, as they will already have been compared in the past to one-another, saving computation.
- Returns:
List of 3D points that are sufficiently novel w.r.t one-another, along with their associated indices.
tbp.monty.frameworks.utils.plot_utils#
A collection of plot utilities used during normal platform runtime.
tbp.monty.frameworks.utils.plot_utils_analysis#
tbp.monty.frameworks.utils.plot_utils_dev#
tbp.monty.frameworks.utils.profile_utils#
- drop_filename(string)[source]#
Drop filename for shorter strings and easier viz.
We do this because strings for code calls are long.
- Returns:
String without filename.
tbp.monty.frameworks.utils.sensor_processing#
- center_neighbors(point_cloud, center_id, neighbor_patch_frac)[source]#
Get neighbors within a given neighborhood of the patch center.
- Returns:
Locations and semantic IDs of all points within a given neighborhood of the patch center that lie on an object.
- curvature_at_point(point_cloud, center_id, normal)[source]#
Compute principal curvatures from a point cloud.
Computes the two principal curvatures of a 2D surface and the corresponding principal directions.
- Parameters:
point_cloud – Point cloud (2D numpy array) on which the local surface is approximated.
center_id – Center point around which the local curvature is estimated.
normal – Surface normal at the center point.
- Returns:
First principal curvature. k2: Second principal curvature. dir1: First principal direction. dir2: Second principal direction.
- Return type:
k1
- log_sign(to_scale)[source]#
Apply symlog to the input array, preserving sign.
This implementation ensures that the sign of the input values is preserved and avoids extreme outputs when values are close to 0.
- Parameters:
to_scale – Array to scale.
- Returns:
Scaled values of the array.
- pixel_dist_to_center(n_points, patch_width, center_id)[source]#
Extract the relative distance of each pixel to the patch center (in pixel space).
- Parameters:
n_points – Total number of points in the patch.
patch_width – Width of the square patch.
center_id – ID of the patch center.
- Returns:
Relative distance of each pixel to the patch center (in pixel space).
- point_pair_features(pos_i, pos_j, normal_i, normal_j)[source]#
Return point pair features between two points.
- Parameters:
pos_i – Location of point 1.
pos_j – Location of point 2.
normal_i – Surface normal of point 1.
normal_j – Surface normal of point 2.
- Returns:
Point pair features.
- principal_curvatures(point_cloud_base, center_id, n_dir, neighbor_patch_frac=2.13, weighted=True, fit_intercept=True)[source]#
Compute principal curvatures from a point cloud.
Computes the two principal curvatures of a 2D surface and the corresponding principal directions.
- Parameters:
point_cloud_base – Point cloud (2D numpy array) based on which the 2D surface is approximated.
center_id – Center point around which the local curvature is estimated.
n_dir – Surface normal at the center point.
neighbor_patch_frac – Fraction of the patch width that defines the standard deviation of the Gaussian distribution used to sample the weights; this defines a local neighborhood for principal curvature computation.
weighted – Boolean flag that determines if regression is weighted. The weighting scheme is defined in
weight_matrix().fit_intercept – Boolean flag that determines whether to fit an intercept term for the regression.
- Returns:
First principal curvature. k2: Second principal curvature. dir1: First principal direction. dir2: Second principal direction.
- Return type:
k1
- scale_clip(to_scale, clip)[source]#
Clip values into a range and scale with the square root.
This can be used to bring Gaussian and mean curvatures into a reasonable range and remove outliers, which makes it easier to handle noise. The sign is preserved before applying the square root.
- Parameters:
to_scale – Array where each element should be scaled.
clip – Range to which the array values should be clipped.
- Returns:
Scaled values of the array.
- surface_normal_naive(point_cloud, patch_radius_frac=2.5)[source]#
Estimate surface normal.
This is a very simplified alternative to open3d’s estimate_normals where we make use of several assumptions specific to our case: - we know which locations are neighboring locations from the camera patch
arrangement
we only need the surface normal at the center of the patch
- TODO: Calculate surface normal from multiple points at different distances (tan_len
values) and then take the average of them. Test if this improves robustness to raw sensor noise.
- Parameters:
point_cloud – List of 3D coordinates with flags indicating whether each point lies on the object. Shape = [n, 4].
patch_radius_frac – Fraction of observation size to use for SN calculation. Default of 2.5 means that we look half_obs_dim//2.5 to the left, right, up and down. With a resolution of 64x64 that would be 12 pixels. The calculated tan_len (in this example 12) describes the distance of pixels used to span up the two tangent vectors to calculate the surface normals. These two vectors are then used to calculate the surface normal by taking the cross product. If we set tan_len to a larger value, the surface normal is more influenced by the global shape of the patch.
- Returns:
Estimated surface normal at the center of the patch. valid_sn: Boolean indicating whether the surface normal was valid (True by
default); an invalid surface normal means there were not enough points in the patch to make any estimate of the surface normal.
- Return type:
norm
- surface_normal_ordinary_least_squares(sensor_frame_data, world_camera, center_id, neighbor_patch_frac=3.2)[source]#
Extracts the surface normal direction from a noisy point cloud.
Uses ordinary least-squares fitting with error minimization along the view direction.
- Parameters:
sensor_frame_data – Point cloud in sensor coordinates (assumes the full patch is provided, i.e., no preliminary filtering of off-object points).
world_camera – Matrix defining the sensor-to-world frame transformation.
center_id – ID of the center point in the point cloud.
neighbor_patch_frac – Fraction of the patch width that defines the local neighborhood within which to perform the least-squares fitting.
- Returns:
Estimated surface normal at the center of the patch. valid_sn: Boolean indicating whether the surface normal was valid; defaults
to True. An invalid surface normal means there were not enough points in the patch to make any estimate of the surface normal.
- Return type:
surface_normal
- surface_normal_total_least_squares(point_cloud_base, center_id, view_dir, neighbor_patch_frac=3.2)[source]#
Extracts the surface normal direction from a noisy point-cloud.
Uses total least-squares fitting. Error minimization is independent of the view direction.
- Parameters:
point_cloud_base – Point cloud in world coordinates (assumes the full patch is provided, i.e., no preliminary filtering of off-object points).
center_id – ID of the center point in the point cloud.
view_dir – Viewing direction used to adjust the sign of the estimated surface normal.
neighbor_patch_frac – Fraction of the patch width that defines the local neighborhood within which to perform the least-squares fitting.
- Returns:
Estimated surface normal at the center of the patch. valid_sn: Boolean indicating whether the surface normal was valid; defaults
to True. An invalid surface normal means there were not enough points in the patch to make any estimate of the surface normal.
- Return type:
norm
- weight_matrix(n_points, center_id, neighbor_patch_frac=2.13)[source]#
Extract individual pixel weights for least-squares fitting.
Each pixel weight is sampled from a Gaussian distribution based on its distance to the patch center.
- Parameters:
n_points – Total number of points in the full RGB-D square patch.
center_id – ID of the center point in the point cloud.
neighbor_patch_frac – Fraction of the patch width that defines the standard deviation of the Gaussian distribution used to sample the weights.
- Returns:
Diagonal weight matrix of shape (n_points, 1).
tbp.monty.frameworks.utils.spatial_arithmetics#
- align_multiple_orthonormal_vectors(ms1, ms2, as_scipy=True)[source]#
Calculate rotations between multiple orthonormal vector sets.
- Parameters:
ms1 – Multiple orthonormal vector sets with shape = (N, 3, 3).
ms2 – Orthonormal vectors to align with, shape = (3, 3).
as_scipy – Whether to return a list of N scipy.Rotation objects or a np.array of rotation matrices (N, 3, 3).
- Returns:
List of N Rotations that align ms2 with each element in ms1.
- align_orthonormal_vectors(m1, m2, as_scipy=True)[source]#
Calculate the rotation that aligns two sets of orthonormal vectors.
- Parameters:
m1 – First set of orthonormal vectors.
m2 – Second set of orthonormal vectors to align with.
as_scipy – Whether to return a scipy rotation object or a rotation matrix. Defaults to True.
- Returns:
If as_scipy is True, a tuple (Rotation, float) containing the alignment rotation and the corresponding alignment error. Otherwise returns (np.ndarray, None), where the array is the rotation matrix aligning the vectors.
- apply_rf_transform_to_points(locations, features, location_rel_model, object_location_rel_body, object_rotation, object_scale=1)[source]#
Apply location and rotation transform to locations and features.
These transforms tell us how to transform new observations into the existing model reference frame. They are calculated from the detected object pose.
- Parameters:
locations – Locations to transform (in body reference frame). Shape (N, 3)
features – Features to transform (in body reference frame). Shape (N, F)
location_rel_model – Detected location of the sensor on the object (object reference frame).
object_location_rel_body – Location of the sensor in the body reference frame.
object_rotation – Rotation of the object in the world relative to the learned model of the object. Expresses how the object model needs to be rotated to be consistent with the observations. To transform the observed locations (rel. body) into the models reference frame, the inverse of this rotation is applied.
object_scale – Scale of the object relative to the model. Not used yet.
Note
Function can also be used in different contexts besides transforming points from body to object centric reference frame.
- Returns:
Transformed locations features: Transformed features
- Return type:
transformed_locations
- euler_to_quats(euler_rots, invert=False)[source]#
Convert Euler rotations to quaternions.
- Parameters:
euler_rots – Euler rotations
invert – Whether to invert the rotation. Defaults to False.
- Returns:
Quaternions
- get_angle(vec1, vec2)[source]#
Get angle between two vectors.
NOTE: For efficiency reasons we assume vec1 and vec2 are already normalized (which is the case for surface normals and curvature directions).
- Parameters:
vec1 – Vector 1
vec2 – Vector 2
- Returns:
angle in radians
- get_angle_beefed_up(v1, v2)[source]#
Return the angle in radians between vectors ‘v1’ and ‘v2’.
If one of the vectors is undefined, return an arbitrarily large distance.
If one of the vectors is the zero vector, return an arbitrarily large distance.
Also enforces that vectors are unit vectors, which makes it less efficient than the standard get_angle.
>>> angle_between_vecs((1, 0, 0), (0, 1, 0)) 1.5707963267948966 >>> angle_between_vecs((1, 0, 0), (1, 0, 0)) 0.0 >>> angle_between_vecs((1, 0, 0), (-1, 0, 0)) 3.141592653589793
- get_angle_torch(v1, v2)[source]#
Get angle between two torch vectors.
- Parameters:
v1 – Vector 1
v2 – Vector 2
- Returns:
angle in radians
- get_angles_for_all_hypotheses(hyp_f, query_f)[source]#
Get all angles for hypotheses and their neighbors at once.
hyp_f shape = (num_hyp, num_nn, 3) query_f shape = (num_hyp, 3)
for each hypothesis we want to get num_nn angles.
Return shape = (num_hyp, num_nn)
- Parameters:
hyp_f – Hypotheses features three pose vectors
query_f – Query features three pose vectors
- Returns:
Angles between hypotheses and query pose vectors
- get_more_directions_in_plane(vecs, n_poses) list[np.ndarray][source]#
Get a list of unit vectors, evenly spaced in a plane orthogonal to vecs[0].
This is used to sample possible poses orthogonal to the surface normal when the curvature directions are undefined (like on a flat surface).
- Parameters:
vecs – Vector to get more directions in plane for
n_poses – Number of poses to get
- Returns:
List of vectors evenly spaced in a plane orthogonal to vecs[0]
- get_unique_rotations(poses, similarity_th, get_reverse_r=True)[source]#
Get unique scipy.Rotations out of a list, given a similarity threshold.
- Parameters:
poses – List of poses to get unique rotations from
similarity_th – Similarity threshold
get_reverse_r – Whether to get the reverse rotation. Defaults to True.
- Returns:
Unique euler poses r_poses: Unique rotations corresponding to euler_poses
- Return type:
euler_poses
- non_singular_mat(a)[source]#
Return True if a matrix is non-singular, i.e. can be inverted.
Uses the condition number of the matrix, which will approach a very large value, given by (1 / sys.float_info.epsilon) (where epsilon is the smallest possible floating-point difference)
- pose_is_new(all_poses, new_pose, similarity_th) bool[source]#
Check if a pose is different from a list of poses.
Use the magnitude of the difference between quaternions as a measure for similarity and check that it is below pose_similarity_threshold.
- Return type:
- Returns:
True if the pose is new, False otherwise
- rot_mats_to_quats(rot_mats, invert=False)[source]#
Convert rotation matrices to quaternions.
- Parameters:
rot_mats – Rotation matrices
invert – Whether to invert the rotation. Defaults to False.
- Returns:
Quaternions
- rotate_multiple_pose_dependent_features(features, ref_frame_rot) dict[source]#
Rotate surface normal and curve dirs given a rotation matrix.
- Parameters:
features – dict of features with pose vectors to rotate. Pose vectors have shape (N, 9)
ref_frame_rot – scipy rotation to rotate pose vectors with.
- Return type:
- Returns:
Features with rotated pose vectors
- rotate_pose_dependent_features(features, ref_frame_rots) dict[source]#
Rotate pose_vectors given a list of rotation matrices.
- Parameters:
features – dict of features with pose vectors to rotate. pose vectors have shape (3, 3)
ref_frame_rots – Rotation matrices to rotate pose features by. Can either be - A single scipy rotation (as used in FeatureGraphLM) - An array of rotation matrices of shape (N, 3, 3) or (3, 3) (as used in EvidenceGraphLM).
- Return type:
- Returns:
Original features but with the pose_vectors rotated. If multiple rotations were given, pose_vectors entry will now contain multiple entries of shape (N, 3, 3).
tbp.monty.frameworks.utils.transform_utils#
- numpy_to_scipy_quat(quat)[source]#
Convert from wxyz to xyzw format of quaternions.
i.e. identity rotation in scipy is (0,0,0,1).
- Parameters:
quat – A quaternion in wxyz format
- Returns:
A quaternion in xyzw format
- rotation_as_quat(rot: scipy.spatial.transform.Rotation, scalar_first: bool = True) numpy.ndarray[source]#
Convert a scipy rotation to its quaternion representation.
Scipy added a scalar_first argument to Rotation.as_quat in version 1.14.0. (https://scipy.github.io/devdocs/release/1.14.0-notes.html). This function backports that behavior. Note, however, that scipy defaults to scalar-last format.
- Parameters:
rot (
Rotation) – The scipy rotation object to convert.scalar_first (
bool) – Whether to return the array in (w, x, y, z) or (x, y, z, w) order. Defaults to True, i.e., (w, x, y, z) order.
- Return type:
- Returns:
An array with shape (4,) representing a single quaternion, or an array with shape (N, 4) representing N quaternions.
- rotation_from_quat(quat: numpy.typing.ArrayLike, scalar_first: bool = True) scipy.spatial.transform.Rotation[source]#
Create a scipy rotation object from a quaternion.
Scipy added a scalar_first argument to Rotation.from_quat in version 1.14.0. (https://scipy.github.io/devdocs/release/1.14.0-notes.html). This function backports that behavior. Note, however, that scipy defaults to scalar-last format.
- Parameters:
quat (
ArrayLike) – An array with shape (4,) for a single quaternion, or an array with shape (N, 4) for N quaternions.scalar_first (
bool) – Whether the scalar component is first or last. Defaults to True, i.e., (w, x, y, z) order.
- Return type:
Rotation- Returns:
The scipy rotation object.
- scipy_to_numpy_quat(quat: numpy.ndarray) quaternion.quaternion[source]#
- Return type:
quaternion