tbp.monty.frameworks.models#

tbp.monty.frameworks.models.abstract_monty_classes#

class GoalStateGenerator[source]#

Bases: object

Generate goal-states that other learning modules and motor-systems will attempt.

Generate goal-states potentially (in the case of LMs) by outputing their own sub-goal-states. Provides a mechanism for implementing hierarchical action policies that are informed by world models/hypotheses.

abstract output_goal_states() list[GoalState][source]#

Return output goal-states.

abstract set_driving_goal_state()[source]#

Set the driving goal state.

e.g., from a human operator or a high-level LM.

abstract step()[source]#

Called on each step of the LM to which the GSG belongs.

class LMMemory[source]#

Bases: object

Like a long-term memory storing all the knowledge an LM has.

abstract load_state_dict()[source]#

Take a state dict as an argument and set state for the memory.

abstract memory_consolidation()[source]#

Consolidate/clean up models stored in memory.

abstract state_dict()[source]#

Return a serializable dict with everything needed to save/load the memory.

abstract update_memory(observations)[source]#

Update models stored in memory given new observation & classification.

class LearningModule[source]#

Bases: object

abstract exploratory_step()[source]#

Model building step called inside of monty._step_learning_modules.

abstract get_output()[source]#

Return learning module output (same format as input).

abstract load_state_dict(state_dict)[source]#

Take a state dict as an argument and set state for this LM.

abstract matching_step()[source]#

Matching / inference step called inside of monty._step_learning_modules.

abstract post_episode()[source]#

Do things like update object models with stored data after an episode.

abstract pre_episode()[source]#

Do things like reset buffers or possible_matches before training.

abstract propose_goal_states() list[GoalState][source]#

Return the goal-states proposed by this LM’s GSG if they exist.

abstract receive_votes(votes)[source]#

Process voting data sent out from other learning modules.

abstract reset()[source]#

Do things like reset buffers or possible_matches before training.

abstract send_out_vote()[source]#

This method defines what data are sent to other learning modules.

abstract set_experiment_mode(mode)[source]#

Set the experiment mode.

Update state variables based on which method (train or evaluate) is being called at the experiment level.

abstract state_dict()[source]#

Return a serializable dict with everything needed to save/load this LM.

class Monty[source]#

Bases: object

abstract aggregate_sensory_inputs(observation)[source]#

Receive data from dataloader/env, organize on a per sensor module basis.

abstract is_done()[source]#

Return bool to tell the experiment if we are done with this episode.

abstract load_state_dict(state_dict)[source]#

Take a state dict as an argument and set state for monty and children.

abstract post_episode()[source]#

Recursively call post_episode on child classes.

abstract pre_episode()[source]#

Recursively call pre_episode on child classes.

abstract set_experiment_mode(mode)[source]#

Set the experiment mode.

Update state variables based on which method (train or evaluate) is being called at the experiment level.

abstract state_dict()[source]#

Return a serializable dict with everything needed to save/load monty.

abstract step(observation)[source]#

Take a matching, exploratory, or custom user-defined step.

Step taken depends on the value of self.step_type.

class ObjectModel[source]#

Bases: object

Model of an object. Is stored in Memory and used by LM.

abstract build_model(observations)[source]#

Build a new model.

abstract update_model(obersevations)[source]#

Update an existing model with new observations.

class SensorModule[source]#

Bases: object

abstract pre_episode()[source]#

This method is called before each episode.

abstract state_dict()[source]#

Return a serializable dict with this sensor module’s state.

Includes everything needed to save/load this sensor module.

abstract step(data)[source]#

Called on each step.

Parameters:

data – Sensor observations

abstract update_state(state)[source]#

tbp.monty.frameworks.models.buffer#

class BaseBuffer[source]#

Bases: object

abstract classmethod append()[source]#
abstract classmethod reset()[source]#
class BufferEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#

Bases: JSONEncoder

Encoder to turn the buffer into a JSON compliant format.

default(obj: Any) Any[source]#

Turn non-compliant types into a JSON-compliant format.

Parameters:

obj (Any) – The object to turn into a JSON-compliant format.

Return type:

Any

Returns:

The JSON-compliant data.

classmethod register(obj_type: type, encoder: Callable | Type[JSONEncoder]) None[source]#

Register an encoder.

Parameters:
  • obj_type (type) – The type to associate with encoder.

  • encoder (Union[Callable, Type[JSONEncoder]]) – A function or JSONEncoder class that converts objects of type obj_type into JSON-compliant data.

Raises:

TypeError – If encoder is not a JSONEncoder subclass or a callable.

Return type:

None

classmethod unregister(obj_type: type) None[source]#

Unregister an encoder.

Parameters:

obj_type (type) – The type to unregister the encoder for.

Return type:

None

class FeatureAtLocationBuffer[source]#

Bases: BaseBuffer

Buffer which stores features at locations coming into one LM. Also stores stats.

Used for building graph models and logging detailed stats about an episode. The location buffer is also used to calculate displacements.

__init__()[source]#

Initialize buffer dicts for locations, features, displacements and stats.

add_overall_stats(stats)[source]#

Add overall episode stats to self.stats.

append(list_of_data)[source]#

Add an observation to the buffer. Must be features at locations.

TODO S: Store state objects instead of list of data? A provisional version of this is implemented below, as the GSG uses State objects for computations.

append_input_states(input_state)[source]#
get_all_features_on_object()[source]#

Get all observed features that were on the object.

Like in get_all_locations_on_object the feature arrays should have the length of np.where(self.on_object) and contain all features that were observed when at least one of the input channels was on the object. However, we also check for each feature whether its input channel indicated on_object=True and if not set its value to nan.

Note

Since all inputs to an LM should have overlapping receptive fields, there should be no difference between the two at the moment (except due to noisy estimates of on_object).

Returns:

All observed features that were on the object.

get_all_input_states()[source]#

Get all the input states that the buffer’s parent LM has observed.

Returns:

All the input states that the buffer’s parent LM has observed.

get_all_locations_on_object(input_channel=None)[source]#

Get all observed locations that were on the object.

Returns:

All observed locations that were on the object.

get_buffer_len_by_channel(input_channel)[source]#

Return the number of observations stored for that input channel.

get_current_displacement(input_channel)[source]#

Get the current displacement.

Returns:

The current displacement.

get_current_features(keys)[source]#

Get the current value of a specific feature.

Returns:

The current features.

get_current_location(input_channel)[source]#

Get the current location.

Note

May have to add on_object check at some point.

Returns:

The current location.

get_current_pose(input_channel)[source]#

Get currently sensed location and orientation.

Returns:

The currently sensed location and orientation.

get_current_ppf(input_channel)[source]#

Get the current ppf.

Returns:

The current ppf.

get_currently_on_object()[source]#

Check whether last sensation was on object.

Returns:

Whether the last sensation was on object.

get_first_displacement_len(input_channel)[source]#

Get length of first observed displacement.

Use for scale in DisplacementLM.

Returns:

The length of the first observed displacement.

get_first_sensory_input_channel()[source]#

Get name of first sensory (coming from SM) input channel in buffer.

Returns:

The name of the first sensory (coming from SM) input channel in buffer.

Raises:

ValueError – If no sensory channels are found in the buffer.

get_infos_for_graph_update()[source]#

Return all stored infos require to update a graph in memory.

get_last_obs_processed()[source]#

Check whether last sensation was processed by LM.

Returns:

Whether the last sensation was processed by the LM.

get_matching_step_when_output_goal_set()[source]#

Return matching step when last goal-state was generated.

Return the LM matching step associated with the last time a goal-state was generated.

get_nth_displacement(n, input_channel)[source]#

Get the nth displacement.

Returns:

The nth displacement.

get_num_goal_states_generated()[source]#

Return number of goal states generated by the LM’s GSG since episode start.

Note use of length not sum.

get_num_matching_steps()[source]#

Return number of matching steps performed by this LM since episode start.

Returns:

The number of matching steps performed by this LM since episode start.

get_num_observations_on_object()[source]#

Get the number of steps where at least 1 input was on the object.

Returns:

The number of steps where at least 1 input was on the object.

get_num_steps_post_output_goal_generated()[source]#

Return number of steps since last output goal-state.

Return the number of Monty-matching steps that have elapsed since the last time an output goal-state was generated.

get_previous_input_states()[source]#

Get previous State inputs received by the buffer’s parent LM.

i.e. in the last time step.

Returns:

The previous input states.

reset()[source]#

Reset the buffer.

set_individual_ts(object_id, pose)[source]#

Update self.stats with the individual LMs terminal state.

update_last_stats_entry(stats)[source]#

Use this to overwrite last entry (for example after voting).

update_stats(stats, update_time=True, append=True, init_list=True)[source]#

Update statistics for this step in the episode.

tbp.monty.frameworks.models.displacement_matching#

class DisplacementGraphLM(k=None, match_attribute=None, tolerance=0.001, use_relative_len=False, graph_delta_thresholds=None)[source]#

Bases: GraphLM

Learning module that uses displacement stored in graphs to recognize objects.

__init__(k=None, match_attribute=None, tolerance=0.001, use_relative_len=False, graph_delta_thresholds=None)[source]#

Initialize Learning Module.

Parameters:
  • k – How many nearest neighbors should nodes in graphs connect to.

  • match_attribute – Which displacement to use for matching. Should be in [‘displacement’, ‘PPF’].

  • tolerance – How close does an observed displacement have to be to a stored displacement to be considered a match,. defaults to 0.001

  • use_relative_len – Whether to scale the displacements to achieve scale invariance. Only possible when using PPF.

  • graph_delta_thresholds – Thresholds used to compare nodes in the graphs being learned, and thereby whether to include a new point or not. By default, we only consider the distance between points, using a threshold of 0.001 (determined in remove_close_points). Can also specify thresholds based on e.g. surface normal angle difference, or principal curvature magnitude difference.

get_object_rotation(sensed_displacements, model_displacements, get_reverse_r=False)[source]#

Calculate the rotation between two sets of displacement vectors.

Parameters:
  • sensed_displacements – The displacements that were sensed.

  • model_displacements – The displacements in the model that were matched to the sensed displacements.

  • get_reverse_r – Whether to get the rotation that turns the model such that it would produce the sensed_displacements (False) or the rotation needed to turn the sensed_displacements into the model displacements.

Returns:

The rotation in Euler angles, as a matrix, and as a Rotation object.

get_object_scale(sensed_displacement, model_displacement)[source]#

Calculate the objects scale given sensed and model displacements.

Returns:

The scale of the object.

get_unique_pose_if_available(object_id)[source]#

Compute (location, rotation, scale) of object.

If we are sure about where on the object we are compare the sensed displacements to the observed displacements to calculate the pose, else return None.

Returns:

The pose and scale of the object.

reset()[source]#

Call this before each episode.

class DisplacementGraphMemory(match_attribute, *args, **kwargs)[source]#

Bases: GraphMemory

Graph memory that stores graphs with displacements as edges.

__init__(match_attribute, *args, **kwargs)[source]#

Initialize Graph memory.

get_initial_hypotheses()[source]#
load_state_dict(state_dict)[source]#

Load graphs into memory from a state_dict and add point pair features.

tbp.monty.frameworks.models.evidence_sdr_matching#

class EncoderSDR(sdr_length=2048, sdr_on_bits=41, lr=0.01, n_epochs=1000, stability=0.0, log_flag=False)[source]#

Bases: object

The SDR Encoder class.

This class keeps track of the dense representations, and trains them to output SDRs when binarized. This class also contains its own optimizer and function to add more objects/representations.

The representations are stored as dense vectors and binarized using top-k to convert them to SDRs. During training, the pairwise overlaps between the sdrs are compared to the target overlaps. This error signal trains the dense representations.

Refer to the self.train_sdrs function for more information on the training details

sdr_length#

The size of the SDRs (total number of bits).

sdr_on_bits#

The number of on bits in the SDRs. Controls sparsity.

lr#

The learning rate of the encoding algorithm.

n_epochs#

The number of training epochs per episode

stability#

The stability parameter controls by how much old SDRs change relative to new SDRs. Value range is [0.0, 1.0], where 0.0 is no stability constraint applied and 1.0 is fixed SDRs. Values in between are for partial stability.

