Source code for tbp.monty.frameworks.models.goal_state_generation

# Copyright 2025 Thousand Brains Project
# Copyright 2023-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

import logging

import numpy as np

from tbp.monty.frameworks.models.abstract_monty_classes import GoalStateGenerator
from tbp.monty.frameworks.models.states import GoalState
from tbp.monty.frameworks.utils.communication_utils import get_state_from_channel


[docs]class GraphGoalStateGenerator(GoalStateGenerator): """Generate sub-goal states until the received goal state is achieved. A component associated with each learning module that receives a high level goal state, and generates sub-goal states until the received goal state is achieved. Generated goal-states are received by either: i) other learning modules, which may model world-objects (e.g. a mug), or may model internal systems (e.g. the agent's robotic limb) ii) motor actuators, in which case they represent simpler, primitive goal-states for the actuator to achieve (e.g. location and orientation of an actuator-sensor pair) As well as the high-level, "driving" goal-state, generated goal-states can also be conditioned on other information, such as the LMs current most-likely hypothesis, and the structure of known object models (i.e. information local to the LM). Note all goal-states conform to the State-class cortical messaging protocol (CMP). """ def __init__(self, parent_lm, goal_tolerances=None, **kwargs) -> None: """Initialize the GSG. Args: parent_lm: The learning-module class instance that the GSG is embedded within. goal_tolerances: The tolerances for each attribute of the goal-state that can be used by the GSG when determining whether a goal-state is achieved. These are not necessarily the same as an LM's tolerances used for matching, as here we are evaluating whether a goal-state is achieved. **kwargs: Additional keyword arguments. Unused. """ self.parent_lm = parent_lm if goal_tolerances is None: self.goal_tolerances = dict( location=0.015, # distance in meters ) else: self.goal_tolerances = goal_tolerances self.reset() self.set_driving_goal_state(self._generate_none_goal_state()) # =============== Public Interface Functions =============== # ------------------ Getters & Setters ---------------------
[docs] def reset(self): """Reset any stored attributes of the GSG.""" self.set_driving_goal_state(self._generate_none_goal_state()) self._set_output_goal_state(self._generate_none_goal_state()) self.parent_lm.buffer.update_stats( dict( matching_step_when_output_goal_set=[], goal_state_achieved=[], ), update_time=False, append=False, init_list=False, )
[docs] def set_driving_goal_state(self, received_goal_state): """Receive a new high-level goal to drive this goal-state-generator (GSG). If none is provided, the goal-state generator should default to pursuing a goal-state of high confidence, with no other attributes of the state specified; in other words, it attempts to reduce uncertainty about the LM's output (object ID and pose, whatever these may be). TODO M: Currently GSGs always use the default, however future work will implement hierarchical action policies/GSGs, as well as the ability to specify a top goal-state by the experimenter. TODO M : we currently just use "None" as a placehodler for the default goal-state > plan : set the default driving goal-state to a meaningful, non-None value that is compatible with the current method for checking convergence of an LM, such that achieving the driving goal-state can be used as a test for Monty convergence. This might be something like the below. """ # if received_goal_state is None: # # The current default goal-state, which is to reduce uncertainty; this is # # defined by having a high-confidence in the goal-state, and an arbitrary # # single object ID. # self.driving_goal_state = GoalState( # location=None, # morphological_features=None, # non_morphological_features={ # "object_id": "*", # Match any object so long as it is described # # by a single ID # "location_rel_model": None # }, # confidence=1.0, # Should have high confidence # use_state=False, # sender_id=self.parent_lm.learning_module_id, # sender_type="GSG", # goal_tolerances=None, # ) # else: self.driving_goal_state = received_goal_state
[docs] def get_output_goal_state(self): """Retrieve the output goal-state of the GSG. This is the goal-state projected to other LM's GSGs +/- motor-actuators. Returns: Output goal-state of the GSG. """ return self.output_goal_state
# ------------------- Main Algorithm -----------------------
[docs] def step_gsg(self, observations): """Step the GSG. Check whether the GSG's output and driving goal-states are achieved, and generate a new output goal-state if necessary. """ output_goal_achieved = self._check_output_goal_state_achieved(observations) self._update_gsg_logging(output_goal_achieved) # If driving goal-state achieved, from this LM's perspective, other LMs/GSGs # need not do anything # TODO M re-introduce when replacing the current check for convergence with # achievement of the driving goal-state # if self._check_driving_goal_state_achieved(): # self._set_output_goal_state( # new_goal_state=self._generate_none_goal_state() # ) # else: # # Below code block if self._check_need_new_output_goal(output_goal_achieved): self._set_output_goal_state( new_goal_state=self._generate_goal_state(observations) ) elif self._check_keep_current_output_goal(): pass else: self._set_output_goal_state(new_goal_state=self._generate_none_goal_state())
# ======================= Private ========================== # ------------------- Main Algorithm ----------------------- def _generate_none_goal_state(self): """Return a None-type goal state. A None-type goal state specifies nothing other than high confidence. NOTE currently we just use a None value, however in the future we might specify a GoalState object with a None value for the location, morphological features, etc, or some variation of this. """ return None def _generate_goal_state(self, observations): """Generate a new goal-state to send out to other LMs and/or motor actuators. Given the driving goal-state, and information from the parent LM of the GSG (including the current observations), generate a new goal-state to send out to other LMs and/or motor actuators. Note the output goal-state is in a common, body-centered frame of reference, as for voting, such that different modules can mutually communicate. This version is a base place-holder method that just returns a None goal-state, and does not actually make use of observations or the driving-goal state. Returns: A None-type goal state. """ return self._generate_none_goal_state() def _check_states_different( self, state_a, state_b, diff_tolerances, ) -> bool: """Check whether two states are different. States need to be different only by one feature/dimension to be considered different. When checking whether the states are different, a dictionary must be passed of tolerances; in the GSG-class, this is typically the GSG's own default goal-tolerances that are used, but specific tolerances can also be passed along with a goal-state itself (i.e. achieve this goal-state, to within these tolerance bounds). Note: If a state is undefined (None), we define a difference as unmeaningful and therefore return False. Similarly, for any feature of a state (or dimension of a feature) that is undefined (None or NaN), we do not return any difference along that dimension. TODO M consider making this a utility function, as might be useful in e.g. the LM itself as well. However, the significant presence of None/NaN values in Goal States may mean we want to take a different appraoch. Returns: Whether the states are different. """ if state_a is None or state_b is None: return False states_different = False for tolerance_key, tolerance_val in diff_tolerances.items(): # TODO M implement feature comparisons for other State features (e.g. # confidence) # TODO M consider using the LM's lm.tolerances for the default values of # diff_tolerances if tolerance_key == "location": if state_a.location is not None and state_b.location is not None: distance = np.linalg.norm(state_a.location - state_b.location) elif tolerance_key == "pose_vectors": if ( state_a.morphological_features is not None and state_b.morphological_features is not None ): raise NotImplementedError( "TODO M implement pose-vector comparisons that handle symmetry\ of objects" ) # TODO M consider using an angular distance instead of Euclidean # when we actually begin making use of this feature; try to ensure # this handles symmetry conditions e.g. flipped principal curvature # directions. distance = self._euc_dist_ignoring_nan( state_a.morphological_features["pose_vectors"], state_b.morphological_features["pose_vectors"], ) states_different = distance > tolerance_val if states_different: return states_different return states_different def _check_driving_goal_state_achieved(self) -> bool: """Check if parent LM's output state is close enough to driving goal-state. TODO M Move some of the checks for convergence here Returns: Whether the parent LM's output state is close enough to the driving goal-state. """ if self.driving_goal_state.goal_tolerances is None: # When not specified by the incoming driving-goal-state, use the GSG's own # default matching tolerances diff_tolerances = self.goal_tolerances return self._check_states_different( self.parent_lm.get_output(), self.driving_goal_state, diff_tolerances ) def _check_output_goal_state_achieved(self, observations) -> bool: """Check if the output goal-state was achieved. Check whether the information entering the LM suggests that the output goal state of the GSG was achieved. Recall that the output goal is the one sent by this GSG to other LMs and motor-actuators to be achieved. Note: In the future we might use feedback from a receiving system that is not "sensory input" (i.e. does not inform the graph building of this parent LM); Instead, such feedback could include the state of an LM that controls a motor system (such as a hand model LM), or the state of a motor-actuator (akin to proprioceptive feedback); in this case, we could directly compare the output goal-state to the state recived by this feedback, to determine whether the goal-state was likely achieved. This input would likely come from a separate channel (similar to voting). Finally, note that this information could be complimentary to feedback from the sensory input and our sensory predictions, as in some cases we might have no proprioceptive feedback, while in other cases we might have no sensory input (e.g. blindfolded); alignment or mismatch between these two could form useful signals for learning policies and object behaviors. Returns: Whether the output goal-state was achieved. """ if self.output_goal_state is not None: goal_achieved = self._check_input_matches_sensory_prediction(observations) return goal_achieved else: return False def _check_input_matches_sensory_prediction(self, observations): """Check whether the input matches the sensory prediction. Here the sensory prediction is simply that the input state has changed, as when the motor-system attempts to achieve a goal-state and fails (e.g. due to collision with another object), it moves back to the original position. Note that there can still be some difference even when a goal-state failed, as the feature-change-SM and motor-only steps can result in the agent moving after it has returned to its original position. Futhermore, there may not always be a difference if the agent did "succeed", if the goal-state it wanted to acheive happened to be very close to its original position. Thus this is an approximate method. TODO M implement also using the target goal-state and internal model to predict a specific input state, and then compare to that to determine not just whether a movement took place, but whether the agent moved to a particular point on a particular object. Returns: Whether the input matches the sensory prediction. """ sensor_channel_name = self.parent_lm.buffer.get_first_sensory_input_channel() current_sensory_input = get_state_from_channel( states=observations, channel_name=sensor_channel_name ) prev_input_states = self.parent_lm.buffer.get_previous_input_states() if prev_input_states is not None: previous_sensory_input = get_state_from_channel( states=prev_input_states, channel_name=sensor_channel_name, ) else: previous_sensory_input = None # NB if no history of inputs, get_previous_input_states returns None, in which # case _check_states_different will return False, and we return goal_achieved as # False, as we cannot meaningfully evaluate whether this occured input_changed = self._check_states_different( current_sensory_input, previous_sensory_input, diff_tolerances=self.goal_tolerances, ) return input_changed def _check_need_new_output_goal(self, output_goal_achieved) -> bool: """Determine whether the GSG should generate a new output goal-state. In the base version, this is True if the output-goal was achieved, suggesting we should move on to the next goal. Returns: Whether the GSG should generate a new output goal-state. """ if output_goal_achieved: return True else: return False def _check_keep_current_output_goal(self) -> bool: """Should we keep our current goal? If we don't need a new goal, determine whether we should keep our current goal (as opposed to output no goal at all). Returns: Whether we should keep our current goal. """ return True def _euc_dist_ignoring_nan(self, a, b): """Euclidean distance between two arrays, ignoring NaN values. Take the Euclidean distance between two arrays, but only measuring the distance where both arrays have non-NaN values. Args: a: First array b: Second array Returns: Euclidean distance between the two arrays, ignoring NaN values; if all values are NaN, return 0 TODO M consider making a general utility function """ assert a.shape == b.shape, "Arrays must be of the same shape" mask = ~np.isnan(a) & ~np.isnan(b) # If the mask is empty, return 0 (i.e. there is no meaningful distance # between the two vectors) if len(mask) == 0: return 0 else: return np.linalg.norm(a[mask] - b[mask]) # ------------------ Getters, Setters & Logging --------------------- def _set_output_goal_state(self, new_goal_state): """Set the output goal-state of the GSG.""" self.output_goal_state = new_goal_state def _update_gsg_logging(self, output_goal_achieved: bool): """Update any logging information (stored in the parent LM's buffer). Update any logging information (stored in the parent LM's buffer), such as the matching step on which an output goal-state was output. """ # Only consider output-state achieved for the purpose of logging when the # output-goal state is meaningful (i.e. not None) if self.output_goal_state is not None: # Subtract 1 as the goal-state was actually set (and potentially achieved) # on the previous step, we are simply first checking it now match_step = self.parent_lm.buffer.get_num_matching_steps() - 1 self.parent_lm.buffer.update_stats( dict( matching_step_when_output_goal_set=match_step, goal_state_achieved=output_goal_achieved, ), update_time=False, append=True, init_list=True, )
[docs]class EvidenceGoalStateGenerator(GraphGoalStateGenerator): """Generator of goal states for an evidence-based graph LM. GSG specifically setup for generating goal states for an evidence-based graph LM, which can therefore leverage the hypothesis-testing action policy. This policy uses hypotheses about the most likely objects, as well as knowledge of their structure from long-term memory, to propose test-points that should efficiently disambiguate the ID or pose of the object the agent is currently observing. TODO M separate out the hypothesis-testing policy (which is one example of a model-based policy), from the GSG, which is the system that is capable of leveraging a variety of model-based policies. """ def __init__( self, parent_lm, goal_tolerances=None, elapsed_steps_factor=10, min_post_goal_success_steps=np.infty, x_percent_scale_factor=0.75, desired_object_distance=0.03, **kwargs, ) -> None: """Initialize the Evidence GSG. Args: parent_lm: ? goal_tolerances: ? elapsed_steps_factor: Factor that considers the number of elapsed steps as a possible condition for initiating a hypothesis-testing goal state; should be set to an integer reflecting a number of steps. In general, when we have taken number of non-goal-state driven steps greater than elapsed_steps_factor, then this is an indiciation to initiate a hypothesis-testing goal-state. In addition however, we can multiply elapsed_steps_factor by an exponentiall increasing wait-factor, such that we use longer and longer intervals as the experiment continues. Defaults to 10. min_post_goal_success_steps: Number of necessary steps for a hypothesis goal-state to be considered. Unlike elapsed_steps_factor, this is a *necessary* criteria for us to generate a new hypothesis-testing goal-state. For example, if set to 5, then the agent must take 5 non-hypothesis-testing steps before it can even consider generating a new hypothesis-testing goal-state. Infinity by default, resulting in no use of the hypothesis-testing policy (desirable for unit tests etc.). Defaults to np.infty. x_percent_scale_factor: Scale x-percent threshold to decide when to focus on pose rather than determining object ID; in particular, this is used to determine whether the top object is sufficiently more likely (based on MLH evidence) than the second MLH object to warrant focusing on disambiguating the pose of the first; should be bounded between 0:1.0. If x_percent_scale_factor=1.0, then will wait until the standard x-percent threshold is exceeded, equivalent to the LM converging to a single object, but not a pose. If it is <1.0, then we will start testing pose of the MLH object even before we are entirely certain about its ID. Defaults to 0.75. desired_object_distance: The desired distance between the agent and the object, which is used to determine whether the agent is close enough to the object to consider it "achieved". Note this need not be the same as the one specified for the motor-system (e.g. the surface-policy), as we may want to aim for an initially farther distance, while the surface-policy may want to stay quite close to the object. Defaults to 0.03. **kwargs: Additional keyword arguments. """ super().__init__(parent_lm, goal_tolerances, **kwargs) self.elapsed_steps_factor = elapsed_steps_factor self.min_post_goal_success_steps = min_post_goal_success_steps self.x_percent_scale_factor = x_percent_scale_factor self.desired_object_distance = desired_object_distance # ======================= Public ========================== # ------------------- Main Algorithm -----------------------
[docs] def reset(self): """Reset additional parameters specific to the Evidence GSG.""" super().reset() self.focus_on_pose = False # Whether the jump should be executed to focus on # distinguishing between possible poses of the current MLH object, rather # than trying to distinguish different possible object IDs. self.wait_factor = 1 # Initial value; scales how long to wait before the # next jump attempt self.prev_top_mlhs = None # Store the top two object hypothesis IDs from
# previous hypothesis-testing actions; used to track when these have changed, # and therefore a possible reason to initiate another hypothesis-testing action. # TODO M consider moving to buffer. # ======================= Private ========================== # ------------------- Main Algorithm ----------------------- def _generate_goal_state(self, observations) -> list: """Use the hypothesis-testing policy to generate a goal-state. The goal-state will rapidly disambiguate the pose and/or ID of the object the LM is currently observing. Returns: A goal-state for the motor system. """ # Determine where we want to test in the MLH graph target_loc_id, target_separation = self._compute_graph_mismatch() # Get pose information for the target point target_info = self._get_target_loc_info(target_loc_id) # Estimate how important this goal-state will be for the Monty-system as a # whole goal_confidence = self._compute_goal_confidence( lm_output_confidence=self.parent_lm.get_output().confidence, separation=target_separation, ) # Compute the goal-state (for the motor-actuator) motor_goal_state = self._compute_goal_state_for_target_loc( observations, target_info, goal_confidence=goal_confidence, ) return motor_goal_state def _compute_graph_mismatch(self): """Propose a point for the model to test. The aim is to propose a point for the model to test, with the aim of performing object pose and ID recognition, by looking at the graph of the most likely object, and comparing it to the graph of the second most likely object. If there is a local mismatch in the graphs (e.g. the presense of a handle in one and not the other), this should return the necessary coordinates to move there. Returns the index of the point in the graph of the most likely object that should be tested, along with the size of the L-2 separation of this point to the nearest point in the graph of the second most likely object. --- Some Details --- Part of this method transforms the graph of the most likely object into the reference frame in which the 2nd most-likely object was *learned*. We perform this operation because when comparing the two point-clouds (using the points of the most-likely object as queries), we can then re-use the KDTree already constructed for the 2nd most-likely object, hence the importance of being in that reference frame TODO M eventually can try looking at more objects, or flipping the MLH object - e.g. if the two most likely are the mug and one of the handle-less cups, then depending on the order in which we compare them, we may not actually identify the handle as a good candidate to test (i.e. if one graph is entirely a subset of the other graph). We could use the returned separation distance to estimate which of these approaches would be better. - i.e. imagine 2nd MLH is a mug with a handle, and MLH has no handle; when checking all the points for the handleless mug, there will always be nearby points - Re. implementing this : could start with the MLH as the query points, looking for points with minimal neighbors with the 2nd most likely graph; if found too many neighbors in a given radius (threshold dependent), this suggest the 1st MLH graph is a sub-graph of the 2nd MLH; therefore, check whether the 2nd graph has any points with few neighbors with the first; if still many neighbors, this could then serve as a learning signal to merge the graphs? (at least at some levels of hierarchy) --> NB merging should use "picky" graph-building method to ensure we don't just double the number of points in the graph unecessarily TODO M consider adding a factor so that we ensure our testing spot is also far away from any previously visited locations (at least according to MLH path), including our current location. Returns: The index of the point in the model to test. """ logging.debug("Proposing an evaluation location based on graph mismatch") top_id, second_id = self.parent_lm.get_top_two_mlh_ids() top_mlh = self.parent_lm.get_mlh_for_object(top_id) # Determine the second most-likely object for saving to history, even if we # are going to focus on pose mismatch second_mlh_object = self.parent_lm.get_mlh_for_object(second_id) top_mlh_graph = self.parent_lm.get_graph(top_id, input_channel="first").pos if self.focus_on_pose: # Overwrite the second most likely hypothesis with the second most likely # *pose* of the most-likely object second_id = top_id _, second_mlh = self.parent_lm.get_top_two_pose_hypotheses_for_graph_id( top_id ) else: second_mlh = second_mlh_object # == Corrective transformation == # Fully correct the origin and rotation of the top object's graph so it is in # the same reference frame as the second object's graph was learned # Note the graph of the second most likely object is already in the same # reference frame (i.e. the one from learning) used when constructing the KDTree # Convert to environmental coordinates, and normalize by current MLH location # TODO M refactor this into a function that can be reapplied to arbitrary graphs # Note the MLH rotation is the rotation required to match a displacement to # a model, so it is the *inverse* of e.g. the ground-truth rotation # TODO M: See if apply_rf_transform_to_points could be used here rotated_graph = top_mlh["rotation"].inv().apply(top_mlh_graph) current_mlh_location = top_mlh["rotation"].inv().apply(top_mlh["location"]) top_mlh_graph = rotated_graph - current_mlh_location # Convert from environmental coordinates to the learned coordinate of 2nd object # Thus we don't need to invert the stored rotation, as we would like to actually # apply the inverse form. top_mlh_graph = ( second_mlh["rotation"].apply(top_mlh_graph) + second_mlh["location"] ) # Perform the same transformation to the estimated location (sanity check) transformed_current_loc = top_mlh["rotation"].inv().apply( top_mlh["location"] ) - top_mlh["rotation"].inv().apply(top_mlh["location"]) transformed_current_loc = ( second_mlh["rotation"].apply(transformed_current_loc) + second_mlh["location"] ) assert np.all( transformed_current_loc == second_mlh["location"] ), "Graph transformaiton to 2nd object reference frame not returning correct\ transformed location" # Perform kdtree search to identify the point with the most distant # nearest-neighbor # Note we ultimately want the target location to be one on the most likely # graph, so we pass the top-MLH graph in as the qeury points radius_node_dists = self.parent_lm.get_graph( second_id, input_channel="first" ).find_nearest_neighbors( top_mlh_graph, num_neighbors=1, return_distance=True, ) target_loc_id = np.argmax(radius_node_dists) target_loc_separation = np.max(radius_node_dists) self.prev_top_mlhs = [top_mlh, second_mlh_object] return target_loc_id, target_loc_separation def _get_target_loc_info(self, target_loc_id): """Given a target location ID, get the target location and pose vectors. Note: Currently assumes we are computing with the MLH graph. Returns: A dictionary containing the hypothesis to test, the target location and point-normal of the target point on the object. """ mlh = self.parent_lm.get_current_mlh() mlh_id = mlh["graph_id"] target_object = self.parent_lm.get_graph(mlh_id) sensor_channel_name = self.parent_lm.buffer.get_first_sensory_input_channel() target_graph = target_object[sensor_channel_name] target_loc = target_graph.pos[target_loc_id] pn_mapping = target_graph.feature_mapping["pose_vectors"] target_pn = target_graph.x[target_loc_id, pn_mapping[0] : pn_mapping[0] + 3] target_info = { "hypothesis_to_test": mlh, "target_loc": target_loc, "target_pn": target_pn, } return target_info def _compute_goal_confidence( self, lm_output_confidence, separation, space_size=1.0, confidence_weighting=0.1 ): """Calculate the confidence of the goal-state. The confidence is based on the e.g. separation in hypothesis-space between the two MLH, and the confidence associated with the MLH classificaiton of the parent LM. Currently just retuns the confidence of the parent LM but TODO M implement a more sophisticated function. TODO M How to normalize the displacement? Could put through a sigmoid, that is perhaps scaled by the size of the object? Could divide by e.g. the size of the object to make it likely to be <1, and then clip it; that way any subtle differences between LMs is likely to be preserved, i.e. rather than them all clipping to 1.0; can then just make sure this value is weighted heavily compared to confidence when computing the overall strenght of the goal-state. - size of the object could be estimated from the minimum and maximum corners - or use the max size of the graph --> Note this doesn't account for the actual size of the object, and these grid-models are not currently used Returns: The confidence of the goal-state. """ # Provisional implementation: # squashed_displacement = np.clip(separation / space_size, 0, 1) # goal_confidence = squashed_displacement + confidence_weighting # * lm_output_confidence goal_confidence = lm_output_confidence return goal_confidence def _compute_goal_state_for_target_loc( self, observations, target_info, goal_confidence=1.0 ): """Specify a goal state for the motor-actuator. Based on a target location (in object-centric coordinates) and the associated point-normal of that location, specify a goal state for the motor-actuator, such that any sensors associated with the motor-actuator should be pointed down at and observing the target location (i.e. parallel to the point-normal). For the movement to have a high probability of arriving at the desired location, the current hypothesis of the object ID and pose used to inform the movement should be correct, although subsequent observations may still provide useful information to the agent, i.e. even if we are wrong about the object ID and pose. Args: observations: The current observations, which should include the sensory input. target_info: A dictionary containing the target location and point-normal of the target point on the object. goal_confidence: The confidence of the goal-state, which should be in the range [0, 1]. This is used by receiving modules to weigh the importance of the goal-state relative to other goal-states. Returns: GoalState: A goal-state for the motor-actuator. """ # Determine the displacement, and therefore the environmental target location, # that we will use sensor_channel_name = self.parent_lm.buffer.get_first_sensory_input_channel() sensory_input = get_state_from_channel( states=observations, channel_name=sensor_channel_name ) displacement = ( target_info["target_loc"] - target_info["hypothesis_to_test"]["location"] ) object_rot = target_info["hypothesis_to_test"]["rotation"].inv() # MLH rotation # is stored as the rotation needed to convert a displacement to the object pose, # so the *object pose* is given by its inverse # Rotate the displacement; note we're converting from an *internal* object frame # of reference, to the global frame of reference; thus, we rotate not by the # inverse, but by the actual object orientation. rotated_disp = object_rot.apply(displacement) # The target location on the object's surface in global/body-centric coordinates proposed_surface_loc = sensory_input.location + rotated_disp # Rotate the learned point normal (which was commited to memory assuming a # default 0,0,0 orientation of the object) target_pn_rotated = object_rot.apply(target_info["target_pn"]) # Scale the point-normal by the desired distance x1.5 (i.e. so that we start # a bit further away from the object; we will separately move forward if we # are indeed facing it) surface_displacement = target_pn_rotated * self.desired_object_distance * 1.5 target_loc = proposed_surface_loc + surface_displacement motor_goal_state = GoalState( location=np.array(target_loc), morphological_features={ # Note the hypothesis-testing policy does not specify the roll of the # agent, because this is not relevant to the task "pose_vectors": np.array( [ (-1) * target_pn_rotated, [np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan], ] ), "pose_fully_defined": None, "on_object": 1, }, non_morphological_features=None, confidence=goal_confidence, use_state=True, sender_id=self.parent_lm.learning_module_id, sender_type="GSG", goal_tolerances=None, ) # TODO M consider also using the below sensor-predicted state as an additional # evalaution of how much we have achieved our goal, i.e. consistent with the # object we thought we were on; could have more detailed information using the # internal object model # sensor_predicted_state = GoalState( # location=np.array(proposed_surface_loc), # morphological_features={ # # Note the hypothesis-testing policy does not specify the roll of the # # agent, because this is not relevant to the task # "pose_vectors": np.array( # [ # target_pn_rotated, # [np.nan, np.nan, np.nan], # [np.nan, np.nan, np.nan], # ] # ), # "pose_fully_defined": None, # "on_object": 1, # }, # non_morphological_features=None, # confidence=goal_confidence, # use_state=True, # sender_id=self.parent_lm.learning_module_id, # sender_type="GSG", # goal_tolerances=None, # ) return motor_goal_state def _check_need_new_output_goal(self, output_goal_achieved) -> bool: """Determine whether the GSG should generate a new output goal-state. Unlike the base version, success in achieving the goal-state is not an indication to need a new goal, because we should now be exploring a new part of the hypothesis-space, and so want to stay there for some time. Returns: Whether the GSG should generate a new output goal-state. """ if output_goal_achieved: return False else: return self._check_conditions_for_hypothesis_test() def _check_keep_current_output_goal(self) -> bool: """Determine whether the GSG should keep the current goal-state. Hypothesis-testing actions should be executed as one-off attempts, lest we get stuck in a loop of trying to achieve the same goal that is impossible (e.g. due to collision with objects). Returns: Whether the GSG should keep the current goal-state. Always returns False. """ return False def _check_conditions_for_hypothesis_test(self): """Check if good chance to discriminate between conflicting object IDs or poses. Evaluates possible conditions for performing a hypothesis-guided action for pose and object ID determination, i.e. determines whether there is a good chance of discriminating between conflicting object IDs or poses. The schedule is designed to balance descriminating the pose and objects as efficiently as possible; TODO M future work can use the schedule conditions as primitives and use RL or evolutionary algorithms to optimize the relevant parameters. TODO M each of the below conditions could be their own method; could then pass a set of keys which we iterate through, and thereby quickly test as a hyper-parameter which of these are worth keeping, and which of these we should get rid of. Returns: Whether there's a good chance to discriminate between conflicting object IDs or poses. """ num_elapsed_steps = self._get_num_steps_post_output_goal_generated() self.focus_on_pose = False # Default if num_elapsed_steps <= self.min_post_goal_success_steps: # Exceeding this threshold is necessary to consider a jump return False # === Collect additional information that will be used to check conditions # for initializing a jump === top_id, second_id = self.parent_lm.get_top_two_mlh_ids() if top_id == second_id: # If we only know (i.e. have learned) about one object, we can focus on pose # In this case, get_top_two_mlh_ids returns the same IDs for top_id and # second_id self.focus_on_pose = True return True # Used to check if pose for top MLH has changed top_mlh = self.parent_lm.get_current_mlh() # If the MLH evidence is significantly above the second MLH (where "significant" # is determined by x_percent_scale_factor below), then focus on descriminating # its pose on some (random) occasions; always focus on pose if we've convereged # to one object # TODO M update so that not accessing private methods here; part of 2nd phase # of refactoring pm_base_thresh = self.parent_lm._threshold_possible_matches() pm_smaller_thresh = self.parent_lm._threshold_possible_matches( x_percent_scale_factor=self.x_percent_scale_factor ) if ( len(pm_smaller_thresh) == 1 and (self.parent_lm.rng.uniform() <= 0.5) ) or len(pm_base_thresh) == 1: # We always focus on pose if there is just 1 possible match - if we are part # of the way towards being certain about the ID # (len(pm_smaller_thresh) == 1), then we sometimes (hence the randomness) # focus on pose. logging.debug( "Hypothesis jump indicated: One object more likely, focusing on pose" ) self.focus_on_pose = True return True # If the identities or *order* (i.e. which one is most likely) # of the top two MLH changes, perform a new jump test, as this # is a reasonable heuristic for us having a new interesting # place to test # TODO when optimizing, consider using np.any rather than np.all, i.e. as long # as there is any change in the top two MLH elif self.prev_top_mlhs is not None and np.all( [ self.prev_top_mlhs[0]["graph_id"], self.prev_top_mlhs[1]["graph_id"], ] != [top_id, second_id] ): logging.debug( "Hypothesis jump indicated: change or shuffle in top-two MLH IDs" ) return True # If the most-likely pose of the top object has changed (e.g. we didn't find # the mug handle following a previous jump, and have therefore eliminated a # pose), then we are likely to gain new information by performing another jump # TODO expand this to handle change in translationm/location pose as well # TODO add a parameter that specifies the angle between the two poses above # which we consider it a new pose (rather than it needing to be identical) elif self.prev_top_mlhs is not None and np.all( top_mlh["rotation"].as_euler("xyz") != self.prev_top_mlhs[0]["rotation"].as_euler("xyz") ): logging.debug( "Hypothesis jump indicated: change in most-likely rotation of MLH" ) return True # Otherwise, if a sufficient number of steps have elapsed, # still perform a jump; note however that this threshold exponentially # increases, so that we avoid continuously returning to the same location elif num_elapsed_steps % (self.wait_factor * self.elapsed_steps_factor) == 0: logging.debug( "Hypothesis jump indicated: sufficient steps elapsed with no jump" ) self.wait_factor *= 2 return True else: return False def _get_num_steps_post_output_goal_generated(self): """Number of steps since last output goal-state. Returns: The number of Monty-matching steps that have elapsed since the last time an output goal-state was generated. """ return self.parent_lm.buffer.get_num_steps_post_output_goal_generated()