tbp.monty.frameworks.models#

tbp.monty.frameworks.models.abstract_monty_classes#

class AgentObservations[source]#

Bases: Dict[SensorID, SensorObservation]

Observations from an agent.

class GoalGenerator[source]#

Bases: object

Generate goals that other learning modules and motor-systems will attempt.

Generate goals potentially (in the case of LMs) by outputting their own sub-goals. Provides a mechanism for implementing hierarchical action policies that are informed by world models/hypotheses.

abstract output_goals() list[Goal][source]#

Return output goals.

abstract set_driving_goal()[source]#

Set the driving goal.

e.g., from a human operator or a high-level LM.

abstract step(ctx: RuntimeContext, observations: Observations)[source]#

Called on each step of the LM to which the GSG belongs.

class LMMemory(*args, **kwargs)[source]#

Bases: Snapshotable

Like a long-term memory storing all the knowledge an LM has.

abstract load_state_dict(memento: Mapping[str, Any]) None[source]#

Load the internal object state from a previously snapshot memento.

Parameters:

memento (Mapping[str, Any]) – State dict to load.

Return type:

None

abstract state_dict() Mapping[str, Any][source]#

Snapshot a memento representing the internal state of this object.

Return type:

Mapping[str, Any]

Returns:

State dict for logging and saving.

abstract update_memory(observations)[source]#

Update models stored in memory given new observation & classification.

class LearningModule(*args, **kwargs)[source]#

Bases: RuntimeLearningModule, Snapshotable, ExperimentLearningModule

abstract exploratory_step(ctx: RuntimeContext, percepts: Sequence[Message]) None[source]#

Model building step called inside of monty._step_learning_modules.

Parameters:
Return type:

None

abstract fixme_reset_ground_truth(primary_target=None) None[source]#

Reset internal state based on ground truth.

Parameters:

primary_target – The primary target for the learning module to recognize.

Return type:

None

abstract fixme_update_ground_truth() None[source]#

Update internal state based on ground truth.

Return type:

None

abstract get_output() Message | None[source]#

Return learning module output (same format as input).

Return type:

Message | None

abstract load_state_dict(memento: Mapping[str, Any]) None[source]#

Load the internal object state from a previously snapshot memento.

Parameters:

memento (Mapping[str, Any]) – State dict to load.

Return type:

None

abstract matching_step(ctx: RuntimeContext, percepts: Sequence[Message]) None[source]#

Matching / inference step called inside of monty._step_learning_modules.

Parameters:
Return type:

None

abstract propose_goals() list[Goal][source]#

Return the goals proposed by this LM’s GSG if they exist.

Returns:

A collection of proposed Goals.

abstract receive_votes(votes: Collection[Any]) None[source]#

Process inbound voting data.

TODO: Use Message type for votes rather than ad-hoc data?

Parameters:

votes (Collection[Any]) – A collection of votes from other learning modules.

Return type:

None

abstract reset_stm() None[source]#

Reset short-term memory buffer.

Do things like reset buffers or possible_matches before training.

Return type:

None

abstract send_out_vote() Any[source]#

This method defines what data are sent to other learning modules.

TODO: Use Message type for votes rather than ad-hoc data?

Return type:

Any

Returns:

This learning module’s voting data.

abstract set_experiment_mode(mode: ExperimentMode) None[source]#

Set the experiment mode.

Update state variables based on which method (train or evaluate) is being called at the experiment level.

Parameters:

mode (ExperimentMode) – The experiment mode.

Return type:

None

abstract state_dict() Mapping[str, Any][source]#

Snapshot a memento representing the internal state of this object.

Return type:

Mapping[str, Any]

Returns:

State dict for logging and saving.

abstract update_ltm_from_stm() None[source]#

Update long-term memory from short-term memory buffer.

Return type:

None

class Monty(*args, **kwargs)[source]#

Bases: ExperimentMonty, RuntimeMonty, Snapshotable

abstract aggregate_sensory_inputs(ctx: RuntimeContext, observations: Observations, proprioceptive_state: ProprioceptiveState) None[source]#

Receive data from environment, organize on a per sensor module basis.

Parameters:
Return type:

None

abstract fixme_set_ground_truth(primary_target: dict[str, Any] | None = None, semantic_id_to_label: dict[SemanticID, str] | None = None) None[source]#

Provide ground truth from experiment supervision.

Parameters:
  • primary_target – Optional primary target to recognize.

  • semantic_id_to_label – Optional mapping from IDs to labels.

abstract is_done() bool[source]#

Return True if the model has reached a terminal condition.

Return type:

bool

abstract load_state_dict(memento: Mapping[str, Any]) None[source]#

Load the internal object state from a previously snapshot memento.

Parameters:

memento (Mapping[str, Any]) – State dict to load.

Return type:

None

abstract motor_only_step(ctx: RuntimeContext, observations: Observations, proprioceptive_state: ProprioceptiveState) list[Action][source]#

Take a step of the sensors and motor system only.

This skips stepping the learning modules.

Parameters:
  • ctx – The runtime context.

  • observations – The observations from the environment.

  • proprioceptive_state – The proprioceptive state from the environment.

Returns:

The actions to take.

abstract reset() None[source]#

Reset the internal state of this Monty model.

Return type:

None

abstract set_experiment_mode(mode: ExperimentMode) None[source]#

Set the experiment mode.

Update state variables based on which method (train or evaluate) is being called at the experiment level.

Parameters:

mode (ExperimentMode) – The experiment mode.

Return type:

None

abstract state_dict() Mapping[str, Any][source]#

Snapshot a memento representing the internal state of this object.

Return type:

Mapping[str, Any]

Returns:

State dict for logging and saving.

abstract step(ctx: RuntimeContext, observations: Observations, proprioceptive_state: ProprioceptiveState) list[Action][source]#

Take a matching, exploratory, or custom user-defined step.

Step taken depends on the value of self.step_type.

Parameters:
  • ctx – The runtime context.

  • observations – The observations from the environment.

  • proprioceptive_state – The proprioceptive state from the environment.

Returns:

The actions to take.

abstract update_ltm() None[source]#

Transfer short-term buffer to long-term memory.

Return type:

None

class ObjectModel[source]#

Bases: object

Model of an object. Is stored in Memory and used by LM.

abstract build_model(observations)[source]#

Build a new model.

abstract update_model(observations)[source]#

Update an existing model with new observations.

class Observations[source]#

Bases: Dict[AgentID, AgentObservations]

Observations from the environment.

class RuntimeContext(rng: numpy.random.RandomState, suppress_runtime_errors: bool = False) None[source]#

Bases: object

Monty’s runtime context.

The RuntimeContext carries runtime-scoped values used throughout Monty.

rng#

The random number generator.

suppress_runtime_errors#

Whether to suppress runtime errors. Runtime errors can be raised when goal is None or invalid. When in an experimental mode, we want to raise runtime errors by default. When in a production mode, we want to suppress runtime errors by default. Currently, we run a lot of experiments, so the current default is to raise runtime errors.

__init__(rng: numpy.random.RandomState, suppress_runtime_errors: bool = False) None#
rng: RandomState#
suppress_runtime_errors: bool = False#
class RuntimeLearningModule(*args, **kwargs)[source]#

Bases: Protocol

Monty runtime interface to a Learning Module.

__init__(*args, **kwargs)#
exploratory_step(ctx: RuntimeContext, percepts: Sequence[Message]) None[source]#

Model building step called inside of monty._step_learning_modules.

Parameters:
Return type:

None

get_output() Message | None[source]#

Return learning module output (same format as input).

Return type:

Message | None

matching_step(ctx: RuntimeContext, percepts: Sequence[Message]) None[source]#

Matching / inference step called inside of monty._step_learning_modules.

Parameters:
Return type:

None

propose_goals() Sequence[Goal][source]#

Return the goals proposed by this LM’s GSG if they exist.

Return type:

Sequence[Goal]

Returns:

A collection of proposed Goals.

receive_votes(votes: Collection[Any]) None[source]#

Process inbound voting data.

TODO: Use Message type for votes rather than ad-hoc data?

Parameters:

votes (Collection[Any]) – A collection of votes from other learning modules.

Return type:

None

send_out_vote() Any[source]#

This method defines what data are sent to other learning modules.

TODO: Use Message type for votes rather than ad-hoc data?

Return type:

Any

Returns:

This learning module’s voting data.

class SensorModule(*args, **kwargs)[source]#

Bases: RuntimeSensorModule, ExperimentSensorModule

propose_goals() list[Goal][source]#

Return the goals proposed by this Sensor Module.

Returns:

A sequence of proposed Goals.

abstract reset() None[source]#

Reset the internal state of this Sensor Module.

Return type:

None

abstract state_dict() Mapping[str, Any][source]#
Return type:

Mapping[str, Any]

abstract step(ctx: RuntimeContext, observation: SensorObservation, motor_only_step: bool = False) Message | None[source]#

Execute a time-step for the Sensor Module.

Parameters:
  • ctx (RuntimeContext) – The runtime context.

  • observation (SensorObservation) – Sensor observation.

  • motor_only_step (bool) – Whether the current step is a motor-only step.

Return type:

Message | None

Returns:

An optional percept with features and morphological features.

abstract update_state(agent: AgentState) None[source]#

Update the proprioceptive state for this Sensor Module.

Parameters:

agent (AgentState) – The proprioceptive state of this sensor module’s Agent.

Return type:

None

class SensorObservation(*args, **kwargs)[source]#

Bases: dict

Observations from a sensor.

cam_to_world: float64#
depth: float64#
pixel_loc: float64#
raw: uint8#
rgba: uint8#
semantic: int_#
semantic_3d: int_#
sensor_frame_data: int_#

tbp.monty.frameworks.models.buffer#

class BufferEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#

Bases: JSONEncoder

Encoder to turn the buffer into a JSON compliant format.

default(obj: Any) Any[source]#

Turn non-compliant types into a JSON-compliant format.

Parameters:

obj (Any) – The object to turn into a JSON-compliant format.

Return type:

Any

Returns:

The JSON-compliant data.

classmethod register(obj_type: type, encoder: Callable | type[json.JSONEncoder]) None[source]#

Register an encoder.

Parameters:
  • obj_type – The type to associate with encoder.

  • encoder – A function or JSONEncoder class that converts objects of type obj_type into JSON-compliant data.

Raises:

TypeError – If encoder is not a JSONEncoder subclass or a callable.

classmethod unregister(obj_type: type) None[source]#

Unregister an encoder.

Parameters:

obj_type (type) – The type to unregister the encoder for.

Return type:

None

class FeatureAtLocationBuffer[source]#

Bases: object

Buffer which stores features at locations coming into one LM. Also stores stats.

Used for building graph models and logging detailed stats about an episode. The location buffer is also used to calculate displacements.

__init__()[source]#

Initialize buffer dicts for locations, features, displacements and stats.

add_overall_stats(stats)[source]#

Add overall episode stats to self.stats.

append(percepts: Sequence[Message]) None[source]#

Add a list of percepts to the buffer. Must be features at locations.

TODO S: Store messages instead of list of percepts? A provisional version of this is implemented below, as the GSG uses messages for computations.

Return type:

None

append_input_percepts(input_percept: Message) None[source]#
Return type:

None

current_displacement() numpy.typing.NDArray.numpy.float64[source]#

Get the current displacement.

Return type:

float64

Returns:

The current displacement.

current_ppf() numpy.typing.NDArray.numpy.float64[source]#

Get the current ppf.

Return type:

float64

Returns:

The current ppf.

first_displacement_len() float[source]#

Get length of first observed displacement.

Use for scale in DisplacementLM.

Return type:

float

Returns:

The length of the first observed displacement.

get_all_features_on_object()[source]#

Get all observed features that were on the object.

Like in get_all_locations_on_object the feature arrays should have the length of np.where(self.on_object) and contain all features that were observed when at least one of the input channels was on the object. However, we also check for each feature whether its input channel indicated on_object=True and if not set its value to nan.

Note

Since all inputs to an LM should have overlapping receptive fields, there should be no difference between the two at the moment (except due to noisy estimates of on_object).

Returns:

All observed features that were on the object.

get_all_input_percepts() list[Message][source]#

Get all the percept inputs that the buffer’s parent LM has observed.

Returns:

All the percept inputs that the buffer’s parent LM has observed.

get_all_locations_on_object(input_channel=None)[source]#

Get all observed locations that were on the object.

Returns:

All observed locations that were on the object.

get_buffer_len_by_channel(input_channel)[source]#

Return the number of observations stored for that input channel.

get_current_features(keys)[source]#

Get the current value of a specific feature.

Returns:

The current features.

get_current_location(input_channel)[source]#

Get the current location.

Note

May have to add on_object check at some point.

Returns:

The current location.

get_current_pose(input_channel)[source]#

Get currently sensed location and orientation.

Returns:

The currently sensed location and orientation.

get_currently_on_object()[source]#

Check whether last sensation was on object.

Returns:

Whether the last sensation was on object.

get_first_sensory_input_channel()[source]#

Get name of first sensory (coming from SM) input channel in buffer.

Returns:

The name of the first sensory (coming from SM) input channel in buffer.

Raises:

ValueError – If no sensory channels are found in the buffer.

get_infos_for_graph_update()[source]#

Return all stored infos required to update a graph in memory.

get_last_obs_processed()[source]#

Check whether last sensation was processed by LM.

Returns:

Whether the last sensation was processed by the LM.

get_matching_step_when_output_goal_set()[source]#

Return matching step when last goal was generated.

Return the LM matching step associated with the last time a goal was generated.

get_num_goals_generated()[source]#

Return number of goals generated by the LM’s GSG since episode start.

Note: use the length, not the sum.

get_num_matching_steps()[source]#

Return number of matching steps performed by this LM since episode start.

Returns:

The number of matching steps performed by this LM since episode start.

get_num_observations_on_object()[source]#

Get the number of steps where at least 1 input was on the object.

Returns:

The number of steps where at least 1 input was on the object.

get_num_steps_post_output_goal_generated()[source]#

Return number of steps since last output goal.

Return the number of Monty-matching steps that have elapsed since the last time an output goal was generated.

get_previous_input_percepts()[source]#

Get previous percept inputs received by the buffer’s parent LM.

i.e. in the last time step.

Returns:

The previous percept inputs.

nth_displacement(n: int) numpy.typing.NDArray.numpy.float64[source]#

Get the nth displacement.

Parameters:

n (int) – Index of the displacement to retrieve.

Return type:

float64

Returns:

The nth displacement.

reset()[source]#

Reset the buffer.

set_individual_ts(object_id, pose)[source]#

Update self.stats with the individual LMs terminal state.

update_last_stats_entry(stats)[source]#

Use this to overwrite last entry (for example after voting).

update_stats(stats, update_time=True, append=True, init_list=True)[source]#

Update statistics for this step in the episode.

tbp.monty.frameworks.models.displacement_matching#

class DisplacementGraphLM(k=None, match_attribute=None, tolerance=0.001, use_relative_len=False, graph_delta_thresholds=None)[source]#

Bases: GraphLM

Learning module that uses displacement stored in graphs to recognize objects.

__init__(k=None, match_attribute=None, tolerance=0.001, use_relative_len=False, graph_delta_thresholds=None)[source]#

Initialize Learning Module.

Parameters:
  • k – How many nearest neighbors should nodes in graphs connect to.

  • match_attribute – Which displacement to use for matching. Should be in [‘displacement’, ‘PPF’].

  • tolerance – How close does an observed displacement have to be to a stored displacement to be considered a match. Defaults to 0.001

  • use_relative_len – Whether to scale the displacements to achieve scale invariance. Only possible when using PPF.

  • graph_delta_thresholds – Thresholds used to compare nodes in the graphs being learned, and thereby whether to include a new point or not. By default, we only consider the distance between points, using a threshold of 0.001 (determined in remove_close_points). Can also specify thresholds based on e.g. surface normal angle difference, or principal curvature magnitude difference.

get_object_rotation(sensed_displacements, model_displacements, get_reverse_r=False)[source]#

Calculate the rotation between two sets of displacement vectors.

Parameters:
  • sensed_displacements – The displacements that were sensed.

  • model_displacements – The displacements in the model that were matched to the sensed displacements.

  • get_reverse_r – Whether to get the rotation that turns the model such that it would produce the sensed_displacements (False) or the rotation needed to turn the sensed_displacements into the model displacements.

Returns:

The rotation in Euler angles, as a matrix, and as a Rotation object.

get_object_scale(sensed_displacement, model_displacement)[source]#

Calculate the objects scale given sensed and model displacements.

Returns:

The scale of the object.

get_unique_pose_if_available(object_id)[source]#

Compute (location, rotation, scale) of object.

If we are sure about where on the object we are compare the sensed displacements to the observed displacements to calculate the pose, else return None.

Returns:

The pose and scale of the object.

reset()[source]#

Reset possible matches for paths on objects.

class DisplacementGraphMemory(match_attribute, *args, **kwargs)[source]#

Bases: GraphMemory

Graph memory that stores graphs with displacements as edges.

__init__(match_attribute, *args, **kwargs)[source]#

Initialize Graph memory.

get_initial_hypotheses()[source]#
load_state_dict(memento: Mapping[str, Any]) None[source]#

Load the internal object state from a previously snapshot memento.

Parameters:

memento (Mapping[str, Any]) – State dict to load.

Return type:

None

tbp.monty.frameworks.models.feature_location_matching#

class FeatureGraphLM(max_match_distance: float, tolerances: dict[str, dict[str, Any]], path_similarity_threshold: float = 0.1, pose_similarity_threshold: float = 0.35, required_symmetry_evidence: int = 5, graph_delta_thresholds: dict[str, dict[str, Any]] | None = None, initial_possible_poses: str = 'informed', umbilical_num_poses: int = 8) None[source]#

Bases: GraphLM

Learning module that uses features at locations to recognize objects.

__init__(max_match_distance, tolerances, path_similarity_threshold=0.1, pose_similarity_threshold=0.35, required_symmetry_evidence=5, graph_delta_thresholds=None, initial_possible_poses='informed', umbilical_num_poses=8)[source]#

Initialize Learning Module.

Parameters:
  • max_match_distance – Maximum distance of a tested and stored location to be matched.

  • tolerances – How much can each observed feature deviate from the stored features to still be considered a match.

  • graph_delta_thresholds – Thresholds used to compare nodes in the graphs being learned, and thereby whether to include a new point or not. By default, we only consider the distance between points, using a threshold of 0.001 (determined in remove_close_points). Can also specify thresholds based on e.g. surface normal angle difference, or principal curvature magnitude difference.

  • path_similarity_threshold – How similar do paths have to be considered the same in the terminal condition check.

  • pose_similarity_threshold – difference between two poses to be considered unique when checking for the terminal condition (in radians).

  • required_symmetry_evidence – number of steps with unchanged possible poses to classify an object as symmetric and go into terminal condition.

  • initial_possible_poses – initial possible poses that should be tested for. In [“uniform”, “informed”, list]. default = “informed”.

  • umbilical_num_poses – Number of samples rotations in the direction of the plane perpendicular to the surface normal.

get_object_rotation(graph_id, get_reverse_r=False)[source]#

Get the rotation of an object from the possible poses if resolved.