log_flag#

Flag to activate the logger.

__init__(sdr_length=2048, sdr_on_bits=41, lr=0.01, n_epochs=1000, stability=0.0, log_flag=False)[source]#
add_objects(n_objects)[source]#

Adds more objects to the available objects and re-initializes the optimizer.

We keep track of the stable representation ids (old objects) when adding new objects.

Parameters:

n_objects – Number of objects to add

binarize(emb)[source]#

Convert dense representations to SDRs (0s and 1s) using Top-k function.

Returns:

The SDRs.

get_sdr(index)[source]#

Return the SDR at a specific index.

This index refers to the object index in the SDRs dictionary (i.e., self.obj_sdrs)

optimize(overlap_error, mask)[source]#

Compute and apply local gradient descent.

Compute based on the overlap error and mask.

Note there is no use of the chain rule, i.e. each SDR is optimized based on the derivative of its clustering error with respect to its values, with no intermediary functions.

The overlap error helps correct the sign and also provides a magnitude for the representation updates.

Parameters:
  • overlap_error – The difference between target and predicted overlaps.

  • mask – Mask indicating valid entries in the overlap matrix.

Note

num_objects = self.n_objects

Note

A vectorized version of the algorithm is provided below, although it would need to be modified to avoid repeated creation of arrays in order to be more efficient. Leaving for now as this algorithm is not a bottle-neck (circa 10-20 seconds to learn 60 object SDRs).:

# Initialize gradients grad = np.zeros_like(self.obj_sdrs)

# Compute the pairwise differences between SDRs diff_matrix = (

self.obj_sdrs[:, np.newaxis, :] - self.obj_sdrs[np.newaxis, :, :]

)

# Compute the absolute differences for each pair abs_diff = np.sum(np.abs(diff_matrix), axis=2)

# Create a mask for non-zero differences non_zero_mask = abs_diff > 0

# Apply the mask to the original mask valid_mask = mask & non_zero_mask

# Calculate the summed distance and gradient contributions where the mask # is valid for logging summed_distance = np.sum(overlap_error * valid_mask * abs_diff)

# Calculate the gradients grad_contrib = overlap_error[:, :, np.newaxis] * 2 * diff_matrix grad += np.sum(grad_contrib * valid_mask[:, :, np.newaxis], axis=1) grad -= np.sum(grad_contrib * valid_mask[:, :, np.newaxis], axis=0)

# Update the SDRs using the gradient self.obj_sdrs -= self.lr * grad

Returns:

The summed distance for logging.

train_sdrs(target_overlaps, log_epoch_every=10)[source]#

Main SDR training function.

This function receives a copy of the average target overlap 2D tensor and trains the sdr representations for n_epochs to achieve these target overlap scores.

We use the overlap target as a learning signal to move the dense representations towards or away from each other. The magnitude of the overlap error controls the strength of moving dense representations. Also the sign of the overlap error controls whether the representations will be moving towards or away from each other.

We want to limit the amount by which trained representation change relative to untrained object representations such that higher-level LMs would not suffer from significant changes in lower-level representations that were used to build higher-level graphs.

When adding new representations, we keep track of the ids of the older representations (i.e., self.stable_ids). This allows us to control by how much the older representations move relative to the newer ones during training. This behavior is controlled by the stability value. During each training iteration, we update these older representations with an average of the optimizer output and the original representation (weighted by the stability value). Note that too much stability restricts the SDRs from adapting to desired changes in the target overlaps caused by normalization or distribution shift, affecting the overall encoding performance.

Consider two dense representations, A_dense and B_dense. We apply top-k operation on both to convert them to A_sdr and B_sdr, then calculate their overlaps. If the overlap is less than the target overlap, we move dense representations (A_dense and B_dense) closer to eachother with strength proportional to the error in overlaps. We move them apart if they have more overlap than the target.

Note

The distance_matrix variable is calculated using the cdist function and it denotes the pairwise euclidean distances between dense representations. The term “overlap” always refers to the overlap in bits between SDRs.

Note

The overlap_error is only used to weight the distance_matrix for each pair of objects, and gradients do not flow through the sparse overlap calculations.

Returns:

The stats dictionary for logging.

property n_objects#

Return the available number of objects.

property sdrs#

Return the available SDRs.

class EvidenceSDRGraphLM(*args, **kwargs)[source]#

Bases: EvidenceSDRLMMixin, EvidenceGraphLM

Class that incorporates the EvidenceSDR Mixin with the EvidenceGraphLM.

class EvidenceSDRLMMixin(*args, **kwargs)[source]#

Bases: object

This Mixin adds training of SDR representations to the EvidenceGraphLM.

It overrides the __init__ and post_episode functions of the LM

To use this Mixin, pass the EvidenceSDRGraphLM class as the learning_module_class in the learning_module_configs.

Additionally pass the sdr_args dictionary as an additional key in the learning_module_args.

The sdr_args dictionary should contain:
  • log_path (string): A string that points to a temporary location for saving

    experiment logs. “None” means don’t save to file

  • sdr_length (int): The size of the SDR to be used for encoding

  • sdr_on_bits (int): The number of active bits to be used with these SDRs

  • sdr_lr (float): The learning rate of the encoding algorithm

  • n_sdr_epochs (int): The number of epochs to train the encoding algorithm

  • stability (float): Stability of older object SDRs.

    Value range is [0.0, 1.0], where 0.0 is no stability applied and 1.0 is fixed SDRs.

  • sdr_log_flag (bool): Flag indicating whether to log the results or not

See the monty_lab repo for reference. Specifically, experiments/configs/evidence_sdr_evaluation.py

TODO: This mixin adds state to the instance it is being mixed with. As such, it is attempting to reuse some common functionality of EvidenceGraphLM while being a different thing. The likely refactor is to extract the reusable EvidenceGraphLM functionality into a component (since it probably requires its own state), use that component as the default in EvidenceGraphLM, and then reuse that component in a new EvidenceSDRGraphLM class instead of inheriting from EvidenceGraphLM.

__init__(*args, **kwargs)[source]#

The mixin overrides the __init__ function of the Learning Module.

The encoding algorithm is initialized here and it stores the actual SDRs. Also, a temporary logging function is initialized here.

collect_evidences()[source]#

Collect evidence scores from the Learning Module.

We do this in three steps:
  • Step 1: We use the number of objects in the LM

    to update the sdr_encoder and id <-> obj tracking dictionaries, as well as the target overlap tensor

  • Step 2: We collect evidences relative to the current

    most likely hypothesis (mlh). Evidences are stored in a 2d tensor.

  • Step 3: We use the stored evidences to update the target overlap

    which stores the running average. Refer to EvidenceSDRTargetOverlaps for more details.

Note: We sort the ids in step 2 because the overlap values are suppossed to be symmetric (e.g., “2,5” = “5,2”). This way the target overlaps for the ids “x,y” and “y,x” will be averaged together in the EvidenceSDRTargetOverlaps class.

post_episode(*args, **kwargs)[source]#

Overrides the LM post_episode function.

This function collects evidences, trains SDRs and logs the output.

class EvidenceSDRTargetOverlaps[source]#

Bases: object

Keep track of the running average of target overlaps for each episode.

The target overlaps is implemented as a 2D tensor where the indices of the tensor represent the ids of the objects, and the values of the tensor represent a running average of the overlap target.

To achieve this, we implement functions for expanding the size of the overlap target tensor, linear mapping for normalization, and updating the overlap tensor (i.e., running average).

Note

We are averaging over multiple overlap targets. Multiple targets can happen for different reasons:

  • Asymmetric evidences: the target overlap for object 1 w.r.t object 2

    ([2,1]) is averaged with object 2 w.r.t object 1 ([1,2]). This is possible because we sort the ids when we add the evidences to overlaps. Both evidences get added to the location [1,2].

  • Additional episodes with similar MLO (most-likely object): More episodes

    can accumulate additional evidences on to the same key if the MLO is similar to previous MLO of another episode.

__init__()[source]#

Initialize with overlap tensor.

Initialize the class with overlap tensor to store the running average of the target scores. Additionally we store the counts to easily calculate the running average.

add_evidence(evidence, mapping_output_range)[source]#

Main function for updating the running average with evidence.

This function receives as input the relative evidence scores and maps them to overlaps in the mapping_output_range. The mapped overlaps are added to the running average in the function add_overlaps.

add_objects(new_size)[source]#

Expands the overlaps and the counts 2D tensors to accomodate new objects.

add_overlaps(mapped_overlaps)[source]#

Main function for updating the running average with overlaps.

The running average equation we use is: new_average = ((old_average * counts) + (new_val * 1))/ (counts + 1)

This calculates equally-weighted average, assuming that we keep track of the counts and increment them every time we add a new value to the average.

map_to_overlaps(evidence, output_range)[source]#

Linear mapping of values from input range to output range.

Only applies to real values (i.e., ignores nan values).

Returns:

?

property overlaps#

Returns the target overlap values rounded to the nearest integer.

class LoggerSDR(path)[source]#

Bases: object

A simple logger that saves the data passed to it.

This logger maintains an episode counter and logs the data it receives under different files named by the episode counter.

See more information about what data is being logged under the `log_episode` function.

__init__(path)[source]#
log_episode(data)[source]#

Receives data dictionary and saves it as a pth file.

This function will save all the data passed to it. Here is a breakdown of the data to be logged by this function.

The data dictionary contains these key-value pairs:
  • mask: 2d tensor of the available overlap targets after this

    episode

  • target_overlap: 2d tensor of the target overlap at the end of

    this episode

  • training: Dictionary of training statistics for every epoch.

    Includes overlap_error, training_summed_distance, dense representations, and sdrs

  • obj2id: Objects to ids dictionary mapping

  • id2obj: Ids to objects dictionary mapping

tbp.monty.frameworks.models.feature_location_matching#

class FeatureGraphLM(max_match_distance, tolerances, path_similarity_threshold=0.1, pose_similarity_threshold=0.35, required_symmetry_evidence=5, graph_delta_thresholds=None, initial_possible_poses='informed', umbilical_num_poses=8)[source]#

Bases: GraphLM

Learning module that uses features at locations to recognize objects.

__init__(max_match_distance, tolerances, path_similarity_threshold=0.1, pose_similarity_threshold=0.35, required_symmetry_evidence=5, graph_delta_thresholds=None, initial_possible_poses='informed', umbilical_num_poses=8)[source]#

Initialize Learning Module.

Parameters:
  • max_match_distance – Maximum distance of a tested and stored location to be matched.

  • tolerances – How much can each observed feature deviate from the stored features to still be considered a match.

  • graph_delta_thresholds – Thresholds used to compare nodes in the graphs being learned, and thereby whether to include a new point or not. By default, we only consider the distance between points, using a threshold of 0.001 (determined in remove_close_points). Can also specify thresholds based on e.g. surface normal angle difference, or principal curvature magnitude difference.

  • path_similarity_threshold – How similar do paths have to be considered the same in the terminal condition check.

  • pose_similarity_threshold – difference between two poses to be considered unique when checking for the terminal condition (in radians).

  • required_symmetry_evidence – number of steps with unchanged possible poses to classify an object as symmetric and go into terminal condition.

  • initial_possible_poses – initial possible poses that should be tested for. In [“uniform”, “informed”, list]. default = “informed”.

  • umbilical_num_poses – Number of samples rotations in the direction of the plane perpendicular to the surface normal.

get_object_rotation(graph_id, get_reverse_r=False)[source]#

Get the rotation of an object from the possible poses if resolved.

This first checks whether we have recognized a unique pose of the object or if a symmetry is detected. If one of the two is true it returns the unique rotation(s), otherwise returns None.

Parameters:
  • graph_id – The object to check poses for.

  • get_reverse_r – Whether to get the rotation that turns the model such that it would produce the sensed_displacements (False) or the rotation needed to turn the sensed_displacements into the model displacements.

Returns:

The rotation of the object if we know it.

get_unique_pose_if_available(object_id)[source]#

Get the pose of an object if we know it.

Scale is not implemented.

Returns:

The pose of the object if we know it.

receive_votes(vote_data)[source]#

