Source code for tbp.monty.frameworks.models.motor_policies

# Copyright 2025 Thousand Brains Project
# Copyright 2021-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

from __future__ import annotations

import abc
import copy
import json
import logging
import math
import os
from dataclasses import dataclass, field
from typing import (
    Any,
    Dict,
    List,
    Literal,
    Mapping,
    Optional,
    Tuple,
    Type,
    cast,
)

import numpy as np
import quaternion as qt
import scipy.ndimage
from scipy.spatial.transform import Rotation as rot  # noqa: N813

from tbp.monty.frameworks.actions.action_samplers import ActionSampler
from tbp.monty.frameworks.actions.actions import (
    Action,
    ActionJSONDecoder,
    ActionJSONEncoder,
    LookDown,
    LookUp,
    MoveForward,
    MoveTangentially,
    OrientHorizontal,
    OrientVertical,
    SetAgentPose,
    SetSensorRotation,
    TurnLeft,
    TurnRight,
    VectorXYZ,
)
from tbp.monty.frameworks.models.motor_system_state import AgentState, MotorSystemState
from tbp.monty.frameworks.utils.spatial_arithmetics import get_angle_beefed_up
from tbp.monty.frameworks.utils.transform_utils import scipy_to_numpy_quat


[docs]class MotorPolicy(abc.ABC): """The abstract scaffold for motor policies.""" def __init__(self) -> None: self.is_predefined = False
[docs] @abc.abstractmethod def dynamic_call(self, state: Optional[MotorSystemState] = None) -> Action: """Use this method when actions are not predefined. Args: state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. Returns: (Action): The action to take. """ pass
@property @abc.abstractmethod def last_action(self) -> Action: """Returns the last action taken by the motor policy.""" pass
[docs] @abc.abstractmethod def post_action( self, action: Action, state: Optional[MotorSystemState] = None ) -> None: """This post action hook will automatically be called at the end of __call__. TODO: Remove state parameter as it is only used to serialize the state in state.convert_motor_state() and should be done within the motor system. Args: action (Action): The action to process the hook for. state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. """ pass
[docs] @abc.abstractmethod def post_episode(self) -> None: """Post episode hook.""" pass
[docs] @abc.abstractmethod def pre_episode(self) -> None: """Pre episode hook.""" pass
[docs] @abc.abstractmethod def predefined_call(self) -> Action: """Use this method when actions are predefined. Returns: (Action): The action to take. """ pass
[docs] @abc.abstractmethod def set_experiment_mode(self, mode: Literal["train", "eval"]) -> None: """Sets the experiment mode. Args: mode (Literal["train", "eval"]): The experiment mode to set. """ pass
def __call__(self, state: Optional[MotorSystemState] = None) -> Action: """Select either dynamic or predefined call. Args: state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. Returns: (Action): The action to take. """ if self.is_predefined: action = self.predefined_call() else: action = self.dynamic_call(state) self.post_action(action, state) return action
[docs]class BasePolicy(MotorPolicy): def __init__( self, rng, action_sampler_args: Dict, action_sampler_class: Type[ActionSampler], agent_id: str, switch_frequency, file_name=None, file_names_per_episode=None, ): """Initialize a base policy. Args: rng: Random number generator to use action_sampler_args: arguments for the ActionSampler action_sampler_class: The ActionSampler to use agent_id: The agent ID switch_frequency: float in [0,1], how frequently to change actions when using sticky actions file_name: Path to file with predefined actions. Defaults to None. file_names_per_episode: ?. Defaults to None. """ super().__init__() ### # Define instance attributes ### self.rng = rng self.agent_id = agent_id self.action_sampler = action_sampler_class(rng=self.rng, **action_sampler_args) self.action_sequence = [] self.timestep = 0 self.episode_step = 0 self.episode_count = 0 self.switch_frequency = float(switch_frequency) # Ensure our first action only samples from those that can be random self.action = self.get_random_action(self.action_sampler.sample(self.agent_id)) ### # Load data for predefined actions and amounts if specified ### self.is_predefined = False self.file_names_per_episode = None self.action_list = [] # Don't want to go around and change all uses of file_name, so this is argument # is in addition to, rather than in replacement of, file_name if file_names_per_episode is not None: self.file_names_per_episode = file_names_per_episode # Have to set this here bc file_names_per_episode is used for loading in # post_episode so won't do anything for the first episode. file_name = file_names_per_episode[0] self.is_predefined = True if file_name is not None: self.action_list = read_action_file(file_name) self.is_predefined = True ### # Methods that define behavior of __call__ ###
[docs] def dynamic_call(self, _state: Optional[MotorSystemState] = None) -> Action: """Return a random action. The MotorSystemState is ignored. Args: _state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. Unused. Returns: (Action): A random action. """ return self.get_random_action(self.action)
[docs] def get_random_action(self, action: Action) -> Action: """Returns random action sampled from allowable actions. Enables expanding the action space of the base policy with actions that we don't necessarily want to randomly sample """ while True: if self.rng.rand() < self.switch_frequency: action = self.action_sampler.sample(self.agent_id) if not isinstance(action, SetAgentPose) and not isinstance( action, SetSensorRotation ): return action
[docs] def predefined_call(self) -> Action: return self.action_list[self.episode_step % len(self.action_list)]
[docs] def post_action(self, action: Action, _: Optional[MotorSystemState] = None) -> None: self.action = action self.timestep += 1 self.episode_step += 1 self.action_sequence.append([action])
[docs] def pre_episode(self): self.episode_step = 0 self.action_sequence = []
[docs] def post_episode(self): self.episode_count += 1 if self.file_names_per_episode is not None: if self.episode_count in self.file_names_per_episode: file_name = self.file_names_per_episode[self.episode_count] self.action_list = read_action_file(file_name)
### # Other required abstract methods, methods called by Monty or Dataloader ###
[docs] def get_agent_state(self, state: MotorSystemState) -> AgentState: """Get agent state (dict). Note: Assumes we only have one agent. Args: state (MotorSystemState): The current state of the motor system. Returns: (AgentState): Agent state. """ return state[self.agent_id]
[docs] def is_motor_only_step(self, state: MotorSystemState): """Check if the current step is a motor-only step. TODO: This information is currently stored in motor system state, but should be stored in the policy state instead as it is tracking policy state, not motor system state. This will remove MotorSystemState param. Args: state (MotorSystemState): The current state of the motor system. Returns: bool: True if the current step is a motor-only step, False otherwise. """ agent_state = self.get_agent_state(state) if "motor_only_step" in agent_state.keys() and agent_state["motor_only_step"]: return True else: return False
@property def last_action(self) -> Action: return self.action
[docs] def state_dict(self): return {"timestep": self.timestep, "episode_step": self.episode_step}
[docs] def load_state_dict(self, state_dict): self.timestep = state_dict["timestep"] self.episode_step = state_dict["episode_step"]
[docs] def set_experiment_mode(self, mode: Literal["train", "eval"]) -> None: pass
[docs]class JumpToGoalStateMixin: """Convert driving goal state to an action in Habitat-compatible coordinates. Motor policy that enables us to take in a driving goal state for the motor agent, and specify the action in Habitat-compatible coordinates that must be taken to move there. """ def __init__(self) -> None: self.driving_goal_state = None
[docs] def pre_episode(self): self.set_driving_goal_state(None)
[docs] def set_driving_goal_state(self, goal_state): """Specify the goal-state that the motor-actuator will attempt to satisfy.""" self.driving_goal_state = goal_state
[docs] def derive_habitat_goal_state(self): """Derive the Habitat-compatible goal state. Take the current driving goal state (in CMP format), and derive the corresponding Habitat compatible goal-state to pass through the Embodied Dataloader. Returns: target_loc: Target location. target_quat: Target quaternion. """ if self.driving_goal_state is not None: target_loc = self.driving_goal_state.location target_agent_vec = self.driving_goal_state.morphological_features[ "pose_vectors" ][0] yaw_angle = math.atan2(-target_agent_vec[0], -target_agent_vec[2]) pitch_angle = math.asin(target_agent_vec[1]) # Should rotate by pitch degrees around x, and by yaw degrees around y (and # no change about z, which would correspond to roll) scipy_combined_orientation = rot.from_euler( "xyz", [pitch_angle, yaw_angle, 0], degrees=False, ) target_quat = scipy_to_numpy_quat(scipy_combined_orientation.as_quat()) # Reset driving goal state and await further inputs self.set_driving_goal_state(None) return target_loc, target_quat else: return None, None
[docs]@dataclass class PositioningProcedureResult: """Result of a positioning procedure. For more on the terminated/truncated terminology, see https://farama.org/Gymnasium-Terminated-Truncated-Step-API. """ actions: List[Action] = field(default_factory=list) """Actions to take.""" success: bool = False """Whether the procedure succeeded in its positioning goal.""" terminated: bool = False """Whether the procedure reached a terminal state with success or failure.""" truncated: bool = False """Whether the procedure was truncated due to a limit on the number of attempts or other criteria."""
[docs]class PositioningProcedure(BasePolicy): """Positioning procedure to position the agent in the scene. TODO: Remove from MotorPolicy hierarchy and refactor to standalone PositioningProcedure hierarchy when they get separated. The positioning_call method should be repeatedly called until the procedure result indicates that the procedure has terminated or truncated. """
[docs] @abc.abstractmethod def positioning_call( self, observation: Mapping, state: Optional[MotorSystemState] = None, ) -> PositioningProcedureResult: """Return a list of actions to position the agent in the scene. TODO: When this becomes a PositioningProcedure it can be a __call__ method. Args: observation (Optional[Mapping]): The observation to use for positioning. state (Optional[MotorSystemState]): The current state of the motor system. Returns: (PositioningProcedureResult): Any actions to take, whether the procedure succeeded, whether the procedure terminated, and whether the procedure truncated. """ pass
[docs]class GetGoodView(PositioningProcedure): """Positioning procedure to get a good view of the object before an episode. Used to position the distant agent so that it finds the initial view of an object at the beginning of an episode with respect to a given sensor (the surface agent is positioned using the TouchObject positioning procedure instead). Also currently used by the distant agent after a "jump" has been initialized by a model-based policy. First, the agent is moved towards the target object until the object fills a minimum of percentage (given by `good_view_percentage`) of the sensor's field of view or the closest point of the object is less than `desired_object_distance` from the sensor. This makes sure that big and small objects all fill similar amount of space in the sensor's field of view. Otherwise small objects may be too small to perform saccades or the sensor ends up inside of big objects. This step is performed by default but can be skipped by setting `allow_translation=False`. Second, the agent will then be oriented towards the object so that the sensor's central pixel is on-object. In the case of multi-object experiments, (i.e., when `multiple_objects_present=True`), there is an additional orientation step performed prior to the translational movement step. """ def __init__( self, desired_object_distance: float, good_view_percentage: float, multiple_objects_present: bool, sensor_id: str, target_semantic_id: int, allow_translation: bool = True, max_orientation_attempts: int = 1, **kwargs, ) -> None: """Initialize the GetGoodView policy. Args: desired_object_distance (float): The desired distance to the object. good_view_percentage (float): The percentage of the sensor that should be filled with the object. multiple_objects_present (bool): Whether there are multiple objects in the scene. sensor_id (str): The ID of the sensor to use for positioning. target_semantic_id (int): The semantic ID of the target object. allow_translation (bool): Whether to allow movement toward the object via the motor systems's move_close_enough method. If False, only orientienting movements are performed. Defaults to True. max_orientation_attempts (int): The maximum number of orientation attempts allowed before giving up and truncating the procedure indicating that the sensor is not on the target object. **kwargs: Additional keyword arguments. """ super().__init__(**kwargs) self._desired_object_distance = desired_object_distance self._good_view_percentage = good_view_percentage self._multiple_objects_present = multiple_objects_present self._sensor_id = sensor_id self._target_semantic_id = target_semantic_id self._allow_translation = allow_translation self._max_orientation_attempts = max_orientation_attempts self._num_orientation_attempts = 0
[docs] def compute_look_amounts( self, relative_location: np.ndarray, state: Optional[MotorSystemState] = None ) -> Tuple[float, float]: """Compute the amount to look down and left given a relative location. This function computes the amount needed to look down and left in order for the sensor to be aimed at the target. The returned amounts are relative to the agent's current position and rotation. Looking up and right is done by returning negative amounts. TODO: Test whether this function works when the agent is facing in the positive z-direction. It may be fine, but there were some adjustments to accommodate the z-axis positive direction pointing opposite the body's initial orientation (e.g., using negative `z` in `left_amount = -np.degrees(np.arctan2(x_rot, -z_rot)))`. Args: relative_location: the x,y,z coordinates of the target with respect to the sensor. state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. Returns: down_amount: Amount to look down (degrees). left_amount: Amount to look left (degrees). """ sensor_rotation_rel_world = self.sensor_rotation_relative_to_world(state) # Invert the sensor rotation and apply it to the relative location w, x, y, z = qt.as_float_array(sensor_rotation_rel_world) rotation = rot.from_quat([x, y, z, w]) rotated_location = rotation.inv().apply(relative_location) # Calculate the necessary rotation amounts and convert them to degrees. x_rot, y_rot, z_rot = rotated_location left_amount = -np.degrees(np.arctan2(x_rot, -z_rot)) distance_horiz = np.sqrt(x_rot**2 + z_rot**2) down_amount = -np.degrees(np.arctan2(y_rot, distance_horiz)) return down_amount, left_amount
[docs] def find_location_to_look_at( self, sem3d_obs: np.ndarray, image_shape: Tuple[int, int], state: Optional[MotorSystemState] = None, ) -> np.ndarray: """Find the location to look at in the observation. Takes in a semantic 3D observation and returns an x,y,z location. The location is on the object and surrounded by pixels that are also on the object. This is done by smoothing the on_object image and then taking the maximum of this smoothed image. Args: sem3d_obs (np.ndarray): The location of each pixel and the semantic ID associated with that location. image_shape (Tuple[int, int]): The shape of the camera image. state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. Returns: (np.ndarray): The x,y,z coordinates of the target with respect to the sensor. """ sem3d_obs_image = sem3d_obs.reshape((image_shape[0], image_shape[1], 4)) on_object_image = sem3d_obs_image[:, :, 3] if not self._multiple_objects_present: on_object_image[on_object_image > 0] = self._target_semantic_id on_object_image = on_object_image == self._target_semantic_id on_object_image = on_object_image.astype(float) # TODO add unit test that we make sure find_location_to_look at functions # as expected, which can otherwise break if e.g. on_object_image is passed # as an int or boolean rather than float kernel_size = on_object_image.shape[0] // 16 smoothed_on_object_image = scipy.ndimage.gaussian_filter( on_object_image, kernel_size, mode="constant" ) idx_loc_to_look_at = np.argmax(smoothed_on_object_image * on_object_image) idx_loc_to_look_at = np.unravel_index(idx_loc_to_look_at, on_object_image.shape) location_to_look_at = sem3d_obs_image[ idx_loc_to_look_at[0], idx_loc_to_look_at[1], :3 ] camera_location = self.get_agent_state(state)["sensors"][ f"{self._sensor_id}.depth" ]["position"] agent_location = self.get_agent_state(state)["position"] # Get the location of the object relative to sensor. relative_location = location_to_look_at - (camera_location + agent_location) return relative_location
[docs] def is_on_target_object(self, observation: Mapping) -> bool: """Check if a sensor is on the target object. Args: observation (Mapping): The observation to use for positioning. Returns: bool: Whether the sensor is on the target object. """ # Reconstruct the 2D semantic/surface map embedded in 'semantic_3d'. image_shape = observation[self.agent_id][self._sensor_id]["depth"].shape[0:2] semantic_3d = observation[self.agent_id][self._sensor_id]["semantic_3d"] semantic = semantic_3d[:, 3].reshape(image_shape).astype(int) if not self._multiple_objects_present: semantic[semantic > 0] = self._target_semantic_id # Check if the central pixel is on the target object. y_mid, x_mid = image_shape[0] // 2, image_shape[1] // 2 on_target_object = semantic[y_mid, x_mid] == self._target_semantic_id return on_target_object
[docs] def move_close_enough(self, observation: Mapping) -> Action | None: """Move closer to the object until we are close enough. Args: observation (Mapping): The observation to use for positioning. Returns: (Action | None): The next action to take, or None if we are already close enough to the object. Raises: ValueError: If the object is not visible. """ # Reconstruct 2D semantic map. depth_image = observation[self.agent_id][self._sensor_id]["depth"] semantic_3d = observation[self.agent_id][self._sensor_id]["semantic_3d"] semantic_image = semantic_3d[:, 3].reshape(depth_image.shape).astype(int) if not self._multiple_objects_present: semantic_image[semantic_image > 0] = self._target_semantic_id points_on_target_obj = semantic_image == self._target_semantic_id n_points_on_target_obj = points_on_target_obj.sum() # For multi-object experiments, handle the possibility that object is no # longer visible. if self._multiple_objects_present and n_points_on_target_obj == 0: logging.debug("Object not visible, cannot move closer") return None if n_points_on_target_obj > 0: closest_point_on_target_obj = np.min(depth_image[points_on_target_obj]) logging.debug( "closest target object point: " + str(closest_point_on_target_obj) ) else: raise ValueError( "May be initializing experiment with no visible target object" ) perc_on_target_obj = get_perc_on_obj_semantic( semantic_image, semantic_id=self._target_semantic_id ) logging.debug("% on target object: " + str(perc_on_target_obj)) # Also calculate closest point on *any* object so that we don't get too close # and clip into objects; NB that any object will have a semantic ID > 0 points_on_any_obj = semantic_image > 0 closest_point_on_any_obj = np.min(depth_image[points_on_any_obj]) logging.debug("closest point on any object: " + str(closest_point_on_any_obj)) if perc_on_target_obj < self._good_view_percentage: if closest_point_on_target_obj > self._desired_object_distance: if self._multiple_objects_present and ( closest_point_on_any_obj < self._desired_object_distance / 4 ): logging.debug( "Getting too close to other objects, not moving forward." ) return None else: logging.debug("Moving forward") return MoveForward(agent_id=self.agent_id, distance=0.01) else: logging.debug("Close enough.") return None else: logging.debug("Enough percent visible.") return None
[docs] def orient_to_object( self, observation: Mapping, state: Optional[MotorSystemState] = None ) -> List[Action]: """Rotate sensors so that they are centered on the object using the view finder. The view finder needs to be in the same position as the sensor patch and the object needs to be somewhere in the view finders view. Args: observation (Mapping): The observation to use for positioning. state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. Returns: (List[Action]): A list of actions of length two composed of actions needed to get us onto the target object. """ # Reconstruct 2D semantic map. depth_image = observation[self.agent_id][self._sensor_id]["depth"] obs_dim = depth_image.shape[0:2] sem3d_obs = observation[self.agent_id][self._sensor_id]["semantic_3d"] sem_obs = sem3d_obs[:, 3].reshape(obs_dim).astype(int) if not self._multiple_objects_present: sem_obs[sem_obs > 0] = self._target_semantic_id logging.debug("Searching for object") relative_location = self.find_location_to_look_at( sem3d_obs, image_shape=obs_dim, state=state, ) down_amount, left_amount = self.compute_look_amounts( relative_location, state=state ) return [ LookDown(agent_id=self.agent_id, rotation_degrees=down_amount), TurnLeft(agent_id=self.agent_id, rotation_degrees=left_amount), ]
[docs] def positioning_call( self, observation: Mapping, state: Optional[MotorSystemState] = None, ) -> PositioningProcedureResult: if self._multiple_objects_present: on_target_object = self.is_on_target_object(observation) if not on_target_object: return PositioningProcedureResult( actions=self.orient_to_object(observation, state) ) if self._allow_translation: action = self.move_close_enough(observation) if action is not None: logging.debug("Moving closer to object.") return PositioningProcedureResult(actions=[action]) on_target_object = self.is_on_target_object(observation) if ( not on_target_object and self._num_orientation_attempts < self._max_orientation_attempts ): self._num_orientation_attempts += 1 return PositioningProcedureResult( actions=self.orient_to_object(observation, state) ) if on_target_object: return PositioningProcedureResult(success=True, terminated=True) else: return PositioningProcedureResult(truncated=True)
[docs] def sensor_rotation_relative_to_world(self, state: MotorSystemState) -> Any: """Derives the positioning sensor's rotation relative to the world. Args: state (MotorSystemState): The current state of the motor system. Returns: (Any): The positioning sensor's rotation relative to the world. """ agent_state = self.get_agent_state(state) # Retrieve agent's rotation relative to the world. agent_rotation = agent_state["rotation"] # Retrieve sensor's rotation relative to the agent. sensor_rotation = agent_state["sensors"][f"{self._sensor_id}.depth"]["rotation"] # Derive sensor's rotation relative to the world. return agent_rotation * sensor_rotation
[docs]class InformedPolicy(BasePolicy, JumpToGoalStateMixin): """Policy that takes observation as input. Extension of BasePolicy that allows for taking the observation into account for action selection. Currently it uses the percentage of the observation that is on the object to reverse the last action if it is below min_perc_on_obj. Additionally, this policy discouraces taking the reverse of the previous action if we are still on the object. Attributes: guiding_sensors: List of sensors that are used to calculate the percentage on object. When using multiple sensors or a visualization sensor we may want to ignore some when determining whether we need to move back. min_perc_on_obj: How much percent of the observation needs to be on the object to sample a new action. Otherwise the previous action is reversed to get back on the object. TODO: Not used anywhere? """ def __init__( self, min_perc_on_obj, good_view_percentage, desired_object_distance, use_goal_state_driven_actions=False, **kwargs, ): """Initialize policy. Args: min_perc_on_obj: Minimum percentage of patch that needs to be on the object. If under this amount, reverse the previous action to get the patch back on the object. good_view_percentage: How much percent of the view finder perception should be filled with the object. (If less, move closer) desired_object_distance: How far away should the agent be from the object in view; for the distant-agent, this is used to establish a maximum allowable distance of the object; note for the surface agent, this is used with every set of traversal steps to ensure we remain close to the surface use_goal_state_driven_actions: Whether to enable the motor system to make use of the JumpToGoalStateMixin, which attempts to "jump" (i.e. teleport) the agent to a specified goal state. **kwargs: Additional keyword arguments. """ super(InformedPolicy, self).__init__(**kwargs) self.min_perc_on_obj = min_perc_on_obj self.good_view_percentage = good_view_percentage self.desired_object_distance = desired_object_distance self.use_goal_state_driven_actions = use_goal_state_driven_actions if self.use_goal_state_driven_actions: JumpToGoalStateMixin.__init__(self) # Observations after passing through sensor modules. # Are updated in Monty step method. self.processed_observations = None
[docs] def pre_episode(self): if self.use_goal_state_driven_actions: JumpToGoalStateMixin.pre_episode(self) return super().pre_episode()
[docs] def get_depth_at_center(self, raw_observation, view_sensor_id, initial_pose=True): """Determine the depth of the central pixel. Method primarily used by surface-agent, but also by distant agent after performing a hypothesis-testing jump; determines the depth of the central pixel, to inform whether the object is visible at all initial_pose : Whether we are checking depth-at center as part of the first start of an experiment; if using get_depth_at_center to check for the observation after e.g. a hypothesis-testing jump, then we don't want to throw an error if the object is nowhere to be seen; instead, we simply move back Returns: Depth at the center of the view sensor. """ observation_shape = raw_observation[self.agent_id][view_sensor_id][ "depth" ].shape depth_at_center = raw_observation[self.agent_id][view_sensor_id]["depth"][ observation_shape[0] // 2, observation_shape[1] // 2 ] if initial_pose: assert depth_at_center > 0, ( "Object must be initialized such that " "agent can visualize it by moving forward" ) # TODO investigate - I think this may have always been passing in the # original surface-agent policy implementation because the surface # sensor clips at 1.0, so even if the object isn't strictly visible (or # we're inside the object)) there's a risk we have initialized without the # object in view return depth_at_center
### # Methods that define behavior of __call__ ###
[docs] def dynamic_call(self, state: Optional[MotorSystemState] = None) -> Action: """Return the next action to take. This requires self.processed_observations to be updated at every step in the Monty class. self.processed_observations contains the features extracted by the sensor module for the guiding sensor (patch). Args: state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. Returns: (Action): The action to take. """ return ( super().dynamic_call(state) if self.processed_observations.get_on_object() else self.fixme_undo_last_action() )
[docs] def fixme_undo_last_action(self): """Returns an action that undoes last action for supported actions. Previous InformedPolicy.dynamic_call() implementation when not on object: action, amount = (last_action, -last_amount) This implementation duplicates the functionality and the implicit assumption in the code and configurations that InformedPolicy is working with one of the following actions: - LookUp - LookDown - TurnLeft - TurnRight Additionally, this implementation adds support for: - MoveForward - MoveTangentially Additional support for the above two actions is due to `-last_amount` working for these actions as well. This maintains the same code functionality during this refactoring. For other actions, raise ValueError explicitly. Raises: TypeError: If the last action is not supported TODO These instance checks are undesirable and should be removed in the future. I am using these for now to express the implicit assumptions in the code. An Action.undo of some sort would be a better solution, however it is not yet clear to me what to do for actions that do not support undo. """ last_action = self.last_action if isinstance(last_action, LookDown): return LookDown( agent_id=last_action.agent_id, rotation_degrees=-last_action.rotation_degrees, constraint_degrees=last_action.constraint_degrees, ) elif isinstance(last_action, LookUp): return LookUp( agent_id=last_action.agent_id, rotation_degrees=-last_action.rotation_degrees, constraint_degrees=last_action.constraint_degrees, ) elif isinstance(last_action, TurnLeft): return TurnLeft( agent_id=last_action.agent_id, rotation_degrees=-last_action.rotation_degrees, ) elif isinstance(last_action, TurnRight): return TurnRight( agent_id=last_action.agent_id, rotation_degrees=-last_action.rotation_degrees, ) elif isinstance(last_action, MoveForward): return MoveForward( agent_id=last_action.agent_id, distance=-last_action.distance, ) elif isinstance(last_action, MoveTangentially): return MoveTangentially( agent_id=last_action.agent_id, distance=-last_action.distance, # Same direction, negative distance direction=last_action.direction, ) else: raise TypeError(f"Invalid action: {last_action}")
[docs] def post_action( self, action: Action, state: Optional[MotorSystemState] = None ) -> None: self.action = action self.timestep += 1 self.episode_step += 1 state_copy = state.convert_motor_state() if state else None self.action_sequence.append([action, state_copy])
### # Methods called by Dataloader and associated helper methods ###
[docs] def move_close_enough( self, raw_observation: Mapping, view_sensor_id: str, target_semantic_id: int, multiple_objects_present: bool, ) -> Tuple[Action | None, bool]: """At beginning of episode move close enough to the object. Used the raw observations returned from the dataloader and not the extracted features from the sensor module. Args: raw_observation: The raw observations from the dataloader view_sensor_id: The ID of the view sensor target_semantic_id: The semantic ID of the primary target object in the scene. multiple_objects_present: Whether there are multiple objects present in the scene. If so, we do additional checks to make sure we don't get too close to these when moving forward Returns: Tuple[Action | None, bool]: The next action to take and whether the episode is done. Raises: ValueError: If the object is not visible """ # Reconstruct 2D semantic map. depth_image = raw_observation[self.agent_id][view_sensor_id]["depth"] semantic_3d = raw_observation[self.agent_id][view_sensor_id]["semantic_3d"] semantic_image = semantic_3d[:, 3].reshape(depth_image.shape).astype(int) if not multiple_objects_present: semantic_image[semantic_image > 0] = target_semantic_id points_on_target_obj = semantic_image == target_semantic_id n_points_on_target_obj = points_on_target_obj.sum() # For multi-object experiments, handle the possibility that object is no # longer visible. if multiple_objects_present and n_points_on_target_obj == 0: logging.debug("Object not visible, cannot move closer") return None, True if n_points_on_target_obj > 0: closest_point_on_target_obj = np.min(depth_image[points_on_target_obj]) logging.debug( "closest target object point: " + str(closest_point_on_target_obj) ) else: raise ValueError( "May be initializing experiment with no visible target object" ) perc_on_target_obj = get_perc_on_obj_semantic( semantic_image, semantic_id=target_semantic_id ) logging.debug("% on target object: " + str(perc_on_target_obj)) # Also calculate closest point on *any* object so that we don't get too close # and clip into objects; NB that any object will have a semantic ID > 0 points_on_any_obj = semantic_image > 0 closest_point_on_any_obj = np.min(depth_image[points_on_any_obj]) logging.debug("closest point on any object: " + str(closest_point_on_any_obj)) if perc_on_target_obj < self.good_view_percentage: if closest_point_on_target_obj > self.desired_object_distance: if multiple_objects_present and ( closest_point_on_any_obj < self.desired_object_distance / 4 ): logging.debug( "getting too close to other objects, not moving forward" ) return None, True # done else: logging.debug("move forward") return MoveForward(agent_id=self.agent_id, distance=0.01), False else: logging.debug("close enough") return None, True # done else: logging.debug("Enough percent visible") return None, True # done
[docs] def orient_to_object( self, raw_observation: Mapping, sensor_id: str, target_semantic_id: int, multiple_objects_present: bool, state: MotorSystemState, ) -> List[Action]: """Rotate sensors so that they are centered on the object using a view finder. The view finder needs to be in the same position as the sensor patch and the object needs to be somewhere in the view finders view. Args: raw_observation: raw observations of the view finder sensor_id: view finder id (str) target_semantic_id: the integer corresponding to the semantic ID of the target object that we will try to fixate on multiple_objects_present: whether there are multiple objects present in the scene. state (MotorSystemState): The current state of the motor system. Returns: A (possibly empty) list of actions and a bool that indicates whether we are already on the target object. If we are not on the target object, the list of actions is of length two and is composed of actions needed to get us onto the target object. """ # Reconstruct 2D semantic map. depth_image = raw_observation[self.agent_id][sensor_id]["depth"] obs_dim = depth_image.shape[0:2] sem3d_obs = raw_observation[self.agent_id][sensor_id]["semantic_3d"] sem_obs = sem3d_obs[:, 3].reshape(obs_dim).astype(int) if not multiple_objects_present: sem_obs[sem_obs > 0] = target_semantic_id logging.debug("Searching for object") relative_location = self.find_location_to_look_at( sem3d_obs, image_shape=obs_dim, target_semantic_id=target_semantic_id, multiple_objects_present=multiple_objects_present, sensor_id=sensor_id, state=state, ) down_amount, left_amount = self.compute_look_amounts( relative_location, sensor_id, state=state ) return [ LookDown(agent_id=self.agent_id, rotation_degrees=down_amount), TurnLeft(agent_id=self.agent_id, rotation_degrees=left_amount), ]
[docs] def compute_look_amounts( self, relative_location: np.ndarray, sensor_id: str, state: MotorSystemState, ) -> Tuple[float, float]: """Compute the amount to look down and left given a relative location. This function computes the amount needed to look down and left in order for the sensor to be aimed at the target. The returned amounts are relative to the agent's current position and rotation. TODO: Test whether this function works when the agent is facing in the positive z-direction. It may be fine, but there were some adjustments to accommodate the z-axis positive direction pointing opposite the body's initial orientation (e.g., using negative `z` in `left_amount = -np.degrees(np.arctan2(x_rot, -z_rot)))`. Args: relative_location: the x,y,z coordinates of the target with respect to the sensor. sensor_id: the ID of the sensor used to produce the relative location. state (MotorSystemState): The current state of the motor system. Returns: down_amount: Amount to look down (degrees). left_amount: Amount to look left (degrees). """ # Get the sensor's rotation relative to the world. agent_state = self.get_agent_state(state) # - The agent's rotation relative to the world. agent_rotation = agent_state["rotation"] # - The sensor's rotation relative to the agent. sensor_rotation = agent_state["sensors"][f"{sensor_id}.depth"]["rotation"] # - The sensor's rotation relative to the world. sensor_rotation_rel_world = agent_rotation * sensor_rotation # Invert the location to align it with sensor's rotation. w, x, y, z = qt.as_float_array(sensor_rotation_rel_world) rotation = rot.from_quat([x, y, z, w]) rotated_location = rotation.inv().apply(relative_location) # Calculate the necessary rotation amounts. x_rot, y_rot, z_rot = rotated_location left_amount = -np.degrees(np.arctan2(x_rot, -z_rot)) distance_horiz = np.sqrt(x_rot**2 + z_rot**2) down_amount = -np.degrees(np.arctan2(y_rot, distance_horiz)) return down_amount, left_amount
[docs] def find_location_to_look_at( self, sem3d_obs: np.ndarray, image_shape: Tuple[int, int], target_semantic_id: int, multiple_objects_present: bool, sensor_id: str, state: MotorSystemState, ) -> np.ndarray: """Takes in a semantic 3D observation and returns an x,y,z location. The location is on the object and surrounded by pixels that are also on the object. This is done by smoothing the on_object image and then taking the maximum of this smoothed image. Args: sem3d_obs: the location of each pixel and the semantic ID associated with that location image_shape: the shape of the camera image target_semantic_id: the semantic ID of the target object we'd like to saccade on to multiple_objects_present: whether there are multiple objects present in the scene. sensor_id: the ID of the sensor to use for the search. Used for computing the relative location of the new target. state (MotorSystemState): The current state of the motor system. Returns: relative_location: the x,y,z coordinates of the target with respect to the sensor. """ sem3d_obs_image = sem3d_obs.reshape((image_shape[0], image_shape[1], 4)) on_object_image = sem3d_obs_image[:, :, 3] if not multiple_objects_present: on_object_image[on_object_image > 0] = target_semantic_id on_object_image = on_object_image == target_semantic_id on_object_image = on_object_image.astype(float) # TODO add unit test that we make sure find_location_to_look at functions # as expected, which can otherwise break if e.g. on_object_image is passed # as an int or boolean rather than float kernel_size = on_object_image.shape[0] // 16 smoothed_on_object_image = scipy.ndimage.gaussian_filter( on_object_image, kernel_size, mode="constant" ) idx_loc_to_look_at = np.argmax(smoothed_on_object_image * on_object_image) idx_loc_to_look_at = np.unravel_index(idx_loc_to_look_at, on_object_image.shape) location_to_look_at = sem3d_obs_image[ idx_loc_to_look_at[0], idx_loc_to_look_at[1], :3 ] camera_location = self.get_agent_state(state)["sensors"][f"{sensor_id}.depth"][ "position" ] agent_location = self.get_agent_state(state)["position"] # Get the location of the object relative to sensor. relative_location = location_to_look_at - (camera_location + agent_location) return relative_location
[docs] def get_sensors_perc_on_obj(self, observation: Mapping) -> float: """Calculate how much percent of the sensor is on the object. Get the average percentage of pixels on the object for all sensors in the list of guiding sensors. Returns: perc_on_obj: Percentage of pixels on the object. """ perc_on_obj = 0 # Currently assumes that we only have one agent. for sensor_id in self.guiding_sensors: sensor_obs = observation[self.agent_id][sensor_id]["rgba"] perc_on_obj += get_perc_on_obj(sensor_obs) / len(self.guiding_sensors) return perc_on_obj
[docs] def is_on_target_object( self, observation: Mapping, sensor_id: str, target_semantic_id: int, multiple_objects_present: bool, ) -> bool: """Check if a sensor is on the target object. Args: observation (Mapping): The raw observations from the dataloader. sensor_id (str): The sensor to check. target_semantic_id (int): The semantic ID of the target object. multiple_objects_present (bool): Whether there are multiple objects present in the scene. Returns: bool: Whether the sensor is on the target object. """ # Reconstruct the 2D semantic/surface map embedded in 'semantic_3d'. image_shape = observation[self.agent_id][sensor_id]["depth"].shape[0:2] semantic_3d = observation[self.agent_id][sensor_id]["semantic_3d"] semantic = semantic_3d[:, 3].reshape(image_shape).astype(int) if not multiple_objects_present: semantic[semantic > 0] = target_semantic_id # Check if the central pixel is on the target object. y_mid, x_mid = image_shape[0] // 2, image_shape[1] // 2 on_target_object = semantic[y_mid, x_mid] == target_semantic_id return on_target_object
[docs]class NaiveScanPolicy(InformedPolicy): """Policy that just moves left and right along the object.""" def __init__( self, fixed_amount, **kwargs, ): """Initialize policy.""" # Mostly use version of InformedPolicy to get the good view in the beginning # TODO: maybe separate this out. Don't need to specify reverse_actions or # min_perc_on_obj for that. super(NaiveScanPolicy, self).__init__(**kwargs) # Specify this specific action space, otherwise it doesn't work self._naive_scan_actions = [ LookUp(agent_id=self.agent_id, rotation_degrees=fixed_amount), TurnLeft(agent_id=self.agent_id, rotation_degrees=fixed_amount), LookDown(agent_id=self.agent_id, rotation_degrees=fixed_amount), TurnRight(agent_id=self.agent_id, rotation_degrees=fixed_amount), ] self.fixed_amount = fixed_amount self.steps_per_action = 1 self.current_action_id = 0 self.step_on_action = 0 ### # Methods that define behavior of __call__ ###
[docs] def dynamic_call(self, _state: Optional[MotorSystemState] = None) -> Action: """Return the next action in the spiral being executed. The MotorSystemState is ignored. Args: _state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. Unused. Returns: (Action): The action to take. Raises: StopIteration: If the spiral has completed. """ if self.steps_per_action * self.fixed_amount >= 90: # Raise "StopIteration" to notify the dataloader we need to stop # the experiment. This exception is automatically handled by any # python loop statements using iterators. # See https://docs.python.org/3/library/exceptions.html#StopIteration raise StopIteration() else: self.check_cycle_action() action = self._naive_scan_actions[self.current_action_id] self.step_on_action += 1 return action
[docs] def pre_episode(self): super().pre_episode() self.steps_per_action = 1 self.current_action_id = 0 self.step_on_action = 0
[docs] def check_cycle_action(self): """Makes sure we move in a spiral. This method switches the current action if steps_per_action was reached. Additionally it increments steps_per_action after the second and forth action to make sure paths don't overlap. _ _ _ _ | _ _ | | |_ | | |_ _ _| | corresponds to 1x left, 1x up, 2x right, 2x down, 3x left, 3x up, 4x right, 4x down, ... """ if self.step_on_action == self.steps_per_action: self.current_action_id += 1 self.step_on_action = 0 if self.current_action_id == 2: self.steps_per_action += 1 elif self.current_action_id == 4: self.current_action_id = 0 self.steps_per_action += 1
[docs]class SurfacePolicy(InformedPolicy): """Policy class for a surface-agent. i.e. an agent that moves to and follows the surface of an object. Includes functions for moving along an object based on its surface normal. """ def __init__( self, alpha, min_perc_on_obj=0.25, good_view_percentage=0.5, **kwargs, ): """Initialize policy. Args: min_perc_on_obj: Minimum percentage of patch that needs to be on the object. If under this amount, reverse the previous action to get the patch back on the object. good_view_percentage: How much percent of the view finder perception should be filled with the object. (If less, move closer) TODO M : since surface agent does not use get_good_view, can consider removing this parameter alpha: to what degree should the move_tangentially direction be the same as the last step or totally random? 0~same as before, 1~random walk **kwargs: ? """ super().__init__(min_perc_on_obj, good_view_percentage, **kwargs) self.action = None self.tangential_angle = 0 self.alpha = alpha
[docs] def pre_episode(self): self.tangential_angle = 0 self.action = None # Reset the first action for every episode self.touch_search_amount = 0 # Track how many rotations the agent has made # along the horizontal plane searching for an object; when this reaches 360, # try searching along the vertical plane, or for 720, performing a random # search return super().pre_episode()
[docs] def touch_object( self, raw_observation, view_sensor_id: str, state: MotorSystemState ) -> Action: """The surface agent's policy for moving onto an object for sensing it. Like the distant agent's get_good_view, this is called at the beginning of every episode, and after a "jump" has been initialized by a model-based policy. In addition, it can be called when the surface agent cannot sense the object, e.g. because it has fallen off its surface. Currently uses the raw observations returned from the viewfinder via the dataloader, and not the extracted features from the sensor module. TODO M refactor this so that all sensory processing is done in the sensor module. If we aren't on the object, try first systematically orienting left around a point, then orienting down, and finally random orientations along the surface of a fixed sphere. Args: raw_observation: The raw observation from the simulator. view_sensor_id: The ID of the viewfinder sensor. state: The current state of the motor system. Returns: Action to take. """ # If the viewfinder sees the object within range, then move to it depth_at_center = self.get_depth_at_center(raw_observation, view_sensor_id) if depth_at_center < 1.0: distance = ( depth_at_center - self.desired_object_distance - state["agent_id_0"]["sensors"][f"{view_sensor_id}.depth"]["position"][ 2 ] ) logging.debug(f"Move to touch visible object, forward by {distance}") self.action = MoveForward(agent_id=self.agent_id, distance=distance) return self.action logging.debug("Surface policy searching for object...") # Helpful to conceptualize these movements by considering a unit circle, # scaled by the radius distance_from_center # This image may be useful for intuition: # https://en.wikipedia.org/wiki/Exsecant#/media/File:Circle-trig6.svg # Rotate about a circle centered in fron of the agent's current # position; TODO decide how to deal with "coliding with"/entering an # object; ?could just rotate about a point on which the sensor is present # Currently as a heuristic we rotate about a point 4x the desired distance; # as this is typically 2.5cm, this would mean a circle with radius 10cm distance_from_center = self.desired_object_distance * 4 rotation_degrees = 30 # 30 degrees at a time; note this amount is also used # to eventually re-orient ourselves back to the original central point; # TODO may want to consider trying smaller step-sizes; will # be less efficient, but may be important for smaller/distant objects that # otherwise get missed if self.touch_search_amount >= 720: # Perform a random upward or downward movement along the surface of a # sphere, with its centre fixed 10 cm in front of the agent logging.debug("Trying random search for object") if self.rng.uniform() < 0.5: orientation = "vertical" logging.debug("Orienting vertically") else: orientation = "horizontal" logging.debug("Orienting horizontally") rotation_degrees = self.rng.uniform(-180, 180) logging.debug(f"Random orientation amount is : {rotation_degrees}") elif self.touch_search_amount >= 360 and self.touch_search_amount < 720: logging.debug("Trying vertical search for object") orientation = "vertical" else: logging.debug("Trying horizontal search for object") orientation = "horizontal" # Move tangentally to the point we're facing, resulting in the agent's # orienation about the central point changing as specified by rotation_deg, # but now where the agent is no longer on the edge of the circle move_lat_amount = np.tan(np.radians(rotation_degrees)) * distance_from_center # The lateral movement will be down relative to the agent's facing position # in the case of orient vertical, and left in the case of orient horizontal # The below calculates the exsecant, which provides the necessary # movement to bring the agent back to the same radius around the central # point move_forward_amount = distance_from_center * ( (1 - np.cos(np.radians(rotation_degrees))) / np.cos(np.radians(rotation_degrees)) ) self.touch_search_amount += rotation_degrees # Accumulate total rotations # for touch-search if orientation == "vertical": self.action = OrientVertical( agent_id=self.agent_id, rotation_degrees=rotation_degrees, down_distance=move_lat_amount, forward_distance=move_forward_amount, ) else: self.action = OrientHorizontal( agent_id=self.agent_id, rotation_degrees=rotation_degrees, left_distance=move_lat_amount, forward_distance=move_forward_amount, ) return self.action
### # Methods that define behavior of __call__ ###
[docs] def dynamic_call(self, state: Optional[MotorSystemState] = None) -> Action: """Return the next action to take. This requires self.processed_observations to be updated at every step in the Monty class. self.processed_observations contains the features extracted by the sensor module for the guiding sensor (patch). Args: state (Optional[MotorSystemState]): The current state of the motor system. Defaults to None. Returns: (Action): The action to take. """ # Check if we have poor visualization of the object if self.processed_observations.get_feature_by_name("object_coverage") < 0.1: logging.debug( "Object coverage of only " + str( self.processed_observations.get_feature_by_name("object_coverage") ) ) logging.debug("Initiating attempts to touch object") return None # Will result in moving to try to find the object # This is determined by some logic in embodied_data.py, in particular # the next method of InformedEnvironmentDataLoader elif self.action is None: logging.debug( "Object coverage good at initialization: " + str( self.processed_observations.get_feature_by_name("object_coverage") ) ) # In this case, we are on the first action, but the object view is already # good; therefore initialize the cycle of actions as if we had just # moved forward (e.g. to get a good view) self.action = self.action_sampler.sample_move_forward(self.agent_id) return self.get_next_action(state)
def _orient_horizontal(self, state: MotorSystemState) -> Action: """Orient the agent horizontally. Args: state (MotorSystemState): The current state of the motor system. Returns: OrientHorizontal action. """ rotation_degrees = self.orienting_angle_from_normal( orienting="horizontal", state=state ) left_distance, forward_distance = self.horizontal_distances(rotation_degrees) return OrientHorizontal( agent_id=self.agent_id, rotation_degrees=rotation_degrees, left_distance=left_distance, forward_distance=forward_distance, ) def _orient_vertical(self, state: MotorSystemState) -> Action: """Orient the agent vertically. Args: state (MotorSystemState): The current state of the motor system. Returns: OrientVertical action. """ rotation_degrees = self.orienting_angle_from_normal( orienting="vertical", state=state ) down_distance, forward_distance = self.vertical_distances(rotation_degrees) return OrientVertical( agent_id=self.agent_id, rotation_degrees=rotation_degrees, down_distance=down_distance, forward_distance=forward_distance, ) def _move_tangentially(self, state: MotorSystemState) -> Action: """Move tangentially along the object surface. Args: state (MotorSystemState): The current state of the motor system. Returns: MoveTangentially action. """ action = self.action_sampler.sample_move_tangentially(self.agent_id) # be careful if you're falling off the object! if self.processed_observations.get_feature_by_name("object_coverage") < 0.2: # Scale the step size by how small the object coverage is # Reduces situations where e.g. change in sensor resolution causes agent # to fall off the object action.distance = action.distance / ( 4 / self.processed_observations.get_feature_by_name("object_coverage") ) logging.debug(f"Very close to edge so only moving by {action.distance}") elif self.processed_observations.get_feature_by_name("object_coverage") < 0.75: action.distance = action.distance / 4 logging.debug(f"Near edge so only moving by {action.distance}") action.direction = self.tangential_direction(state) return action def _move_forward(self) -> Action: """Move forward to touch the object at the right distance. Returns: MoveForward action. """ action = MoveForward( agent_id=self.agent_id, distance=( self.processed_observations.get_feature_by_name("min_depth") - self.desired_object_distance ), ) return action
[docs] def get_next_action(self, state: MotorSystemState): """Retrieve next action from a cycle of four actions. First move forward to touch the object at the right distance Then orient toward the normal along direction 1 Then orient toward the normal along direction 2 Then move tangentially along the object surface Then start over Args: state (MotorSystemState): The current state of the motor system. Returns: Next action in the cycle. """ # TODO: is this check necessary? if not hasattr(self, "processed_observations"): return None last_action = self.last_action if isinstance(last_action, MoveForward): return self._orient_horizontal(state) elif isinstance(last_action, OrientHorizontal): return self._orient_vertical(state) elif isinstance(last_action, OrientVertical): return self._move_tangentially(state) elif isinstance(last_action, MoveTangentially): # orient around object if it's not centered in view if not self.processed_observations.get_on_object(): return self._orient_horizontal(state) # move to the desired_object_distance if it is in view else: return self._move_forward()
[docs] def tangential_direction(self, state: MotorSystemState) -> VectorXYZ: """Set the direction of the action to be a direction 0 - 2pi. - start at 0 (go up) in the reference frame of the agent; i.e. based on the standard initialization of an agent, this will be up from the floor. To implement this convention, the theta is offset by 90 degrees when finding our x and y translations, i.e. such that theta of 0 results in moving up by 1 (y), and right by 0 (x), rather than vice-versa - random action -pi - +pi is given by (rand() - 0.5) * 2pi - These are combined and weighted by the alpha parameter Args: state: The current state of the motor system. Returns: VectorXYZ: direction of the action """ new_target_direction = (self.rng.rand() - 0.5) * 2 * np.pi self.tangential_angle = ( self.tangential_angle * (1 - self.alpha) + new_target_direction * self.alpha ) direction = qt.rotate_vectors( state[self.agent_id]["rotation"], [ np.cos(self.tangential_angle - np.pi / 2), np.sin(self.tangential_angle + np.pi / 2), 0, ], # NB the np.pi/2 offset implements the convention that theta=0 should # result in moving up (i.e. along y), rather than right (i.e. along x), # while maintaing the standard association of cos(theta) with the # x-coordinate and sin(theta) with the y-coordinate of space ) return direction
[docs] def horizontal_distances(self, rotation_degrees: float) -> Tuple[float, float]: """Compute the horizontal and forward distances to move to. Compensate for a given rotation of a certain angle. Args: rotation_degrees: The angle to rotate by Returns: move_left_distance: The left distance to move move_forward_distance: The forward distance to move """ rotation_radians = np.radians(rotation_degrees) depth = self.processed_observations.non_morphological_features["mean_depth"] move_left_distance = np.tan(rotation_radians) * depth move_forward_distance = ( depth * (1 - np.cos(rotation_radians)) / np.cos(rotation_radians) ) return move_left_distance, move_forward_distance
[docs] def vertical_distances(self, rotation_degrees: float) -> Tuple[float, float]: """Compute the down and forward distances to move to. Compensate for a given rotation of a certain angle. Args: rotation_degrees: The angle to rotate by Returns: move_down_distance: The down distance to move move_forward_distance: The forward distance to move """ rotation_radians = np.radians(rotation_degrees) depth = self.processed_observations.non_morphological_features["mean_depth"] move_down_distance = np.tan(rotation_radians) * depth move_forward_distance = ( depth * (1 - np.cos(rotation_radians)) / np.cos(rotation_radians) ) return move_down_distance, move_forward_distance
[docs] def get_inverse_agent_rot(self, state: MotorSystemState): """Get the inverse rotation of the agent's current orientation. Used to transform poses of e.g. point normals or principle curvature from global coordinates into the coordinate frame of the agent. To intuit why we apply the inverse, imagine an e.g. point normal with the same pose as the agent; in the agent's reference frame, this should have the identity pose, which will be aquired by transforming the original pose by the inverse Args: state (MotorSystemState): The current state of the motor system. Returns: Inverse quaternion rotation. """ # Note that quaternion format is [w, x, y, z] [w, x, y, z] = qt.as_float_array(state[self.agent_id]["rotation"]) # Note that scipy.spatial.transform.Rotation (v1.10.0) format is [x, y, z, w] [x, y, z, w] = rot.from_quat([x, y, z, w]).inv().as_quat() return qt.quaternion(w, x, y, z)
[docs] def orienting_angle_from_normal( self, orienting: str, state: MotorSystemState ) -> float: """Compute turn angle to face the object. Based on the point normal, compute the angle that the agent needs to turn in order to be oriented directly toward the object Args: orienting (str): `"horizontal" or "vertical"` state (MotorSystemState): The current state of the motor system. Returns: degrees that the agent needs to turn """ original_point_normal = self.processed_observations.get_point_normal() inverse_quaternion_rotation = self.get_inverse_agent_rot(state) rotated_point_normal = qt.rotate_vectors( inverse_quaternion_rotation, original_point_normal ) x, y, z = rotated_point_normal if orienting == "horizontal": return -np.degrees(np.arctan(x / z)) if z != 0 else -np.sign(x) * 90.0 if orienting == "vertical": return -np.degrees(np.arctan(y / z)) if z != 0 else -np.sign(y) * 90.0
### # Helper functions that can be used by multiple classes ###
[docs]def read_action_file(file: str) -> List[Action]: """Load a file with one action per line. Args: file: name of file to load Returns: List[Action]: list of actions """ file = os.path.expanduser(file) with open(file, "r") as f: file_read = f.read() lines = [line.strip() for line in file_read.split("\n") if line.strip()] actions = [cast(Action, json.loads(line, cls=ActionJSONDecoder)) for line in lines] return actions
[docs]def write_action_file(actions: List[Action], file: str) -> None: """Write a list of actions to a file, one per line. Should be readable by read_action_file. Args: actions: list of actions file: path to file to save actions to """ with open(file, "w") as f: for action in actions: f.write(f"{json.dumps(action, cls=ActionJSONEncoder)}\n")
[docs]def get_perc_on_obj(rgba_obs): """Get the percentage of pixels in the observation that land on the object. This uses the RGBA image which means it only works with one object in an empty void. With multiple objects this would need to be solved differently. Args: rgba_obs: RGBA image observation. Returns: perc_on_obj: Percentage of pixels on the object. """ res = rgba_obs.shape[0] flat_obs = rgba_obs.reshape((res * res, 4)) csum = np.sum(flat_obs[:, :3], axis=1) per_on_obj = np.sum(csum > 0) / (res * res) return per_on_obj
[docs]def get_perc_on_obj_semantic(semantic_obs, semantic_id=0): """Get the percentage of pixels in the observation that land on the target object. If a semantic ID is provided, then only pixels on the target object are counted; otherwise, pixels on any object are counted. This uses the semantic image, where each pixel is associated with a semantic ID that is unique for each object, and always >0. Args: semantic_obs: Semantic image observation. semantic_id: Semantic ID of the target object. Returns: perc_on_obj: Percentage of pixels on the object. """ res = semantic_obs.shape[0] * semantic_obs.shape[1] if semantic_id == 0: csum = np.sum(semantic_obs >= 1) else: # Count only pixels on the target (e.g. primary target) object csum = np.sum(semantic_obs == semantic_id) per_on_obj = csum / res return per_on_obj
[docs]class SurfacePolicyCurvatureInformed(SurfacePolicy): """Policy class for a more intelligent surface-agent. Includes additional functions for moving along an object based on the direction of principle curvature (PC). A general summary of the policy is that the agent will start following PC directions as soon as these are well defined. This will initially be the minimal curvature; it will follow these for as long as they are defined, and until reaching a certain number of steps (max_pc_bias_steps), before then changing to follow maximal curvature. This process continues to alternate, as long as the PC directions are well defined. If PC are not meaningfully defined (which is often the case), then the agent uses standard momentum to take a step in a direction similar to the previous step (weighted by the alpha parameter); it will do this for a minimum number of steps (min_general_steps) before it will consider using PC information again. If the agent is taking a step that will bring it to a previously visited location according to its estimates, then it will attempt to correct for this and choose another direction; after finding a new heading, the agent performs a minimum number of steps (min_heading_steps) before it will check again for a conflicting heading. The main other event that can occur is that PCs are defined, but these are predominantly in the z-direction (relative to the agent); in that case, the PC-defined heading is ignored on that step, and a standard, momentum-based step is taken; such z-defined PC directions tend to occur on e.g. the rim of cups and are presumably due to issues with the agents re-orientation steps vs. noisiness of the PCs defined by the surface TODO update this method to accept more general notions of directions-of-variance (rather than strictly principal curvature), such that it can be applied in more abstract spaces """ def __init__( self, alpha, pc_alpha, max_pc_bias_steps, min_general_steps, min_heading_steps, **kwargs, ): """Initialize policy. Args: alpha: to what degree should the move_tangentially direction be the same as the last step or totally random? 0~same as before, 1~random walk; used when not following principal curvature (PC) pc_alpha: the degree to which the moving average of the PC direction should be based on the history vs. the most recent step max_pc_bias_steps: the number of steps to take following a particular PC direction (i.e. maximum or minimal curvature) before switching to the other type of PC direction min_general_steps: the number of normal momentum steps that must be taken after the agent has stopped following principal curvature, and before it will follow PC again; if this is too low, then the agent has the habit of moving quite noisily as it keeps attempting to follow PC, but if it is too high, then PC directions are not used to their fullest min_heading_steps: when not following PC directions, the minimum number of steps to take before we check that we're not heading to a previously visited location; again, this should be sufficiently high that we don't bounce around noisily, but also low enough that we avoid revisiting locations **kwargs: Additional keyword arguments. """ super().__init__(alpha, **kwargs) self.pc_alpha = pc_alpha # == Threshold variables that determine policy behaviour self.max_pc_bias_steps = max_pc_bias_steps self.min_general_steps = min_general_steps self.min_heading_steps = min_heading_steps
[docs] def pre_episode(self): super().pre_episode() # == Variables for representing heading == # We represent it both in angular and vector form as under different settings, # one or the other will be leveraged self.tangential_angle = None self.tangential_vec = None # == Boolean variables determining action decisions == self.min_dir_pref = True # Initialize direction preference for following the # minimal principal curvature direction # Token to indicate whether we have been following PC-guided # trajectories; useful for handling how we act when PC's are no longer # defined self.using_pc_guide = False # == Counter variables determining action decisions == self.ignoring_pc_counter = self.min_general_steps # How many steps we have # taken without PC-guidance; for the *first* time we encounter an informative # PC, we should ideally always consider it, therefore initialize to the minimum # required for beginning to follow PC self.following_heading_counter = 0 # Used to ensure that for normal tangential # steps, we follow a particular heading for a while before trying to select # a new one that avoids previous locations self.following_pc_counter = 0 # How long we've been following PC for; used to # keep track of when we should shift from following minimal to maximal curvature self.continuous_pc_steps = 0 # How many continuous steps in a row we have # taken for the PC (i.e. without entering a region with un-defined PCs); this # counter does *not* influence whether we follow minimal or maximal curvature; # instead, it helps us determine whether we have just got new PC information, # and therefore enables us the possibility of flipping the PC direction so as # to avoid previous locations # == Variables for estimating moving average of principal curvature direction == self.prev_angle = None # Used to accumulate previous directions # along a particular principle curvature, for e.g. moving average # == Variables to help us avoid revisiting previously observed locations == self.tangent_locs = [] # Receive visited locations from sensor module, # specifically those associated with prev. tangential movements; helps us avoid # revisiting old locations; note we thus ignore locations associated with # the surface-agent-policy's re-orientation movements self.tangent_norms = [] # As for tangent_locs; helpful for distinguishing # locations as being on different surfaces # == Logging variables == # Store detailed information about actions taken; useful for both visualization, # and potentially informing e.g. an LM to intervene where certain policy # decisions are failing if not hasattr(self, "action_details"): # TODO M clean up where we log action information for motor system # Ideally as much as as possible should be with buffer following refactor self.action_details = dict( pc_heading=[], # "min", "max", or "no"; indicates the type of curvature # the agent is following avoidance_heading=[], # True or False, whether the agent is taking a # new heading to avoid previously visited points z_defined_pc=[], # None, and otherwise the principle curvature and # point normal directions when the PC direction is predominantly in # z-axis of the agent (i.e. pointing towards/away from the agent rather # than being orthogonal) ) else: self.action_details.update( pc_heading=[], avoidance_heading=[], z_defined_pc=[], )
[docs] def update_action_details(self): """Store informaton for later logging. This stores information that details elements of the policy or observations relevant to policy decisions. E.g. if model-free policy has been unable to find a path that avoids revisiting old locations, an LM might use this information to inform a particular action (TODO not yet implemented, and NOTE that any modelling should ultimately be located in the learning module(s), not in motor systems) """ if self.using_pc_guide: if self.min_dir_pref: self.action_details["pc_heading"].append("min") else: self.action_details["pc_heading"].append("max") else: self.action_details["pc_heading"].append("no") self.action_details["avoidance_heading"].append(self.setting_new_heading) if self.pc_is_z_defined: # Note for logging we save the orientations in the global reference frame, # however whether the PC is z-defined is relative to the agent and its # orientation # TODO: This value doesn't seem to be used anywhere self.action_details["z_defined_pc"].append( [ self.processed_observations.get_point_normal(), self.processed_observations.get_curvature_directions(), ] ) else: self.action_details["z_defined_pc"].append(None)
[docs] def tangential_direction(self, state: MotorSystemState) -> VectorXYZ: """Set the direction of action to be a direction 0 - 2pi. This controls the move_tangential action - start at 0 (go up in the reference frame of the agent, i.e. based on where it is facing), with the actual orientation determined via either principal curvature, or a random step weighted by momentum Tangential movements are the primary means of progressively exploring an object's surface Args: state: The current state of the motor system. Returns: VectorXYZ: direction of the action """ # Reset booleans tracking z-axis PC directions and new headings self.pc_is_z_defined = False self.setting_new_heading = False logging.debug("Input-driven tangential movement") if (self.processed_observations.get_feature_by_name("pose_fully_defined")) and ( self.ignoring_pc_counter >= self.min_general_steps ): # Principal curvatures are defined, and counter for a min number of # general steps is satisfied tang_movement = self.perform_pc_guided_step(state) # Note this may fail if the PC guidance directs us back towards # a point we have previously experienced, in which case we revert # to a standard tangential movement (as below) # Use standard momentum (alpha weighting) if we've not found an area of # reliable principle curvature else: # Check whether we've just left a series of PC-defined trajectories # (i.e. entering a region of undefined PCs) if self.using_pc_guide: logging.debug("First movement after exiting PC-guided sequence") self.reset_pc_buffers() else: # Set PC-guidance flag; note the other elements of reset_pc_buffers # should/need not be reset self.using_pc_guide = False self.ignoring_pc_counter += 1 tang_movement = self.perform_standard_tang_step(state) # Save detailed information about tangential steps self.update_action_details() return tang_movement
[docs] def perform_pc_guided_step(self, state: MotorSystemState): """Inform steps to take using defined directions of principal curvature. Use the defined directions of principal curvature to inform (ideally a series) of steps along the appropriate direction. Args: state (MotorSystemState): The current state of the motor system. Returns: VectorXYZ: direction of the action """ logging.debug("Attempting step with PC guidance") self.using_pc_guide = True self.check_for_preference_change() pc_for_use = self.determine_pc_for_use() # Get the index for the PC we will # follow pc_dirs = self.processed_observations.get_curvature_directions() # Select the PC directions for use selected_pc_dir = pc_dirs[pc_for_use] # Rotate the tangential vector to be in the coordinate frame of the sensory # agent (rather than the global reference frame of the environment) inverse_quaternion_rotation = self.get_inverse_agent_rot(state) rotated_form = qt.rotate_vectors(inverse_quaternion_rotation, selected_pc_dir) # Before updating the representation and removing z-axis direction, check # for movements defined in the z-axis if int(np.argmax(np.abs(rotated_form))) == 2: # TODO decide if in cases where the PC is defined in the z-direction # relative to the agent, it might actually make sense to still follow it, # i.e. as it should representa a movement tangential to the surface, and # suggests we're not oriented with the point-normal as expected; NB this # would need to be tested with the action-space of the surface-agent self.pc_is_z_defined = True logging.debug("Warning: PC is predominantly defined in z-direction") logging.debug("Skipping PC-guided move; using standard tangential movement") # Revert to a standard tangential step if we get to this stage # and the PC is predominantly defined in the z-direction; note that this # step will be weighted by the standard momentum, integrating the previous # movement, as we want to move in a consistent heading alternative_movement = self.perform_standard_tang_step(state) # Note that we *don't* re-set the PC buffers, because with any luck, # the PC axes will be better defined on the next step; it was also found in # practice that resetting the PC buffers (i.e. quitting entirely to # the PC-guided actions) occasionally resulted in longer, more noisy # inference return alternative_movement self.update_tangential_reps(vec_form=rotated_form) # Before attempting conflict avoidance, check if the PC direction itself # appears to have been arbitrarily flipped self.check_for_flipped_pc() # Check if new heading is necessary self.avoid_revisiting_locations(state=state) # If we are abandoning following PC directions, return the heading that was # found in the avoid_revisiting_locations search if self.setting_new_heading: # We've just found ourselves with a bad heading while following # principle curvatures, so break out of this; also reset the following # heading iter for standard tangential steps to make sure we move # sufficiently far from this area self.reset_pc_buffers() self.following_heading_counter = 0 return qt.rotate_vectors( state["agent_id_0"]["rotation"], self.tangential_vec ) # Otherwise our heading is good; we continue and use our original heading (or # it's negative flip) to inform the PC heading logging.debug("We have a good PC heading") # Use a moving average of previous principal curvature directions to result in a # smoother path through areas where this shifts slightly if self.prev_angle is None: self.prev_angle = self.tangential_angle else: # Update tangential_vec using the moving average of the principle curvature self.pc_moving_average() self.following_pc_counter += 1 self.continuous_pc_steps += 1 return qt.rotate_vectors(state["agent_id_0"]["rotation"], self.tangential_vec)
[docs] def perform_standard_tang_step(self, state: MotorSystemState): """Perform a standard tangential step across the object. This is in contrast to, for example, being guided by principal curvatures. Note this is still more "intelligent" than the tangential step of the baseline surface-agent policy, because it also attempts to avoid revisiting old locations Args: state (MotorSystemState): The current state of the motor system. Returns: VectorXYZ: direction of the action """ logging.debug("Standard tangential movement") # Select new movement, equivalent to alpha-weighted steps in the standard # informed surface-agent policy new_target_direction = (self.rng.rand() - 0.5) * 2 * np.pi if self.tangential_angle is not None: # Use alternative momentum calculation (vs. original SurfacePolicy # implementation)that does not suffer from discontinuous jump if new and # old theta are approximately -pi and +pi # NB the original surface-agent policy has not been updated as that method # appears to work as a "functional bug", where with an e.g. alpha=0.1, we # will *rarely* take a step in an unexpected direction, helping reduce the # chance of doing a continuous loop over an object with no exploration diff_amount = theta_change(self.tangential_angle, new_target_direction) blended_angle = self.tangential_angle + self.alpha * diff_amount blended_angle = enforce_pi_bounds(blended_angle) self.update_tangential_reps(angle_form=blended_angle) else: # On first movements, just use the initial direction self.update_tangential_reps(angle_form=new_target_direction) if self.following_heading_counter >= self.min_heading_steps: # Every now and then, check our heading is not in violation of prev. points, # and otherwise update it; thus, if # we need to avoid a certain direction, we will ignore momentum on this # particular iteration, continuing it on the next step self.avoid_revisiting_locations(state=state) # Note the value for self.tangential_vec and self.tangential_angle is # updated by avoid_revisiting_locations (if necessary) if self.setting_new_heading: logging.debug("Ignoring momentum to avoid prev. locations") self.following_heading_counter = ( 0 # Reset this counter, so that we continue on the new heading ) elif not self.setting_new_heading: # Either due to the original heading being fine, or due to timing out # in the search, we continue with our original heading pass self.following_heading_counter += 1 return qt.rotate_vectors( state["agent_id_0"]["rotation"], self.tangential_vec, )
[docs] def update_tangential_reps(self, vec_form=None, angle_form=None): """Update the angle and vector representation of a tangential heading. Angle and vector representations are stored as self.tangential_angle and self.tangential_vec, respectively. Ensures the two representations are always consistent. Further ensures that, because these movements are tangential, it will be defined in the plane, relative to the agent (i.e. movement along x and y only), and any inadvertant movement along the z-axis relative to the agent will be eliminated. User should supply *either* the vector form or the (Euler) angle form in radians that will define the new representations. """ logging.debug("Updating tangential representations") assert (vec_form is None) != ( angle_form is None # Logical xor ), "Please provide one format for updating the tangential representation" if vec_form is not None: self.tangential_angle = projected_angle_from_vec(vec_form) self.tangential_vec = projected_vec_from_angle(self.tangential_angle) elif angle_form is not None: self.tangential_vec = projected_vec_from_angle(angle_form) self.tangential_angle = projected_angle_from_vec(self.tangential_vec) logging.debug(f"Angular form {self.tangential_angle}; vector form:") logging.debug(self.tangential_vec)
[docs] def reset_pc_buffers(self): """Reset counters and other variables. We've just left a series of PC-defined trajectories (i.e. entered a region of undefined PCs), or had to select a new heading in order to avoid revisiting old locations. As such, appropriately reset counters and other variables. Note we do not reset tangential_angle or the vector, such that this information can still be used by e.g. momentum on the next step to keep us going generally forward """ self.ignoring_pc_counter = 0 # Reset counter as we've just started # ignoring the PC after a period of movements self.using_pc_guide = False # We're not using PC directions self.continuous_pc_steps = 0 # Note we don't reset self.following_pc_counter # because we want to keep our bias of PC heading for when we re-enter # a region where PC is defined again # Reset variables used for determining moving average estimate of the PC # direction self.prev_angle = None
[docs] def check_for_preference_change(self): """Flip the preference for the min or max PC after a certain number of steps. This way, we can more quickly explore different "parts" of an object, rather than just persistently following e.g. the rim of a cup. By default, there is always an initial bias for the smallest principal curvature. TODO can eventually combine with a hypothesis-testing policy that encourages exploration of unvisited parts of the object, rather than relying on this simple counter-heuristic """ if self.following_pc_counter == self.max_pc_bias_steps: logging.debug("Changing preference for pc-type.") logging.debug(f"Previous pref. for min-directions: {self.min_dir_pref}") self.min_dir_pref = not (self.min_dir_pref) self.following_pc_counter = 0 # Reset to allow multiple steps in # new chosen direction self.continuous_pc_steps = 0 # Reset so that we can flip the new PC # direction if necessary (i.e. if there is a risk of revisiting old # locations) # Reset moving average of principle curvature directions, as this is going # to be orthogonal to previous estimates self.prev_angle = None logging.debug(f"Updated preference: {self.min_dir_pref}")
[docs] def determine_pc_for_use(self): """Determine the principal curvature to use for our heading. Use magnitude (ignoring negatives), as well as the current direction preference. Returns: Principal curvature to use. """ absolute_pcs = np.abs( self.processed_observations.get_feature_by_name("principal_curvatures") ) if self.min_dir_pref: # Follow minimal curvature direction return np.argmin(absolute_pcs) else: return np.argmax(absolute_pcs)
[docs] def avoid_revisiting_locations( self, state: MotorSystemState, conflict_divisor=3, max_steps=100, ): """Avoid revisiting locations. Check if the new proposed location direction is already pointing to somewhere we've visited before; if not, we can use the initially proposed movement. If there is a conflict, we select a new heading that avoids this; this is achieved by iteratively searching for a heading that does not conflict with any previously visited locations. Args: conflict_divisor: The amount pi is divided by to determine that a current heading will be too close to a previously visited location; this is an initial value that will be dynamically adjusted. Defaults to 3. max_steps: Maximum iterations of the search to perform to try to find a non-conflicting heading. Defaults to 100. state (MotorSystemState): The current state of the motor system. Note that while the policy might have "unrealistic" access to information about it's location in the environment, this could easily be replaced by relative locations based on the first sensation Finally, note that in many situations, revisiting locations can be a good thing (e.g. re-anchoring given noisy path-integration), so we may want to activate /inactivate this as necessary (TODO) TODO separate out avoid_revisiting_locations as its own mixin so that it can be used more broadly """ self.conflict_divisor = conflict_divisor self.max_steps = max_steps # Backup the original tangential vector vec_copy = copy.copy(self.tangential_vec) if len(self.tangent_locs) > 0: # Only relevant if prev. locations visited inverse_quaternion_rotation = self.get_inverse_agent_rot(state) current_loc = self.tangent_locs[-1] logging.debug("Checking we don't head for prev. locations") logging.debug(f"Current location: {current_loc}") # Previous locations relative to current one (i.e. centered) adjusted_prev_locs = np.array(self.tangent_locs) - current_loc # Further adjust the relative locations based on the reference frame of the # moving SM-agent; we are going to look for conflicts by comparing these # locations to the headings (also in the reference frame of the agent) that # we might take # TODO could vectorize this rotated_locs = qt.rotate_vectors( inverse_quaternion_rotation, adjusted_prev_locs ) # Until we have not found a direction that we can guarentee is # in a new heading, continue to attempt new directions searching_for_heading = True self.first_attempt = True # On the first attempt to fix the direction # heading, simply flip it's orientation; this will often resolve issues # with principal curvature spontaneously swapping direction self.setting_new_heading = False # If the heading isn't already good, or # *(-1) to fix an arbitrarily flipped PC direction is not sufficient, # then set this to True and abandon following PCs, reverting to a standard # momentum operation self.search_counter = 0 # Eventually, abort the search if we exceed a # threshold; additionally, as this increases, we periodically decrease the # angle we are concerned with for conflicts, making it more likely that # we at least avoid being nearby to previous points while searching_for_heading: conflicts = False # Assume False until evidence otherwise logging.debug(f"Number of locations to check : {len(rotated_locs) - 1}") # Check all prev. locations for conflict; TODO could vectorize this for ii in range( max(0, len(rotated_locs) - 50), len(rotated_locs) - 1 ): # Ignore current point, and points before the last 50 sensations # Check if there is actually a conflict on_conflict = self.conflict_check(rotated_locs, ii) if on_conflict: conflicts = True # Keep track of the fact that we've found # a conflicting direction that we need to deal with logging.debug("Angle is low, so re-orienting") logging.debug(f"Inducing location from sensation {ii}") logging.debug(f"Currently on sensation {len(rotated_locs)}") self.attempt_conflict_resolution(vec_copy) if not conflicts: # We have a valid heading searching_for_heading = False logging.debug("The final direction from conflict checking:") logging.debug(self.tangential_vec) self.update_tangential_reps(vec_form=self.tangential_vec) elif self.search_counter >= self.max_steps: logging.debug("Abandoning search, no directions without conflict") logging.debug("Therefore using original headng") self.update_tangential_reps(vec_form=vec_copy) return None else: # Search continues, but occasionally narrow the region in which we # look for conflicts, making it easier to select a path "out" self.search_counter += 1 if self.search_counter % 20 == 0: self.conflict_divisor += 1 logging.debug( f"Updating conflict divisor: {self.conflict_divisor}" )
[docs] def conflict_check(self, rotated_locs, ii): """Check for conflict in the current heading. Target location needs to be similar *and* we need to have a similar point normal to discount the current proposed heading; if point normals are significantly different, then we are likely on a different surface, in which case passing by nearby previous points is no longer problematic. Note that when executing failed hypothesis-testing jumps, we can have multiple instances of the same location in our history; this will result in get_angle_beefed_up returning infinity, i.e. we don't worry about avoiding our current location Returns: True if there is a conflict, False otherwise. """ assert np.linalg.norm(rotated_locs[-1]) == 0, "Should be centered to 0" if ( ( get_angle_beefed_up(self.tangential_vec, rotated_locs[ii]) <= np.pi / self.conflict_divisor ) and ( # Heuristic of np.pi / 4 for point-normals is that sides of # a cube will therefore be different surfaces, but not necessarily # points along e.g. a bowl's curved underside get_angle_beefed_up(self.tangent_norms[-1], self.tangent_norms[ii]) <= np.pi / 4 ) and ( # Only consider points that are relatively close by (2.5 cm) in # Euclidean space; note we are comparing to an origin of 0,0,0 # Re. choice of 2.5 cm, tangential steps are generally of size 0.004 # (4mm), so this seems like a reasonable estimate np.linalg.norm(rotated_locs[ii], ord=2) <= 0.025 ) ): return True else: return False
[docs] def attempt_conflict_resolution(self, vec_copy): """Try to define direction vector that avoids revisiting previous locations.""" if self.first_attempt and self.using_pc_guide and self.continuous_pc_steps == 0: # On the first PC-guided step in a series, try to choose a direction # along the PC that is away from previous locations # Note first-attempt refers to trying to fix a bad PC # heading with a flip first; otherwise we have to abort # following PC # Note that when not concerned with principal curvatures, # we do not try this initial flip self.update_tangential_reps(vec_form=np.array(vec_copy) * -1) logging.debug("Flipping the PC direction to avoid prev. locations.") self.first_attempt = False else: logging.debug("Trying a new random heading") self.setting_new_heading = True # PC, if it was being followed, will # be abandoned # Progressively rotate in the plane, depending on how # long we've been searching, and starting with a small # possible rotation rot_limits = np.clip(0.1 + self.search_counter / self.max_steps, 0, 1) rot_val = self.rng.uniform(-rot_limits, rot_limits) * np.pi plane_rot = rot.from_euler("xyz", (0.0, 0.0, rot_val), degrees=False) # Note we apply the rotation to the original vector self.update_tangential_reps(vec_form=plane_rot.apply(vec_copy))
[docs] def check_for_flipped_pc(self): """Check for arbitrarily flipped PC direction. Do a quick check to see if the previous PC heading has been arbitrarily flipped, in which case, flip it back. With any luck, this will allow us to automatically pass avoid_revisiting_locations and thereby continue using PCs where relevant. """ if self.prev_angle is not None: if ( abs(theta_change(self.prev_angle, self.tangential_angle + np.pi)) <= np.pi / 4 ): logging.debug( "Evidence that PC direction arbitrarily flipped, flipping back" ) self.update_tangential_reps(vec_form=np.array(self.tangential_vec) * -1)
[docs] def pc_moving_average(self): """Calculate a moving average of the principal curvature direction. The moving average should be consistent even on curved surfaces as the directions will be in the reference frame of the agent (which has rotated itself to align with the point normal) """ logging.debug("Applying momentum to PC direction estimate") # Calculate a weighted average in such a way that ensures we can handle # the circular nature of angles (i.e. that +pi and -pi are adjacent) diff_amount = theta_change(self.prev_angle, self.tangential_angle) blended_angle = self.tangential_angle + self.pc_alpha * diff_amount blended_angle = enforce_pi_bounds(blended_angle) self.update_tangential_reps(angle_form=blended_angle) # Store action taken for estimating future movements self.prev_angle = self.tangential_angle
[docs]def theta_change(a, b): """Determine the min, signed change in orientation between two angles in radians. Returns: Signed change in orientation. """ min_sep = a - b # If an angle has "looped" round, correct for this min_sep = enforce_pi_bounds(min_sep) return min_sep
[docs]def enforce_pi_bounds(theta): """Enforce an orientation to be bounded between - pi and + pi. Returns: Angle in radians. """ while theta > np.pi: theta -= np.pi * 2 while theta < -np.pi: theta += np.pi * 2 return theta
[docs]def projected_angle_from_vec(vector): """Determine the rotation about a z-axis (pointing "up"). Note that because of the convention for moving along the y when theta=0, the typical order of arguments is swapped from calculating the standard atan2 (https://en.wikipedia.org/wiki/Atan2). Returns: Angle in radians. """ test_thetas = [-np.pi, -np.pi / 2, -0.1, 0, 0.1, np.pi / 2, np.pi] # Double check that we correctly recover the theta over possible range for test_theta in test_thetas: test_vec = projected_vec_from_angle(test_theta) recovered = math.atan2(test_vec[0], test_vec[1]) assert abs(test_theta - recovered) < 0.01, ( f"Issue with angle recovery for : {test_theta} vs. {recovered}" ) return math.atan2(vector[0], vector[1])
[docs]def projected_vec_from_angle(angle): """Determine the vector in the plane defined by an orientation around the z-axis. Takes angle in radians, bound between -pi : pi. This continues the convention started in the original surface-agent policy that a theta of 0 should correspond to a movement of 1 in the y direction, and 0 in the x direction, rather than vice-versa; this convention is implemented by the np.pi/2 offsets. Returns: Vector in the plane defined by an orientation around the z-axis. """ assert abs(angle) < np.pi + 0.01, f"-pi : +pi bound angles only : {angle}" return [np.cos(angle - np.pi / 2), np.sin(angle + np.pi / 2), 0]