This first checks whether we have recognized a unique pose of the object or if a symmetry is detected. If one of the two is true it returns the unique rotation(s), otherwise returns None.

Parameters:
  • graph_id – The object to check poses for.

  • get_reverse_r – Whether to get the rotation that turns the model such that it would produce the sensed_displacements (False) or the rotation needed to turn the sensed_displacements into the model displacements.

Returns:

The rotation of the object if we know it.

get_unique_pose_if_available(object_id)[source]#

Get the pose of an object if we know it.

Scale is not implemented.

Returns:

The pose of the object if we know it.

receive_votes(vote_data)[source]#

Use votes to remove objects and poses from possible matches.

NOTE: Add object back into possible matches if majority of other modules

think it is correct? Could help with dealing with noise but may also prevent LMs from narrowing down quickly. Since we are not working with this LM anymore, we probably won’t add that.

Parameters:

vote_data – positive and negative votes on object IDs + positive votes for locations and rotations on the object.

reset()[source]#

Reset initial hypotheses.

send_out_vote()[source]#

Send out list of objects that are not possible matches.

By sending out the negative matches we avoid the problem that every LM needs to know about the same objects. We could think of this as more of an inhibitory signal (I know it can’t be this object so you all don’t need to check that anymore).

Returns:

List of objects that are not possible matches.

NUM_OTHER_LMS = 4#
class FeatureGraphMemory(graph_delta_thresholds)[source]#

Bases: GraphMemory

Graph memory that matches objects by using features at locations.

__init__(graph_delta_thresholds)[source]#

Initialize Graph Memory.

Parameters:

graph_delta_thresholds – Thresholds used to compare nodes in the graphs being learned, and thereby whether to include a new point or not.

get_initial_hypotheses()[source]#
get_nodes_with_matching_features(graph_id, features, list_of_lists=False) tuple[list, list][source]#

Get only nodes with matching features.

Get a reduced list of nodes that includes only nodes with features that match the features dict passed here

Parameters:
  • graph_id – The graph descriptor e.g. ‘mug’

  • features – The observed features to be matched

  • list_of_lists – should each location in the list be embedded in its own list (useful for some downstream operations)

  • False. (Defaults to) –

Returns:

The reduced lists of ids / locs.

get_rotation_features_at_node(graph_id, node_id, channel)[source]#

Get the rotation features at a node in the graph.

Returns:

The rotation features at a node in the graph.

tbp.monty.frameworks.models.goal_generation#

exception ParentLMNotProvided[source]#

Bases: AttributeError

Parent LM wasn’t provided to a GoalGenerator.

Error raised when a parent learning module is accessed before it is provided to a goal generator.

class EvidenceGoalGenerator(goal_tolerances=None, elapsed_steps_factor=10, min_post_goal_success_steps=numpy.inf, x_percent_scale_factor=0.75, desired_object_distance=0.03, wait_growth_multiplier=2, **kwargs) None[source]#

Bases: GraphGoalGenerator

Generator of Goals for an evidence-based graph LM.

GSG specifically set up for generating Goals for an evidence-based graph LM, which can therefore leverage the hypothesis-testing action policy. This policy uses hypotheses about the most likely objects, as well as knowledge of their structure from long-term memory, to propose test-points that should efficiently disambiguate the ID or pose of the object the agent is currently observing.

TODO M separate out the hypothesis-testing policy (which is one example of a model-based policy), from the GSG, which is the system that is capable of leveraging a variety of model-based policies.

__init__(goal_tolerances=None, elapsed_steps_factor=10, min_post_goal_success_steps=numpy.inf, x_percent_scale_factor=0.75, desired_object_distance=0.03, wait_growth_multiplier=2, **kwargs) None[source]#

Initialize the Evidence GSG.

Parameters:
  • parent_lm

    ?

  • goal_tolerances

    ?

  • elapsed_steps_factor – Factor that considers the number of elapsed steps as a possible condition for initiating a hypothesis-testing Goal; should be set to an integer reflecting a number of steps. In general, when we have taken number of non-Goal-driven steps greater than elapsed_steps_factor, then this is an indication to initiate a hypothesis-testing Goal. In addition however, we can multiply elapsed_steps_factor by an exponentially increasing wait-factor, such that we use longer and longer intervals as the experiment continues. Defaults to 10.

  • min_post_goal_success_steps – Number of necessary steps for a hypothesis Goal to be considered. Unlike elapsed_steps_factor, this is a necessary criteria for us to generate a new hypothesis-testing Goal. For example, if set to 5, then the agent must take 5 non-hypothesis-testing steps before it can even consider generating a new hypothesis-testing Goal. Infinity by default, resulting in no use of the hypothesis-testing policy (desirable for unit tests etc.). Defaults to np.infty.

  • x_percent_scale_factor – Scale x-percent threshold to decide when to focus on pose rather than determining object ID; in particular, this is used to determine whether the top object is sufficiently more likely (based on MLH evidence) than the second MLH object to warrant focusing on disambiguating the pose of the first; should be bounded between 0:1.0. If x_percent_scale_factor=1.0, then will wait until the standard x-percent threshold is exceeded, equivalent to the LM converging to a single object, but not a pose. If it is <1.0, then we will start testing pose of the MLH object even before we are entirely certain about its ID. Defaults to 0.75.

  • desired_object_distance – The desired distance between the agent and the object, which is used to determine whether the agent is close enough to the object to consider it “achieved”. Note this need not be the same as the one specified for the motor-system (e.g. the surface-policy), as we may want to aim for an initially farther distance, while the surface-policy may want to stay quite close to the object. Defaults to 0.03.

  • wait_growth_multiplier – Multiplier used to increase the wait_factor, which in turn controls how long to wait before the next jump attempt.

  • **kwargs – Additional keyword arguments.

reset()[source]#

Reset additional parameters specific to the Evidence GSG.

class GraphGoalGenerator(goal_tolerances=None, **_kwargs) None[source]#

Bases: GoalGenerator

Generate sub-Goals until the received Goal is achieved.

A component (embedded in a learning module) that receives a high-level Goal and generates sub-Goals until that Goal is achieved.

Generated Goals are received by either:

i) other learning modules, which may model world objects (e.g., a mug) or internal systems (e.g., the agent’s robotic limb) ii) motor actuators, in which case they represent simpler, primitive Goals for the actuator to achieve (e.g., the location and orientation of an actuator-sensor pair)

Alongside the high-level driving Goal, generated sub-Goals can also be conditioned on other information such as the LM’s current most-likely hypothesis and the structure of known object models (i.e., information local to the LM).

Note that all Goals conform to the Cortical Messaging Protocol (CMP).

__init__(goal_tolerances=None, **_kwargs) None[source]#

Initialize the GSG.

Note: the GSG is not fully initialized until the parent_lm is set by the owner of the GSG. This step is separated out to allow for dependency injection.

Parameters:
  • goal_tolerances – The tolerances for each attribute of the Goal that can be used by the GSG when determining whether a Goal is achieved. These are not necessarily the same as an LM’s tolerances used for matching, as here we are evaluating whether a Goal is achieved.

  • **kwargs – Additional keyword arguments. Unused.

output_goals() list[Goal][source]#

Retrieve the output Goals of the GSG.

This is the Goal projected to other LMs’ GSGs and/or motor actuators.

Returns:

Output Goals of the GSG if it exists, otherwise empty list.

reset()[source]#

Reset any stored attributes of the GSG.

set_driving_goal(goal)[source]#

Receive a new high-level Goal to drive this Goal Generator (GSG).

If none is provided, the GSG should default to pursuing a high confidence Goal, with no other attributes of the message specified; in other words, it attempts to reduce uncertainty about the LM’s output (object ID and pose, whatever these may be).

TODO M: Currently GSGs always use the default, however future work will implement hierarchical action policies/GSGs, as well as the ability to specify a top Goal by the experimenter.

TODO M: we currently just use “None” as a placeholder for the default Goal. > plan : set the default driving Goal to a meaningful, non-None value that is compatible with the current method for checking convergence of an LM, such that achieving the driving Goal can be used as a test for Monty convergence. This might be something like the below.

step(ctx: RuntimeContext, observations)[source]#

Step the GSG.

Check whether the GSG’s output and driving Goals are achieved, and generate a new output Goal if necessary.

property parent_lm: GraphLM#

tbp.monty.frameworks.models.graph_matching#

class GraphLM(initialize_base_modules=True) None[source]#

Bases: LearningModule

General Learning Module that contains a graph memory.

__init__(initialize_base_modules=True) None[source]#

Initialize general Learning Module based on graphs.

Parameters:
  • rng – The random number generator.

  • initialize_base_modules – Provides option to not intialize the base modules if more specialized versions will be initialized in child LMs. Defaults to True.

add_lm_processing_to_buffer_stats(lm_processed)[source]#

Update the buffer stats with whether the LM processed an observation.

Add boolean of whether the LM processed an observation on this particular episode step.

Parameters:

lm_processed – Boolean of whether the LM processed an observation on this particular episode step

collect_stats_to_save()[source]#

Get all stats that this LM should store in the buffer for logging.

Returns:

Stats to store in the buffer.

exploratory_step(ctx: RuntimeContext, percepts: Sequence[Message]) None[source]#

Step without trying to recognize object (updating possible matches).

Return type:

None

fixme_reset_ground_truth(primary_target: dict[str, Any] | None = None) None[source]#

Set target object var and reset others from last episode.

Parameters:

primary_target – The primary target for the learning module/ Monty system to recognize (e.g. the object the agent begins on, or an important object in the environment; NB that a learning module can also correctly classify a “stepwise_target”, corresponding to the object that it is currently on, while it is attempting to classify the primary_target)

fixme_update_ground_truth() None[source]#

If training, update ground truth.

Return type:

None

get_all_known_object_ids() list[str][source]#