Use votes to remove objects and poses from possible matches.

NOTE: Add object back into possible matches if majority of other modules

think it is correct? Could help with dealing with noise but may also prevent LMs from narrowing down quickly. Since we are not working with this LM anymore, we probably wont add that.

Parameters:

vote_data – positive and negative votes on object IDs + positive votes for locations and rotations on the object.

reset()[source]#

Call this before each episode.

send_out_vote()[source]#

Send out list of objects that are not possible matches.

By sending out the negative matches we avoid the problem that every LM needs to know about the same objects. We could think of this as more of an inhibitory signal (I know it can’t be this object so you all don’t need to check that anymore).

Returns:

List of objects that are not possible matches.

NUM_OTHER_LMS = 4#
class FeatureGraphMemory(graph_delta_thresholds)[source]#

Bases: GraphMemory

Graph memory that matches objects by using features at locations.

__init__(graph_delta_thresholds)[source]#

Initialize Graph Memory.

Parameters:

graph_delta_thresholds – Thresholds used to compare nodes in the graphs being learned, and thereby whether to include a new point or not.

get_initial_hypotheses()[source]#
get_nodes_with_matching_features(graph_id, features, list_of_lists=False) tuple[list, list][source]#

Get only nodes with matching features.

Get a reduced list of nodes that includes only nodes with features that match the features dict passed here

Parameters:
  • graph_id – The graph descriptor e.g. ‘mug’

  • features – The observed features to be matched

  • list_of_lists – should each location in the list be embedded in its own list (useful for some downstream operations)

  • False. (Defaults to) –

Returns:

The reduced lists of ids / locs.

get_rotation_features_at_node(graph_id, node_id, channel)[source]#

Get the rotation features at a node in the graph.

Returns:

The rotation features at a node in the graph.

tbp.monty.frameworks.models.goal_state_generation#

class EvidenceGoalStateGenerator(parent_lm, goal_tolerances=None, elapsed_steps_factor=10, min_post_goal_success_steps=numpy.inf, x_percent_scale_factor=0.75, desired_object_distance=0.03, wait_growth_multiplier=2, **kwargs) None[source]#

Bases: GraphGoalStateGenerator

Generator of goal states for an evidence-based graph LM.

GSG specifically setup for generating goal states for an evidence-based graph LM, which can therefore leverage the hypothesis-testing action policy. This policy uses hypotheses about the most likely objects, as well as knowledge of their structure from long-term memory, to propose test-points that should efficiently disambiguate the ID or pose of the object the agent is currently observing.

TODO M separate out the hypothesis-testing policy (which is one example of a model-based policy), from the GSG, which is the system that is capable of leveraging a variety of model-based policies.

__init__(parent_lm, goal_tolerances=None, elapsed_steps_factor=10, min_post_goal_success_steps=numpy.inf, x_percent_scale_factor=0.75, desired_object_distance=0.03, wait_growth_multiplier=2, **kwargs) None[source]#

Initialize the Evidence GSG.

Parameters:
  • parent_lm

    ?

  • goal_tolerances

    ?

  • elapsed_steps_factor – Factor that considers the number of elapsed steps as a possible condition for initiating a hypothesis-testing goal state; should be set to an integer reflecting a number of steps. In general, when we have taken number of non-goal-state driven steps greater than elapsed_steps_factor, then this is an indiciation to initiate a hypothesis-testing goal-state. In addition however, we can multiply elapsed_steps_factor by an exponentiall increasing wait-factor, such that we use longer and longer intervals as the experiment continues. Defaults to 10.

  • min_post_goal_success_steps – Number of necessary steps for a hypothesis goal-state to be considered. Unlike elapsed_steps_factor, this is a necessary criteria for us to generate a new hypothesis-testing goal-state. For example, if set to 5, then the agent must take 5 non-hypothesis-testing steps before it can even consider generating a new hypothesis-testing goal-state. Infinity by default, resulting in no use of the hypothesis-testing policy (desirable for unit tests etc.). Defaults to np.infty.

  • x_percent_scale_factor – Scale x-percent threshold to decide when to focus on pose rather than determining object ID; in particular, this is used to determine whether the top object is sufficiently more likely (based on MLH evidence) than the second MLH object to warrant focusing on disambiguating the pose of the first; should be bounded between 0:1.0. If x_percent_scale_factor=1.0, then will wait until the standard x-percent threshold is exceeded, equivalent to the LM converging to a single object, but not a pose. If it is <1.0, then we will start testing pose of the MLH object even before we are entirely certain about its ID. Defaults to 0.75.

  • desired_object_distance – The desired distance between the agent and the object, which is used to determine whether the agent is close enough to the object to consider it “achieved”. Note this need not be the same as the one specified for the motor-system (e.g. the surface-policy), as we may want to aim for an initially farther distance, while the surface-policy may want to stay quite close to the object. Defaults to 0.03.

  • wait_growth_multiplier – Multiplier used to increase the wait_factor, which in turn controls how long to wait before the next jump attempt.

  • **kwargs – Additional keyword arguments.

reset()[source]#

Reset additional parameters specific to the Evidence GSG.

class GraphGoalStateGenerator(parent_lm, goal_tolerances=None, **kwargs) None[source]#

Bases: GoalStateGenerator

Generate sub-goal states until the received goal state is achieved.

A component associated with each learning module that receives a high level goal state, and generates sub-goal states until the received goal state is achieved.

Generated goal-states are received by either:

i) other learning modules, which may model world-objects (e.g. a mug), or may model internal systems (e.g. the agent’s robotic limb) ii) motor actuators, in which case they represent simpler, primitive goal-states for the actuator to achieve (e.g. location and orientation of an actuator-sensor pair)

As well as the high-level, “driving” goal-state, generated goal-states can also be conditioned on other information, such as the LMs current most-likely hypothesis, and the structure of known object models (i.e. information local to the LM).

Note all goal-states conform to the State-class cortical messaging protocol (CMP).

__init__(parent_lm, goal_tolerances=None, **kwargs) None[source]#

Initialize the GSG.

Parameters:
  • parent_lm – The learning-module class instance that the GSG is embedded within.

  • goal_tolerances – The tolerances for each attribute of the goal-state that can be used by the GSG when determining whether a goal-state is achieved. These are not necessarily the same as an LM’s tolerances used for matching, as here we are evaluating whether a goal-state is achieved.

  • **kwargs – Additional keyword arguments. Unused.

output_goal_states() list[GoalState][source]#

Retrieve the output goal-states of the GSG.

This is the goal-state projected to other LM’s GSGs +/- motor-actuators.

Returns:

Output goal-states of the GSG if it exists, otherwise empty list.

reset()[source]#

Reset any stored attributes of the GSG.

set_driving_goal_state(received_goal_state)[source]#

Receive a new high-level goal to drive this goal-state-generator (GSG).

If none is provided, the goal-state generator should default to pursuing a goal-state of high confidence, with no other attributes of the state specified; in other words, it attempts to reduce uncertainty about the LM’s output (object ID and pose, whatever these may be).

TODO M: Currently GSGs always use the default, however future work will implement hierarchical action policies/GSGs, as well as the ability to specify a top goal-state by the experimenter.

TODO M : we currently just use “None” as a placehodler for the default goal-state > plan : set the default driving goal-state to a meaningful, non-None value that is compatible with the current method for checking convergence of an LM, such that achieving the driving goal-state can be used as a test for Monty convergence. This might be something like the below.

step(observations)[source]#

Step the GSG.

Check whether the GSG’s output and driving goal-states are achieved, and generate a new output goal-state if necessary.

tbp.monty.frameworks.models.graph_matching#

class GraphLM(initialize_base_modules=True)[source]#

Bases: LearningModule

General Learning Module that contains a graph memory.

__init__(initialize_base_modules=True)[source]#

Initialize general Learning Module based on graphs.

Parameters:

initialize_base_modules – Provides option to not intialize the base modules if more specialized versions will be initialized in child LMs. Defaults to True.

add_lm_processing_to_buffer_stats(lm_processed)[source]#

Update the buffer stats with whether the LM processed an observation.

Add boolean of whether the LM processed an observation on this particular episode step.

Parameters:

lm_processed – Boolean of whether the LM processed an observation on this particular episode step

collect_stats_to_save()[source]#

Get all stats that this LM should store in the buffer for logging.

Returns:

Stats to store in the buffer.

exploratory_step(observations)[source]#

Step without trying to recognize object (updating possible matches).

get_all_known_object_ids()[source]#

Get the IDs of all object models stored in memory.

Returns:

IDs of all object models stored in memory.

get_graph(model_id, input_channel=None)[source]#

Get learned graph from graph memory.

Note

May generalize this in the future to get_object_model which doesn’t have to be a graph but currently a lot of code expects a graph to be returned so this name is more meaningful.

Returns:

Graph.

get_input_channels_in_graph(model_id)[source]#

Get input channels stored for a graph in graph memory.

Returns:

Input channels stored for a graph in graph memory.

get_object_scale(object_id)[source]#

Get object scale. TODO: implement solution for detecting scale.

Returns:

1

get_output()[source]#

Return the output of the learning module.

Is currently only implemented for the evidence LM since the other LM versions do not have a notion of MLH and therefore can’t produce an output until the last step of the episode.

get_possible_locations()[source]#
get_possible_matches()[source]#

Get list of current possible objects.

TODO: Maybe make this private -> check terminal condition

Returns:

List of current possible objects.

get_possible_paths()[source]#

Return possible paths for each object.

This is used for logging/plotting and to check if we know where on the object we are.

Returns:

Possible paths for each object.

get_possible_poses(as_euler=True)[source]#

Return possible poses for each object (for logging).

Possible poses are narrowed down in the feature matching version. When using displacements or PPF this is empty.

Returns:

Possible poses for each object.

get_unique_pose_if_available(object_id)[source]#

Return a 7d pose array if pose is uniquely identified.

This method should return a 7d pose array containing the detected object location, rotation and scale if the pose is uniquely identified. If not, it should contain None. This is used in the Monty class to determine whether we have reached a terminal state.

Returns:

7d pose array or None.

load_state_dict(state_dict)[source]#

Load state dict.

Parameters:

state_dict – State dict to load.

matching_step(observations)[source]#

Update the possible matches given an observation.

post_episode()[source]#

If training, update memory after each episode.

pre_episode(primary_target)[source]#

Set target object var and reset others from last episode.

primary_targetthe primary target for the learning module/

Monty system to recognize (e.g. the object the agent begins on, or an important object in the environment; NB that a learning module can also correctly classify a “stepwise_target”, corresponding to the object that it is currently on, while it is attempting to classify the primary_target)

propose_goal_states() list[GoalState][source]#

Return the goal-states proposed by this LM’s GSG.

Only returned if the LM/GSG was stepped, otherwise returns empty list.

receive_votes(vote_data)[source]#

Remove object ids that come in from the votes.

Parameters:

vote_data – set of objects that other LMs excluded from possible matches

reset()[source]#

NOTE: currently not used in public interface.

send_out_vote()[source]#

Send out list ob objects that are not possible matches.

By sending out the negavtive matches we avoid the problem that every LM needs to know about the same objects. We could think of this as more of an inhibitory signal (I know it can’t be this object so you all don’t need to check that anymore).

Returns:

Set of objects that are not possible matches.

set_detected_object(terminal_state)[source]#

Set the current graph ID.

If we didn’t recognize the object this will be new_object{n} where n is len(graph_memory) + 1. Otherwise it is the id of the graph that we recognized. If we timed out it is None and we will not update the graph memory.

set_experiment_mode(mode)[source]#

Set LM and GM mode to train or eval.

set_individual_ts(terminal_state)[source]#
state_dict()[source]#

Get the full state dict for logging and saving.

Returns:

Full state dict for logging and saving.

update_terminal_condition()[source]#

Check if we have reached a terminal condition for this episode.

Returns:

Terminal state of the LM.

class GraphMemory(graph_delta_thresholds=None, k=None)[source]#

Bases: LMMemory

General GraphMemory that stores & manipulates GraphObjectModel instances.

You can think of the GraphMemory as a library of object models with a librarian managing them. The books ate GraphObjectModel instances. The LearningModule classes access the information stored in the books and can request books to be added to the library.

