Source code for tbp.monty.frameworks.models.motor_policies
# Copyright 2025-2026 Thousand Brains Project
# Copyright 2021-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
from __future__ import annotations
import abc
import copy
import json
import logging
import math
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
import numpy as np
import quaternion as qt
from scipy.spatial.transform import Rotation as rot # noqa: N813
from tbp.monty.context import RuntimeContext
from tbp.monty.frameworks.actions.action_samplers import ActionSampler
from tbp.monty.frameworks.actions.actions import (
Action,
ActionJSONDecoder,
LookDown,
LookUp,
MoveForward,
MoveTangentially,
OrientHorizontal,
OrientVertical,
TurnLeft,
TurnRight,
VectorXYZ,
)
from tbp.monty.frameworks.agents import AgentID
from tbp.monty.frameworks.environments.positioning_procedures import (
PositioningProcedure,
)
from tbp.monty.frameworks.models.abstract_monty_classes import Observations
from tbp.monty.frameworks.models.motor_system_state import AgentState, MotorSystemState
from tbp.monty.frameworks.models.states import State
from tbp.monty.frameworks.sensors import SensorID
from tbp.monty.frameworks.utils.spatial_arithmetics import get_angle_beefed_up
from tbp.monty.frameworks.utils.transform_utils import scipy_to_numpy_quat
if TYPE_CHECKING:
from os import PathLike
__all__ = [
"BasePolicy",
"InformedPolicy",
"JumpToGoalStateMixin",
"MotorPolicy",
"NaiveScanPolicy",
"ObjectNotVisible",
"SurfacePolicy",
"SurfacePolicyCurvatureInformed",
]
logger = logging.getLogger(__name__)
@dataclass
class MotorPolicyResult:
"""Result of a motor policy."""
actions: list[Action] = field(default_factory=list)
motor_only_step: bool = False
[docs]class MotorPolicy(abc.ABC):
"""The abstract scaffold for motor policies."""
[docs] @abc.abstractmethod
def get_agent_state(self, state: MotorSystemState) -> AgentState:
"""Get agent state.
Args:
state: The current state of the motor system.
Returns:
Agent state.
"""
pass
[docs] @abc.abstractmethod
def load_state_dict(self, state_dict: dict[str, Any]) -> None:
"""Take a state dict as an argument and set state for policy."""
pass
[docs] @abc.abstractmethod
def state_dict(self) -> dict[str, Any]:
"""Return a serializable dict with everything needed to save/load policy."""
pass
@abc.abstractmethod
def __call__(
self,
ctx: RuntimeContext,
observations: Observations,
state: MotorSystemState | None = None,
) -> MotorPolicyResult:
"""Invoke motor policy to determine the next actions to take.
Args:
ctx: The runtime context.
observations: The observations from the environment.
state: The current state of the motor system.
Defaults to None.
Returns:
The motor policy result.
"""
pass
[docs]class BasePolicy(MotorPolicy):
[docs] def __init__(
self,
action_sampler: ActionSampler,
agent_id: AgentID,
):
"""Initialize a base policy.
Args:
action_sampler: The ActionSampler to use
agent_id: The agent ID
"""
super().__init__()
self.agent_id = agent_id
self.action_sampler = action_sampler
def __call__(
self,
ctx: RuntimeContext,
observations: Observations, # noqa: ARG002
state: MotorSystemState | None = None, # noqa: ARG002
) -> MotorPolicyResult:
"""Return a motor policy result containing a random action.
The MotorSystemState is ignored.
Args:
ctx: The runtime context.
observations: The observations from the environment.
state: The current state of the motor system.
Defaults to None. Unused.
Returns:
A MotorPolicyResult that contains a random action.
"""
return MotorPolicyResult([self.action_sampler.sample(self.agent_id, ctx.rng)])
###
# Other required abstract methods, methods called by Monty or Environment Interface
###
[docs] def get_agent_state(self, state: MotorSystemState) -> AgentState:
"""Get agent state.
Note:
Assumes we only have one agent.
Args:
state: The current state of the motor system.
Returns:
Agent state.
"""
return state[self.agent_id]
class PredefinedPolicy(MotorPolicy):
"""Policy that follows an action sequence read from file.
Cycles through the actions in the file indefinitely.
"""
@staticmethod
def read_action_file(file_name: PathLike) -> list[Action]:
"""Load a file with one action per line.
Args:
file_name: name of file to load
Returns:
List of actions
"""
file = Path(file_name).expanduser()
with file.open() as f:
file_read = f.read()
lines = [line.strip() for line in file_read.split("\n") if line.strip()]
return [
cast("Action", json.loads(line, cls=ActionJSONDecoder)) for line in lines
]
def __init__(
self,
agent_id: AgentID,
file_name: PathLike,
) -> None:
self.agent_id = agent_id
self.action_list: list[Action] = PredefinedPolicy.read_action_file(file_name)
self.episode_step = 0
self.use_goal_state_driven_actions = False
def __call__(
self,
ctx: RuntimeContext, # noqa: ARG002
observations: Observations, # noqa: ARG002
state: MotorSystemState | None = None, # noqa: ARG002
) -> MotorPolicyResult:
actions = [self.action_list[self.episode_step % len(self.action_list)]]
self.episode_step += 1
return MotorPolicyResult(actions)
def get_agent_state(self, state: MotorSystemState) -> AgentState:
return state[self.agent_id]
def pre_episode(self) -> None:
self.episode_step = 0
def state_dict(self) -> dict[str, Any]:
return {"episode_step": self.episode_step}
def load_state_dict(self, state_dict):
self.episode_step = state_dict["episode_step"]
[docs]class JumpToGoalStateMixin:
"""Convert driving goal state to an action in Habitat-compatible coordinates.
Motor policy that enables us to take in a driving goal state for the motor agent,
and specify the action in Habitat-compatible coordinates that must be taken
to move there.
"""
[docs] def set_driving_goal_state(self, goal_state):
"""Specify the goal-state that the motor-actuator will attempt to satisfy."""
self.driving_goal_state = goal_state
[docs] def derive_habitat_goal_state(self):
"""Derive the Habitat-compatible goal state.
Take the current driving goal state (in CMP format), and derive the
corresponding Habitat compatible goal-state to pass through the Embodied
Environment Interface.
Returns:
target_loc: Target location.
target_quat: Target quaternion.
"""
if self.driving_goal_state is not None:
target_loc = self.driving_goal_state.location
target_agent_vec = self.driving_goal_state.morphological_features[
"pose_vectors"
][0]
yaw_angle = math.atan2(-target_agent_vec[0], -target_agent_vec[2])
pitch_angle = math.asin(target_agent_vec[1])
# Should rotate by pitch degrees around x, and by yaw degrees around y (and
# no change about z, which would correspond to roll)
scipy_combined_orientation = rot.from_euler(
"xyz",
[pitch_angle, yaw_angle, 0],
degrees=False,
)
target_quat = scipy_to_numpy_quat(scipy_combined_orientation.as_quat())
# Reset driving goal state and await further inputs
self.set_driving_goal_state(None)
return target_loc, target_quat
return None, None
[docs]class InformedPolicy(BasePolicy, JumpToGoalStateMixin):
"""Policy that takes observation as input.
Extension of BasePolicy that allows for taking the observation into account for
action selection. Uses processed_observations.get_on_object() to decide whether to
reverse the last action when the patch is off the object.
Attributes:
guiding_sensors: List of sensors that are used to calculate the percentage
on object. When using multiple sensors or a visualization sensor we may
want to ignore some when determining whether we need to move back.
"""
[docs] def __init__(
self,
use_goal_state_driven_actions=False,
**kwargs,
) -> None:
"""Initialize policy.
Args:
use_goal_state_driven_actions: Whether to enable the motor system to make
use of the JumpToGoalStateMixin, which attempts to "jump" (i.e.
teleport) the agent to a specified goal state.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
self.use_goal_state_driven_actions = use_goal_state_driven_actions
if self.use_goal_state_driven_actions:
JumpToGoalStateMixin.__init__(self)
# Observations after passing through sensor modules.
# Are updated in Monty step method.
self._processed_observations = None
self._undo_action: Action | None = None
@property
def processed_observations(self) -> State | None:
return self._processed_observations
@processed_observations.setter
def processed_observations(self, percept: State | None) -> None:
self._processed_observations = percept
[docs] def pre_episode(self) -> None:
self._processed_observations = None
self._undo_action = None
if self.use_goal_state_driven_actions:
JumpToGoalStateMixin.pre_episode(self)
return super().pre_episode()
def __call__(
self,
ctx: RuntimeContext,
observations: Observations, # noqa: ARG002
state: MotorSystemState | None = None, # noqa: ARG002
) -> MotorPolicyResult:
"""Return a motor policy result containing the next actions to take.
This requires self.processed_observations to be updated at every step
in the Monty class. self.processed_observations contains the features
extracted by the sensor module for the guiding sensor (patch).
Args:
ctx: The runtime context.
observations: The observations from the environment.
state: The current state of the motor system.
Defaults to None.
Returns:
A MotorPolicyResult that contains the actions to take.
"""
if self.processed_observations.get_on_object():
action = self.action_sampler.sample(self.agent_id, ctx.rng)
self._undo_action = self.fixme_undo_last_action(action)
return MotorPolicyResult([action])
if self._undo_action is not None:
action = self._undo_action
self._undo_action = self.fixme_undo_last_action(action)
return MotorPolicyResult([action])
return MotorPolicyResult([])
[docs] def fixme_undo_last_action(
self,
last_action: Action,
) -> LookDown | LookUp | TurnLeft | TurnRight | MoveForward | MoveTangentially:
"""Returns an action that undoes last action for supported actions.
This implementation duplicates the functionality and the implicit
assumption in the code and configurations that InformedPolicy is working
with one of the following actions:
- LookUp
- LookDown
- TurnLeft
- TurnRight
- MoveForward
- MoveTangentially
For other actions, raise ValueError explicitly.
Raises:
TypeError: If the last action is not supported
TODO These instance checks are undesirable and should be removed in the future.
I am using these for now to express the implicit assumptions in the code.
An Action.undo of some sort would be a better solution, however it is not
yet clear to me what to do for actions that do not support undo.
"""
if isinstance(last_action, LookDown):
return LookDown(
agent_id=last_action.agent_id,
rotation_degrees=-last_action.rotation_degrees,
constraint_degrees=last_action.constraint_degrees,
)
if isinstance(last_action, LookUp):
return LookUp(
agent_id=last_action.agent_id,
rotation_degrees=-last_action.rotation_degrees,
constraint_degrees=last_action.constraint_degrees,
)
if isinstance(last_action, TurnLeft):
return TurnLeft(
agent_id=last_action.agent_id,
rotation_degrees=-last_action.rotation_degrees,
)
if isinstance(last_action, TurnRight):
return TurnRight(
agent_id=last_action.agent_id,
rotation_degrees=-last_action.rotation_degrees,
)
if isinstance(last_action, MoveForward):
return MoveForward(
agent_id=last_action.agent_id,
distance=-last_action.distance,
)
if isinstance(last_action, MoveTangentially):
return MoveTangentially(
agent_id=last_action.agent_id,
distance=-last_action.distance,
# Same direction, negative distance
direction=last_action.direction,
)
raise TypeError(f"Invalid action: {last_action}")
[docs]class NaiveScanPolicy(InformedPolicy):
"""Policy that just moves left and right along the object."""
[docs] def __init__(
self,
fixed_amount,
**kwargs,
):
"""Initialize policy."""
# Mostly use version of InformedPolicy to get the good view in the beginning
# TODO: maybe separate this out.
super().__init__(**kwargs)
# Specify this specific action space, otherwise it doesn't work
self._naive_scan_actions = [
LookUp(agent_id=self.agent_id, rotation_degrees=fixed_amount),
TurnLeft(agent_id=self.agent_id, rotation_degrees=fixed_amount),
LookDown(agent_id=self.agent_id, rotation_degrees=fixed_amount),
TurnRight(agent_id=self.agent_id, rotation_degrees=fixed_amount),
]
self.fixed_amount = fixed_amount
self.steps_per_action = 1
self.current_action_id = 0
self.step_on_action = 0
def __call__(
self,
ctx: RuntimeContext, # noqa: ARG002
observations: Observations, # noqa: ARG002
state: MotorSystemState | None = None, # noqa: ARG002
) -> MotorPolicyResult:
"""Return a motor policy result containing the next actions in the spiral.
The MotorSystemState is ignored.
Args:
ctx: The runtime context.
observations: The observations from the environment.
state: The current state of the motor system.
Defaults to None. Unused.
Returns:
A MotorPolicyResult that contains the actions to take.
Raises:
StopIteration: If the spiral has completed.
"""
if self.steps_per_action * self.fixed_amount >= 90:
# Raise "StopIteration" to notify the environment interface we need to stop
# the experiment.
# TODO: We used to use iterators, which would automatically handle
# StopIteration. This is no longer the case, so we need to find a
# better way to handle policy declaring episode termination.
# It feels like an experimental concern inside a runtime policy.
raise StopIteration
self.check_cycle_action()
self.step_on_action += 1
return MotorPolicyResult([self._naive_scan_actions[self.current_action_id]])
[docs] def pre_episode(self) -> None:
super().pre_episode()
self.steps_per_action = 1
self.current_action_id = 0
self.step_on_action = 0
[docs] def check_cycle_action(self):
"""Makes sure we move in a spiral.
This method switches the current action if steps_per_action was reached.
Additionally it increments steps_per_action after the second and forth action
to make sure paths don't overlap.
_ _ _ _
| _ _ |
| |_ | |
|_ _ _| |
corresponds to
1x left, 1x up,
2x right, 2x down,
3x left, 3x up,
4x right, 4x down,
...
"""
if self.step_on_action == self.steps_per_action:
self.current_action_id += 1
self.step_on_action = 0
if self.current_action_id == 2:
self.steps_per_action += 1
elif self.current_action_id == 4:
self.current_action_id = 0
self.steps_per_action += 1
[docs]class SurfacePolicy(InformedPolicy):
"""Policy class for a surface-agent.
i.e. an agent that moves to and follows the surface of an object. Includes
functions for moving along an object based on its surface normal.
"""
[docs] def __init__(
self,
alpha,
desired_object_distance=0.025,
**kwargs,
) -> None:
"""Initialize policy.
Args:
desired_object_distance: Distance to maintain from the surface; used for
touch_object and move-forward. Defaults to 0.025.
alpha: to what degree should the move_tangentially direction be the
same as the last step or totally random? 0~same as before, 1~random walk
**kwargs: ?
"""
super().__init__(**kwargs)
self.tangential_angle = 0
self.alpha = alpha
self.desired_object_distance = desired_object_distance
self.attempting_to_find_object: bool = False
self.last_surface_policy_action: Action | None = None
[docs] def pre_episode(self) -> None:
self.tangential_angle = 0
self.touch_search_amount = 0 # Track how many rotations the agent has made
# along the horizontal plane searching for an object; when this reaches 360,
# try searching along the vertical plane, or for 720, performing a random
# search
self.last_surface_policy_action = None
return super().pre_episode()
def _touch_object(
self,
ctx: RuntimeContext,
observations: Observations,
view_sensor_id: SensorID,
state: MotorSystemState,
) -> MoveForward | OrientHorizontal | OrientVertical:
"""The surface agent's policy for moving onto an object for sensing it.
Like the distant agent's get_good_view, this is called at the beginning
of every episode, and after a "jump" has been initialized by a
model-based policy. In addition, it can be called when the surface agent
cannot sense the object, e.g. because it has fallen off its surface.
Currently uses the observations returned from the viewfinder via the
environment interface, and not the extracted features from the sensor module.
TODO M refactor this so that all sensory processing is done in the sensor
module.
If we aren't on the object, try first systematically orienting left around
a point, then orienting down, and finally random orientations along the surface
of a fixed sphere.
Args:
ctx: The runtime context.
observations: The observations from the environment.
view_sensor_id: The ID of the viewfinder sensor.
state: The current state of the motor system.
Returns:
Action to take.
"""
# If the viewfinder sees the object within range, then move to it
depth_at_center = PositioningProcedure.depth_at_center(
agent_id=self.agent_id,
observations=observations,
sensor_id=view_sensor_id,
)
if depth_at_center < 1.0:
distance = (
depth_at_center
- self.desired_object_distance
- state[self.agent_id].sensors[view_sensor_id].position[2]
)
logger.debug(f"Move to touch visible object, forward by {distance}")
self.attempting_to_find_object = False
return MoveForward(agent_id=self.agent_id, distance=distance)
logger.debug("Surface policy searching for object...")
self.attempting_to_find_object = True
# Helpful to conceptualize these movements by considering a unit circle,
# scaled by the radius distance_from_center
# This image may be useful for intuition:
# https://en.wikipedia.org/wiki/Exsecant#/media/File:Circle-trig6.svg
# Rotate about a circle centered in fron of the agent's current
# position; TODO decide how to deal with "coliding with"/entering an
# object; ?could just rotate about a point on which the sensor is present
# Currently as a heuristic we rotate about a point 4x the desired distance;
# as this is typically 2.5cm, this would mean a circle with radius 10cm
distance_from_center = self.desired_object_distance * 4
rotation_degrees = 30 # 30 degrees at a time; note this amount is also used
# to eventually re-orient ourselves back to the original central point;
# TODO may want to consider trying smaller step-sizes; will
# be less efficient, but may be important for smaller/distant objects that
# otherwise get missed
if self.touch_search_amount >= 720:
# Perform a random upward or downward movement along the surface of a
# sphere, with its centre fixed 10 cm in front of the agent
logger.debug("Trying random search for object")
if ctx.rng.uniform() < 0.5:
orientation = "vertical"
logger.debug("Orienting vertically")
else:
orientation = "horizontal"
logger.debug("Orienting horizontally")
rotation_degrees = ctx.rng.uniform(-180, 180)
logger.debug(f"Random orientation amount is : {rotation_degrees}")
elif self.touch_search_amount >= 360 and self.touch_search_amount < 720:
logger.debug("Trying vertical search for object")
orientation = "vertical"
else:
logger.debug("Trying horizontal search for object")
orientation = "horizontal"
# Move tangentally to the point we're facing, resulting in the agent's
# orienation about the central point changing as specified by rotation_deg,
# but now where the agent is no longer on the edge of the circle
move_lat_amount = np.tan(np.radians(rotation_degrees)) * distance_from_center
# The lateral movement will be down relative to the agent's facing position
# in the case of orient vertical, and left in the case of orient horizontal
# The below calculates the exsecant, which provides the necessary
# movement to bring the agent back to the same radius around the central
# point
move_forward_amount = distance_from_center * (
(1 - np.cos(np.radians(rotation_degrees)))
/ np.cos(np.radians(rotation_degrees))
)
self.touch_search_amount += rotation_degrees # Accumulate total rotations
# for touch-search
return (
OrientVertical(
agent_id=self.agent_id,
rotation_degrees=rotation_degrees,
down_distance=move_lat_amount,
forward_distance=move_forward_amount,
)
if orientation == "vertical"
else OrientHorizontal(
agent_id=self.agent_id,
rotation_degrees=rotation_degrees,
left_distance=move_lat_amount,
forward_distance=move_forward_amount,
)
)
def __call__(
self,
ctx: RuntimeContext,
observations: Observations,
state: MotorSystemState | None = None,
) -> MotorPolicyResult:
"""Return a motor policy result containing the next actions to take.
This requires self.processed_observations to be updated at every step
in the Monty class. self.processed_observations contains the features
extracted by the sensor module for the guiding sensor (patch).
Args:
ctx: The runtime context.
observations: The observations from the environment.
state: The current state of the motor system.
Defaults to None.
Returns:
A MotorPolicyResult that contains the actions to take.
"""
# Check if we have poor visualization of the object
if (
self.processed_observations.get_feature_by_name("object_coverage") < 0.1
or self.attempting_to_find_object
):
logger.debug(
"Object coverage of only "
+ str(
self.processed_observations.get_feature_by_name("object_coverage")
)
)
logger.debug(f"Attempting to find object: {self.attempting_to_find_object}")
logger.debug("Initiating attempts to touch object")
assert state is not None
action = self._touch_object(
ctx,
observations,
# TODO: Eliminate this hardcoded sensor ID
view_sensor_id=SensorID("view_finder"),
state=state,
)
return MotorPolicyResult(actions=[action], motor_only_step=True)
# Reset touch_object search state so that the next time we fall off the object,
# we will try to find the object using its full repertoire of actions.
self.touch_search_amount = 0
if self.last_surface_policy_action is None:
logger.debug(
"Object coverage good at initialization: "
+ str(
self.processed_observations.get_feature_by_name("object_coverage")
)
)
# In this case, we are on the first action, but the object view is already
# good; therefore initialize the cycle of actions as if we had just
# moved forward (e.g. to get a good view)
self.last_surface_policy_action = self.action_sampler.sample_move_forward(
self.agent_id, ctx.rng
)
next_action = self.get_next_action(ctx, state)
# Out of the four actions in the
# MoveForward->OrientHorizontal->OrientVertical->MoveTangentially "subroutine"
# of self.get_next_action(...), we only want to send data to the learning module
# after taking the OrientVertical action. The other three actions in the cycle
# are motor-only to keep the surface agent on the object.
motor_only_step = (
next_action is not None and next_action.name != OrientVertical.action_name()
)
self.last_surface_policy_action = next_action
actions: list[Action] = [] if next_action is None else [next_action]
return MotorPolicyResult(actions, motor_only_step=motor_only_step)
def _orient_horizontal(self, state: MotorSystemState) -> OrientHorizontal:
"""Orient the agent horizontally.
Args:
state: The current state of the motor system.
Returns:
OrientHorizontal action.
"""
rotation_degrees = self.orienting_angle_from_normal(
orienting="horizontal", state=state
)
left_distance, forward_distance = self.horizontal_distances(rotation_degrees)
return OrientHorizontal(
agent_id=self.agent_id,
rotation_degrees=rotation_degrees,
left_distance=left_distance,
forward_distance=forward_distance,
)
def _orient_vertical(self, state: MotorSystemState) -> OrientVertical:
"""Orient the agent vertically.
Args:
state: The current state of the motor system.
Returns:
OrientVertical action.
"""
rotation_degrees = self.orienting_angle_from_normal(
orienting="vertical", state=state
)
down_distance, forward_distance = self.vertical_distances(rotation_degrees)
return OrientVertical(
agent_id=self.agent_id,
rotation_degrees=rotation_degrees,
down_distance=down_distance,
forward_distance=forward_distance,
)
def _move_tangentially(
self, ctx: RuntimeContext, state: MotorSystemState
) -> MoveTangentially:
"""Move tangentially along the object surface.
Args:
ctx: The runtime context.
state: The current state of the motor system.
Returns:
MoveTangentially action.
"""
action = self.action_sampler.sample_move_tangentially(self.agent_id, ctx.rng)
# be careful if you're falling off the object!
if self.processed_observations.get_feature_by_name("object_coverage") < 0.2:
# Scale the step size by how small the object coverage is
# Reduces situations where e.g. change in sensor resolution causes agent
# to fall off the object
action.distance = action.distance / (
4 / self.processed_observations.get_feature_by_name("object_coverage")
)
logger.debug(f"Very close to edge so only moving by {action.distance}")
elif self.processed_observations.get_feature_by_name("object_coverage") < 0.75:
action.distance = action.distance / 4
logger.debug(f"Near edge so only moving by {action.distance}")
action.direction = self.tangential_direction(ctx, state)
return action
def _move_forward(self) -> MoveForward:
"""Move forward to touch the object at the right distance.
Returns:
MoveForward action.
"""
return MoveForward(
agent_id=self.agent_id,
distance=(
self.processed_observations.get_feature_by_name("min_depth")
- self.desired_object_distance
),
)
[docs] def get_next_action(
self, ctx: RuntimeContext, state: MotorSystemState
) -> OrientHorizontal | OrientVertical | MoveTangentially | MoveForward | None:
"""Retrieve next action from a cycle of four actions.
First move forward to touch the object at the right distance
Then orient toward the normal along direction 1
Then orient toward the normal along direction 2
Then move tangentially along the object surface
Then start over
Args:
ctx: The runtime context.
state: The current state of the motor system.
Returns:
Next action in the cycle.
"""
last_action = self.last_surface_policy_action
if isinstance(last_action, MoveForward):
return self._orient_horizontal(state)
if isinstance(last_action, OrientHorizontal):
return self._orient_vertical(state)
if isinstance(last_action, OrientVertical):
return self._move_tangentially(ctx, state)
if isinstance(last_action, MoveTangentially):
# orient around object if it's not centered in view
if not self.processed_observations.get_on_object():
return self._orient_horizontal(state)
# move to the desired_object_distance if it is in view
return self._move_forward()
return None
[docs] def tangential_direction(
self, ctx: RuntimeContext, state: MotorSystemState
) -> VectorXYZ:
"""Set the direction of the action to be a direction 0 - 2pi.
- start at 0 (go up) in the reference frame of the agent; i.e. based on
the standard initialization of an agent, this will be up from the floor.
To implement this convention, the theta is offset by 90 degrees when
finding our x and y translations, i.e. such that theta of 0 results in
moving up by 1 (y), and right by 0 (x), rather than vice-versa
- random action -pi - +pi is given by (rand() - 0.5) * 2pi
- These are combined and weighted by the alpha parameter
Args:
ctx: The runtime context.
state: The current state of the motor system.
Returns:
Direction of the action
"""
new_target_direction = (ctx.rng.rand() - 0.5) * 2 * np.pi
self.tangential_angle = (
self.tangential_angle * (1 - self.alpha) + new_target_direction * self.alpha
)
direction = qt.rotate_vectors(
state[self.agent_id].rotation,
[
np.cos(self.tangential_angle - np.pi / 2),
np.sin(self.tangential_angle + np.pi / 2),
0,
],
# NB the np.pi/2 offset implements the convention that theta=0 should
# result in moving up (i.e. along y), rather than right (i.e. along x),
# while maintaing the standard association of cos(theta) with the
# x-coordinate and sin(theta) with the y-coordinate of space
)
return tuple(direction)
[docs] def horizontal_distances(self, rotation_degrees: float) -> tuple[float, float]:
"""Compute the horizontal and forward distances to move to.
Compensate for a given rotation of a certain angle.
Args:
rotation_degrees: The angle to rotate by
Returns:
move_left_distance: The left distance to move
move_forward_distance: The forward distance to move
"""
rotation_radians = np.radians(rotation_degrees)
depth = self.processed_observations.non_morphological_features["mean_depth"]
move_left_distance = np.tan(rotation_radians) * depth
move_forward_distance = (
depth * (1 - np.cos(rotation_radians)) / np.cos(rotation_radians)
)
return move_left_distance, move_forward_distance
[docs] def vertical_distances(self, rotation_degrees: float) -> tuple[float, float]:
"""Compute the down and forward distances to move to.
Compensate for a given rotation of a certain angle.
Args:
rotation_degrees: The angle to rotate by
Returns:
move_down_distance: The down distance to move
move_forward_distance: The forward distance to move
"""
rotation_radians = np.radians(rotation_degrees)
depth = self.processed_observations.non_morphological_features["mean_depth"]
move_down_distance = np.tan(rotation_radians) * depth
move_forward_distance = (
depth * (1 - np.cos(rotation_radians)) / np.cos(rotation_radians)
)
return move_down_distance, move_forward_distance
[docs] def get_inverse_agent_rot(self, state: MotorSystemState):
"""Get the inverse rotation of the agent's current orientation.
Used to transform poses of e.g. surface normals or principle curvature from
global coordinates into the coordinate frame of the agent.
To intuit why we apply the inverse, imagine an e.g. surface normal with the same
pose as the agent; in the agent's reference frame, this should have the
identity pose, which will be acquired by transforming the original pose by the
inverse
Args:
state: The current state of the motor system.
Returns:
Inverse quaternion rotation.
"""
# Note that quaternion format is [w, x, y, z]
[w, x, y, z] = qt.as_float_array(state[self.agent_id].rotation)
# Note that scipy.spatial.transform.Rotation (v1.10.0) format is [x, y, z, w]
[x, y, z, w] = rot.from_quat([x, y, z, w]).inv().as_quat()
return qt.quaternion(w, x, y, z)
[docs] def orienting_angle_from_normal(
self, orienting: str, state: MotorSystemState
) -> float:
"""Compute turn angle to face the object.
Based on the surface normal, compute the angle that the agent needs
to turn in order to be oriented directly toward the object
Args:
orienting: `"horizontal" or "vertical"`
state: The current state of the motor system.
Returns:
degrees that the agent needs to turn
"""
original_surface_normal = self.processed_observations.get_surface_normal()
inverse_quaternion_rotation = self.get_inverse_agent_rot(state)
rotated_surface_normal = qt.rotate_vectors(
inverse_quaternion_rotation, original_surface_normal
)
x, y, z = rotated_surface_normal
if orienting == "horizontal":
return -np.degrees(np.arctan(x / z)) if z != 0 else -np.sign(x) * 90.0
if orienting == "vertical":
return -np.degrees(np.arctan(y / z)) if z != 0 else -np.sign(y) * 90.0
[docs]class SurfacePolicyCurvatureInformed(SurfacePolicy):
"""Policy class for a more intelligent surface-agent.
Includes additional functions for moving along an object based on the direction
of principle curvature (PC).
A general summary of the policy is that the agent will start following PC directions
as soon as these are well defined. This will initially be the minimal curvature; it
will follow these for as long as they are defined, and until reaching a certain
number of steps (max_pc_bias_steps), before then changing to follow maximal
curvature. This process continues to alternate, as long as the PC directions are
well defined.
If PC are not meaningfully defined (which is often the case), then the agent uses
standard momentum to take a step in a direction similar to the previous step
(weighted by the alpha parameter); it will do this for a minimum number of steps
(min_general_steps) before it will consider using PC information again.
If the agent is taking a step that will bring it to a previously visited location
according to its estimates, then it will attempt to correct for this and choose
another direction; after finding a new heading, the agent performs a minimum number
of steps (min_heading_steps) before it will check again for a conflicting heading.
The main other event that can occur is that PCs are defined, but these are
predominantly in the z-direction (relative to the agent); in that case, the
PC-defined heading is ignored on that step, and a standard, momentum-based step is
taken; such z-defined PC directions tend to occur on e.g. the rim of cups and
are presumably due to issues with the agents re-orientation steps vs. noisiness
of the PCs defined by the surface
TODO update this method to accept more general notions of directions-of-variance
(rather than strictly principal curvature), such that it can be applied in more
abstract spaces
"""
[docs] def __init__(
self,
alpha,
pc_alpha,
max_pc_bias_steps,
min_general_steps,
min_heading_steps,
**kwargs,
):
"""Initialize policy.
Args:
alpha: to what degree should the move_tangentially direction be the
same as the last step or totally random? 0~same as before, 1~random
walk; used when not following principal curvature (PC)
pc_alpha: the degree to which the moving average of the PC direction
should be based on the history vs. the most recent step
max_pc_bias_steps: the number of steps to take following a particular PC
direction (i.e. maximum or minimal curvature) before switching to the
other type of PC direction
min_general_steps: the number of normal momentum steps that must be taken
after the agent has stopped following principal curvature, and before it
will follow PC again; if this is too low, then the agent has the habit
of moving quite noisily as it keeps attempting to follow PC, but if it
is too high, then PC directions are not used to their fullest
min_heading_steps: when not following PC directions, the minimum number
of steps to take before we check that we're not heading to a previously
visited location; again, this should be sufficiently high that we don't
bounce around noisily, but also low enough that we avoid revisiting
locations
**kwargs: Additional keyword arguments.
"""
super().__init__(alpha, **kwargs)
self.pc_alpha = pc_alpha
# == Threshold variables that determine policy behaviour
self.max_pc_bias_steps = max_pc_bias_steps
self.min_general_steps = min_general_steps
self.min_heading_steps = min_heading_steps
self.tangent_locs = []
self.tangent_norms = []
[docs] def pre_episode(self) -> None:
super().pre_episode()
# == Variables for representing heading ==
# We represent it both in angular and vector form as under different settings,
# one or the other will be leveraged
self.tangential_angle = None
self.tangential_vec = None
# == Boolean variables determining action decisions ==
self.min_dir_pref = True # Initialize direction preference for following the
# minimal principal curvature direction
# Token to indicate whether we have been following PC-guided
# trajectories; useful for handling how we act when PC's are no longer
# defined
self.using_pc_guide = False
# == Counter variables determining action decisions ==
self.ignoring_pc_counter = self.min_general_steps # How many steps we have
# taken without PC-guidance; for the *first* time we encounter an informative
# PC, we should ideally always consider it, therefore initialize to the minimum
# required for beginning to follow PC
self.following_heading_counter = 0 # Used to ensure that for normal tangential
# steps, we follow a particular heading for a while before trying to select
# a new one that avoids previous locations
self.following_pc_counter = 0 # How long we've been following PC for; used to
# keep track of when we should shift from following minimal to maximal curvature
self.continuous_pc_steps = 0 # How many continuous steps in a row we have
# taken for the PC (i.e. without entering a region with un-defined PCs); this
# counter does *not* influence whether we follow minimal or maximal curvature;
# instead, it helps us determine whether we have just got new PC information,
# and therefore enables us the possibility of flipping the PC direction so as
# to avoid previous locations
# == Variables for estimating moving average of principal curvature direction ==
self.prev_angle = None # Used to accumulate previous directions
# along a particular principle curvature, for e.g. moving average
# == Variables to help us avoid revisiting previously observed locations ==
self.tangent_locs = [] # Receive visited locations from sensor module,
# specifically those associated with prev. tangential movements; helps us avoid
# revisiting old locations; note we thus ignore locations associated with
# the surface-agent-policy's re-orientation movements
self.tangent_norms = [] # As for tangent_locs; helpful for distinguishing
# locations as being on different surfaces
# == Logging variables ==
# Store detailed information about actions taken; useful for both visualization,
# and potentially informing e.g. an LM to intervene where certain policy
# decisions are failing
if not hasattr(self, "action_details"):
# TODO M clean up where we log action information for motor system
# Ideally as much as as possible should be with buffer following refactor
self.action_details = dict(
pc_heading=[], # "min", "max", or "no"; indicates the type of curvature
# the agent is following
avoidance_heading=[], # True or False, whether the agent is taking a
# new heading to avoid previously visited points
z_defined_pc=[], # None, and otherwise the principle curvature and
# surface normal directions when the PC direction is predominantly in
# z-axis of the agent (i.e. pointing towards/away from the agent rather
# than being orthogonal)
)
else:
self.action_details.update(
pc_heading=[],
avoidance_heading=[],
z_defined_pc=[],
)
@property
def processed_observations(self) -> State | None:
return self._processed_observations
@processed_observations.setter
def processed_observations(self, percept: State | None) -> None:
self._processed_observations = percept
if percept is None:
return
if self.last_surface_policy_action is None:
return
if self.last_surface_policy_action.name == "orient_vertical":
# Only append locations associated with performing a tangential
# action, rather than some form of corrective movement; these
# movements are performed immediately after "orient_vertical"
self.tangent_locs.append(
percept.location,
)
if "pose_vectors" in percept.morphological_features:
self.tangent_norms.append(
percept.morphological_features["pose_vectors"][0]
)
else:
self.tangent_norms.append(None)
[docs] def update_action_details(self):
"""Store informaton for later logging.
This stores information that details elements of the policy or observations
relevant to policy decisions.
E.g. if model-free policy has been unable to find a path that avoids
revisiting old locations, an LM might use this information to inform a
particular action (TODO not yet implemented, and NOTE that any modelling
should ultimately be located in the learning module(s), not in motor
systems)
"""
if self.using_pc_guide:
if self.min_dir_pref:
self.action_details["pc_heading"].append("min")
else:
self.action_details["pc_heading"].append("max")
else:
self.action_details["pc_heading"].append("no")
self.action_details["avoidance_heading"].append(self.setting_new_heading)
if self.pc_is_z_defined:
# Note for logging we save the orientations in the global reference frame,
# however whether the PC is z-defined is relative to the agent and its
# orientation
# TODO: This value doesn't seem to be used anywhere
self.action_details["z_defined_pc"].append(
[
self.processed_observations.get_surface_normal(),
self.processed_observations.get_curvature_directions(),
]
)
else:
self.action_details["z_defined_pc"].append(None)
[docs] def tangential_direction(
self,
ctx: RuntimeContext,
state: MotorSystemState,
) -> VectorXYZ:
"""Set the direction of action to be a direction 0 - 2pi.
This controls the move_tangential action
- start at 0 (go up in the reference frame of the agent, i.e. based on
where it is facing), with the actual orientation determined via either
principal curvature, or a random step weighted by momentum
Tangential movements are the primary means of progressively exploring
an object's surface
Args:
ctx: The runtime context.
state: The current state of the motor system.
Returns:
Direction of the action
"""
# Reset booleans tracking z-axis PC directions and new headings
self.pc_is_z_defined = False
self.setting_new_heading = False
logger.debug("Input-driven tangential movement")
if (self.processed_observations.get_feature_by_name("pose_fully_defined")) and (
self.ignoring_pc_counter >= self.min_general_steps
): # Principal curvatures are defined, and counter for a min number of
# general steps is satisfied
tang_movement = self.perform_pc_guided_step(ctx, state)
# Note this may fail if the PC guidance directs us back towards
# a point we have previously experienced, in which case we revert
# to a standard tangential movement (as below)
# Use standard momentum (alpha weighting) if we've not found an area of
# reliable principle curvature
else:
# Check whether we've just left a series of PC-defined trajectories
# (i.e. entering a region of undefined PCs)
if self.using_pc_guide:
logger.debug("First movement after exiting PC-guided sequence")
self.reset_pc_buffers()
else:
# Set PC-guidance flag; note the other elements of reset_pc_buffers
# should/need not be reset
self.using_pc_guide = False
self.ignoring_pc_counter += 1
tang_movement = self.perform_standard_tang_step(ctx, state)
# Save detailed information about tangential steps
self.update_action_details()
return tang_movement
[docs] def perform_pc_guided_step(
self, ctx: RuntimeContext, state: MotorSystemState
) -> VectorXYZ:
"""Inform steps to take using defined directions of principal curvature.
Use the defined directions of principal curvature to inform (ideally a
series) of steps along the appropriate direction.
Args:
ctx: The runtime context.
state: The current state of the motor system.
Returns:
Direction of the action
"""
logger.debug("Attempting step with PC guidance")
self.using_pc_guide = True
self.check_for_preference_change()
pc_for_use = self.determine_pc_for_use() # Get the index for the PC we will
# follow
pc_dirs = self.processed_observations.get_curvature_directions()
# Select the PC directions for use
selected_pc_dir = pc_dirs[pc_for_use]
# Rotate the tangential vector to be in the coordinate frame of the sensory
# agent (rather than the global reference frame of the environment)
inverse_quaternion_rotation = self.get_inverse_agent_rot(state)
rotated_form = qt.rotate_vectors(inverse_quaternion_rotation, selected_pc_dir)
# Before updating the representation and removing z-axis direction, check
# for movements defined in the z-axis
if int(np.argmax(np.abs(rotated_form))) == 2:
# TODO decide if in cases where the PC is defined in the z-direction
# relative to the agent, it might actually make sense to still follow it,
# i.e. as it should representa a movement tangential to the surface, and
# suggests we're not oriented with the surface normal as expected; NB this
# would need to be tested with the action-space of the surface-agent
self.pc_is_z_defined = True
logger.debug("Warning: PC is predominantly defined in z-direction")
logger.debug("Skipping PC-guided move; using standard tangential movement")
# Revert to a standard tangential step if we get to this stage
# and the PC is predominantly defined in the z-direction; note that this
# step will be weighted by the standard momentum, integrating the previous
# movement, as we want to move in a consistent heading
# Note that we *don't* re-set the PC buffers, because with any luck,
# the PC axes will be better defined on the next step; it was also found in
# practice that resetting the PC buffers (i.e. quitting entirely to
# the PC-guided actions) occasionally resulted in longer, more noisy
# inference
return self.perform_standard_tang_step(ctx, state)
self.update_tangential_reps(vec_form=rotated_form)
# Before attempting conflict avoidance, check if the PC direction itself
# appears to have been arbitrarily flipped
self.check_for_flipped_pc()
# Check if new heading is necessary
self.avoid_revisiting_locations(ctx, state=state)
# If we are abandoning following PC directions, return the heading that was
# found in the avoid_revisiting_locations search
if self.setting_new_heading:
# We've just found ourselves with a bad heading while following
# principle curvatures, so break out of this; also reset the following
# heading iter for standard tangential steps to make sure we move
# sufficiently far from this area
self.reset_pc_buffers()
self.following_heading_counter = 0
return tuple(
qt.rotate_vectors(state[self.agent_id].rotation, self.tangential_vec)
)
# Otherwise our heading is good; we continue and use our original heading (or
# it's negative flip) to inform the PC heading
logger.debug("We have a good PC heading")
# Use a moving average of previous principal curvature directions to result in a
# smoother path through areas where this shifts slightly
if self.prev_angle is None:
self.prev_angle = self.tangential_angle
else:
# Update tangential_vec using the moving average of the principle curvature
self.pc_moving_average()
self.following_pc_counter += 1
self.continuous_pc_steps += 1
return tuple(
qt.rotate_vectors(state[self.agent_id].rotation, self.tangential_vec)
)
[docs] def perform_standard_tang_step(
self, ctx: RuntimeContext, state: MotorSystemState
) -> VectorXYZ:
"""Perform a standard tangential step across the object.
This is in contrast to, for example, being guided by principal curvatures.
Note this is still more "intelligent" than the tangential step of the baseline
surface-agent policy, because it also attempts to avoid revisiting old locations
Args:
ctx: The runtime context.
state: The current state of the motor system.
Returns:
Direction of the action
"""
logger.debug("Standard tangential movement")
# Select new movement, equivalent to alpha-weighted steps in the standard
# informed surface-agent policy
new_target_direction = (ctx.rng.rand() - 0.5) * 2 * np.pi
if self.tangential_angle is not None:
# Use alternative momentum calculation (vs. original SurfacePolicy
# implementation)that does not suffer from discontinuous jump if new and
# old theta are approximately -pi and +pi
# NB the original surface-agent policy has not been updated as that method
# appears to work as a "functional bug", where with an e.g. alpha=0.1, we
# will *rarely* take a step in an unexpected direction, helping reduce the
# chance of doing a continuous loop over an object with no exploration
diff_amount = theta_change(self.tangential_angle, new_target_direction)
blended_angle = self.tangential_angle + self.alpha * diff_amount
blended_angle = enforce_pi_bounds(blended_angle)
self.update_tangential_reps(angle_form=blended_angle)
else:
# On first movements, just use the initial direction
self.update_tangential_reps(angle_form=new_target_direction)
if self.following_heading_counter >= self.min_heading_steps:
# Every now and then, check our heading is not in violation of prev. points,
# and otherwise update it; thus, if
# we need to avoid a certain direction, we will ignore momentum on this
# particular iteration, continuing it on the next step
self.avoid_revisiting_locations(ctx, state=state)
# Note the value for self.tangential_vec and self.tangential_angle is
# updated by avoid_revisiting_locations (if necessary)
if self.setting_new_heading:
logger.debug("Ignoring momentum to avoid prev. locations")
self.following_heading_counter = (
0 # Reset this counter, so that we continue on the new heading
)
elif not self.setting_new_heading:
# Either due to the original heading being fine, or due to timing out
# in the search, we continue with our original heading
pass
self.following_heading_counter += 1
return tuple(
qt.rotate_vectors(
state[self.agent_id].rotation,
self.tangential_vec,
)
)
[docs] def update_tangential_reps(self, vec_form=None, angle_form=None):
"""Update the angle and vector representation of a tangential heading.
Angle and vector representations are stored as self.tangential_angle and
self.tangential_vec, respectively.
Ensures the two representations are always consistent. Further ensures that,
because these movements are tangential, it will be defined in the plane,
relative to the agent (i.e. movement along x and y only), and any inadvertant
movement along the z-axis relative to the agent will be eliminated. User should
supply *either* the vector form or the (Euler) angle form in radians that will
define the new representations.
"""
logger.debug("Updating tangential representations")
assert (vec_form is None) != (
angle_form is None # Logical xor
), "Please provide one format for updating the tangential representation"
if vec_form is not None:
self.tangential_angle = projected_angle_from_vec(vec_form)
self.tangential_vec = projected_vec_from_angle(self.tangential_angle)
elif angle_form is not None:
self.tangential_vec = projected_vec_from_angle(angle_form)
self.tangential_angle = projected_angle_from_vec(self.tangential_vec)
logger.debug(f"Angular form {self.tangential_angle}; vector form:")
logger.debug(self.tangential_vec)
[docs] def reset_pc_buffers(self):
"""Reset counters and other variables.
We've just left a series of PC-defined trajectories (i.e. entered a region
of undefined PCs), or had to select a new heading in order to avoid revisiting
old locations. As such, appropriately reset counters and other variables.
Note we do not reset tangential_angle or the vector, such that this information
can still be used by e.g. momentum on the next step to keep us going generally
forward
"""
self.ignoring_pc_counter = 0 # Reset counter as we've just started
# ignoring the PC after a period of movements
self.using_pc_guide = False # We're not using PC directions
self.continuous_pc_steps = 0 # Note we don't reset self.following_pc_counter
# because we want to keep our bias of PC heading for when we re-enter
# a region where PC is defined again
# Reset variables used for determining moving average estimate of the PC
# direction
self.prev_angle = None
[docs] def check_for_preference_change(self):
"""Flip the preference for the min or max PC after a certain number of steps.
This way, we can more quickly explore different "parts" of an object, rather
than just persistently following e.g. the rim of a cup. By default, there is
always an initial bias for the smallest principal curvature.
TODO can eventually combine with a hypothesis-testing policy that encourages
exploration of unvisited parts of the object, rather than relying on this
simple counter-heuristic
"""
if self.following_pc_counter == self.max_pc_bias_steps:
logger.debug("Changing preference for pc-type.")
logger.debug(f"Previous pref. for min-directions: {self.min_dir_pref}")
self.min_dir_pref = not (self.min_dir_pref)
self.following_pc_counter = 0 # Reset to allow multiple steps in
# new chosen direction
self.continuous_pc_steps = 0 # Reset so that we can flip the new PC
# direction if necessary (i.e. if there is a risk of revisiting old
# locations)
# Reset moving average of principle curvature directions, as this is going
# to be orthogonal to previous estimates
self.prev_angle = None
logger.debug(f"Updated preference: {self.min_dir_pref}")
[docs] def determine_pc_for_use(self):
"""Determine the principal curvature to use for our heading.
Use magnitude (ignoring negatives), as well as the current direction
preference.
Returns:
Principal curvature to use.
"""
absolute_pcs = np.abs(
self.processed_observations.get_feature_by_name("principal_curvatures")
)
if self.min_dir_pref: # Follow minimal curvature direction
return np.argmin(absolute_pcs)
return np.argmax(absolute_pcs)
[docs] def avoid_revisiting_locations(
self,
ctx: RuntimeContext,
state: MotorSystemState,
conflict_divisor=3,
max_steps=100,
):
"""Avoid revisiting locations.
Check if the new proposed location direction is already pointing to somewhere
we've visited before; if not, we can use the initially proposed movement.
If there is a conflict, we select a new heading that avoids this; this is
achieved by iteratively searching for a heading that does not conflict with any
previously visited locations.
Args:
ctx: The runtime context.
conflict_divisor: The amount pi is divided by to determine that a current
heading will be too close to a previously visited location; this is an
initial value that will be dynamically adjusted. Defaults to 3.
max_steps: Maximum iterations of the search to perform to try to find a
non-conflicting heading. Defaults to 100.
state: The current state of the motor system.
Note that while the policy might have "unrealistic" access to information about
it's location in the environment, this could easily be replaced by relative
locations based on the first sensation
Finally, note that in many situations, revisiting locations can be a good thing
(e.g. re-anchoring given noisy path-integration), so we may want to activate
/inactivate this as necessary (TODO)
TODO separate out avoid_revisiting_locations as its own mixin so that it can
be used more broadly
"""
self.conflict_divisor = conflict_divisor
self.max_steps = max_steps
# Backup the original tangential vector
vec_copy = copy.copy(self.tangential_vec)
if len(self.tangent_locs) > 0: # Only relevant if prev. locations visited
inverse_quaternion_rotation = self.get_inverse_agent_rot(state)
current_loc = self.tangent_locs[-1]
logger.debug("Checking we don't head for prev. locations")
logger.debug(f"Current location: {current_loc}")
# Previous locations relative to current one (i.e. centered)
adjusted_prev_locs = np.array(self.tangent_locs) - current_loc
# Further adjust the relative locations based on the reference frame of the
# moving SM-agent; we are going to look for conflicts by comparing these
# locations to the headings (also in the reference frame of the agent) that
# we might take
# TODO could vectorize this
rotated_locs = qt.rotate_vectors(
inverse_quaternion_rotation, adjusted_prev_locs
)
# Until we have not found a direction that we can guarentee is
# in a new heading, continue to attempt new directions
searching_for_heading = True
self.first_attempt = True # On the first attempt to fix the direction
# heading, simply flip it's orientation; this will often resolve issues
# with principal curvature spontaneously swapping direction
self.setting_new_heading = False # If the heading isn't already good, or
# *(-1) to fix an arbitrarily flipped PC direction is not sufficient,
# then set this to True and abandon following PCs, reverting to a standard
# momentum operation
self.search_counter = 0 # Eventually, abort the search if we exceed a
# threshold; additionally, as this increases, we periodically decrease the
# angle we are concerned with for conflicts, making it more likely that
# we at least avoid being nearby to previous points
while searching_for_heading:
conflicts = False # Assume False until evidence otherwise
logger.debug(f"Number of locations to check : {len(rotated_locs) - 1}")
# Check all prev. locations for conflict; TODO could vectorize this
for ii in range(
max(0, len(rotated_locs) - 50), len(rotated_locs) - 1
): # Ignore current point, and points before the last 50 sensations
# Check if there is actually a conflict
on_conflict = self.conflict_check(rotated_locs, ii)
if on_conflict:
conflicts = True # Keep track of the fact that we've found
# a conflicting direction that we need to deal with
logger.debug("Angle is low, so re-orienting")
logger.debug(f"Inducing location from sensation {ii}")
logger.debug(f"Currently on sensation {len(rotated_locs)}")
self.attempt_conflict_resolution(ctx, vec_copy)
if not conflicts: # We have a valid heading
searching_for_heading = False
logger.debug("The final direction from conflict checking:")
logger.debug(self.tangential_vec)
self.update_tangential_reps(vec_form=self.tangential_vec)
elif self.search_counter >= self.max_steps:
logger.debug("Abandoning search, no directions without conflict")
logger.debug("Therefore using original headng")
self.update_tangential_reps(vec_form=vec_copy)
return
else:
# Search continues, but occasionally narrow the region in which we
# look for conflicts, making it easier to select a path "out"
self.search_counter += 1
if self.search_counter % 20 == 0:
self.conflict_divisor += 1
logger.debug(
f"Updating conflict divisor: {self.conflict_divisor}"
)
[docs] def conflict_check(self, rotated_locs, ii):
"""Check for conflict in the current heading.
Target location needs to be similar *and* we need to have a similar surface
normal to discount the current proposed heading; if surface normals are
significantly different, then we are likely on a different surface, in which
case passing by nearby previous points is no longer problematic.
Note that when executing failed hypothesis-testing jumps, we can have multiple
instances of the same location in our history; this will result in
get_angle_beefed_up returning infinity, i.e. we don't worry about avoiding our
current location
Returns:
True if there is a conflict, False otherwise.
"""
assert np.linalg.norm(rotated_locs[-1]) == 0, "Should be centered to 0"
return (
(
get_angle_beefed_up(self.tangential_vec, rotated_locs[ii])
<= np.pi / self.conflict_divisor
)
and (
# Heuristic of np.pi / 4 for surface normals is that sides of
# a cube will therefore be different surfaces, but not necessarily
# points along e.g. a bowl's curved underside
get_angle_beefed_up(self.tangent_norms[-1], self.tangent_norms[ii])
<= np.pi / 4
)
and (
# Only consider points that are relatively close by (2.5 cm) in
# Euclidean space; note we are comparing to an origin of 0,0,0
# Re. choice of 2.5 cm, tangential steps are generally of size 0.004
# (4mm), so this seems like a reasonable estimate
np.linalg.norm(rotated_locs[ii], ord=2) <= 0.025
)
)
[docs] def attempt_conflict_resolution(self, ctx: RuntimeContext, vec_copy) -> None:
"""Try to define direction vector that avoids revisiting previous locations.
Args:
ctx: The runtime context.
vec_copy: The vector to copy.
"""
if self.first_attempt and self.using_pc_guide and self.continuous_pc_steps == 0:
# On the first PC-guided step in a series, try to choose a direction
# along the PC that is away from previous locations
# Note first-attempt refers to trying to fix a bad PC
# heading with a flip first; otherwise we have to abort
# following PC
# Note that when not concerned with principal curvatures,
# we do not try this initial flip
self.update_tangential_reps(vec_form=np.array(vec_copy) * -1)
logger.debug("Flipping the PC direction to avoid prev. locations.")
self.first_attempt = False
else:
logger.debug("Trying a new random heading")
self.setting_new_heading = True # PC, if it was being followed, will
# be abandoned
# Progressively rotate in the plane, depending on how
# long we've been searching, and starting with a small
# possible rotation
rot_limits = np.clip(0.1 + self.search_counter / self.max_steps, 0, 1)
rot_val = ctx.rng.uniform(-rot_limits, rot_limits) * np.pi
plane_rot = rot.from_euler("xyz", (0.0, 0.0, rot_val), degrees=False)
# Note we apply the rotation to the original vector
self.update_tangential_reps(vec_form=plane_rot.apply(vec_copy))
[docs] def check_for_flipped_pc(self):
"""Check for arbitrarily flipped PC direction.
Do a quick check to see if the previous PC heading has been arbitrarily
flipped, in which case, flip it back. With any luck, this will allow us to
automatically pass avoid_revisiting_locations and thereby continue using PCs
where relevant.
"""
if (
self.prev_angle is not None
and abs(theta_change(self.prev_angle, self.tangential_angle + np.pi))
<= np.pi / 4
):
logger.debug(
"Evidence that PC direction arbitrarily flipped, flipping back"
)
self.update_tangential_reps(vec_form=np.array(self.tangential_vec) * -1)
[docs] def pc_moving_average(self):
"""Calculate a moving average of the principal curvature direction.
The moving average should be consistent even on curved surfaces as the
directions will be in the reference frame of the agent (which has rotated itself
to align with the surface normal)
"""
logger.debug("Applying momentum to PC direction estimate")
# Calculate a weighted average in such a way that ensures we can handle
# the circular nature of angles (i.e. that +pi and -pi are adjacent)
diff_amount = theta_change(self.prev_angle, self.tangential_angle)
blended_angle = self.tangential_angle + self.pc_alpha * diff_amount
blended_angle = enforce_pi_bounds(blended_angle)
self.update_tangential_reps(angle_form=blended_angle)
# Store action taken for estimating future movements
self.prev_angle = self.tangential_angle
def theta_change(a, b):
"""Determine the min, signed change in orientation between two angles in radians.
Returns:
Signed change in orientation.
"""
min_sep = a - b
# If an angle has "looped" round, correct for this
return enforce_pi_bounds(min_sep)
def enforce_pi_bounds(theta):
"""Enforce an orientation to be bounded between - pi and + pi.
Returns:
Angle in radians.
"""
while theta > np.pi:
theta -= np.pi * 2
while theta < -np.pi:
theta += np.pi * 2
return theta
def projected_angle_from_vec(vector):
"""Determine the rotation about a z-axis (pointing "up").
Note that because of the convention for moving along the y when theta=0, the
typical order of arguments is swapped from calculating the standard
atan2 (https://en.wikipedia.org/wiki/Atan2).
Returns:
Angle in radians.
"""
test_thetas = [-np.pi, -np.pi / 2, -0.1, 0, 0.1, np.pi / 2, np.pi]
# Double check that we correctly recover the theta over possible range
for test_theta in test_thetas:
test_vec = projected_vec_from_angle(test_theta)
recovered = math.atan2(test_vec[0], test_vec[1])
assert abs(test_theta - recovered) < 0.01, (
f"Issue with angle recovery for : {test_theta} vs. {recovered}"
)
return math.atan2(vector[0], vector[1])
def projected_vec_from_angle(angle):
"""Determine the vector in the plane defined by an orientation around the z-axis.
Takes angle in radians, bound between -pi : pi.
This continues the convention started in the original surface-agent policy that a
theta of 0 should correspond to a movement of 1 in the y direction, and 0 in the x
direction, rather than vice-versa; this convention is implemented by the np.pi/2
offsets.
Returns:
Vector in the plane defined by an orientation around the z-axis.
"""
assert abs(angle) < np.pi + 0.01, f"-pi : +pi bound angles only : {angle}"
return [np.cos(angle - np.pi / 2), np.sin(angle + np.pi / 2), 0]