Get the IDs of all object models stored in memory.

Returns:

IDs of all object models stored in memory.

get_graph(model_id, input_channel=None)[source]#

Get learned graph from graph memory.

Note

May generalize this in the future to get_object_model which doesn’t have to be a graph but currently a lot of code expects a graph to be returned so this name is more meaningful.

Returns:

Graph.

get_input_channels_in_graph(model_id)[source]#

Get input channels stored for a graph in graph memory.

Returns:

Input channels stored for a graph in graph memory.

get_object_scale(_object_id)[source]#

Get object scale. TODO: implement solution for detecting scale.

Returns:

1

get_output() Message | None[source]#

Return the output of the learning module.

Is currently only implemented for the evidence LM since the other LM versions do not have a notion of MLH and therefore can’t produce an output until the last step of the episode.

Return type:

Message | None

get_possible_locations()[source]#
get_possible_matches() list[str][source]#

Get list of current possible objects.

TODO: Maybe make this private -> check terminal condition

Returns:

List of current possible objects.

get_possible_paths() dict[str, Any][source]#

Return possible paths for each object.

This is used for logging/plotting and to check if we know where on the object we are.

Returns:

Possible paths for each object.

get_possible_poses(as_euler: bool = True) dict[str, Any][source]#

Return possible poses for each object (for logging).

Possible poses are narrowed down in the feature matching version. When using displacements or PPF this is empty.

Returns:

Possible poses for each object.

get_unique_pose_if_available(object_id)[source]#

Return a 7d pose array if pose is uniquely identified.

This method should return a 7d pose array containing the detected object location, rotation and scale if the pose is uniquely identified. If not, it should contain None. This is used in the Monty class to determine whether we have reached a terminal state.

Returns:

7d pose array or None.

load_state_dict(memento: Mapping[str, Any]) None[source]#

Load the internal object state from a previously snapshot memento.

Parameters:

memento (Mapping[str, Any]) – State dict to load.

Return type:

None

matching_step(ctx: RuntimeContext, percepts: Sequence[Message]) None[source]#

Update the possible matches given an observation.

Return type:

None

propose_goals() list[Goal][source]#

Return the goals proposed by this LM’s GSG.

Only returned if the LM/GSG was stepped, otherwise returns empty list.

receive_votes(votes: Collection[Any]) None[source]#

Remove object ids that come in from the votes.

Parameters:

votes (Collection[Any]) – set of objects that other LMs excluded from possible matches

Return type:

None

reset()[source]#

Reset initial hypotheses.

TODO integrate this into reset_stm and/or fixme_reset_ground_truth?

reset_stm() None[source]#

Reset short-term memory buffer.

Return type:

None

send_out_vote() Any[source]#

Send out list of objects that are not possible matches.

By sending out the negative matches we avoid the problem that every LM needs to know about the same objects. We could think of this as more of an inhibitory signal (I know it can’t be this object so you all don’t need to check that anymore).

Return type:

Any

Returns:

Set of objects that are not possible matches.

set_detected_object(terminal_state)[source]#

Set the current graph ID.

If we didn’t recognize the object this will be new_object{n} where n is len(graph_memory) + 1. Otherwise it is the id of the graph that we recognized. If we timed out it is None and we will not update the graph memory.

set_experiment_mode(mode: ExperimentMode) None[source]#

Set the experiment mode.

Update state variables based on which method (train or evaluate) is being called at the experiment level.

Parameters:

mode (ExperimentMode) – The experiment mode.

Return type:

None

set_individual_ts(terminal_state)[source]#
state_dict() Mapping[str, Any][source]#

Snapshot a memento representing the internal state of this object.

Return type:

Mapping[str, Any]

Returns:

State dict for logging and saving.

update_ltm_from_stm() None[source]#

If training, update memory from buffer.

Return type:

None

update_terminal_condition()[source]#

Check if we have reached a terminal condition for this episode.

Returns:

Terminal state of the LM.

class GraphMemory(graph_delta_thresholds=None, k=None) None[source]#

Bases: LMMemory

General GraphMemory that stores & manipulates GraphObjectModel instances.

You can think of the GraphMemory as a library of object models with a librarian managing them. The books are GraphObjectModel instances. The LearningModule classes access the information stored in the books and can request books to be added to the library.

Subclasses are DisplacementGraphMemory, FeatureGraphMemory and EvidenceGraphMemory.

__init__(graph_delta_thresholds=None, k=None) None[source]#

Initialize a graph memory structure. This can then be filled with graphs.

Parameters:
  • k – integer k as in KNN, used for creating edges between observations

  • graph_delta_thresholds – thresholds for determining if two observations are sufficiently different to both be added to the object model.

Examples:

graph_memory = GraphMemory()
graph_memory._add_graph_to_memory(cup_model, "cup")
graph_memory.reset() # Call at beginning of episode
get_all_models_in_memory()[source]#

Return models stored in memory.

get_feature_array(graph_id)[source]#
get_feature_order(graph_id)[source]#
get_features_at_node(graph_id, input_channel, node_id, feature_keys=None)[source]#

Get features at a specific node in the graph.

Parameters:
  • graph_id – Name of graph.

  • input_channel – Input channel.

  • node_id – Node ID of the node to get features from. Can also be an array of node IDs to return an array of features.

  • feature_keys – Feature keys.

Returns:

Dict of features at this node.

TODO: look into getting node_id > graph.x.shape[0] (by 1)

get_graph(graph_id, input_channel=None)[source]#

Return graph from graph memory.

Parameters:
  • graph_id – id of graph to retrieve

  • input_channel

    ?

Raises:

ValueError – If input_channel is defined, not “first”, and not in the graph

get_graph_node_ids(graph_id, input_channel)[source]#
get_initial_hypotheses()[source]#
get_input_channels_in_graph(graph_id)[source]#
get_locations_in_graph(graph_id, input_channel)[source]#
get_memory_ids()[source]#

Get list of all objects in memory.

Returns:

List of all objects in memory.

get_num_nodes_in_graph(graph_id, input_channel=None)[source]#

Get number of nodes in graph.

If input_channel is None, return sum over all input channels for this object.

Returns:

Number of nodes in graph.

initialize_feature_arrays()[source]#
load_state_dict(memento: Mapping[str, Any]) None[source]#

Load the internal object state from a previously snapshot memento.

Parameters:

memento (Mapping[str, Any]) – State dict to load.

Return type:

None

remove_graph_from_memory(graph_id)[source]#
state_dict() Mapping[str, Any][source]#

Snapshot a memento representing the internal state of this object.

Return type:

Mapping[str, Any]

Returns:

State dict for logging and saving.

update_memory(locations, features, graph_id, object_location_rel_body, location_rel_model, object_rotation)[source]#

Determine how to update memory and call corresponding function.

class MontyForGraphMatching(*args, **kwargs)[source]#

Bases: MontyBase

General Monty model for recognizing objects using graphs.

__init__(*args, **kwargs)[source]#

Initialize and reset LM.

check_if_any_lms_updated()[source]#

True if any LM received sensory information on the current episode step.

Returns:

True if any LM received sensory information on the current episode step, False otherwise.

check_terminal_conditions()[source]#

Check if all LMs have reached a terminal state.

This could be no_match, match, or time_out. If all LMs have reached one of these states, end the episode.

Currently the episode just ends if
  • min_lms_match lms have reached “match”

  • all lms have reached “no_match”

  • We have exceeded max_total_steps

Note

In the future we may want to allow ending an episode when all states are either match or no_match. Right now, the match lms will have to convince the no_match lms of their detected object and pose for the episode to end which may be more difficult if not all LMs know about all objects.

Returns:

True if all LMs have reached a terminal state, False otherwise.

deal_with_time_out()[source]#

Set LM terminal states to time_out.

fixme_set_ground_truth(primary_target: dict[str, Any] | None = None, semantic_id_to_label: dict[SemanticID, str] | None = None) None[source]#

Provide ground truth from experiment supervision.

Parameters:
  • primary_target – Optional primary target to recognize.

  • semantic_id_to_label – Optional mapping from IDs to labels.

load_state_dict_from_parallel(parallel_dirs, save=False)[source]#
reset() None[source]#

Reset the internal state of this Monty model.

Return type:

None

send_vote_to_lm(lm, lm_id, combined_votes)[source]#

Route correct votes to a given LM.

set_is_done()[source]#

Set the model’s is_done flag.

Method that e.g. experiment classes can use to set the model’s flag if e.g. the total number of episode steps possible has been exceeded.

update_stats_after_vote(lm)[source]#

Add voting stats to buffer and check individual terminal condition.

LOGGING_REGISTRY: ClassVar[dict[str, type[BaseMontyLogger]]] = {'BASIC': <class 'tbp.monty.frameworks.loggers.graph_matching_loggers.BasicGraphMatchingLogger'>, 'DETAILED': <class 'tbp.monty.frameworks.loggers.graph_matching_loggers.DetailedGraphMatchingLogger'>, 'SELECTIVE': <class 'tbp.monty.frameworks.loggers.graph_matching_loggers.SelectiveEvidenceLogger'>, 'SILENT': <class 'tbp.monty.frameworks.loggers.exp_logger.BaseMontyLogger'>}#

tbp.monty.frameworks.models.monty_base#

class MontyBase(sensor_modules: Sequence[SensorModule], learning_modules: Sequence[LearningModule], motor_system: MotorSystem, sm_to_agent_dict, sm_to_lm_matrix, lm_to_lm_matrix, lm_to_lm_vote_matrix, min_eval_steps, min_train_steps, num_exploratory_steps, max_total_steps) None[source]#

Bases: Monty

