Source code for tbp.monty.frameworks.environments.embodied_data
# Copyright 2025 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import copy
import logging
import math
from pprint import pformat
import numpy as np
import quaternion
from torch.utils.data import Dataset
from tbp.monty.frameworks.actions.action_samplers import UniformlyDistributedSampler
from tbp.monty.frameworks.actions.actions import (
Action,
LookUp,
MoveTangentially,
SetAgentPose,
SetSensorRotation,
)
from tbp.monty.frameworks.models.motor_policies import (
GetGoodView,
InformedPolicy,
SurfacePolicy,
)
from tbp.monty.frameworks.models.motor_system import MotorSystem
from tbp.monty.frameworks.models.motor_system_state import (
MotorSystemState,
ProprioceptiveState,
)
from .embodied_environment import EmbodiedEnvironment
__all__ = [
"EnvironmentDataset",
"EnvironmentDataLoader",
"EnvironmentDataLoaderPerObject",
"InformedEnvironmentDataLoader",
"OmniglotDataLoader",
"SaccadeOnImageDataLoader",
"SaccadeOnImageFromStreamDataLoader",
]
[docs]class EnvironmentDataset(Dataset):
"""Wraps an embodied environment with a :class:`torch.utils.data.Dataset`.
TODO: Change the name of this class to reflect the interactiveness. Monty doesn't
work with static datasets, it interacts with the environment.
Attributes:
env_init_func: Callable function used to create the embodied environment. This
function should return a class implementing :class:`.EmbodiedEnvironment`
env_init_args: Arguments to `env_init_func`
n_actions_per_epoch: Number of actions per epoch. Used to determine
the number of observations this dataset will return per epoch. It can be
viewed as the dataset size.
transform: Callable used to tranform the observations returned by the dataset
Note:
Main idea is to separate concerns:
- dataset owns the environment and creates it at initialization
- dataset just handles the :meth:`__getitem__` method
- dataset does not handle motor activity, it just accepts action from
policy and uses it to look up the next observation
"""
def __init__(self, env_init_func, env_init_args, rng, transform=None):
self.rng = rng
self.transform = transform
if self.transform is not None:
for t in self.transform:
if t.needs_rng:
t.rng = self.rng
env = env_init_func(**env_init_args)
assert isinstance(env, EmbodiedEnvironment)
self.env = env
@property
def action_space(self):
return self.env.action_space
[docs] def reset(self):
observation = self.env.reset()
state = self.env.get_state()
if self.transform is not None:
observation = self.apply_transform(self.transform, observation, state)
return observation, ProprioceptiveState(state) if state else None
[docs] def apply_transform(self, transform, observation, state):
if isinstance(transform, list):
for t in transform:
observation = t(observation, state)
else:
observation = transform(observation, state)
return observation
def __getitem__(self, action: Action):
observation = self.env.step(action)
state = self.env.get_state()
if self.transform is not None:
observation = self.apply_transform(self.transform, observation, state)
return observation, ProprioceptiveState(state) if state else None
def __len__(self):
return math.inf
[docs]class EnvironmentDataLoader:
"""Wraps the environment dataset with an iterator.
The observations are based on the actions returned by the `motor_system`.
The first value returned by this iterator are the observations of the
environment's initial state, subsequent observations are returned after the action
returned by `motor_system` is applied.
Attributes:
dataset: :class:`EnvironmentDataset`
motor_system: :class:`MotorSystem`
Note:
If the amount variable returned by motor_system is None, the amount used by
habitat will be the default for the actuator, e.g.
PanTiltZoomCamera.translation_step
Note:
This one on its own won't work.
"""
def __init__(self, dataset: EnvironmentDataset, motor_system: MotorSystem, rng):
assert isinstance(dataset, EnvironmentDataset)
if not isinstance(motor_system, MotorSystem):
f"motor_system must be an instance of MotorSystem, got {motor_system}"
self.dataset = dataset
self.motor_system = motor_system
self.rng = rng
self._observation, proprioceptive_state = self.dataset.reset()
self.motor_system._state = (
MotorSystemState(proprioceptive_state) if proprioceptive_state else None
)
self._action = None
self._amount = None
self._counter = 0
def __iter__(self):
# Reset the environment before iterating
self._observation, proprioceptive_state = self.dataset.reset()
self.motor_system._state = (
MotorSystemState(proprioceptive_state) if proprioceptive_state else None
)
self._action = None
self._amount = None
self._counter = 0
return self
def __next__(self):
if self._counter == 0:
# Return first observation after 'reset' before any action is applied
self._counter += 1
return self._observation
else:
action = self.motor_system()
self._action = action
self._observation, proprioceptive_state = self.dataset[action]
self.motor_system._state = (
MotorSystemState(proprioceptive_state) if proprioceptive_state else None
)
self._counter += 1
return self._observation
[docs]class EnvironmentDataLoaderPerObject(EnvironmentDataLoader):
"""Dataloader for testing on environment with one "primary target" object.
Dataloader for testing on environment where we load one "primary target" object
at a time; in addition, one can optionally load other "distractor" objects to the
environment
Has a list of primary target objects, swapping these objects in and out for episodes
without resetting the environment. The objects are initialized with parameters such
that we can vary their location, rotation, and scale.
After the primary target is added to the environment, other distractor objects,
sampled from the same object list, can be added.
"""
def __init__(self, object_names, object_init_sampler, *args, **kwargs):
"""Initialize dataloader.
Args:
object_names: list of objects if doing a simple experiment with primary
target objects only; dict for experiments with multiple objects,
corresponding to -->
targets_list : the list of primary target objects
source_object_list : the original object list from which the primary
target objects were sampled; used to sample distractor objects
num_distractors : the number of distractor objects to add to the
environment
object_init_sampler: Function that returns dict with position, rotation,
and scale of objects when re-initializing. To keep configs
serializable, default is set to :class:`DefaultObjectInitializer`.
*args: ?
**kwargs: ?
See Also:
tbp.monty.frameworks.make_dataset_configs
:class:`EnvironmentDataLoaderPerObjectTrainArgs`
Raises:
TypeError: If `object_names` is not a list or dictionary
"""
super(EnvironmentDataLoaderPerObject, self).__init__(*args, **kwargs)
if isinstance(object_names, list):
self.object_names = object_names
# Return an (ordered) list of unique items:
self.source_object_list = list(dict.fromkeys(object_names))
self.num_distractors = 0
elif isinstance(object_names, dict):
# TODO when we want more advanced multi-object experiments, update these
# arguments along with the Object Initializers so that we can easily
# specify a set of primary targets and distractors, i.e. random sampling
# of the distractor objects shouldn't happen here
self.object_names = object_names["targets_list"]
self.source_object_list = list(
dict.fromkeys(object_names["source_object_list"])
)
self.num_distractors = object_names["num_distractors"]
else:
raise TypeError("Object names should be a list or dictionary")
self.create_semantic_mapping()
self.object_init_sampler = object_init_sampler
self.object_init_sampler.rng = self.rng
self.object_params = self.object_init_sampler()
self.current_object = 0
self.n_objects = len(self.object_names)
self.episodes = 0
self.epochs = 0
self.primary_target = None
[docs] def post_episode(self):
super().post_episode()
self.object_init_sampler.post_episode()
self.object_params = self.object_init_sampler()
self.cycle_object()
self.episodes += 1
[docs] def post_epoch(self):
self.epochs += 1
self.object_init_sampler.post_epoch()
self.object_params = self.object_init_sampler()
[docs] def create_semantic_mapping(self):
"""Create a unique semantic ID (positive integer) for each object.
Used by Habitat for the semantic sensor.
In addition, create a dictionary mapping back and forth between these IDs and
the corresponding name of the object
"""
assert set(self.object_names).issubset(set(self.source_object_list)), (
"Semantic mapping requires primary targets sampled from source list"
)
starting_integer = 1 # Start at 1 so that we can distinguish on-object semantic
# IDs (>0) from being off object (semantic_id == 0 in Habitat by default)
self.semantic_id_to_label = {
i + starting_integer: label
for i, label in enumerate(self.source_object_list)
}
self.semantic_label_to_id = {
label: i + starting_integer
for i, label in enumerate(self.source_object_list)
}
[docs] def cycle_object(self):
"""Remove the previous object(s) from the scene and add a new primary target.
Also add any potential distractor objects.
"""
next_object = (self.current_object + 1) % self.n_objects
logging.info(
f"\n\nGoing from {self.current_object} to {next_object} of {self.n_objects}"
)
self.change_object_by_idx(next_object)
[docs] def change_object_by_idx(self, idx):
"""Update the primary target object in the scene based on the given index.
The given `idx` is the index of the object in the `self.object_names` list,
which should correspond to the index of the object in the `self.object_params`
list.
Also add any distractor objects if required.
Args:
idx: Index of the new object and ints parameters in object_params
"""
assert idx <= self.n_objects, "idx must be <= self.n_objects"
self.dataset.env.remove_all_objects()
# Specify config for the primary target object and then add it
init_params = self.object_params.copy()
init_params.pop("euler_rotation")
if "quat_rotation" in init_params.keys():
init_params.pop("quat_rotation")
init_params["semantic_id"] = self.semantic_label_to_id[self.object_names[idx]]
# TODO clean this up with its own specific call i.e. Law of Demeter
primary_target_obj = self.dataset.env.add_object(
name=self.object_names[idx], **init_params
)
if self.num_distractors > 0:
self.add_distractor_objects(
primary_target_obj,
init_params,
primary_target_name=self.object_names[idx],
)
self.current_object = idx
self.primary_target = {
"object": self.object_names[idx],
"semantic_id": self.semantic_label_to_id[self.object_names[idx]],
**self.object_params,
}
logging.info(f"New primary target: {pformat(self.primary_target)}")
[docs] def add_distractor_objects(
self, primary_target_obj, init_params, primary_target_name
):
"""Add arbitrarily many "distractor" objects to the environment.
Args:
primary_target_obj : the Habitat object which is the primary target in
the scene
init_params (dict): parameters used to initialize the object, e.g.
orientation; for now, these are identical to the primary target
except for the object ID
primary_target_name (str): name of the primary target object
"""
# Sample distractor objects from those that are not the primary target; this
# is so that, for now, we can evaluate how well the model stays on the primary
# target object until it is classified, with no ambiguity about what final
# object it is classifying
sampling_list = [
item for item in self.source_object_list if item != primary_target_name
]
for __ in range(self.num_distractors):
new_init_params = copy.deepcopy(init_params)
new_obj_label = self.rng.choice(sampling_list)
new_init_params["semantic_id"] = self.semantic_label_to_id[new_obj_label]
# TODO clean up the **unpacking used
self.dataset.env.add_object(
name=new_obj_label,
**new_init_params,
object_to_avoid=True,
primary_target_object=primary_target_obj,
)
[docs] def reset_agent(self):
logging.debug("resetting agent------")
self._observation, proprioceptive_state = self.dataset.reset()
motor_system_state = MotorSystemState(proprioceptive_state)
self._counter = 0
# Make sure to also reset action variables when resetting agent during
# pre-episode
self._action = None
self._amount = None
motor_system_state[self.motor_system._policy.agent_id]["motor_only_step"] = (
False
)
self.motor_system._state = motor_system_state
return self._observation
[docs]class InformedEnvironmentDataLoader(EnvironmentDataLoaderPerObject):
"""Dataloader that supports a policy which makes use of previous observation(s).
Extension of the EnvironmentDataLoader where the actions can be informed by the
observations. It passes the observation to the InformedPolicy class (which is an
extension of the BasePolicy). This policy can then make use of the observation
to decide on the next action.
Also has the following, additional functionality; TODO refactor/separate these
out as appropriate
i) this dataloader allows for early stopping by adding the set_done
method which can for example be called when the object is recognized.
ii) the motor_only_step can be set such that the sensory module can
later determine whether perceptual data should be sent to the learning module,
or just fed back to the motor policy.
iii) Handles different data-loader updates depending on whether the policy is
based on the surface-agent or touch-agent
iv) Supports hypothesis-testing "jump" policy
"""
def __init__(
self, use_get_good_view_positioning_procedure: bool = False, *args, **kwargs
):
super(InformedEnvironmentDataLoader, self).__init__(*args, **kwargs)
self._use_get_good_view_positioning_procedure = (
use_get_good_view_positioning_procedure
)
"""Feature flag to use the GetGoodView positioning procedure.
This is a temporary feature flag to allow for testing the GetGoodView
positioning procedure.
"""
def __iter__(self):
# Overwrite original because we don't want to reset agent at this stage
# (already done in pre-episode)
# TODO look into refactoring the parent __iter__ method so that we don't need
# to use this fix
return self
def __next__(self):
if self._counter == 0:
return self.first_step()
# Check if any LM's have output a goal-state (such as hypothesis-testing
# goal-state)
elif (
isinstance(self.motor_system._policy, InformedPolicy)
and self.motor_system._policy.use_goal_state_driven_actions
and self.motor_system._policy.driving_goal_state is not None
):
return self.execute_jump_attempt()
# NOTE: terminal conditions are now handled in experiment.run_episode loop
else:
self._action = self.motor_system()
# If entirely off object, use vision (i.e. view-finder)
# TODO refactor so that this check is done in the motor-policy, and we
# update the constraint separately/appropriately; i.e. the below
# code should be as general as possible
if (
isinstance(self.motor_system._policy, SurfacePolicy)
and self._action is None
):
self._action = self.motor_system._policy.touch_object(
self._observation,
view_sensor_id="view_finder",
state=self.motor_system._state,
)
self._observation, proprioceptive_state = self.dataset[self._action]
motor_system_state = MotorSystemState(proprioceptive_state)
# Check whether sensory information is just for feeding back to motor policy
# TODO refactor so that the motor policy itself is making this update
# when appropriate, not embodied_data
if (
isinstance(self.motor_system._policy, SurfacePolicy)
and self._action.name != "orient_vertical"
):
motor_system_state[self.motor_system._policy.agent_id][
"motor_only_step"
] = True
else:
motor_system_state[self.motor_system._policy.agent_id][
"motor_only_step"
] = False
self.motor_system._state = motor_system_state
self._counter += 1 # TODO clean up incrementing of counter
return self._observation
[docs] def pre_episode(self):
super().pre_episode()
if not self.dataset.env._agents[0].action_space_type == "surface_agent":
on_target_object = self.get_good_view_with_patch_refinement()
if self.num_distractors == 0:
# Only perform this check if we aren't doing multi-object experiments.
assert on_target_object, (
"Primary target must be visible at the start of the episode"
)
[docs] def first_step(self):
"""Carry out particular motor-system state updates required on the first step.
TODO ?can get rid of this by appropriately initializing motor_only_step
Returns:
The observation from the first step.
"""
# Return first observation after 'reset' before any action is applied
self._counter += 1
# Based on current code-base self._action will always be None when
# the counter is 0
assert self._action is None, "Setting of motor_only_step may need updating"
# For first step of surface-agent policy, always bypass LM processing
# For distant-agent policy, we still process the first sensation if it is
# on the object
self.motor_system._state[self.motor_system._policy.agent_id][
"motor_only_step"
] = isinstance(self.motor_system._policy, SurfacePolicy)
return self._observation
[docs] def get_good_view(
self,
sensor_id: str,
allow_translation: bool = True,
max_orientation_attempts: int = 1,
) -> bool:
"""Policy to get a good view of the object before an episode starts.
Used by the distant agent to find the initial view of an object at the
beginning of an episode with respect to a given sensor (the surface agent
makes use of the `touch_object` method instead). Also currently used
by the distant agent after a "jump" has been initialized by a model-based
policy.
First, the agent moves towards object until it fills a minimum of percentage
(given by `motor_system._policy.good_view_percentage`) of the sensor's field of
view or the closest point of the object is less than a given distance
(`motor_system._policy.desired_object_distance`) from the sensor. This makes
sure that big and small objects all fill similar amount of space in the sensor's
field of view. Otherwise small objects may be too small to perform saccades or
the sensor ends up inside of big objects. This step is performed by default
but can be skipped by setting `allow_translation=False`.
Second, the agent will then be oriented towards the object so that the
sensor's central pixel is on-object. In the case of multi-object experiments,
(i.e., when `num_distractors > 0`), there is an additional orientation step
performed prior to the translational movement step.
Args:
sensor_id: The name of the sensor used to inform movements.
allow_translation: Whether to allow movement toward the object via
the motor systems's `move_close_enough` method. If `False`, only
orientienting movements are performed. Default is `True`.
max_orientation_attempts: The maximum number of orientation attempts
allowed before giving up and returning `False` indicating that the
sensor is not on the target object.
Returns:
Whether the sensor is on the target object.
TODO M : move most of this to the motor systems, shouldn't be in embodied_data
class
"""
if self._use_get_good_view_positioning_procedure:
_configured_policy = self.motor_system._policy
self.motor_system._policy = GetGoodView(
agent_id=self.motor_system._policy.agent_id,
desired_object_distance=_configured_policy.desired_object_distance,
good_view_percentage=_configured_policy.good_view_percentage,
multiple_objects_present=self.num_distractors > 0,
sensor_id=sensor_id,
target_semantic_id=self.primary_target["semantic_id"],
allow_translation=allow_translation,
max_orientation_attempts=max_orientation_attempts,
# TODO: Remaining arguments are unused but required by BasePolicy.
# These will be removed when PositioningProcedure is split from
# BasePolicy
#
# Note that if we use rng=self.rng below, then the following test will
# fail:
# tests/unit/evidence_lm_test.py::EvidenceLMTest::test_two_lm_heterarchy_experiment # noqa: E501
# The test result seems to be coupled to the random seed and the
# specific sequence of rng calls (rng is called once on GetGoodView
# initialization).
rng=np.random.RandomState(),
action_sampler_args=dict(actions=[LookUp]),
action_sampler_class=UniformlyDistributedSampler,
switch_frequency=0.0,
)
result = self.motor_system._policy.positioning_call(
self._observation, self.motor_system._state
)
while not result.terminated and not result.truncated:
for action in result.actions:
self._observation, proprio_state = self.dataset[action]
self.motor_system._state = (
MotorSystemState(proprio_state) if proprio_state else None
)
result = self.motor_system._policy.positioning_call(
self._observation, self.motor_system._state
)
self.motor_system._policy = _configured_policy
return result.success
# TODO break up this method so that there is less code duplication
# Start by ensuring the center of the patch is covering the primary target
# object before we start moving forward; only done for multi-object experiments
multiple_objects_present = self.num_distractors > 0
if multiple_objects_present:
on_target_object = self.motor_system._policy.is_on_target_object(
self._observation,
sensor_id,
target_semantic_id=self.primary_target["semantic_id"],
multiple_objects_present=multiple_objects_present,
)
if not on_target_object:
actions = self.motor_system._policy.orient_to_object(
self._observation,
sensor_id,
target_semantic_id=self.primary_target["semantic_id"],
multiple_objects_present=multiple_objects_present,
state=self.motor_system._state,
)
for action in actions:
self._observation, proprio_state = self.dataset[action]
self.motor_system._state = (
MotorSystemState(proprio_state) if proprio_state else None
)
if allow_translation:
# Move closer to the object, if not already close enough
action, close_enough = self.motor_system._policy.move_close_enough(
self._observation,
sensor_id,
target_semantic_id=self.primary_target["semantic_id"],
multiple_objects_present=multiple_objects_present,
)
# Continue moving to a close distance to the object
while not close_enough:
logging.debug("moving closer!")
self._observation, proprio_state = self.dataset[action]
self.motor_system._state = (
MotorSystemState(proprio_state) if proprio_state else None
)
action, close_enough = self.motor_system._policy.move_close_enough(
self._observation,
sensor_id,
target_semantic_id=self.primary_target["semantic_id"],
multiple_objects_present=multiple_objects_present,
)
on_target_object = self.motor_system._policy.is_on_target_object(
self._observation,
sensor_id,
target_semantic_id=self.primary_target["semantic_id"],
multiple_objects_present=multiple_objects_present,
)
num_attempts = 0
while not on_target_object and num_attempts < max_orientation_attempts:
actions = self.motor_system._policy.orient_to_object(
self._observation,
sensor_id,
target_semantic_id=self.primary_target["semantic_id"],
multiple_objects_present=multiple_objects_present,
state=self.motor_system._state,
)
for action in actions:
self._observation, proprio_state = self.dataset[action]
self.motor_system._state = (
MotorSystemState(proprio_state) if proprio_state else None
)
on_target_object = self.motor_system._policy.is_on_target_object(
self._observation,
sensor_id,
target_semantic_id=self.primary_target["semantic_id"],
multiple_objects_present=multiple_objects_present,
)
num_attempts += 1
return on_target_object
[docs] def get_good_view_with_patch_refinement(self) -> bool:
"""Policy to get a good view of the object for the central patch.
Used by the distant agent to move and orient toward an object such that the
central patch is on-object. This is done by first moving and orienting the
agent toward the object using the view finder. Then orienting movements are
performed using the central patch (i.e., the sensor module with id
"patch" or "patch_0") to ensure that the patch's central pixel is on-object.
Up to 3 reorientation attempts are performed using the central patch.
Also currently used by the distant agent after a "jump" has been initialized
by a model-based policy.
Returns:
Whether the sensor is on the object.
"""
self.get_good_view("view_finder")
for patch_id in ("patch", "patch_0"):
if patch_id in self._observation["agent_id_0"].keys():
on_target_object = self.get_good_view(
patch_id,
allow_translation=False, # only orientation movements
max_orientation_attempts=3, # allow 3 reorientation attempts
)
break
return on_target_object
[docs] def execute_jump_attempt(self):
"""Attempt a hypothesis-testing "jump" onto a location of the object.
Delegates to motor policy directly to determine specific jump actions.
Returns:
The observation from the jump attempt.
"""
logging.debug(
"Attempting a 'jump' like movement to evaluate an object hypothesis"
)
# Store the current location and orientation of the agent
# If the hypothesis-guided jump is unsuccesful (e.g. to empty space,
# or inside an object, we return here)
pre_jump_state = self.motor_system._state[self.motor_system._policy.agent_id]
# Check that all sensors have identical rotations - this is because actions
# currently update them all together; if this changes, the code needs
# to be updated; TODO make this its own method
for ii, current_sensor in enumerate(pre_jump_state["sensors"].keys()):
if ii == 0:
first_sensor = current_sensor
assert np.all(
pre_jump_state["sensors"][current_sensor]["rotation"]
== pre_jump_state["sensors"][first_sensor]["rotation"]
), "Sensors are not identical in pose"
# TODO In general what would be best/cleanest way of routing information,
# e.g. perhaps the learning module should just pass a *displacement* (in
# internal coordinates, and a target point-normal)
# Could also consider making use of decide_location_for_movement (or
# decide_location_for_movement_matching)
(target_loc, target_np_quat) = (
self.motor_system._policy.derive_habitat_goal_state()
)
# Update observations and motor system-state based on new pose, accounting
# for resetting both the agent, as well as the poses of its coupled sensors;
# this is necessary for the distant agent, which pivots the camera around
# like a ball-and-socket joint; note the surface agent does not
# modify this from the the unit quaternion and [0, 0, 0] position
# anyways; further note this is globally applied to all sensors.
set_agent_pose = SetAgentPose(
agent_id=self.motor_system._policy.agent_id,
location=target_loc,
rotation_quat=target_np_quat,
)
set_sensor_rotation = SetSensorRotation(
agent_id=self.motor_system._policy.agent_id,
rotation_quat=quaternion.one,
)
_, _ = self.dataset[set_agent_pose]
self._observation, proprioceptive_state = self.dataset[set_sensor_rotation]
self.motor_system._state = (
MotorSystemState(proprioceptive_state) if proprioceptive_state else None
)
# Check depth-at-center to see if the object is in front of us
# As for methods such as touch_object, we use the view-finder
depth_at_center = self.motor_system._policy.get_depth_at_center(
self._observation,
view_sensor_id="view_finder",
initial_pose=False,
)
# If depth_at_center < 1.0, there is a visible element within 1 meter of the
# view-finder's central pixel)
if depth_at_center < 1.0:
self.handle_successful_jump()
else:
self.handle_failed_jump(pre_jump_state, first_sensor)
# Regardless of whether movement was successful, counts as a step,
# and we provide the observation to the next step of the motor policy
self._counter += 1
self.motor_system._state[self.motor_system._policy.agent_id][
"motor_only_step"
] = True
# TODO refactor so that the whole of the hypothesis driven jumps
# makes cleaner use of self.motor_system()
# Call post_action (normally taken care of __call__ within
# self.motor_system._policy())
self.motor_system._policy.post_action(
self.motor_system._policy.action, self.motor_system._state
)
return self._observation
[docs] def handle_successful_jump(self):
"""Deal with the results of a successful hypothesis-testing jump.
A successful jump is "on-object", i.e. the object is perceived by the sensor.
"""
logging.debug(
"Object visible, maintaining new pose for hypothesis-testing action"
)
if isinstance(self.motor_system._policy, SurfacePolicy):
# For the surface-agent policy, update last action as if we have
# just moved tangentially
# Results in us seemlessly transitioning into the typical
# corrective movements (forward or orientation) of the surface-agent
# policy
self.motor_system._policy.action = MoveTangentially(
agent_id=self.motor_system._policy.agent_id,
distance=0.0,
direction=[0, 0, 0],
)
# TODO cleanup where this is performed, and make variable names more general
# TODO also only log this when we are doing detailed logging
# TODO M clean up these action details loggings; this may need to remain
# local to a "motor-system buffer" given that these are model-free
# actions that have nothing to do with the LMs
# Store logging information about jump success
self.motor_system._policy.action_details["pc_heading"].append("jump")
self.motor_system._policy.action_details["avoidance_heading"].append(False)
self.motor_system._policy.action_details["z_defined_pc"].append(None)
else:
self.get_good_view_with_patch_refinement()
[docs] def handle_failed_jump(self, pre_jump_state, first_sensor):
"""Deal with the results of a failed hypothesis-testing jump.
A failed jump is "off-object", i.e. the object is not perceived by the sensor.
"""
logging.debug("No object visible from hypothesis jump, or inside object!")
logging.debug("Returning to previous position")
set_agent_pose = SetAgentPose(
agent_id=self.motor_system._policy.agent_id,
location=pre_jump_state["position"],
rotation_quat=pre_jump_state["rotation"],
)
# All sensors are updated globally by actions, and are therefore
# identical
set_sensor_rotation = SetSensorRotation(
agent_id=self.motor_system._policy.agent_id,
rotation_quat=pre_jump_state["sensors"][first_sensor]["rotation"],
)
_, _ = self.dataset[set_agent_pose]
self._observation, proprioceptive_state = self.dataset[set_sensor_rotation]
assert np.all(
proprioceptive_state[self.motor_system._policy.agent_id]["position"]
== pre_jump_state["position"]
), "Failed to return agent to location"
assert np.all(
proprioceptive_state[self.motor_system._policy.agent_id]["rotation"]
== pre_jump_state["rotation"]
), "Failed to return agent to orientation"
for current_sensor in proprioceptive_state[self.motor_system._policy.agent_id][
"sensors"
].keys():
assert np.all(
proprioceptive_state[self.motor_system._policy.agent_id]["sensors"][
current_sensor
]["rotation"]
== pre_jump_state["sensors"][current_sensor]["rotation"]
), "Failed to return sensor to orientation"
self.motor_system._state = MotorSystemState(proprioceptive_state)
# TODO explore reverting to an attempt with touch_object here,
# only moving back to our starting location if this is unsuccessful
# after e.g. 16 glances around where we arrived; NB however that
# if we're inside the object, then we don't want to do this
[docs]class OmniglotDataLoader(EnvironmentDataLoaderPerObject):
"""Dataloader for Omniglot dataset."""
def __init__(
self,
alphabets,
characters,
versions,
dataset,
motor_system: MotorSystem,
*args,
**kwargs,
):
"""Initialize dataloader.
Args:
alphabets (List[str]): List of alphabets.
characters (List[str]): List of characters.
versions: List of versions.
dataset (EnvironmentDataset): The environment dataset.
motor_system (MotorSystem): The motor system.
*args: Additional arguments
**kwargs: Additional keyword arguments
Raises:
TypeError: If `motor_system` is not an instance of `MotorSystem`.
"""
assert isinstance(dataset, EnvironmentDataset)
if not isinstance(motor_system, MotorSystem):
raise TypeError(
f"motor_system must be an instance of MotorSystem, got {motor_system}"
)
self.dataset = dataset
self.motor_system = motor_system
self._observation, proprioceptive_state = self.dataset.reset()
self.motor_system._state = (
MotorSystemState(proprioceptive_state) if proprioceptive_state else None
)
self._action = None
self._amount = None
self._counter = 0
self.alphabets = alphabets
self.characters = characters
self.versions = versions
self.current_object = 0
self.n_objects = len(characters)
self.episodes = 0
self.epochs = 0
self.primary_target = None
self.object_names = [
str(self.dataset.env.alphabet_names[alphabets[i]])
+ "_"
+ str(self.characters[i])
for i in range(self.n_objects)
]
[docs] def post_episode(self):
self.motor_system.post_episode()
self.cycle_object()
self.episodes += 1
[docs] def cycle_object(self):
"""Switch to the next character image."""
next_object = (self.current_object + 1) % self.n_objects
logging.info(
f"\n\nGoing from {self.current_object} to {next_object} of {self.n_objects}"
)
self.change_object_by_idx(next_object)
[docs] def change_object_by_idx(self, idx):
"""Update the object in the scene given the idx of it in the object params.
Args:
idx: Index of the new object and ints parameters in object params
"""
assert idx <= self.n_objects, "idx must be <= self.n_objects"
self.dataset.env.switch_to_object(
self.alphabets[idx], self.characters[idx], self.versions[idx]
)
self.current_object = idx
self.primary_target = {
"object": self.object_names[idx],
"rotation": np.quaternion(0, 0, 0, 1),
"euler_rotation": np.array([0, 0, 0]),
"quat_rotation": [0, 0, 0, 1],
"position": np.array([0, 0, 0]),
"scale": [1.0, 1.0, 1.0],
}
[docs]class SaccadeOnImageDataLoader(EnvironmentDataLoaderPerObject):
"""Dataloader for moving over a 2D image with depth channel."""
def __init__(
self,
scenes,
versions,
dataset: EnvironmentDataset,
motor_system: MotorSystem,
*args,
**kwargs,
):
"""Initialize dataloader.
Args:
scenes: List of scenes
versions: List of versions
dataset (EnvironmentDataset): The environment dataset.
motor_system (MotorSystem): The motor system.
*args: Additional arguments
**kwargs: Additional keyword arguments
Raises:
TypeError: If `motor_system` is not an instance of `MotorSystem`.
"""
assert isinstance(dataset, EnvironmentDataset)
if not isinstance(motor_system, MotorSystem):
raise TypeError(
f"motor_system must be an instance of MotorSystem, got {motor_system}"
)
self.dataset = dataset
self.motor_system = motor_system
self._observation, proprioceptive_state = self.dataset.reset()
self.motor_system._state = (
MotorSystemState(proprioceptive_state) if proprioceptive_state else None
)
self._action = None
self._amount = None
self._counter = 0
self.scenes = scenes
self.versions = versions
self.object_names = self.dataset.env.scene_names
self.current_scene_version = 0
self.n_versions = len(versions)
self.episodes = 0
self.epochs = 0
self.primary_target = None
[docs] def post_episode(self):
self.motor_system.post_episode()
self.cycle_object()
self.episodes += 1
[docs] def cycle_object(self):
"""Switch to the next scene image."""
next_scene = (self.current_scene_version + 1) % self.n_versions
logging.info(
f"\n\nGoing from {self.current_scene_version} to {next_scene} of "
f"{self.n_versions}"
)
self.change_object_by_idx(next_scene)
[docs] def change_object_by_idx(self, idx):
"""Update the object in the scene given the idx of it in the object params.
Args:
idx: Index of the new object and ints parameters in object params
"""
assert idx <= self.n_versions, "idx must be <= self.n_versions"
logging.info(
f"changing to obj {idx} -> scene {self.scenes[idx]}, version "
f"{self.versions[idx]}"
)
self.dataset.env.switch_to_object(self.scenes[idx], self.versions[idx])
self.current_scene_version = idx
# TODO: Currently not differentiating between different poses/views
target_object = self.object_names[self.scenes[idx]]
# remove scene index from name
target_object_formatted = "_".join(target_object.split("_")[1:])
self.primary_target = {
"object": target_object_formatted,
"rotation": np.quaternion(0, 0, 0, 1),
"euler_rotation": np.array([0, 0, 0]),
"quat_rotation": [0, 0, 0, 1],
"position": np.array([0, 0, 0]),
"scale": [1.0, 1.0, 1.0],
}
def __iter__(self):
# Overwrite original because we don't want to reset agent at this stage
# (already done in pre-episode)
return self
[docs]class SaccadeOnImageFromStreamDataLoader(SaccadeOnImageDataLoader):
"""Dataloader for moving over a 2D image with depth channel."""
def __init__(
self,
dataset: EnvironmentDataset,
motor_system: MotorSystem,
*args,
**kwargs,
):
"""Initialize dataloader.
Args:
dataset (EnvironmentDataset): The environment dataset.
motor_system (MotorSystem): The motor system.
*args: Additional arguments
**kwargs: Additional keyword arguments
Raises:
TypeError: If `motor_system` is not an instance of `MotorSystem`.
"""
assert isinstance(dataset, EnvironmentDataset)
if not isinstance(motor_system, MotorSystem):
raise TypeError(
f"motor_system must be an instance of MotorSystem, got {motor_system}"
)
# TODO: call super init instead of duplication code & generally clean up more
self.dataset = dataset
self.motor_system = motor_system
self._observation, proprioceptive_state = self.dataset.reset()
self.motor_system._state = (
MotorSystemState(proprioceptive_state) if proprioceptive_state else None
)
self._action = None
self._amount = None
self._counter = 0
self.current_scene = 0
self.episodes = 0
self.epochs = 0
self.primary_target = None
[docs] def pre_epoch(self):
# TODO: Could give a start index as parameter
self.change_scene_by_idx(0)
[docs] def post_episode(self):
self.motor_system.post_episode()
self.cycle_scene()
self.episodes += 1
[docs] def cycle_scene(self):
"""Switch to the next scene image."""
next_scene = self.current_scene + 1
logging.info(f"\n\nGoing from {self.current_scene} to {next_scene}")
# TODO: Do we need a separate method for this ?
self.change_scene_by_idx(next_scene)
[docs] def change_scene_by_idx(self, idx):
"""Update the object in the scene given the idx of it in the object params.
Args:
idx: Index of the new object and ints parameters in object params
"""
logging.info(f"changing to scene {idx}")
self.dataset.env.switch_to_scene(idx)
self.current_scene = idx
# TODO: Currently not differentiating between different poses/views
# TODO: Are the targets important here ? How can we provide the proper
# targets corresponding to the current scene ?
self.primary_target = {
"object": "no_label",
"rotation": np.quaternion(0, 0, 0, 1),
"euler_rotation": np.array([0, 0, 0]),
"quat_rotation": [0, 0, 0, 1],
"position": np.array([0, 0, 0]),
"scale": [1.0, 1.0, 1.0],
}