Subclasses are DisplacementGraphMemory, FeatureGraphMemory and EvidenceGraphMemory.

__init__(graph_delta_thresholds=None, k=None)[source]#

Initialize a graph memory structure. This can then be filled with graphs.

Parameters:
  • k – integer k as in KNN, used for creating edges between observations

  • graph_delta_thresholds – thresholds for determining if two observations are sufficiently different to both be added to the object model.

Examples:

graph_memory = GraphMemory()
graph_memory._add_graph_to_memory(cup_model, "cup")
graph_memory.reset() # Call at beginning of episode
get_all_models_in_memory()[source]#

Return models stored in memory.

get_feature_array(graph_id)[source]#
get_feature_order(graph_id)[source]#
get_features_at_node(graph_id, input_channel, node_id, feature_keys=None)[source]#

Get features at a specific node in the graph.

Parameters:
  • graph_id – Name of graph.

  • input_channel – Input channel.

  • node_id – Node ID of the node to get features from. Can also be an array of node IDs to return an array of features.

  • feature_keys – Feature keys.

Returns:

Dict of features at this node.

TODO: look into getting node_id > graph.x.shape[0] (by 1)

get_graph(graph_id, input_channel=None)[source]#

Return graph from graph memory.

Parameters:
  • graph_id – id of graph to retrieve

  • input_channel

    ?

Raises:

ValueError – If input_channel is defined, not “first”, and not in the graph

get_graph_node_ids(graph_id, input_channel)[source]#
get_initial_hypotheses()[source]#
get_input_channels_in_graph(graph_id)[source]#
get_locations_in_graph(graph_id, input_channel)[source]#
get_memory_ids()[source]#

Get list of all objects in memory.

Returns:

List of all objects in memory.

get_num_nodes_in_graph(graph_id, input_channel=None)[source]#

Get number of nodes in graph.

If input_channel is None, return sum over all input channels for this object.

Returns:

Number of nodes in graph.

initialize_feature_arrays()[source]#
load_state_dict(state_dict)[source]#

Load graphs from state dict and add to memory.

memory_consolidation()[source]#

Is here just as a placeholder.

This could be a function that cleans up graphs in memory to make more efficient use of their nodes by spacing them out evenly along the approximated object surface. It could be something that happens during sleep. During clean up, similar graphs could also be merged.

Q: Should we implement something like this?

remove_graph_from_memory(graph_id)[source]#
state_dict()[source]#

Return state_dict.

update_memory(locations, features, graph_id, object_location_rel_body, location_rel_model, object_rotation, object_scale)[source]#

Determine how to update memory and call corresponding function.

class MontyForGraphMatching(*args, **kwargs)[source]#

Bases: MontyBase

General Monty model for recognizing object using graphs.

__init__(*args, **kwargs)[source]#

Initialize and reset LM.

check_if_any_lms_updated()[source]#

True if any LM received sensory information on the current episode step.

Returns:

True if any LM received sensory information on the current episode step, False otherwise.

check_terminal_conditions()[source]#

Check if all LMs have reached a terminal state.

This could be no_match, match, or time_out. If all LMs have reached one of these states, end the episode.

Currently the episode just ends if
  • min_lms_match lms have reached “match”

  • all lms have reached “no_match”

  • We have exceeded max_total_steps

Note

In the future we may want to allow ending an episode when all states are either match or no_match. Right now, the match lms will have to convince the no_match lms of their detected object and pose for the episode to end which may be more difficult if not all LMs know about all objects.

Returns:

True if all LMs have reached a terminal state, False otherwise.

deal_with_time_out()[source]#

Set LM terminal states to time_out.

load_state_dict_from_parallel(parallel_dirs, save=False)[source]#
pre_episode(primary_target, semantic_id_to_label=None)[source]#

Reset values and call sub-pre_episode functions.

reset()[source]#

Reset monty status.

send_vote_to_lm(lm, lm_id, combined_votes)[source]#

Route correct votes to a given LM.

set_is_done()[source]#

Set the model is_done flag.

Method that e.g. experiment class can use to set model is_done flag if e.g. total number of episode steps possible has been exceeded

update_stats_after_vote(lm)[source]#

Add voting stats to buffer and check individual terminal condition.

LOGGING_REGISTRY = {'BASIC': <class 'tbp.monty.frameworks.loggers.graph_matching_loggers.BasicGraphMatchingLogger'>, 'DETAILED': <class 'tbp.monty.frameworks.loggers.graph_matching_loggers.DetailedGraphMatchingLogger'>, 'SELECTIVE': <class 'tbp.monty.frameworks.loggers.graph_matching_loggers.SelectiveEvidenceLogger'>, 'SILENT': <class 'tbp.monty.frameworks.loggers.exp_logger.BaseMontyLogger'>}#

tbp.monty.frameworks.models.monty_base#

class MontyBase(sensor_modules, learning_modules, motor_system: MotorSystem, sm_to_agent_dict, sm_to_lm_matrix, lm_to_lm_matrix, lm_to_lm_vote_matrix, min_eval_steps, min_train_steps, num_exploratory_steps, max_total_steps)[source]#

Bases: Monty

__init__(sensor_modules, learning_modules, motor_system: MotorSystem, sm_to_agent_dict, sm_to_lm_matrix, lm_to_lm_matrix, lm_to_lm_vote_matrix, min_eval_steps, min_train_steps, num_exploratory_steps, max_total_steps)[source]#

Initialize the base class.

Parameters:
  • sensor_modules – list of sensor modules

  • learning_modules – list of learning modules

  • motor_system (MotorSystem) – class instance that aggregates proposed motor outputs of learning modules and decides next action. Conceptually, this is the subcortical motor area. Note: EnvironmentDataLoader takes a motor_system as an argument. That motor system is the same as this one.

  • sm_to_agent_dict – dictionary mapping each sensor module id to the list of habitat agents it receives input from. This is to simulate columns with wide receptive fields that receive input from multiple sensors. TODO: Do we still need this?

  • sm_to_lm_matrix – nested array that governs which sensor modules a learning module will receive input from, such that learning_modules[sm_to_lm_matrix[i][j]] is the jth sensor module that learning module i receives input from. For now, assume 1:1 mapping. Technically, this is a coupling matrix, but since it is sparse, the argument format is an array of arrays.

  • lm_to_lm_matrix – just like sm_to_lm_matrix, but describes coupling between learning modules where one lms output becomes another lms input. The output of an lm needs to be the same format as its input.

  • lm_to_lm_vote_matrix – describes lateral coupling between learning modules. This matrix is used for voting. Assumes no lateral voting if None is passed.

  • min_eval_steps – Minimum number of steps required for evaluations.

  • min_train_steps – Minimum number of steps required for training.

  • num_exploratory_steps – Number of steps required by the exploratory phase.

  • max_total_steps – Maximum number of steps to run the experiment.

Raises:
  • ValueError – If sm_to_lm_matrix is not defined

  • ValueError – If the lengths of learning_modules and sm_to_lm_matrix do not match

  • ValueError – If the keys of sm_to_agent_dict do not match the sensor_module_id`s of `sensor_modules

aggregate_sensory_inputs(observation)[source]#

Receive data from dataloader/env, organize on a per sensor module basis.

check_reached_max_matching_steps(max_steps)[source]#

Check if max_steps was reached and deal with time_out.

Returns:

True if max_steps was reached, False otherwise.

deal_with_time_out()[source]#

Call any functions and logging in case of a time out.

get_agent_state()[source]#

Get state of agent (dict).

Returns:

State of the agent.

get_observations(observations, sensor_module_id)[source]#

Get observations from all agents pertaining to a single sensor module.