__init__(sensor_modules: Sequence[SensorModule], learning_modules: Sequence[LearningModule], motor_system: MotorSystem, sm_to_agent_dict, sm_to_lm_matrix, lm_to_lm_matrix, lm_to_lm_vote_matrix, min_eval_steps, min_train_steps, num_exploratory_steps, max_total_steps) None[source]#

Initialize the base class.

Parameters:
  • sensor_modules (Sequence[SensorModule]) – list of sensor modules

  • learning_modules (Sequence[LearningModule]) – list of learning modules

  • motor_system (MotorSystem) – class instance that aggregates proposed motor outputs of learning modules and decides next action. Conceptually, this is the subcortical motor area.

  • sm_to_agent_dict – dictionary mapping each sensor module id to the list of habitat agents it receives input from. This is to simulate columns with wide receptive fields that receive input from multiple sensors. TODO: Do we still need this?

  • sm_to_lm_matrix – nested array that governs which sensor modules a learning module will receive input from, such that learning_modules[sm_to_lm_matrix[i][j]] is the jth sensor module that learning module i receives input from. For now, assume 1:1 mapping. Technically, this is a coupling matrix, but since it is sparse, the argument format is an array of arrays.

  • lm_to_lm_matrix – just like sm_to_lm_matrix, but describes coupling between learning modules where one lms output becomes another lms input. The output of an lm needs to be the same format as its input.

  • lm_to_lm_vote_matrix – describes lateral coupling between learning modules. This matrix is used for voting. Assumes no lateral voting if None is passed.

  • min_eval_steps – Minimum number of steps required for evaluations.

  • min_train_steps – Minimum number of steps required for training.

  • num_exploratory_steps – Number of steps required by the exploratory phase.

  • max_total_steps – Maximum number of steps to run the experiment.

Raises:
  • ValueError – If sm_to_lm_matrix is not defined

  • ValueError – If the lengths of learning_modules and sm_to_lm_matrix do not match

  • ValueError – If the keys of sm_to_agent_dict do not match the sensor_module_id`s of `sensor_modules

aggregate_sensory_inputs(ctx: RuntimeContext, observations: Observations, proprioceptive_state: ProprioceptiveState) None[source]#

Receive data from environment, organize on a per sensor module basis.

Parameters:
Return type:

None

check_reached_max_matching_steps(max_steps)[source]#

Check if max_steps was reached and deal with time_out.

Returns:

True if max_steps was reached, False otherwise.

deal_with_time_out()[source]#

Call any functions and logging in case of a time out.

fixme_set_ground_truth(primary_target: dict[str, Any] | None = None, semantic_id_to_label: dict[SemanticID, str] | None = None) None[source]#

Provide ground truth from experiment supervision.

Parameters:
  • primary_target – Optional primary target to recognize.

  • semantic_id_to_label – Optional mapping from IDs to labels.

get_observations(observations, sensor_module_id)[source]#

Get observations from all agents pertaining to a single sensor module.

