Source code for tbp.monty.frameworks.models.salience.motor_policy
# Copyright 2026 Thousand Brains Project
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
from __future__ import annotations
import logging
import numpy as np
from tbp.monty.cmp import Goal, Message
from tbp.monty.context import RuntimeContext
from tbp.monty.experiment.motor_system import ExperimentMotorSystem
from tbp.monty.frameworks.actions.actions import Action, LookUp, TurnLeft
from tbp.monty.frameworks.agents import AgentID
from tbp.monty.frameworks.models.abstract_monty_classes import Observations
from tbp.monty.frameworks.models.motor_policies import (
MotorPolicy,
MotorPolicyResult,
NoGoalProvided,
)
from tbp.monty.frameworks.models.motor_system_state import (
AgentState,
MotorSystemState,
SensorState,
)
from tbp.monty.frameworks.sensors import SensorID
from tbp.monty.geometry import Rotation
from tbp.monty.memento import Memento
logger = logging.getLogger(__name__)
[docs]class GoalCollocatedWithSensor(RuntimeError):
"""Raised when a goal is collocated with a sensor."""
pass
[docs]class LookAtGoal(MotorPolicy):
"""A policy that looks at a target.
This class assumes a system similar to a 2-DOF gimbal in which the "outer" part
can yaw left/right about the y-axis and the "inner" part can pitch up/down about
the x-axis. This setup is typical of our distant agent in which the agent
turns left/right and sensor mounted to it looks up/down.
Note that this code only uses TurnLeft and LookUp. Turning right or looking down
are performed using negative degrees with TurnLeft and LookUp, respectively.
"""
[docs] def __init__(
self,
agent_id: AgentID,
sensor_id: SensorID,
) -> None:
"""Initialize the look at policy.
Args:
agent_id: The agent ID
sensor_id: The sensor ID
"""
self._agent_id = agent_id
self._sensor_id = sensor_id
[docs] def reset(self, motor_system: ExperimentMotorSystem) -> None:
pass
[docs] def load_state_dict(self, memento: Memento) -> None:
self._agent_id = memento["agent_id"]
self._sensor_id = memento["sensor_id"]
[docs] def state_dict(self) -> Memento:
return {
"agent_id": self._agent_id,
"sensor_id": self._sensor_id,
}
def __call__(
self,
ctx: RuntimeContext,
observations: Observations, # noqa: ARG002
state: MotorSystemState,
percept: Message, # noqa: ARG002
goal: Goal | None,
) -> MotorPolicyResult:
"""Invoke motor policy to determine the next actions to take.
Args:
ctx: The runtime context.
observations: The observations from the environment.
state: The current state of the motor system.
percept: The percept from (as of this writing) the first sensor
module.
goal: The goal to look at (in world reference frame).
Returns:
The motor policy result.
Raises:
NoGoalProvided: If no goal is provided.
GoalCollocatedWithSensor: If the goal is collocated with the sensor.
"""
if goal is None:
if ctx.suppress_runtime_errors:
logger.warning("No goal provided")
return MotorPolicyResult([])
raise NoGoalProvided
# Collect necessary agent and sensor pose information.
agent_state: AgentState = state[self._agent_id]
agent_pos_rel_world = agent_state.position
agent_rot_rel_world = Rotation.from_quat(
[
agent_state.rotation.w,
agent_state.rotation.x,
agent_state.rotation.y,
agent_state.rotation.z,
]
)
sensor_state: SensorState = agent_state.sensors[self._sensor_id]
sensor_rot_rel_agent = Rotation.from_quat(
[
sensor_state.rotation.w,
sensor_state.rotation.x,
sensor_state.rotation.y,
sensor_state.rotation.z,
]
)
# Get the target location in world and agent coordinates.
target_rel_world = goal.location
target_rel_agent = agent_rot_rel_world.apply(
target_rel_world - agent_pos_rel_world,
inverse=True,
)
# Check that the goal is not collocated with the sensor.
sensor_pos_rel_agent = np.array(sensor_state.position)
target_rel_sensor = sensor_rot_rel_agent.apply(
target_rel_agent - sensor_pos_rel_agent,
inverse=True,
)
if np.isclose(np.linalg.norm(target_rel_sensor), 0.0):
if ctx.suppress_runtime_errors:
logger.warning("Goal is collocated with sensor")
return MotorPolicyResult([])
raise GoalCollocatedWithSensor
# Compute the target's azimuth, relative to the agent. This value is used to
# compute the yaw action to be performed by the agent.
agent_yaw = -np.arctan2(target_rel_agent[0], -target_rel_agent[2])
# Compute the target's elevation, relative to the agent. Then subtract the
# sensor's current pitch to get a pitch delta effective for the sensor. This
# value is used to compute the look up/down action which must be performed
# by the sensor mounted to the agent.
target_pitch_rel_agent = np.arctan2(
target_rel_agent[1], np.hypot(target_rel_agent[0], target_rel_agent[2])
)
sensor_pitch_rel_agent = sensor_rot_rel_agent.as_euler("xyz")[0]
sensor_pitch = target_pitch_rel_agent - sensor_pitch_rel_agent
# Create actions to return to the the motor system.
actions: list[Action] = [
TurnLeft(agent_id=self._agent_id, rotation_degrees=np.degrees(agent_yaw)),
LookUp(agent_id=self._agent_id, rotation_degrees=np.degrees(sensor_pitch)),
]
return MotorPolicyResult(actions)