Observations are returned in the format
{“agent_1”:
{“sm_1”:
{“rgba”: data,

“depth”: data “semantic”: data}

} {“sm_2”:

{“rgba”: data, … }

“agent_2”:
{“sm_3”:

{“rgba”: data, … }

… “agent_n”:

{“sm_k”:

{“rgba”: data, … }

}

}

Returns:

Observations from all agents pertaining to a single sensor module.

load_state_dict(state_dict)[source]#

Take a state dict as an argument and set state for monty and children.

pass_features_directly_to_motor_system(observation)[source]#

Pass features directly to motor system without stepping LMs.

post_episode()[source]#

Recursively call post_episode on child classes.

pre_episode()[source]#

Recursively call pre_episode on child classes.

reset_episode_steps()[source]#
set_done()[source]#
set_experiment_mode(mode)[source]#

Set the experiment mode.

Update state variables based on which method (train or evaluate) is being called at the experiment level.

state_dict()[source]#

Return a serializable dict with everything needed to save/load monty.

step(observation)[source]#

Take a matching, exploratory, or custom user-defined step.

Step taken depends on the value of self.step_type.

switch_to_exploratory_step()[source]#
switch_to_matching_step()[source]#
update_step_counters()[source]#
LOGGING_REGISTRY = {'TEST': <class 'tbp.monty.frameworks.loggers.exp_logger.TestLogger'>}#
property exceeded_min_steps#
property is_done#

Return bool to tell the experiment if we are done with this episode.

property is_motor_only_step#
property min_steps#
property step_type_count#

tbp.monty.frameworks.models.motor_policies#

exception ObjectNotVisible[source]#

Bases: RuntimeError

Error raised when the object is not visible.

class BasePolicy(rng, action_sampler_args: Dict, action_sampler_class: Type[ActionSampler], agent_id: str, switch_frequency, file_name=None, file_names_per_episode=None)[source]#

Bases: MotorPolicy

__init__(rng, action_sampler_args: Dict, action_sampler_class: Type[ActionSampler], agent_id: str, switch_frequency, file_name=None, file_names_per_episode=None)[source]#

Initialize a base policy.

Parameters:
  • rng – Random number generator to use

  • action_sampler_args (Dict) – arguments for the ActionSampler

  • action_sampler_class (Type[ActionSampler]) – The ActionSampler to use

  • agent_id (str) – The agent ID

  • switch_frequency – float in [0,1], how frequently to change actions when using sticky actions

  • file_name – Path to file with predefined actions. Defaults to None.

  • file_names_per_episode – ?. Defaults to None.

dynamic_call(_state: MotorSystemState | None = None) Action | None[source]#

Return a random action.

The MotorSystemState is ignored.

Parameters:

_state (MotorSystemState | None) – The current state of the motor system. Defaults to None. Unused.

Return type:

Action | None

Returns:

A random action.

get_agent_state(state: MotorSystemState) AgentState[source]#

Get agent state (dict).

Note

Assumes we only have one agent.

Parameters:

state (MotorSystemState) – The current state of the motor system.

Return type:

AgentState

Returns:

Agent state.

get_random_action(action: Action) Action[source]#

Returns random action sampled from allowable actions.

Enables expanding the action space of the base policy with actions that we don’t necessarily want to randomly sample

Return type:

Action

is_motor_only_step(state: MotorSystemState) bool[source]#

Check if the current step is a motor-only step.

TODO: This information is currently stored in motor system state, but should be stored in the policy state instead as it is tracking policy state, not motor system state. This will remove MotorSystemState param.

Parameters:

state (MotorSystemState) – The current state of the motor system.

Return type:

bool

Returns:

True if the current step is a motor-only step, False otherwise.

load_state_dict(state_dict)[source]#
post_action(action: Action | None, _: MotorSystemState | None = None) None[source]#

This post action hook will automatically be called at the end of __call__.

TODO: Remove state parameter as it is only used to serialize the state in

state.convert_motor_state() and should be done within the motor system.

Parameters:
  • action (Action | None) – The action to process the hook for.

  • state – The current state of the motor system. Defaults to None.

Return type:

None

post_episode()[source]#

Post episode hook.

pre_episode()[source]#

Pre episode hook.

predefined_call() Action[source]#

Use this method when actions are predefined.

Return type:

Action

Returns:

The action to take.

set_experiment_mode(mode: Literal['train', 'eval']) None[source]#

Sets the experiment mode.

Parameters:

mode (Literal['train', 'eval']) – The experiment mode to set.

Return type:

None

state_dict()[source]#
property last_action: Action#

Returns the last action taken by the motor policy.

class GetGoodView(desired_object_distance: float, good_view_percentage: float, multiple_objects_present: bool, sensor_id: str, target_semantic_id: int, allow_translation: bool = True, max_orientation_attempts: int = 1, **kwargs) None[source]#

Bases: PositioningProcedure

Positioning procedure to get a good view of the object before an episode.

Used to position the distant agent so that it finds the initial view of an object at the beginning of an episode with respect to a given sensor (the surface agent is positioned using the TouchObject positioning procedure instead). Also currently used by the distant agent after a “jump” has been initialized by a model-based policy.

First, the agent is moved towards the target object until the object fills a minimum of percentage (given by good_view_percentage) of the sensor’s field of view or the closest point of the object is less than desired_object_distance from the sensor. This makes sure that big and small objects all fill similar amount of space in the sensor’s field of view. Otherwise small objects may be too small to perform saccades or the sensor ends up inside of big objects. This step is performed by default but can be skipped by setting allow_translation=False.

Second, the agent will then be oriented towards the object so that the sensor’s central pixel is on-object. In the case of multi-object experiments, (i.e., when multiple_objects_present=True), there is an additional orientation step performed prior to the translational movement step.

__init__(desired_object_distance: float, good_view_percentage: float, multiple_objects_present: bool, sensor_id: str, target_semantic_id: int, allow_translation: bool = True, max_orientation_attempts: int = 1, **kwargs) None[source]#

Initialize the GetGoodView policy.

Parameters:
  • desired_object_distance (float) – The desired distance to the object.

  • good_view_percentage (float) – The percentage of the sensor that should be filled with the object.

  • multiple_objects_present (bool) – Whether there are multiple objects in the scene.

  • sensor_id (str) – The ID of the sensor to use for positioning.

  • target_semantic_id (int) – The semantic ID of the target object.

  • allow_translation (bool) – Whether to allow movement toward the object via the motor systems’s move_close_enough method. If False, only orientienting movements are performed. Defaults to True.

  • max_orientation_attempts (int) – The maximum number of orientation attempts allowed before giving up and truncating the procedure indicating that the sensor is not on the target object.

  • **kwargs – Additional keyword arguments.

compute_look_amounts(relative_location: numpy.ndarray, state: MotorSystemState | None = None) Tuple[float, float][source]#

Compute the amount to look down and left given a relative location.

This function computes the amount needed to look down and left in order for the sensor to be aimed at the target. The returned amounts are relative to the agent’s current position and rotation. Looking up and right is done by returning negative amounts.

TODO: Test whether this function works when the agent is facing in the positive z-direction. It may be fine, but there were some adjustments to accommodate the z-axis positive direction pointing opposite the body’s initial orientation (e.g., using negative z in left_amount = -np.degrees(np.arctan2(x_rot, -z_rot))).

Parameters:
  • relative_location (ndarray) – the x,y,z coordinates of the target with respect to the sensor.

  • state (Optional[MotorSystemState]) – The current state of the motor system. Defaults to None.

Returns:

Amount to look down (degrees). left_amount: Amount to look left (degrees).

Return type:

down_amount

find_location_to_look_at(sem3d_obs: numpy.ndarray, image_shape: Tuple[int, int], state: MotorSystemState | None = None) numpy.ndarray[source]#

Find the location to look at in the observation.

Takes in a semantic 3D observation and returns an x,y,z location.

The location is on the object and surrounded by pixels that are also on the object. This is done by smoothing the on_object image and then taking the maximum of this smoothed image.

Parameters:
  • sem3d_obs (ndarray) – The location of each pixel and the semantic ID associated with that location.

  • image_shape (Tuple[int, int]) – The shape of the camera image.

  • state (Optional[MotorSystemState]) – The current state of the motor system. Defaults to None.

Return type:

ndarray

Returns:

The x,y,z coordinates of the target with respect to the sensor.

is_on_target_object(observation: Mapping) bool[source]#

Check if a sensor is on the target object.

Parameters:

observation (Mapping) – The observation to use for positioning.

Return type:

bool

Returns:

Whether the sensor is on the target object.

move_close_enough(observation: Mapping) Action | None[source]#

Move closer to the object until we are close enough.

Parameters:

observation (Mapping) – The observation to use for positioning.

Return type:

Action | None

Returns:

The next action to take, or None if we are already close enough to the object.

Raises:

ValueError – If the object is not visible.

orient_to_object(observation: Mapping, state: MotorSystemState | None = None) List[Action][source]#

Rotate sensors so that they are centered on the object using the view finder.

The view finder needs to be in the same position as the sensor patch and the object needs to be somewhere in the view finders view.

Parameters:
  • observation (Mapping) – The observation to use for positioning.

  • state (Optional[MotorSystemState]) – The current state of the motor system. Defaults to None.

Return type:

List[Action]

Returns:

A list of actions of length two composed of actions needed to get us onto the target object.

positioning_call(observation: Mapping, state: MotorSystemState | None = None) PositioningProcedureResult[source]#

Return a list of actions to position the agent in the scene.

TODO: When this becomes a PositioningProcedure it can be a __call__ method.

Parameters:
Return type:

PositioningProcedureResult

Returns:

Any actions to take, whether the procedure succeeded, whether the procedure terminated, and whether the procedure truncated.

sensor_rotation_relative_to_world(state: MotorSystemState) Any[source]#

Derives the positioning sensor’s rotation relative to the world.

Parameters:

state (MotorSystemState) – The current state of the motor system.

Return type:

Any

Returns:

The positioning sensor’s rotation relative to the world.

class InformedPolicy(min_perc_on_obj, good_view_percentage, desired_object_distance, use_goal_state_driven_actions=False, **kwargs)[source]#

Bases: BasePolicy, JumpToGoalStateMixin

Policy that takes observation as input.

Extension of BasePolicy that allows for taking the observation into account for action selection. Currently it uses the percentage of the observation that is on the object to reverse the last action if it is below min_perc_on_obj.

Additionally, this policy discouraces taking the reverse of the previous action if we are still on the object.

guiding_sensors#

List of sensors that are used to calculate the percentage on object. When using multiple sensors or a visualization sensor we may want to ignore some when determining whether we need to move back.

min_perc_on_obj#

How much percent of the observation needs to be on the object to sample a new action. Otherwise the previous action is reversed to get back on the object. TODO: Not used anywhere?

__init__(min_perc_on_obj, good_view_percentage, desired_object_distance, use_goal_state_driven_actions=False, **kwargs)[source]#

Initialize policy.

Parameters:
  • min_perc_on_obj – Minimum percentage of patch that needs to be on the object. If under this amount, reverse the previous action to get the patch back on the object.

  • good_view_percentage – How much percent of the view finder perception should be filled with the object. (If less, move closer)

  • desired_object_distance – How far away should the agent be from the object in view; for the distant-agent, this is used to establish a maximum allowable distance of the object; note for the surface agent, this is used with every set of traversal steps to ensure we remain close to the surface

  • use_goal_state_driven_actions – Whether to enable the motor system to make use of the JumpToGoalStateMixin, which attempts to “jump” (i.e. teleport) the agent to a specified goal state.

  • **kwargs – Additional keyword arguments.

dynamic_call(state: MotorSystemState | None = None) Action | None[source]#

Return the next action to take.

This requires self.processed_observations to be updated at every step in the Monty class. self.processed_observations contains the features extracted by the sensor module for the guiding sensor (patch).

Parameters:

state (MotorSystemState | None) – The current state of the motor system. Defaults to None.

Return type:

Action | None

Returns:

The action to take.

fixme_undo_last_action() LookDown | LookUp | TurnLeft | TurnRight | MoveForward | MoveTangentially[source]#

Returns an action that undoes last action for supported actions.

Previous InformedPolicy.dynamic_call() implementation when not on object: :rtype: LookDown | LookUp | TurnLeft | TurnRight | MoveForward | MoveTangentially

action, amount = (last_action, -last_amount)

This implementation duplicates the functionality and the implicit assumption in the code and configurations that InformedPolicy is working with one of the following actions: - LookUp - LookDown - TurnLeft - TurnRight

Additionally, this implementation adds support for: - MoveForward - MoveTangentially

Additional support for the above two actions is due to -last_amount working for these actions as well. This maintains the same code functionality during this refactoring.

For other actions, raise ValueError explicitly.

Raises:

TypeError – If the last action is not supported

TODO These instance checks are undesirable and should be removed in the future. I am using these for now to express the implicit assumptions in the code. An Action.undo of some sort would be a better solution, however it is not yet clear to me what to do for actions that do not support undo.

post_action(action: Action | None, state: MotorSystemState | None = None) None[source]#

This post action hook will automatically be called at the end of __call__.

TODO: Remove state parameter as it is only used to serialize the state in

state.convert_motor_state() and should be done within the motor system.

Parameters:
  • action (Action | None) – The action to process the hook for.

  • state (MotorSystemState | None) – The current state of the motor system. Defaults to None.

Return type:

None

pre_episode()[source]#

Pre episode hook.

class JumpToGoalStateMixin None[source]#

Bases: object

Convert driving goal state to an action in Habitat-compatible coordinates.

Motor policy that enables us to take in a driving goal state for the motor agent, and specify the action in Habitat-compatible coordinates that must be taken to move there.

__init__() None[source]#
derive_habitat_goal_state()[source]#

Derive the Habitat-compatible goal state.

Take the current driving goal state (in CMP format), and derive the corresponding Habitat compatible goal-state to pass through the Embodied Dataloader.

Returns:

Target location. target_quat: Target quaternion.

Return type:

target_loc

pre_episode()[source]#
set_driving_goal_state(goal_state)[source]#

Specify the goal-state that the motor-actuator will attempt to satisfy.

class MotorPolicy None[source]#

Bases: ABC

The abstract scaffold for motor policies.

__init__() None[source]#
abstract dynamic_call(state: MotorSystemState | None = None) Action | None[source]#

Use this method when actions are not predefined.

Parameters:

state (MotorSystemState | None) – The current state of the motor system. Defaults to None.

Return type:

Action | None

Returns:

The action to take.

abstract post_action(action: Action | None, state: MotorSystemState | None = None) None[source]#

This post action hook will automatically be called at the end of __call__.

TODO: Remove state parameter as it is only used to serialize the state in

state.convert_motor_state() and should be done within the motor system.

Parameters:
  • action (Action | None) – The action to process the hook for.

  • state (MotorSystemState | None) – The current state of the motor system. Defaults to None.

Return type:

None

abstract post_episode() None[source]#

Post episode hook.

Return type:

None

abstract pre_episode() None[source]#

Pre episode hook.

Return type:

None

abstract predefined_call() Action[source]#

Use this method when actions are predefined.

Return type:

Action

Returns:

The action to take.

abstract set_experiment_mode(mode: Literal['train', 'eval']) None[source]#

Sets the experiment mode.

Parameters:

mode (Literal['train', 'eval']) – The experiment mode to set.

Return type:

None

abstract property last_action: Action#

Returns the last action taken by the motor policy.

class NaiveScanPolicy(fixed_amount, **kwargs)[source]#

Bases: InformedPolicy

Policy that just moves left and right along the object.

__init__(fixed_amount, **kwargs)[source]#

Initialize policy.

check_cycle_action()[source]#

Makes sure we move in a spiral.

This method switches the current action if steps_per_action was reached. Additionally it increments steps_per_action after the second and forth action to make sure paths don’t overlap.

_ _ _ _

_ _ |
|_ | |

|_ _ _| |

corresponds to 1x left, 1x up, 2x right, 2x down, 3x left, 3x up, 4x right, 4x down, …

dynamic_call(_state: MotorSystemState | None = None) Action[source]#

Return the next action in the spiral being executed.

The MotorSystemState is ignored.

Parameters:

_state (Optional[MotorSystemState]) – The current state of the motor system. Defaults to None. Unused.

Return type:

Action

Returns:

The action to take.

Raises:

StopIteration – If the spiral has completed.

pre_episode()[source]#

Pre episode hook.

class PositioningProcedure(rng, action_sampler_args: Dict, action_sampler_class: Type[ActionSampler], agent_id: str, switch_frequency, file_name=None, file_names_per_episode=None)[source]#

Bases: BasePolicy

Positioning procedure to position the agent in the scene.

TODO: Remove from MotorPolicy hierarchy and refactor to standalone

PositioningProcedure hierarchy when they get separated.

The positioning_call method should be repeatedly called until the procedure result indicates that the procedure has terminated or truncated.

static depth_at_center(agent_id: str, observation: Any, sensor_id: str) float[source]#

Determine the depth of the central pixel for the sensor.

Parameters:
  • agent_id (str) – The ID of the agent to use.

  • observation (Any) – The observation to use.

  • sensor_id (str) – The ID of the sensor to use.

Return type:

float

Returns:

The depth of the central pixel for the sensor.

abstract positioning_call(observation: Mapping, state: MotorSystemState | None = None) PositioningProcedureResult[source]#

Return a list of actions to position the agent in the scene.

TODO: When this becomes a PositioningProcedure it can be a __call__ method.

Parameters:
Return type:

PositioningProcedureResult

Returns:

Any actions to take, whether the procedure succeeded, whether the procedure terminated, and whether the procedure truncated.

class PositioningProcedureResult(actions: ~typing.List[~tbp.monty.frameworks.actions.actions.Action] = <factory>, success: bool = False, terminated: bool = False, truncated: bool = False) None[source]#

Bases: object

Result of a positioning procedure.

For more on the terminated/truncated terminology, see https://farama.org/Gymnasium-Terminated-Truncated-Step-API.

__init__(actions: ~typing.List[~tbp.monty.frameworks.actions.actions.Action] = <factory>, success: bool = False, terminated: bool = False, truncated: bool = False) None#
actions: List[Action]#

Actions to take.

success: bool = False#

Whether the procedure succeeded in its positioning goal.

terminated: bool = False#

Whether the procedure reached a terminal state with success or failure.

truncated: bool = False#

Whether the procedure was truncated due to a limit on the number of attempts or other criteria.

class SurfacePolicy(alpha, min_perc_on_obj=0.25, good_view_percentage=0.5, **kwargs)[source]#

Bases: InformedPolicy

Policy class for a surface-agent.

i.e. an agent that moves to and follows the surface of an object. Includes functions for moving along an object based on its surface normal.

__init__(alpha, min_perc_on_obj=0.25, good_view_percentage=0.5, **kwargs)[source]#

Initialize policy.

Parameters:
  • min_perc_on_obj – Minimum percentage of patch that needs to be on the object. If under this amount, reverse the previous action to get the patch back on the object.

  • good_view_percentage – How much percent of the view finder perception should be filled with the object. (If less, move closer) TODO M : since surface agent does not use get_good_view, can consider removing this parameter

  • alpha – to what degree should the move_tangentially direction be the same as the last step or totally random? 0~same as before, 1~random walk

  • **kwargs

    ?

dynamic_call(state: MotorSystemState | None = None) OrientHorizontal | OrientVertical | MoveTangentially | MoveForward | None[source]#

Return the next action to take.

This requires self.processed_observations to be updated at every step in the Monty class. self.processed_observations contains the features extracted by the sensor module for the guiding sensor (patch).

Parameters:

state (Optional[MotorSystemState]) – The current state of the motor system. Defaults to None.

Return type:

OrientHorizontal | OrientVertical | MoveTangentially | MoveForward | None

Returns:

The action to take.

Raises:

ObjectNotVisible – If the object is not visible.

get_inverse_agent_rot(state: MotorSystemState)[source]#

Get the inverse rotation of the agent’s current orientation.

Used to transform poses of e.g. surface normals or principle curvature from global coordinates into the coordinate frame of the agent.

To intuit why we apply the inverse, imagine an e.g. surface normal with the same pose as the agent; in the agent’s reference frame, this should have the identity pose, which will be acquired by transforming the original pose by the inverse

Parameters:

state (MotorSystemState) – The current state of the motor system.

Returns:

Inverse quaternion rotation.

get_next_action(state: MotorSystemState) OrientHorizontal | OrientVertical | MoveTangentially | MoveForward | None[source]#

Retrieve next action from a cycle of four actions.

First move forward to touch the object at the right distance Then orient toward the normal along direction 1 Then orient toward the normal along direction 2 Then move tangentially along the object surface Then start over

Parameters:

state (MotorSystemState) – The current state of the motor system.

Return type:

OrientHorizontal | OrientVertical | MoveTangentially | MoveForward | None

Returns:

Next action in the cycle.

horizontal_distances(rotation_degrees: float) Tuple[float, float][source]#

Compute the horizontal and forward distances to move to.

Compensate for a given rotation of a certain angle.

Parameters:

rotation_degrees (float) – The angle to rotate by

Returns:

The left distance to move move_forward_distance: The forward distance to move

Return type:

move_left_distance

orienting_angle_from_normal(orienting: str, state: MotorSystemState) float[source]#

Compute turn angle to face the object.

Based on the surface normal, compute the angle that the agent needs to turn in order to be oriented directly toward the object

Parameters:
  • orienting (str) – “horizontal” or “vertical”

  • state (MotorSystemState) – The current state of the motor system.

Return type:

float

Returns:

degrees that the agent needs to turn

post_action(action: Action, state: MotorSystemState | None = None) None[source]#

Temporary SurfacePolicy post_action to distinguish types of last action.

Once TouchObject positioning procedure exists, it will not run through the Monty step loop and will not register any touch object actions as last action.

Currently, when SurfacePolicy.dynamic_call resumes, it sees the last action of a touch object, which is always MoveForward. As such, the SurfacePolicy resumes by always taking the OrientHorizontal action.

When the TouchObject positioning procedure is complete, the SurfacePolicy will never see the TouchObject actions, so when it resumes, the last action will be whatever the last action the SurfacePolicy took.

For now, we specifically track only the SurfacePolicy actions in the last_surface_policy_action attribute, in order to prepare the code for TouchObject positioning procedure.

Parameters:
  • action (Action) – The action that was just taken.

  • state (MotorSystemState | None) – The current state of the motor system. Defaults to None.

Return type:

None

# TODO: Remove this once TouchObject positioning procedure is implemented

pre_episode()[source]#

Pre episode hook.

tangential_direction(state: MotorSystemState) Tuple[float, float, float][source]#

Set the direction of the action to be a direction 0 - 2pi.

  • start at 0 (go up) in the reference frame of the agent; i.e. based on

the standard initialization of an agent, this will be up from the floor. To implement this convention, the theta is offset by 90 degrees when finding our x and y translations, i.e. such that theta of 0 results in moving up by 1 (y), and right by 0 (x), rather than vice-versa - random action -pi - +pi is given by (rand() - 0.5) * 2pi - These are combined and weighted by the alpha parameter

Parameters:

state (MotorSystemState) – The current state of the motor system.

Return type:

Tuple[float, float, float]

Returns:

Direction of the action

touch_object(raw_observation, view_sensor_id: str, state: MotorSystemState) MoveForward | OrientHorizontal | OrientVertical[source]#

The surface agent’s policy for moving onto an object for sensing it.

Like the distant agent’s get_good_view, this is called at the beginning of every episode, and after a “jump” has been initialized by a model-based policy. In addition, it can be called when the surface agent cannot sense the object, e.g. because it has fallen off its surface.

Currently uses the raw observations returned from the viewfinder via the dataloader, and not the extracted features from the sensor module. TODO M refactor this so that all sensory processing is done in the sensor module.

If we aren’t on the object, try first systematically orienting left around a point, then orienting down, and finally random orientations along the surface of a fixed sphere.

Parameters:
  • raw_observation – The raw observation from the simulator.

  • view_sensor_id (str) – The ID of the viewfinder sensor.

  • state (MotorSystemState) – The current state of the motor system.

Return type:

MoveForward | OrientHorizontal | OrientVertical

Returns:

Action to take.

vertical_distances(rotation_degrees: float) Tuple[float, float][source]#

Compute the down and forward distances to move to.

Compensate for a given rotation of a certain angle.

Parameters:

rotation_degrees (float) – The angle to rotate by

Returns:

The down distance to move move_forward_distance: The forward distance to move

Return type:

move_down_distance

class SurfacePolicyCurvatureInformed(alpha, pc_alpha, max_pc_bias_steps, min_general_steps, min_heading_steps, **kwargs)[source]#

Bases: SurfacePolicy

Policy class for a more intelligent surface-agent.

Includes additional functions for moving along an object based on the direction of principle curvature (PC).

A general summary of the policy is that the agent will start following PC directions as soon as these are well defined. This will initially be the minimal curvature; it will follow these for as long as they are defined, and until reaching a certain number of steps (max_pc_bias_steps), before then changing to follow maximal curvature. This process continues to alternate, as long as the PC directions are well defined.

If PC are not meaningfully defined (which is often the case), then the agent uses standard momentum to take a step in a direction similar to the previous step (weighted by the alpha parameter); it will do this for a minimum number of steps (min_general_steps) before it will consider using PC information again.

If the agent is taking a step that will bring it to a previously visited location according to its estimates, then it will attempt to correct for this and choose another direction; after finding a new heading, the agent performs a minimum number of steps (min_heading_steps) before it will check again for a conflicting heading.

The main other event that can occur is that PCs are defined, but these are predominantly in the z-direction (relative to the agent); in that case, the PC-defined heading is ignored on that step, and a standard, momentum-based step is taken; such z-defined PC directions tend to occur on e.g. the rim of cups and are presumably due to issues with the agents re-orientation steps vs. noisiness of the PCs defined by the surface

TODO update this method to accept more general notions of directions-of-variance (rather than strictly principal curvature), such that it can be applied in more abstract spaces

__init__(alpha, pc_alpha, max_pc_bias_steps, min_general_steps, min_heading_steps, **kwargs)[source]#

Initialize policy.

Parameters:
  • alpha – to what degree should the move_tangentially direction be the same as the last step or totally random? 0~same as before, 1~random walk; used when not following principal curvature (PC)

  • pc_alpha – the degree to which the moving average of the PC direction should be based on the history vs. the most recent step

  • max_pc_bias_steps – the number of steps to take following a particular PC direction (i.e. maximum or minimal curvature) before switching to the other type of PC direction

  • min_general_steps – the number of normal momentum steps that must be taken after the agent has stopped following principal curvature, and before it will follow PC again; if this is too low, then the agent has the habit of moving quite noisily as it keeps attempting to follow PC, but if it is too high, then PC directions are not used to their fullest

  • min_heading_steps – when not following PC directions, the minimum number of steps to take before we check that we’re not heading to a previously visited location; again, this should be sufficiently high that we don’t bounce around noisily, but also low enough that we avoid revisiting locations

  • **kwargs – Additional keyword arguments.

attempt_conflict_resolution(vec_copy)[source]#

Try to define direction vector that avoids revisiting previous locations.

avoid_revisiting_locations(state: MotorSystemState, conflict_divisor=3, max_steps=100)[source]#

Avoid revisiting locations.

Check if the new proposed location direction is already pointing to somewhere we’ve visited before; if not, we can use the initially proposed movement.

If there is a conflict, we select a new heading that avoids this; this is achieved by iteratively searching for a heading that does not conflict with any previously visited locations.

Parameters:
  • conflict_divisor – The amount pi is divided by to determine that a current heading will be too close to a previously visited location; this is an initial value that will be dynamically adjusted. Defaults to 3.

  • max_steps – Maximum iterations of the search to perform to try to find a non-conflicting heading. Defaults to 100.

  • state (MotorSystemState) – The current state of the motor system.

Note that while the policy might have “unrealistic” access to information about it’s location in the environment, this could easily be replaced by relative locations based on the first sensation

Finally, note that in many situations, revisiting locations can be a good thing (e.g. re-anchoring given noisy path-integration), so we may want to activate /inactivate this as necessary (TODO)

TODO separate out avoid_revisiting_locations as its own mixin so that it can be used more broadly

check_for_flipped_pc()[source]#

Check for arbitrarily flipped PC direction.

Do a quick check to see if the previous PC heading has been arbitrarily flipped, in which case, flip it back. With any luck, this will allow us to automatically pass avoid_revisiting_locations and thereby continue using PCs where relevant.

check_for_preference_change()[source]#

Flip the preference for the min or max PC after a certain number of steps.

This way, we can more quickly explore different “parts” of an object, rather than just persistently following e.g. the rim of a cup. By default, there is always an initial bias for the smallest principal curvature.

TODO can eventually combine with a hypothesis-testing policy that encourages exploration of unvisited parts of the object, rather than relying on this simple counter-heuristic

conflict_check(rotated_locs, ii)[source]#

Check for conflict in the current heading.

Target location needs to be similar and we need to have a similar surface normal to discount the current proposed heading; if surface normals are significantly different, then we are likely on a different surface, in which case passing by nearby previous points is no longer problematic.

Note that when executing failed hypothesis-testing jumps, we can have multiple instances of the same location in our history; this will result in get_angle_beefed_up returning infinity, i.e. we don’t worry about avoiding our current location

Returns:

True if there is a conflict, False otherwise.

determine_pc_for_use()[source]#

Determine the principal curvature to use for our heading.

Use magnitude (ignoring negatives), as well as the current direction preference.

Returns:

Principal curvature to use.

pc_moving_average()[source]#

Calculate a moving average of the principal curvature direction.

The moving average should be consistent even on curved surfaces as the directions will be in the reference frame of the agent (which has rotated itself to align with the surface normal)

perform_pc_guided_step(state: MotorSystemState) Tuple[float, float, float][source]#

Inform steps to take using defined directions of principal curvature.

Use the defined directions of principal curvature to inform (ideally a series) of steps along the appropriate direction.

Parameters:

state (MotorSystemState) – The current state of the motor system.

Return type:

Tuple[float, float, float]

Returns:

Direction of the action

perform_standard_tang_step(state: MotorSystemState) Tuple[float, float, float][source]#

Perform a standard tangential step across the object.

This is in contrast to, for example, being guided by principal curvatures.

Note this is still more “intelligent” than the tangential step of the baseline surface-agent policy, because it also attempts to avoid revisiting old locations

Parameters:

state (MotorSystemState) – The current state of the motor system.

Return type:

Tuple[float, float, float]

Returns:

Direction of the action

pre_episode()[source]#

Pre episode hook.

reset_pc_buffers()[source]#

Reset counters and other variables.

We’ve just left a series of PC-defined trajectories (i.e. entered a region of undefined PCs), or had to select a new heading in order to avoid revisiting old locations. As such, appropriately reset counters and other variables.

Note we do not reset tangential_angle or the vector, such that this information can still be used by e.g. momentum on the next step to keep us going generally forward

tangential_direction(state: MotorSystemState) Tuple[float, float, float][source]#

Set the direction of action to be a direction 0 - 2pi.

This controls the move_tangential action
  • start at 0 (go up in the reference frame of the agent, i.e. based on

where it is facing), with the actual orientation determined via either principal curvature, or a random step weighted by momentum

Tangential movements are the primary means of progressively exploring an object’s surface

Parameters:

state (MotorSystemState) – The current state of the motor system.

Return type:

Tuple[float, float, float]

Returns:

Direction of the action

update_action_details()[source]#

Store informaton for later logging.

This stores information that details elements of the policy or observations relevant to policy decisions.

E.g. if model-free policy has been unable to find a path that avoids revisiting old locations, an LM might use this information to inform a particular action (TODO not yet implemented, and NOTE that any modelling should ultimately be located in the learning module(s), not in motor systems)

update_tangential_reps(vec_form=None, angle_form=None)[source]#

Update the angle and vector representation of a tangential heading.

Angle and vector representations are stored as self.tangential_angle and self.tangential_vec, respectively.

Ensures the two representations are always consistent. Further ensures that, because these movements are tangential, it will be defined in the plane, relative to the agent (i.e. movement along x and y only), and any inadvertant movement along the z-axis relative to the agent will be eliminated. User should supply either the vector form or the (Euler) angle form in radians that will define the new representations.

enforce_pi_bounds(theta)[source]#

Enforce an orientation to be bounded between - pi and + pi.

Returns:

Angle in radians.

get_perc_on_obj_semantic(semantic_obs, semantic_id=0)[source]#

Get the percentage of pixels in the observation that land on the target object.

If a semantic ID is provided, then only pixels on the target object are counted; otherwise, pixels on any object are counted.

This uses the semantic image, where each pixel is associated with a semantic ID that is unique for each object, and always >0.

Parameters:
  • semantic_obs – Semantic image observation.

  • semantic_id – Semantic ID of the target object.

Returns:

Percentage of pixels on the object.

Return type:

perc_on_obj

projected_angle_from_vec(vector)[source]#

Determine the rotation about a z-axis (pointing “up”).

Note that because of the convention for moving along the y when theta=0, the typical order of arguments is swapped from calculating the standard atan2 (https://en.wikipedia.org/wiki/Atan2).

Returns:

Angle in radians.

projected_vec_from_angle(angle)[source]#

Determine the vector in the plane defined by an orientation around the z-axis.

Takes angle in radians, bound between -pi : pi.

This continues the convention started in the original surface-agent policy that a theta of 0 should correspond to a movement of 1 in the y direction, and 0 in the x direction, rather than vice-versa; this convention is implemented by the np.pi/2 offsets.

Returns:

Vector in the plane defined by an orientation around the z-axis.

read_action_file(file: str) List[Action][source]#

Load a file with one action per line.

Parameters:

file (str) – name of file to load

Return type:

List[Action]

Returns:

List of actions

theta_change(a, b)[source]#

Determine the min, signed change in orientation between two angles in radians.

Returns:

Signed change in orientation.

write_action_file(actions: List[Action], file: str) None[source]#

Write a list of actions to a file, one per line.

Should be readable by read_action_file.

Parameters:
  • actions (List[Action]) – list of actions

  • file (str) – path to file to save actions to

Return type:

None

tbp.monty.frameworks.models.motor_system#

class MotorSystem(policy: MotorPolicy, state: MotorSystemState | None = None) None[source]#

Bases: object

The basic motor system implementation.

__init__(policy: MotorPolicy, state: MotorSystemState | None = None) None[source]#

Initialize the motor system with a motor policy.

Parameters:
post_episode() None[source]#

Post episode hook.

Return type:

None

pre_episode() None[source]#

Pre episode hook.

Return type:

None

set_experiment_mode(mode: Literal['train', 'eval']) None[source]#

Sets the experiment mode.

Parameters:

mode (Literal['train', 'eval']) – The experiment mode.

Return type:

None

property last_action: Action#

Returns the last action taken by the motor system.

tbp.monty.frameworks.models.motor_system_state#

class AgentState(*args, **kwargs)[source]#

Bases: dict

The proprioceptive state of an agent.

TODO: Change into dataclass

position: Any#

The agent’s position relative to some global reference frame.

rotation: Any#

The agent’s rotation relative to some global reference frame.

sensors: Dict[str, SensorState]#

The proprioceptive state of the agent’s sensors.

class MotorSystemState[source]#

Bases: Dict[str, Any]

The state of the motor system.

TODO: Currently, ProprioceptiveState can be cast to MotorSystemState since

MotorSystemState is a generic dictionary. In the future, make ProprioceptiveState a param on MotorSystemState to more clearly distinguish between the two.

convert_motor_state() dict[source]#

Convert the motor state into something that can be pickled/saved to JSON.

i.e. substitute vector and quaternion objects; note e.g. copy.deepcopy does not work.

TODO ?clean this up with a recursive algorithm, or use BufferEncoder in buffer.py

Return type:

dict

Returns:

Copy of the motor state.

class ProprioceptiveState[source]#

Bases: Dict[str, AgentState]

The proprioceptive state of the motor system.

TODO: Change into dataclass

class SensorState(*args, **kwargs)[source]#

Bases: dict

The proprioceptive state of a sensor.

TODO: Change into dataclass

position: Any#

The sensor’s position relative to the agent.

rotation: Any#

The sensor’s rotation relative to the agent.

tbp.monty.frameworks.models.no_reset_evidence_matching#

class MontyForNoResetEvidenceGraphMatching(*args, **kwargs)[source]#

Bases: MontyForEvidenceGraphMatching

Monty class for unsupervised inference without explicit episode resets.

This variant of MontyForEvidenceGraphMatching is designed for unsupervised inference experiments where objects may change dynamically without any reset signal. Unlike standard experiments, this class avoids resetting Monty’s internal state (e.g., hypothesis space, evidence scores) between episodes.

This setup better reflects real-world conditions, where object boundaries are ambiguous and no supervisory signal is available to indicate when a new object appears. Only minimal state — such as step counters and termination flags — is reset to prevent buffers from accumulating across objects. Additionally, Monty is currently forced to switch to Matching state. Evaluation of unsupervised inference is performed over a fixed number of matching steps per object.

Intended for evaluation-only runs using pre-trained models, with Monty remaining in the matching phase throughout.

__init__(*args, **kwargs)[source]#

Initialize and reset LM.

pre_episode(primary_target, semantic_id_to_label=None)[source]#

Reset values and call sub-pre_episode functions.

class NoResetEvidenceGraphLM(*args, **kwargs)[source]#

Bases: TheoreticalLimitLMLoggingMixin, EvidenceGraphLM

__init__(*args, **kwargs)[source]#
reset() None[source]#

Reset evidence count and other variables.

Return type:

None

tbp.monty.frameworks.models.object_model#

exception GridTooSmallError[source]#

Bases: Exception

Exception raised when grid is too small to fit all observations.

class GraphObjectModel(object_id)[source]#

Bases: ObjectModel

Object model class that represents object as graphs.

__init__(object_id)[source]#

Initialize the object model.

Parameters:

object_id – id of the object

add_ppf_to_graph()[source]#

Add point pair features to graph edges.

build_model(locations, features, k_n, graph_delta_thresholds)[source]#

Build graph from locations and features sorted into grids.

get_values_for_feature(feature)[source]#
set_graph(graph)[source]#

Set self._graph property with given graph (i.e. from pretraining).

update_model(locations, features, location_rel_model, object_location_rel_body, object_rotation, object_scale=1)[source]#

Add new locations and features into grids and rebuild graph.

property edge_attr#
property edge_index#
property feature_ids_in_graph#
property feature_mapping#
property norm#
property num_nodes#
property pos#
property x#
class GridObjectModel(object_id, max_nodes, max_size, num_voxels_per_dim)[source]#

Bases: GraphObjectModel

Model of an object and all its functions.

This model has the same basic functionality as the NumpyGraph models used in older LM versions. On top of that we now have a grid representation of the object that constraints the model size and resultion. Additionally, this model class implements a lot of functionality that was previously implemented in the graph_utils.py file.

TODO: General cleanups that require more changes in other code
  • remove node_ids from input_channels and have as graph attribute

  • remove .norm as attribute and store as feature instead?

__init__(object_id, max_nodes, max_size, num_voxels_per_dim)[source]#

Initialize a grid object model.

Parameters:
  • object_id – id of the object

  • max_nodes – maximum number of nodes in the graph. Will be k in k winner voxels with highest observation count.

  • max_size – maximum size of the object in meters. Defines size of obejcts that can be represented and how locations are mapped into voxels.

  • num_voxels_per_dim – number of voxels per dimension in the models grids. Defines the resolution of the model.

build_model(locations, features)[source]#

Build graph from locations and features sorted into grids.

find_nearest_neighbors(search_locations, num_neighbors, return_distance=False)[source]#

Find nearest neighbors in graph for list of search locations.

Note

This is currently using kd tree search. In the future we may consider doing this directly by indexing the grids. However, an initial implementation of this does not seem to be faster than the kd tree search (~5-10x slower). However one must consider that search directly in the grid would remove the cost of building the tree. TODO: Investigate this further.

Returns:

If return_distance is True, return distances. Otherwise, return indices of nearest neighbors.

set_graph(graph)[source]#

Set self._graph property and convert input graph to right format.

update_model(locations, features, location_rel_model, object_location_rel_body, object_rotation)[source]#

Add new locations and features into grids and rebuild graph.

tbp.monty.frameworks.models.sensor_modules#

class DefaultMessageNoise(noise_params: dict[str, Any], rng)[source]#

Bases: MessageNoise

__init__(noise_params, rng)[source]#
add_noise_to_feat_value(feat_name, feat_val)[source]#
class FeatureChangeFilter(delta_thresholds: dict[str, Any])[source]#

Bases: StateFilter

__init__(delta_thresholds)[source]#
reset()[source]#

Reset buffer and is_exploring flag.

class HabitatObservation(*args, **kwargs)[source]#

Bases: dict

depth: ndarray#
rgba: ndarray#
semantic_3d: ndarray#
sensor_frame_data: ndarray#
world_camera: ndarray#
class HabitatObservationProcessor(features: list[str], sensor_module_id: str, pc1_is_pc2_threshold=10, surface_normal_method=SurfaceNormalMethod.TLS, weight_curvature=True, is_surface_sm=False) None[source]#

Bases: object

Processes Habitat observations into a Cortical Message.

__init__(features, sensor_module_id, pc1_is_pc2_threshold=10, surface_normal_method=SurfaceNormalMethod.TLS, weight_curvature=True, is_surface_sm=False)[source]#

Initializes the HabitatObservationProcessor.

Parameters:
  • features – List of features to extract.

  • pc1_is_pc2_threshold – Maximum difference between pc1 and pc2 to be classified as being roughly the same (ignore curvature directions). Defaults to 10.

  • sensor_module_id – ID of sensor module.

  • surface_normal_method – Method to use for surface normal extraction. Defaults to TLS.

  • weight_curvature – Whether to use the weighted implementation for principal curvature extraction (True) or unweighted (False). Defaults to True.

  • is_surface_sm – Surface SMs do not require that the central pixel is “on object” in order to process the observation (i.e., extract features). Defaults to False.

process(observation: HabitatObservation) tuple[State, HabitatObservationProcessorTelemetry][source]#

Processes observation.

Parameters:

observation – Habitat observation.

Returns:

Cortical Message.

CURVATURE_FEATURES = ['principal_curvatures', 'principal_curvatures_log', 'gaussian_curvature', 'mean_curvature', 'gaussian_curvature_sc', 'mean_curvature_sc', 'curvature_for_TM']#
POSSIBLE_FEATURES = ['on_object', 'object_coverage', 'min_depth', 'mean_depth', 'rgba', 'hsv', 'pose_vectors', 'principal_curvatures', 'principal_curvatures_log', 'pose_fully_defined', 'gaussian_curvature', 'mean_curvature', 'gaussian_curvature_sc', 'mean_curvature_sc', 'curvature_for_TM', 'coords_for_TM']#
class HabitatObservationProcessorTelemetry(processed_obs: State, visited_loc: Any, visited_normal: Any | None) None[source]#

Bases: object

__init__(processed_obs: State, visited_loc: Any, visited_normal: Any | None) None#
processed_obs: State#
visited_loc: Any#
visited_normal: Any | None#
class HabitatSM(rng, sensor_module_id: str, features: list[str], save_raw_obs: bool = False, pc1_is_pc2_threshold: int = 10, noise_params: dict[str, Any] | None = None, is_surface_sm: bool = False, delta_thresholds: dict[str, Any] | None = None) None[source]#

Bases: SensorModule

Sensor Module that turns Habitat camera obs into features at locations.

Takes in camera rgba and depth input and calculates locations from this. It also extracts features which are currently: on_object, rgba, surface_normal, curvature.

__init__(rng, sensor_module_id, features, save_raw_obs=False, pc1_is_pc2_threshold=10, noise_params=None, is_surface_sm=False, delta_thresholds=None)[source]#

Initialize Sensor Module.

Parameters:
  • rng – Random number generator.

  • sensor_module_id – Name of sensor module.

  • features – Which features to extract. In [on_object, rgba, surface_normal, principal_curvatures, curvature_directions, gaussian_curvature, mean_curvature]

  • save_raw_obs – Whether to save raw sensory input for logging.

  • pc1_is_pc2_threshold – Maximum difference between pc1 and pc2 to be classified as being roughly the same (ignore curvature directions). Defaults to 10.

  • noise_params – Dictionary of noise amount for each feature.

  • is_surface_sm – Surface SMs do not require that the central pixel is “on object” in order to process the observation (i.e., extract features). Defaults to False.

  • delta_thresholds – If given, a FeatureChangeFilter will be used to check whether the current state’s features are significantly different from the previous with tolerances set according to delta_thresholds. Defaults to None.

Note

When using feature at location matching with graphs, surface_normal and on_object needs to be in the list of features.

Note

gaussian_curvature and mean_curvature should be used together to contain the same information as principal_curvatures.

pre_episode()[source]#

Reset buffer and is_exploring flag.

state_dict()[source]#

Return a serializable dict with this sensor module’s state.

Includes everything needed to save/load this sensor module.

step(data)[source]#

Turn raw observations into dict of features at location.

Parameters:

data – Raw observations.

Returns:

State with features and morphological features. Noise may be added. use_state flag may be set.

update_state(state)[source]#

Update information about the sensors location and rotation.

class MessageNoise(*args, **kwargs)[source]#

Bases: Protocol

__init__(*args, **kwargs)#
class PassthroughStateFilter(*args, **kwargs)[source]#

Bases: StateFilter

reset() None[source]#
Return type:

None

class Probe(rng, sensor_module_id: str, save_raw_obs: bool)[source]#

Bases: SensorModule

A probe that can be inserted into Monty in place of a sensor module.

It will track raw observations for logging, and can be used by experiments for positioning procedures, visualization, etc.

What distinguishes a probe from a sensor module is that it does not process observations and does not emit a Cortical Message.

__init__(rng, sensor_module_id: str, save_raw_obs: bool)[source]#

Initialize the probe.

Parameters:
  • rng – Random number generator. Unused.

  • sensor_module_id (str) – Name of sensor module.

  • save_raw_obs (bool) – Whether to save raw sensory input for logging.

pre_episode()[source]#

Reset buffer and is_exploring flag.

state_dict()[source]#

Return a serializable dict with this sensor module’s state.

Includes everything needed to save/load this sensor module.

step(data)[source]#

Called on each step.

Parameters:

data – Sensor observations

update_state(state)[source]#

Update information about the sensors location and rotation.

class SnapshotTelemetry[source]#

Bases: object

Keeps track of raw observation snapshot telemetry.

__init__()[source]#
raw_observation(raw_observation, rotation: quaternion.quaternion, position: numpy.ndarray)[source]#

Record a snapshot of a raw observation and its pose information.

Parameters:
  • raw_observation – Raw observation.

  • rotation (quaternion) – Rotation of the sensor.

  • position (ndarray) – Position of the sensor.

reset()[source]#

Reset the snapshot telemetry.

state_dict() dict[str, list[np.ndarray]][source]#

Returns recorded raw observation snapshots.

Returns:

Dictionary containing a list of raw observations in raw_observations and a list of pose information for each observation in sm_properties.

class StateFilter(*args, **kwargs)[source]#

Bases: Protocol

__init__(*args, **kwargs)#
reset() None[source]#
Return type:

None

class SurfaceNormalMethod(value)[source]#

Bases: Enum

An enumeration.

NAIVE = 'naive'#

Naive

OLS = 'OLS'#

Ordinary Least-Squares

TLS = 'TLS'#

Total Least-Squares

no_message_noise(state: State) State[source]#

No noise function.

Return type:

State

Returns:

State with no noise added.

tbp.monty.frameworks.models.states#

class GoalState(location: numpy.ndarray | None, morphological_features: Dict[str, Any] | None, non_morphological_features: Dict[str, Any] | None, confidence: float, use_state: bool, sender_id: str, sender_type: str, goal_tolerances: Dict[str, Any] | None, info: Dict[str, Any] | None = None)[source]#

Bases: State

Specialization of State for goal states with null (None) values allowed.

Specialized form of state that still adheres to the cortical messaging protocol, but can have null (None) values associated with the location and morphological features.

Used by goal-state generators (GSGs) to communicate goal states to other GSGs, and to motor actuators.

The state variables generally have the same meaning as for the base State class, and they represent the target values for the receiving system. Thus if a goal-state specifies a particular object ID (non-morphological feature) in a particular pose (location and morphological features), then the receiving system should attempt to achieve that state.

Note however that for the goal-state, the confidence corresponds to the conviction with which a GSG believes that the current goal-state should be acted upon. Float bound in [0.0, 1.0].

__init__(location: numpy.ndarray | None, morphological_features: Dict[str, Any] | None, non_morphological_features: Dict[str, Any] | None, confidence: float, use_state: bool, sender_id: str, sender_type: str, goal_tolerances: Dict[str, Any] | None, info: Dict[str, Any] | None = None)[source]#

Initialize a goal state.

Parameters:
  • location (Optional[ndarray]) – the location to move to in global/body-centric coordinates, or None if the location is not specified as part of the goal state. For example, this may be a point on an object’s surface or a location nearby from which a sensor would have a good view of the target point.

  • morphological_features (Optional[Dict[str, Any]]) – dictionary of morphological features or None. For example, it may include pose vectors, whether the pose is fully defined, etc.

  • non_morphological_features (Optional[Dict[str, Any]]) – a dictionary containing non-morphological features at the target location or None.

  • confidence (float) – a float between 0 and 1 representing the confidence in the goal state.

  • use_state (bool) – a boolean indicating whether the goal state should be used.

  • sender_id (str) – the ID of the sender of the goal state (e.g., “LM_0”).

  • sender_type (str) – the type of sender of the goal state (e.g., “GSG”).

  • goal_tolerances (Optional[Dict[str, Any]]) – Dictionary of tolerances that GSGs use when determining whether the current state of the LM matches the driving goal-state or None. As such, a GSG can send a goal state with more or less strict tolerances if certain elements of the state (e.g. the location of a mug vs its orientation) are more or less important.

  • info (Optional[Dict[str, Any]]) – Optional metadata for logging purposes.

class State(location, morphological_features, non_morphological_features, confidence, use_state, sender_id, sender_type)[source]#

Bases: object

State class used as message packages passed in Monty using CMP.

The cortical messaging protocol (CMP) is used to pass messages between Monty components and makes sure we can easily set up arbitrary configurations of them. This class makes it easier to define the CMP in one place and defines the content and structure of messages passed between Monty components. It also contains some helper funtions to access and modify the message content.

States are represented in this format but can be interpreted by the receiver in different ways:

Observed states: states output py sensor modules Hypothesized states: states output by learning modules Goal states: motor output of learning modules

location#

3D vector representing the location of the state

morphological_features#

dictionary of morphological features. Should include pose_vectors of shape (3,3) and pose_fully_defined (bool).

non_morphological_features#

dictionary of non-morphological features.

confidence#

confidence in the state. In range [0,1].

use_state#

boolean indicating whether the state should be used or not.

sender_id#

string identifying the sender of the state.

sender_type#

string identifying the type of sender. Can be “SM” or “LM”.

__init__(location, morphological_features, non_morphological_features, confidence, use_state, sender_id, sender_type)[source]#

Initialize a state.

get_curvature_directions()[source]#

Return the curvature direction vectors.

Raises:

ValueError – If self.sender_type is not SM

get_feature_by_name(feature_name)[source]#
get_nth_pose_vector(pose_vector_index)[source]#

Return the nth pose vector.

When self.sender_type == “SM”, the first pose vector is the surface normal and the second and third are the curvature directions. When self.sender_type == “LM”, the pose vectors correspond to the rotation of the object relative to the model learned of it.

get_on_object()[source]#

Return whether we think we are on the object or not.

This is currently used in the policy to stay on the object.

get_pose_vectors()[source]#

Return the pose vectors.

get_surface_normal()[source]#

Return the surface normal vector.

Raises:

ValueError – If self.sender_type is not SM

set_displacement(displacement, ppf=None)[source]#

Add displacement (represented as dict) to state.

TODO S: Add this to state or in another place?

transform_morphological_features(translation=None, rotation=None)[source]#

Apply translation and/or rotation to morphological features.

encode_goal_state(goal_state: GoalState) Dict[str, Any][source]#

Encode a goal state into a dictionary.

Parameters:

goal_state (GoalState) – The goal state to encode.

Return type:

Dict[str, Any]

Returns:

A dictionary containing the goal state’s attributes.