Observations are returned in the format
{“agent_1”:
{“sm_1”:
{“rgba”: data,

“depth”: data “semantic”: data}

} {“sm_2”:

{“rgba”: data, … }

“agent_2”:
{“sm_3”:

{“rgba”: data, … }

… “agent_n”:

{“sm_k”:

{“rgba”: data, … }

}

}

Returns:

Observations from all agents pertaining to a single sensor module.

load_state_dict(memento: Mapping[str, Any]) None[source]#

Load the internal object state from a previously snapshot memento.

Parameters:

memento (Mapping[str, Any]) – State dict to load.

Return type:

None

motor_only_step(ctx: RuntimeContext, observations: Observations, proprioceptive_state: ProprioceptiveState) list[Action][source]#

Take a step of the sensors and motor system only.

This skips stepping the learning modules.

Parameters:
  • ctx – The runtime context.

  • observations – The observations from the environment.

  • proprioceptive_state – The proprioceptive state from the environment.

Returns:

The actions to take.

reset() None[source]#

Reset the internal state of this Monty model.

Return type:

None

reset_episode_steps()[source]#
set_done() None[source]#
Return type:

None

set_experiment_mode(mode: ExperimentMode) None[source]#

Set the experiment mode.

Update state variables based on which method (train or evaluate) is being called at the experiment level.

Parameters:

mode (ExperimentMode) – The experiment mode.

Return type:

None

state_dict() Mapping[str, Any][source]#

Snapshot a memento representing the internal state of this object.

Return type:

Mapping[str, Any]

Returns:

State dict for logging and saving.

step(ctx: RuntimeContext, observations: Observations, proprioceptive_state: ProprioceptiveState) list[Action][source]#

Take a matching, exploratory, or custom user-defined step.

Step taken depends on the value of self.step_type.

Parameters:
  • ctx – The runtime context.

  • observations – The observations from the environment.

  • proprioceptive_state – The proprioceptive state from the environment.

Returns:

The actions to take.

switch_to_exploratory_step()[source]#
switch_to_matching_step()[source]#
update_ltm() None[source]#

Transfer short-term buffer to long-term memory.

Return type:

None

update_step_counters()[source]#
LOGGING_REGISTRY: ClassVar[dict[str, type[BaseMontyLogger]]] = {'TEST': <class 'tbp.monty.frameworks.loggers.exp_logger.TestLogger'>}#
property exceeded_min_steps#
property is_done: bool#

Return True if the model has reached a terminal condition.

property is_motor_only_step: bool#
property min_steps#
property step_type_count#

tbp.monty.frameworks.models.motor_policies#

exception NoGoalProvided[source]#

Bases: RuntimeError

Raised when no goal is provided.

class BasePolicy(action_sampler: ActionSampler, agent_id: AgentID) None[source]#

Bases: MotorPolicy

__init__(action_sampler: ActionSampler, agent_id: AgentID) None[source]#

Initialize a base policy.

Parameters:
  • action_sampler (ActionSampler) – The ActionSampler to use

  • agent_id (NewType()(AgentID, str)) – The agent ID

load_state_dict(memento: Mapping[str, Any]) None[source]#

Load the internal object state from a previously snapshot memento.

Parameters:

memento (Mapping[str, Any]) – State dict to load.

Return type:

None

reset(motor_system: ExperimentMotorSystem) None[source]#

Reset the internal state of this Motor Policy.

Parameters:

motor_system (ExperimentMotorSystem) – The associated Motor System.

Return type:

None

state_dict() Mapping[str, Any][source]#

Snapshot a memento representing the internal state of this object.

Return type:

Mapping[str, Any]

Returns:

State dict for logging and saving.

class InformedPolicy(use_goal_driven_actions=False, **kwargs) None[source]#

Bases: BasePolicy

Policy that takes observation as input.

Extension of BasePolicy that allows for taking the observation into account for action selection. Uses percept.get_on_object() to decide whether to reverse the last action when the patch is off the object.

__init__(use_goal_driven_actions=False, **kwargs) None[source]#

Initialize policy.

Parameters:
  • use_goal_driven_actions – Whether to enable the motor system to attempt to jump (i.e. teleport) the agent to a specified goal.

  • **kwargs – Additional keyword arguments.

reset(motor_system: ExperimentMotorSystem) None[source]#

Reset the internal state of this Motor Policy.

Parameters:

motor_system (ExperimentMotorSystem) – The associated Motor System.

Return type:

None

class MotorPolicy(*args, **kwargs)[source]#

Bases: RuntimeMotorPolicy, ExperimentMotorPolicy, Snapshotable, ABC

The abstract scaffold for motor policies.

abstract load_state_dict(memento: Mapping[str, Any]) None[source]#

Load the internal object state from a previously snapshot memento.

Parameters:

memento (Mapping[str, Any]) – State dict to load.

Return type:

None

abstract reset(motor_system: ExperimentMotorSystem) None[source]#

Reset the internal state of this Motor Policy.

Parameters:

motor_system (ExperimentMotorSystem) – The associated Motor System.

Return type:

None

abstract state_dict() Mapping[str, Any][source]#

Snapshot a memento representing the internal state of this object.

Return type:

Mapping[str, Any]

Returns:

State dict for logging and saving.

class NaiveScanPolicy(fixed_amount, **kwargs) None[source]#

Bases: InformedPolicy

Policy that just moves left and right along the object.

__init__(fixed_amount, **kwargs) None[source]#

Initialize policy.

check_cycle_action()[source]#

Makes sure we move in a spiral.

This method switches the current action if steps_per_action was reached. Additionally it increments steps_per_action after the second and forth action to make sure paths don’t overlap.

_ _ _ _

_ _ |
|_ | |

|_ _ _| |

corresponds to 1x left, 1x up, 2x right, 2x down, 3x left, 3x up, 4x right, 4x down, …

reset(motor_system: ExperimentMotorSystem) None[source]#

Reset the internal state of this Motor Policy.

Parameters:

motor_system (ExperimentMotorSystem) – The associated Motor System.

Return type:

None

class RuntimeMotorPolicy(*args, **kwargs)[source]#

Bases: Protocol

Monty runtime interface to a Motor Policy.

__init__(*args, **kwargs)#
class SurfacePolicy(alpha, desired_object_distance=0.025, **kwargs) None[source]#

Bases: InformedPolicy

Policy class for a surface-agent.

i.e. an agent that moves to and follows the surface of an object. Includes functions for moving along an object based on its surface normal.

__init__(alpha, desired_object_distance=0.025, **kwargs) None[source]#

Initialize policy.

Parameters:
  • desired_object_distance – Distance to maintain from the surface; used for touch_object and move-forward. Defaults to 0.025.

  • alpha – to what degree should the move_tangentially direction be the same as the last step or totally random? 0~same as before, 1~random walk

  • **kwargs

    ?

get_inverse_agent_rot(state: MotorSystemState)[source]#

Get the inverse rotation of the agent’s current orientation.

Used to transform poses of e.g. surface normals or principle curvature from global coordinates into the coordinate frame of the agent.

To intuit why we apply the inverse, imagine an e.g. surface normal with the same pose as the agent; in the agent’s reference frame, this should have the identity pose, which will be acquired by transforming the original pose by the inverse

Parameters:

state (MotorSystemState) – The current state of the motor system.

Returns:

Inverse quaternion rotation.

get_next_action(ctx: RuntimeContext, state: MotorSystemState, percept: Message) OrientHorizontal | OrientVertical | MoveTangentially | MoveForward | None[source]#

Retrieve next action from a cycle of four actions.

First move forward to touch the object at the right distance Then orient toward the normal along direction 1 Then orient toward the normal along direction 2 Then move tangentially along the object surface Then start over

Parameters:
  • ctx (RuntimeContext) – The runtime context.

  • state (MotorSystemState) – The current state of the motor system.

  • percept (Message) – The percept from (as of this writing) the first sensor module.

Return type:

OrientHorizontal | OrientVertical | MoveTangentially | MoveForward | None

Returns:

Next action in the cycle.

horizontal_distances(rotation_degrees: float, percept: Message) tuple[float, float][source]#

Compute the horizontal and forward distances to move to.

Compensate for a given rotation of a certain angle.

Parameters:
  • rotation_degrees – The angle to rotate by

  • percept – The percept from (as of this writing) the first sensor module.

Returns:

The left distance to move move_forward_distance: The forward distance to move

Return type:

move_left_distance

orienting_angle_from_normal(orienting: str, state: MotorSystemState, percept: Message) float[source]#

Compute turn angle to face the object.

Based on the surface normal, compute the angle that the agent needs to turn in order to be oriented directly toward the object

Parameters:
  • orienting (str) – “horizontal” or “vertical”

  • state (MotorSystemState) – The current state of the motor system.

  • percept (Message) – The percept from (as of this writing) the first sensor module.

Return type:

float

Returns:

degrees that the agent needs to turn

reset(motor_system: ExperimentMotorSystem) None[source]#

Reset the internal state of this Motor Policy.

Parameters:

motor_system (ExperimentMotorSystem) – The associated Motor System.

Return type:

None

tangential_direction(ctx: RuntimeContext, state: MotorSystemState, percept: Message) Tuple[float, float, float][source]#

Set the direction of the action to be a direction 0 - 2pi.

  • start at 0 (go up) in the reference frame of the agent; i.e. based on

the standard initialization of an agent, this will be up from the floor. To implement this convention, the theta is offset by 90 degrees when finding our x and y translations, i.e. such that theta of 0 results in moving up by 1 (y), and right by 0 (x), rather than vice-versa - random action -pi - +pi is given by (rand() - 0.5) * 2pi - These are combined and weighted by the alpha parameter

Parameters:
  • ctx (RuntimeContext) – The runtime context.

  • state (MotorSystemState) – The current state of the motor system.

  • percept (Message) – The percept from (as of this writing) the first sensor module.

Return type:

Tuple[float, float, float]

Returns:

Direction of the action

vertical_distances(rotation_degrees: float, percept: Message) tuple[float, float][source]#

Compute the down and forward distances to move to.

Compensate for a given rotation of a certain angle.

Parameters:
  • rotation_degrees – The angle to rotate by

  • percept – The percept from (as of this writing) the first sensor module.

Returns:

The down distance to move move_forward_distance: The forward distance to move

Return type:

move_down_distance

class SurfacePolicyCurvatureInformed(alpha, pc_alpha, max_pc_bias_steps, min_general_steps, min_heading_steps, **kwargs) None[source]#

Bases: SurfacePolicy

Policy class for a more intelligent surface-agent.

Includes additional functions for moving along an object based on the direction of principle curvature (PC).

A general summary of the policy is that the agent will start following PC directions as soon as these are well defined. This will initially be the minimal curvature; it will follow these for as long as they are defined, and until reaching a certain number of steps (max_pc_bias_steps), before then changing to follow maximal curvature. This process continues to alternate, as long as the PC directions are well defined.

If PC are not meaningfully defined (which is often the case), then the agent uses standard momentum to take a step in a direction similar to the previous step (weighted by the alpha parameter); it will do this for a minimum number of steps (min_general_steps) before it will consider using PC information again.

If the agent is taking a step that will bring it to a previously visited location according to its estimates, then it will attempt to correct for this and choose another direction; after finding a new heading, the agent performs a minimum number of steps (min_heading_steps) before it will check again for a conflicting heading.

The main other event that can occur is that PCs are defined, but these are predominantly in the z-direction (relative to the agent); in that case, the PC-defined heading is ignored on that step, and a standard, momentum-based step is taken; such z-defined PC directions tend to occur on e.g. the rim of cups and are presumably due to issues with the agents re-orientation steps vs. noisiness of the PCs defined by the surface

TODO update this method to accept more general notions of directions-of-variance (rather than strictly principal curvature), such that it can be applied in more abstract spaces

__init__(alpha, pc_alpha, max_pc_bias_steps, min_general_steps, min_heading_steps, **kwargs) None[source]#

Initialize policy.

Parameters:
  • alpha – to what degree should the move_tangentially direction be the same as the last step or totally random? 0~same as before, 1~random walk; used when not following principal curvature (PC)

  • pc_alpha – the degree to which the moving average of the PC direction should be based on the history vs. the most recent step

  • max_pc_bias_steps – the number of steps to take following a particular PC direction (i.e. maximum or minimal curvature) before switching to the other type of PC direction

  • min_general_steps – the number of normal momentum steps that must be taken after the agent has stopped following principal curvature, and before it will follow PC again; if this is too low, then the agent has the habit of moving quite noisily as it keeps attempting to follow PC, but if it is too high, then PC directions are not used to their fullest

  • min_heading_steps – when not following PC directions, the minimum number of steps to take before we check that we’re not heading to a previously visited location; again, this should be sufficiently high that we don’t bounce around noisily, but also low enough that we avoid revisiting locations

  • **kwargs – Additional keyword arguments.

attempt_conflict_resolution(ctx: RuntimeContext, vec_copy) None[source]#

Try to define direction vector that avoids revisiting previous locations.

Parameters:
  • ctx (RuntimeContext) – The runtime context.

  • vec_copy – The vector to copy.

Return type:

None

avoid_revisiting_locations(ctx: RuntimeContext, state: MotorSystemState, conflict_divisor=3, max_steps=100)[source]#

Avoid revisiting locations.

Check if the new proposed location direction is already pointing to somewhere we’ve visited before; if not, we can use the initially proposed movement.

If there is a conflict, we select a new heading that avoids this; this is achieved by iteratively searching for a heading that does not conflict with any previously visited locations.

Parameters:
  • ctx (RuntimeContext) – The runtime context.

  • conflict_divisor – The amount pi is divided by to determine that a current heading will be too close to a previously visited location; this is an initial value that will be dynamically adjusted. Defaults to 3.

  • max_steps – Maximum iterations of the search to perform to try to find a non-conflicting heading. Defaults to 100.

  • state (MotorSystemState) – The current state of the motor system.

Note that while the policy might have “unrealistic” access to information about it’s location in the environment, this could easily be replaced by relative locations based on the first sensation

Finally, note that in many situations, revisiting locations can be a good thing (e.g. re-anchoring given noisy path-integration), so we may want to activate /inactivate this as necessary (TODO)

TODO separate out avoid_revisiting_locations as its own mixin so that it can be used more broadly

check_for_flipped_pc()[source]#

Check for arbitrarily flipped PC direction.

Do a quick check to see if the previous PC heading has been arbitrarily flipped, in which case, flip it back. With any luck, this will allow us to automatically pass avoid_revisiting_locations and thereby continue using PCs where relevant.

check_for_preference_change()[source]#

Flip the preference for the min or max PC after a certain number of steps.

This way, we can more quickly explore different “parts” of an object, rather than just persistently following e.g. the rim of a cup. By default, there is always an initial bias for the smallest principal curvature.

TODO can eventually combine with a hypothesis-testing policy that encourages exploration of unvisited parts of the object, rather than relying on this simple counter-heuristic

conflict_check(rotated_locs, ii)[source]#

Check for conflict in the current heading.

Target location needs to be similar and we need to have a similar surface normal to discount the current proposed heading; if surface normals are significantly different, then we are likely on a different surface, in which case passing by nearby previous points is no longer problematic.

Note that when executing failed hypothesis-testing jumps, we can have multiple instances of the same location in our history; this will result in get_angle_beefed_up returning infinity, i.e. we don’t worry about avoiding our current location

Returns:

True if there is a conflict, False otherwise.

determine_pc_for_use(percept: Message)[source]#

Determine the principal curvature to use for our heading.

Use magnitude (ignoring negatives), as well as the current direction preference.

Parameters:

percept (Message) – The percept from (as of this writing) the first sensor module.

Returns:

Principal curvature to use.

pc_moving_average()[source]#

Calculate a moving average of the principal curvature direction.

The moving average should be consistent even on curved surfaces as the directions will be in the reference frame of the agent (which has rotated itself to align with the surface normal)

perform_pc_guided_step(ctx: RuntimeContext, state: MotorSystemState, percept: Message) Tuple[float, float, float][source]#

Inform steps to take using defined directions of principal curvature.

Use the defined directions of principal curvature to inform (ideally a series) of steps along the appropriate direction.

Parameters:
  • ctx (RuntimeContext) – The runtime context.

  • state (MotorSystemState) – The current state of the motor system.

  • percept (Message) – The percept from (as of this writing) the first sensor

Return type:

Tuple[float, float, float]

Returns:

Direction of the action

perform_standard_tang_step(ctx: RuntimeContext, state: MotorSystemState) Tuple[float, float, float][source]#

Perform a standard tangential step across the object.

This is in contrast to, for example, being guided by principal curvatures.

Note this is still more “intelligent” than the tangential step of the baseline surface-agent policy, because it also attempts to avoid revisiting old locations

Parameters:
Return type:

Tuple[float, float, float]

Returns:

Direction of the action

reset(motor_system: ExperimentMotorSystem) None[source]#

Reset the internal state of this Motor Policy.

Parameters:

motor_system (ExperimentMotorSystem) – The associated Motor System.

Return type:

None

reset_pc_buffers()[source]#

Reset counters and other variables.

We’ve just left a series of PC-defined trajectories (i.e. entered a region of undefined PCs), or had to select a new heading in order to avoid revisiting old locations. As such, appropriately reset counters and other variables.

Note we do not reset tangential_angle or the vector, such that this information can still be used by e.g. momentum on the next step to keep us going generally forward

tangential_direction(ctx: RuntimeContext, state: MotorSystemState, percept: Message) Tuple[float, float, float][source]#

Set the direction of action to be a direction 0 - 2pi.

This controls the move_tangential action
  • start at 0 (go up in the reference frame of the agent, i.e. based on

where it is facing), with the actual orientation determined via either principal curvature, or a random step weighted by momentum

Tangential movements are the primary means of progressively exploring an object’s surface

Parameters:
  • ctx (RuntimeContext) – The runtime context.

  • state (MotorSystemState) – The current state of the motor system.

  • percept (Message) – The percept from (as of this writing) the first sensor module.

Return type:

Tuple[float, float, float]

Returns:

Direction of the action

update_action_details(percept: Message) None[source]#

Store informaton for later logging.

This stores information that details elements of the policy or observations relevant to policy decisions.

E.g. if model-free policy has been unable to find a path that avoids revisiting old locations, an LM might use this information to inform a particular action (TODO not yet implemented, and NOTE that any modelling should ultimately be located in the learning module(s), not in motor systems)

Parameters:

percept (Message) – The percept from (as of this writing) the first sensor module.

Return type:

None

update_tangential_reps(vec_form=None, angle_form=None)[source]#

Update the angle and vector representation of a tangential heading.

Angle and vector representations are stored as self.tangential_angle and self.tangential_vec, respectively.

Ensures the two representations are always consistent. Further ensures that, because these movements are tangential, it will be defined in the plane, relative to the agent (i.e. movement along x and y only), and any inadvertant movement along the z-axis relative to the agent will be eliminated. User should supply either the vector form or the (Euler) angle form in radians that will define the new representations.

tbp.monty.frameworks.models.motor_policy_selectors#

class DistantPolicySelector(jump_to_goal: JumpToGoal, look_at_goal: LookAtGoal, default: MotorPolicy)[source]#

Bases: MotorPolicySelector

__init__(jump_to_goal: JumpToGoal, look_at_goal: LookAtGoal, default: MotorPolicy)[source]#
reset(motor_system: ExperimentMotorSystem) None[source]#

Reset the internal state of this Motor Policy Selector.

Parameters:

motor_system (ExperimentMotorSystem) – The associated Motor System.

Return type:

None

state_dict() Mapping[str, Any][source]#
Return type:

Mapping[str, Any]

class MotorPolicySelector(*args, **kwargs)[source]#

Bases: RuntimeMotorPolicySelector, ExperimentMotorPolicySelector, Protocol

__init__(*args, **kwargs)#
class RuntimeMotorPolicySelector(*args, **kwargs)[source]#

Bases: Protocol

Monty runtime interface to a Motor Policy Selector.

__init__(*args, **kwargs)#
class SinglePolicySelector(policy: MotorPolicy)[source]#

Bases: MotorPolicySelector

__init__(policy: MotorPolicy)[source]#
reset(motor_system: ExperimentMotorSystem) None[source]#

Reset the internal state of this Motor Policy Selector.

Parameters:

motor_system (ExperimentMotorSystem) – The associated Motor System.

Return type:

None

state_dict() Mapping[str, Any][source]#
Return type:

Mapping[str, Any]

highest_confidence_goal(goals: Sequence[Goal]) Goal[source]#

Return the goal with the highest confidence.

If there are multiple goals with the same confidence, returns the first one.

Parameters:

goals (Sequence[Goal]) – A sequence of goals. Must be non-empty.

Return type:

Goal

Returns:

The goal with the highest confidence.

tbp.monty.frameworks.models.motor_system#

class MotorSystem(policy_selector: MotorPolicySelector) None[source]#

Bases: RuntimeMotorSystem, ExperimentMotorSystem

The basic motor system implementation.

__init__(policy_selector: MotorPolicySelector) None[source]#

Initialize the motor system with a motor policy.

Parameters:

policy_selector (MotorPolicySelector) – The motor policy selector to use.

reset() None[source]#

Reset the internal state of this Motor System.

Return type:

None

state_dict() Mapping[str, Any][source]#
Return type:

Mapping[str, Any]

property action_sequence: list[tuple[list[Action], dict[AgentID, Any] | None]]#
property motor_only_step: bool#

When True, suppress Learning Module processing.

class RuntimeMotorSystem(*args, **kwargs)[source]#

Bases: Protocol

Monty runtime interface to a Motor System.

__init__(*args, **kwargs)#

tbp.monty.frameworks.models.motor_system_state#

class AgentState(sensors: dict[SensorID, SensorState], position: VectorXYZ, rotation: Any) None[source]#

Bases: object

The proprioceptive state of an agent.

__init__(sensors, position, rotation)#
position: VectorXYZ#

The agent’s position.

rotation: Any#

The agent’s rotation.

sensors: dict[SensorID, SensorState]#

The proprioceptive state of the agent’s sensors.

When part of an AgentState, the SensorState’s position and rotation are relative to the agent’s position and rotation.

class MotorSystemState[source]#

Bases: Dict[AgentID, AgentState]

The state of the motor system.

TODO: Currently, ProprioceptiveState can be cast to MotorSystemState since

MotorSystemState is a generic dictionary. In the future, make ProprioceptiveState a param on MotorSystemState to more clearly distinguish between the two. These are separate from each other because ProprioceptiveState is the information returned from the environment, while MotorSystemState is that, as well as any state that the motor system needs for operation.

class ProprioceptiveState[source]#

Bases: Dict[AgentID, AgentState]

The proprioceptive state of the motor system.

When part of a ProprioceptiveState, the AgentState’s position and rotation are relative to some global reference frame.

class SensorState(position: Tuple[float, float, float], rotation: Any) None[source]#

Bases: object

The proprioceptive state of a sensor.

__init__(position: Tuple[float, float, float], rotation: Any) None#
position: Tuple[float, float, float]#

The sensor’s position.

rotation: Any#

The sensor’s rotation.

tbp.monty.frameworks.models.no_reset_evidence_matching#

class MontyForNoResetEvidenceGraphMatching(*args, **kwargs)[source]#

Bases: MontyForEvidenceGraphMatching

Monty class for unsupervised inference without explicit episode resets.

This variant of MontyForEvidenceGraphMatching is designed for unsupervised inference experiments where objects may change dynamically without any reset signal. Unlike standard experiments, this class avoids resetting Monty’s internal state (e.g., hypothesis space, evidence scores) between episodes.

This setup better reflects real-world conditions, where object boundaries are ambiguous and no supervisory signal is available to indicate when a new object appears. Only minimal state — such as step counters and termination flags — is reset to prevent buffers from accumulating across objects. Additionally, Monty is currently forced to switch to Matching state. Evaluation of unsupervised inference is performed over a fixed number of matching steps per object.

Intended for evaluation-only runs using pre-trained models, with Monty remaining in the matching phase throughout.

__init__(*args, **kwargs)[source]#

Initialize and reset LM.

fixme_set_ground_truth(primary_target: dict[str, Any] | None = None, semantic_id_to_label: dict[SemanticID, str] | None = None) None[source]#

Provide ground truth from experiment supervision.

Parameters:
  • primary_target – Optional primary target to recognize.

  • semantic_id_to_label – Optional mapping from IDs to labels.

reset() None[source]#

Reset the internal state of this Monty model.

Return type:

None

experiment_mode: ExperimentMode | None#
class NoResetEvidenceGraphLM(*args, **kwargs)[source]#

Bases: TheoreticalLimitLMLoggingMixin, EvidenceGraphLM

__init__(*args, **kwargs)[source]#
reset() None[source]#

Reset evidence count and other variables.

Return type:

None

tbp.monty.frameworks.models.object_model#

exception GridTooSmallError[source]#

Bases: Exception

Exception raised when grid is too small to fit all observations.

class GraphObjectModel(object_id)[source]#

Bases: ObjectModel

Object model class that represents object as graphs.

__init__(object_id)[source]#

Initialize the object model.

Parameters:

object_id – id of the object

add_ppf_to_graph()[source]#

Add point pair features to graph edges.

build_model(locations, features, k_n, graph_delta_thresholds)[source]#

Build graph from locations and features sorted into grids.

edge_index_between(previous_node: int, new_node: int) int | None[source]#

Return the edge index between two nodes in a graph.

Parameters:
  • previous_node (int) – Node ID of the first node in the graph.

  • new_node (int) – Node ID of the second node in the graph.

Return type:

int | None

Returns:

Edge ID between the two nodes, or None if no such edge exists.

get_values_for_feature(feature)[source]#
set_graph(graph)[source]#

Set self._graph property with given graph (i.e. from pretraining).

update_model(locations, features, location_rel_model, object_location_rel_body, object_rotation)[source]#

Add new locations and features into grids and rebuild graph.

property edge_attr#
property edge_index#
property feature_ids_in_graph#
property feature_mapping#
property norm#
property num_nodes#
property pos#
property x#
class GridObjectModel(object_id, max_nodes, max_size, num_voxels_per_dim)[source]#

Bases: GraphObjectModel

Model of an object and all its functions.

This model has the same basic functionality as the NumpyGraph models used in older LM versions. On top of that we now have a grid representation of the object that constrains the model size and resolution. Additionally, this model class implements a lot of functionality that was previously implemented in the graph_utils.py file.

TODO: General cleanups that require more changes in other code
  • remove node_ids from input_channels and have as graph attribute

  • remove .norm as attribute and store as feature instead?

__init__(object_id, max_nodes, max_size, num_voxels_per_dim)[source]#

Initialize a grid object model.

Parameters:
  • object_id – id of the object

  • max_nodes – maximum number of nodes in the graph. Will be k in k winner voxels with highest observation count.

  • max_size – maximum size of the object in meters. Defines size of objects that can be represented and how locations are mapped into voxels.

  • num_voxels_per_dim – number of voxels per dimension in the model’s grids. Defines the resolution of the model.

build_model(locations, features)[source]#

Build graph from locations and features sorted into grids.

find_nearest_neighbors(search_locations, num_neighbors, return_distance=False)[source]#

Find nearest neighbors in graph for list of search locations.

Note

This is currently using kd tree search. In the future we may consider doing this directly by indexing the grids. However, an initial implementation of this does not seem to be faster than the kd tree search (~5-10x slower). However one must consider that search directly in the grid would remove the cost of building the tree. TODO: Investigate this further.

Returns:

If return_distance is True, return distances. Otherwise, return indices of nearest neighbors.

set_graph(graph)[source]#

Set self._graph property and convert input graph to right format.

update_model(locations, features, location_rel_model, object_location_rel_body, object_rotation)[source]#

Add new locations and features into grids and rebuild graph.

tbp.monty.frameworks.models.sensor_modules#

class CameraSM(sensor_module_id: str, features: list[str], save_raw_obs: bool = False, pc1_is_pc2_threshold: int = 10, noise_params: dict[str, Any] | None = None, is_surface_sm: bool = False, delta_thresholds: dict[str, Any] | None = None) None[source]#

Bases: SensorModule

Sensor Module that turns RGBD camera observations into features at locations.

Takes in camera rgba and depth input and calculates locations from this. It also extracts features which are currently: on_object, rgba, surface_normal, curvature.

__init__(sensor_module_id, features, save_raw_obs=False, pc1_is_pc2_threshold=10, noise_params=None, is_surface_sm=False, delta_thresholds=None)[source]#

Initialize Sensor Module.

Parameters:
  • sensor_module_id – Name of sensor module.

  • features – Which features to extract. In [on_object, rgba, surface_normal, principal_curvatures, curvature_directions, gaussian_curvature, mean_curvature]

  • save_raw_obs – Whether to save raw sensory input for logging.

  • pc1_is_pc2_threshold – Maximum difference between pc1 and pc2 to be classified as being roughly the same (ignore curvature directions). Defaults to 10.

  • noise_params – Dictionary of noise amount for each feature.

  • is_surface_sm – Surface SMs do not require that the central pixel is “on object” in order to process the observation (i.e., extract features). Defaults to False.

  • delta_thresholds – If given, a FeatureChangeFilter will be used to check whether the current state’s features are significantly different from the previous with tolerances set according to delta_thresholds. Defaults to None.

Note

When using feature-at-location matching with graphs, surface_normal and on_object need to be in the list of features.

Note

gaussian_curvature and mean_curvature should be used together to preserve the same information contained in principal_curvatures.

reset() None[source]#

Reset the internal state of this Sensor Module.

Return type:

None

state_dict() Mapping[str, Any][source]#
Return type:

Mapping[str, Any]

step(ctx: RuntimeContext, observation: SensorObservation, motor_only_step: bool = False) Message[source]#

Turn raw observations into dict of features at location.

Parameters:
  • ctx (RuntimeContext) – The runtime context.

  • observation (SensorObservation) – Raw sensor observation.

  • motor_only_step (bool) – Whether the current step is a motor-only step.

Return type:

Message

Returns:

Percept with features and morphological features. Noise may be added. The use_state flag may be set.

update_state(agent: AgentState) None[source]#

Update the proprioceptive state for this Sensor Module.

Parameters:

agent (AgentState) – The proprioceptive state of this sensor module’s Agent.

Return type:

None

class DefaultMessageNoise(noise_params: dict[str, Any])[source]#

Bases: MessageNoise

__init__(noise_params)[source]#
add_noise_to_feat_value(feat_name, feat_val, rng: numpy.random.RandomState)[source]#
class FeatureChangeFilter(delta_thresholds: dict[str, Any])[source]#

Bases: PerceptFilter

__init__(delta_thresholds)[source]#
reset()[source]#

Reset buffer and is_exploring flag.

class MessageNoise(*args, **kwargs)[source]#

Bases: Protocol

__init__(*args, **kwargs)#
class NoMessageNoise(*args, **kwargs)[source]#

Bases: MessageNoise

class ObservationProcessor(features: list[str], sensor_module_id: str, pc1_is_pc2_threshold=10, surface_normal_method=SurfaceNormalMethod.TLS, weight_curvature=True, is_surface_sm=False) None[source]#

Bases: object

Processes SensorObservations into Cortical Messages.

__init__(features, sensor_module_id, pc1_is_pc2_threshold=10, surface_normal_method=SurfaceNormalMethod.TLS, weight_curvature=True, is_surface_sm=False)[source]#

Initializes the ObservationProcessor.

Parameters:
  • features – List of features to extract.

  • pc1_is_pc2_threshold – Maximum difference between pc1 and pc2 to be classified as being roughly the same (ignore curvature directions). Defaults to 10.

  • sensor_module_id – ID of sensor module.

  • surface_normal_method – Method to use for surface normal extraction. Defaults to TLS.

  • weight_curvature – Whether to use the weighted implementation for principal curvature extraction (True) or unweighted (False). Defaults to True.

  • is_surface_sm – Surface SMs do not require that the central pixel is “on object” in order to process the observation (i.e., extract features). Defaults to False.

process(observation: SensorObservation) Message[source]#

Processes observation.

Parameters:

observation (SensorObservation) – Habitat observation.

Return type:

Message

Returns:

A Percept.

CURVATURE_FEATURES: ClassVar[list[str]] = ['principal_curvatures', 'principal_curvatures_log', 'gaussian_curvature', 'mean_curvature', 'gaussian_curvature_sc', 'mean_curvature_sc', 'curvature_for_TM']#
POSSIBLE_FEATURES: ClassVar[list[str]] = ['on_object', 'object_coverage', 'min_depth', 'mean_depth', 'rgba', 'hsv', 'pose_vectors', 'principal_curvatures', 'principal_curvatures_log', 'pose_fully_defined', 'gaussian_curvature', 'mean_curvature', 'gaussian_curvature_sc', 'mean_curvature_sc', 'curvature_for_TM', 'coords_for_TM', 'edge_strength', 'coherence']#
class PassthroughPerceptFilter(*args, **kwargs)[source]#

Bases: PerceptFilter

reset() None[source]#
Return type:

None

class PerceptFilter(*args, **kwargs)[source]#

Bases: Protocol

__init__(*args, **kwargs)#
reset() None[source]#
Return type:

None

class Probe(sensor_module_id: str, save_raw_obs: bool) None[source]#

Bases: SensorModule

A probe that can be inserted into Monty in place of a sensor module.

It will track raw observations for logging, and can be used by experiments for positioning procedures, visualization, etc.

What distinguishes a probe from a sensor module is that it does not process observations and does not emit a Cortical Message.

__init__(sensor_module_id: str, save_raw_obs: bool) None[source]#

Initialize the probe.

Parameters:
  • rng – Random number generator. Unused.

  • sensor_module_id (str) – Name of sensor module.

  • save_raw_obs (bool) – Whether to save raw sensory input for logging.

reset() None[source]#

Reset the internal state of this Sensor Module.

Return type:

None

state_dict() Mapping[str, Any][source]#
Return type:

Mapping[str, Any]

step(ctx: RuntimeContext, observation: SensorObservation, motor_only_step: bool = False) None[source]#

Execute a time-step for the Sensor Module.

Parameters:
  • ctx (RuntimeContext) – The runtime context.

  • observation (SensorObservation) – Sensor observation.

  • motor_only_step (bool) – Whether the current step is a motor-only step.

Return type:

None

Returns:

An optional percept with features and morphological features.

update_state(agent: AgentState) None[source]#

Update the proprioceptive state for this Sensor Module.

Parameters:

agent (AgentState) – The proprioceptive state of this sensor module’s Agent.

Return type:

None

class SnapshotTelemetry None[source]#

Bases: object

Keeps track of raw observation snapshot telemetry.

__init__() None[source]#
raw_observation(raw_observation: SensorObservation, rotation: quaternion.quaternion, position: numpy.ndarray)[source]#

Record a snapshot of a raw observation and its pose information.

Parameters:
  • raw_observation (SensorObservation) – Raw observation.

  • rotation (quaternion) – Rotation of the sensor.

  • position (ndarray) – Position of the sensor.

reset()[source]#

Reset the snapshot telemetry.

state_dict() Mapping[str, Any][source]#

Returns recorded raw observation snapshots.

Return type:

Mapping[str, Any]

Returns:

Dictionary containing a list of raw observations in raw_observations and a list of pose information for each observation in sm_properties.

class SurfaceNormalMethod(value)[source]#

Bases: Enum

An enumeration.

NAIVE = 'naive'#

Naive

OLS = 'OLS'#

Ordinary Least-Squares

TLS = 'TLS'#

Total Least-Squares

tbp.monty.frameworks.models.two_d_sensor_module#