Source code for tbp.monty.frameworks.models.motor_system

# Copyright 2025-2026 Thousand Brains Project
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

from __future__ import annotations

from typing import Any

from tbp.monty.context import RuntimeContext
from tbp.monty.frameworks.actions.actions import Action
from tbp.monty.frameworks.agents import AgentID
from tbp.monty.frameworks.models.abstract_monty_classes import Observations
from tbp.monty.frameworks.models.motor_policies import MotorPolicy
from tbp.monty.frameworks.models.motor_system_state import MotorSystemState

__all__ = ["MotorSystem"]


[docs]class MotorSystem: """The basic motor system implementation."""
[docs] def __init__( self, policy: MotorPolicy, state: MotorSystemState | None = None ) -> None: """Initialize the motor system with a motor policy. Args: policy: The motor policy to use. state: The initial state of the motor system. Defaults to None. """ self._policy = policy self._state = state # For each step, we store the actions produced by the policy and the current # motor system state as a (actions, state) tuple. self._action_sequence: list[tuple[list[Action], dict[AgentID, Any] | None]] = [] # TODO: When the motor system is encapsulated within Monty, then motor_only_step # attribute should be moved to Monty itself instead. self.motor_only_step = False
@property def action_sequence(self) -> list[tuple[list[Action], dict[AgentID, Any] | None]]: return self._action_sequence
[docs] def pre_episode(self) -> None: """Pre episode hook.""" self._policy.pre_episode() self._action_sequence = []
def __call__(self, ctx: RuntimeContext, observations: Observations) -> list[Action]: """Defines the structure for __call__. Delegates to the motor policy. Args: ctx: The runtime context. observations: The observations from the environment. Returns: The action to take. """ policy_result = self._policy(ctx, observations, self._state) self.motor_only_step = policy_result.motor_only_step state_copy = self._state.convert_motor_state() if self._state else None self._action_sequence.append((policy_result.actions, state_copy)) return policy_result.actions