Source code for tbp.monty.frameworks.environments.real_robots
# Copyright 2025 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import gym
from tbp.monty.frameworks.environments.embodied_environment import (
ActionSpace,
EmbodiedEnvironment,
)
__all__ = ["RealRobotsEnvironment"]
class GymActionSpace(ActionSpace):
"""Wraps gym space as Monty action space."""
def __init__(self, action_space):
assert isinstance(action_space, gym.Space)
self.action_space = action_space
def __contains__(self, item):
return self.action_space.contains(item)
def sample(self):
return self.action_space.sample()
[docs]class RealRobotsEnvironment(EmbodiedEnvironment):
"""Real Robots environment compatible with Monty.
Note:
`real_robots` dependencies are not installed by default.
Install `real_robot` extra dependencies if you want to use `real_robot`
environment
"""
def __init__(self, id): # noqa: A002
super().__init__()
self._env = gym.make(id=id)
self._env.reset()
@property
def action_space(self):
return GymActionSpace(self._env.action_space)
[docs] def add_object(self, *args, **kwargs):
# TODO The NotImplementedError highlights an issue with the EmbodiedEnvironment
# interface and how the class hierarchy is defined and used.
raise NotImplementedError(
"RealRobotsEnvironment does not support adding objects"
)
[docs] def step(self, action):
observation, reward, done, info = self._env.step(action)
return dict(**observation, reward=reward, done=done, info=info)
[docs] def remove_all_objects(self):
# TODO The NotImplementedError highlights an issue with the EmbodiedEnvironment
# interface and how the class hierarchy is defined and used.
raise NotImplementedError(
"RealRobotsEnvironment does not support removing all objects"
)
[docs] def reset(self):
observation = self._env.reset()
return dict(**observation, reward=0, done=False, info=None)
[docs] def close(self):
if self._env is not None:
self._env.close()
self._env = None