Source code for tbp.monty.frameworks.environments.two_d_data

# Copyright 2025 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

import logging
import os
import time

import matplotlib.pyplot as plt
import numpy as np
import PIL
import quaternion as qt
from scipy.ndimage import gaussian_filter

from tbp.monty.frameworks.actions.actions import Action
from tbp.monty.frameworks.environment_utils.transforms import DepthTo3DLocations
from tbp.monty.frameworks.environments.embodied_environment import EmbodiedEnvironment
from tbp.monty.frameworks.environments.habitat import HabitatActionSpace

__all__ = [
    "OmniglotEnvironment",
    "SaccadeOnImageEnvironment",
    "SaccadeOnImageFromStreamEnvironment",
]

# Listing Numenta objects here since they were used in the iPad demo which uses the
# SaccadeOnImageEnvironment (or SaccadeOnImageFromStreamEnvironment). However, these
# objects can also be tested in simulation in habitat since we created 3D meshes of
# them. TODO: upload mesh dataset & link here.
NUMENTA_OBJECTS = [
    "numenta_mug",
    "terracotta_mug",
    "montys_brain",
    "montys_heart",
    "ramen_pack",
    "kashmiri_chilli",
    "chip_pack",
    "harissa_oil",
    "cocktail_bitters",
    "cocktail_bible",
    "thousand_brains_jp",
    "hot_sauce",
]


[docs]class OmniglotEnvironment(EmbodiedEnvironment): """Environment for Omniglot dataset.""" def __init__(self, patch_size=10, data_path=None): """Initialize environment. Args: patch_size: height and width of patch in pixels, defaults to 10 data_path: path to the omniglot dataset. If None its set to ~/tbp/data/omniglot/python/ """ self.patch_size = patch_size # Letters are always presented upright self.rotation = qt.from_rotation_vector([np.pi / 2, 0.0, 0.0]) self.step_num = 0 self.state = 0 self.data_path = data_path if self.data_path is None: self.data_path = os.path.expanduser("~/tbp/data/omniglot/python/") self.alphabet_names = [ a for a in os.listdir(self.data_path + "images_background") if a[0] != "." ] self.current_alphabet = self.alphabet_names[0] self.character_id = 1 self.character_version = 1 self.current_image, self.locations = self.load_new_character_data() # Just for compatibility. TODO: find cleaner way to do this. self._agents = [type("FakeAgent", (object,), {"action_space_type": "2d"})()] @property def action_space(self): return None
[docs] def step(self, _action, amount): """Retrieve the next observation. Since the omniglot dataset includes stroke information (the order in which the character was drawn as a list of x,y coordinates) we use that for movement. This means we start at the first x,y coordinate saved in the move path and then move in increments specified by amount through this list. Overall there are usually several hundred points (~200-400) but it varies between characters and versions. If the reach the end of a move path and the episode is not finished, we start from the beginning again. If len(move_path) % amount != 0 we will sample different points on the second pass. Args: _action: Not used at the moment since we just follow the draw path. amount: Amount of elements in move path to move at once. Returns: observation (dict). """ if amount < 1: amount = 1 self.step_num += int(amount) query_loc = self.locations[self.step_num % self.max_steps] patch = self.get_image_patch( self.current_image, query_loc, self.patch_size, ) depth = 1.2 - gaussian_filter(np.array(~patch, dtype=float), sigma=0.5) obs = { "agent_id_0": { "patch": { "depth": depth, "semantic": np.array(~patch, dtype=int), "rgba": np.stack( [depth, depth, depth], axis=2 ), # TODO: placeholder }, "view_finder": { "depth": self.current_image, "semantic": np.array(~patch, dtype=int), }, } } return obs
[docs] def get_state(self): loc = self.locations[self.step_num % self.max_steps] sensor_position = np.array([loc[0], loc[1], 0]) state = { "agent_id_0": { "sensors": { "patch" + ".depth": { "rotation": self.rotation, "position": sensor_position, }, "patch" + ".rgba": { "rotation": self.rotation, "position": sensor_position, }, }, "rotation": self.rotation, "position": np.array([0, 0, 0]), } } return state
[docs] def switch_to_object(self, alphabet_id, character_id, version_id): self.current_alphabet = self.alphabet_names[alphabet_id] self.character_id = character_id self.character_version = version_id self.current_image, self.locations = self.load_new_character_data()
[docs] def reset(self): self.step_num = 0 patch = self.get_image_patch( self.current_image, self.locations[self.step_num], self.patch_size ) depth = 1.2 - gaussian_filter(np.array(~patch, dtype=float), sigma=0.5) obs = { "agent_id_0": { "patch": { "depth": depth, "semantic": np.array(~patch, dtype=int), "rgba": np.stack( [depth, depth, depth], axis=2 ), # TODO: placeholder }, "view_finder": { "depth": self.current_image, "semantic": np.array(~patch, dtype=int), }, } } return obs
[docs] def load_new_character_data(self): img_char_dir = os.path.join( self.data_path, "images_background", self.current_alphabet, "character" + str(self.character_id).zfill(2), ) stroke_char_dir = os.path.join( self.data_path, "strokes_background", self.current_alphabet, "character" + str(self.character_id).zfill(2), ) char_img_names = os.listdir(img_char_dir)[0].split("_")[0] char_dir = "/" + char_img_names + "_" + str(self.character_version).zfill(2) current_image = load_img(img_char_dir + char_dir + ".png") move_path = load_motor(stroke_char_dir + char_dir + ".txt") logging.info(f"Finished loading new image from {img_char_dir + char_dir}") locations = self.motor_to_locations(move_path) maxloc = current_image.shape[0] - self.patch_size # Don't use locations at the border where patch doesn't fit anymore locs_in_range = np.where( (locations[:, 0] > self.patch_size) & (locations[:, 1] > self.patch_size) & (locations[:, 0] < maxloc) & (locations[:, 1] < maxloc) ) locations = locations[locs_in_range] self.max_steps = len(locations) - 1 return current_image, locations
[docs] def get_image_patch(self, img, loc, patch_size): loc = np.array(loc, dtype=int) startx = loc[1] - patch_size // 2 stopx = loc[1] + patch_size // 2 starty = loc[0] - patch_size // 2 stopy = loc[0] + patch_size // 2 patch = img[startx:stopx, starty:stopy] return patch
[docs] def motor_to_locations(self, motor): motor = [d[:, 0:2] for d in motor] motor = [space_motor_to_img(d) for d in motor] locations = np.zeros((2)) for stroke in motor: locations = np.vstack([locations, stroke]) return locations[1:]
[docs] def close(self): self._current_state = None
[docs]class SaccadeOnImageEnvironment(EmbodiedEnvironment): """Environment for moving over a 2D image with depth channel. Images should be stored in .png format for rgb and .data format for depth. """ def __init__(self, patch_size=64, data_path=None): """Initialize environment. Args: patch_size: height and width of patch in pixels, defaults to 64 data_path: path to the image dataset. If None its set to ~/tbp/data/worldimages/labeled_scenes/ """ self.patch_size = patch_size # Images are always presented upright so patch and agent rotation is always # the same. Since we don't use this, value doesn't matter much. self.rotation = qt.from_rotation_vector([np.pi / 2, 0.0, 0.0]) self.state = 0 self.data_path = data_path if self.data_path is None: self.data_path = os.path.expanduser( "~/tbp/data/worldimages/labeled_scenes/" ) self.scene_names = [a for a in os.listdir(self.data_path) if a[0] != "."] self.current_scene = self.scene_names[0] self.scene_version = 0 ( self.current_depth_image, self.current_rgb_image, self.current_loc, ) = self.load_new_scene_data() self.move_area = self.get_move_area() # Get 3D scene point cloud array from depth image ( self.current_scene_point_cloud, self.current_sf_scene_point_cloud, ) = self.get_3d_scene_point_cloud() # Just for compatibility. TODO: find cleaner way to do this. self._agents = [ type( "FakeAgent", (object,), {"action_space_type": "distant_agent_no_translation"}, )() ] # Instantiate once and reuse when checking action name in step() # TODO Use 2D-specific actions instead of overloading? Habitat actions self._valid_actions = ["look_up", "look_down", "turn_left", "turn_right"] @property def action_space(self): # TODO: move this to other action space definitions and clean up. return HabitatActionSpace( [ "look_up", "look_down", "turn_left", "turn_right", ] )
[docs] def step(self, action: Action): """Retrieve the next observation. Args: action: moving up, down, left or right from current location. amount: Amount of pixels to move at once. Returns: observation (dict). """ if action.name in self._valid_actions: amount = action.rotation_degrees else: amount = 0 if np.abs(amount) < 1: amount = 1 # Make sure amount is int since we are moving using pixel indices amount = int(amount) query_loc = self.get_next_loc(action.name, amount) ( depth_patch, rgb_patch, depth3d_patch, sensor_frame_patch, ) = self.get_image_patch( query_loc, ) self.current_loc = query_loc obs = { "agent_id_0": { "patch": { "depth": depth_patch, "rgba": rgb_patch, "semantic_3d": depth3d_patch, "sensor_frame_data": sensor_frame_patch, "world_camera": self.world_camera, "pixel_loc": query_loc, # Save pixel loc for plotting }, "view_finder": { "depth": self.current_depth_image, "rgba": self.current_rgb_image, }, } } return obs
[docs] def get_state(self): """Get agent state. Returns: The agent state. """ loc = self.current_loc # Provide LM w/ sensor position in 3D, body-centric coordinates # instead of pixel indices sensor_position = self.get_3d_coordinates_from_pixel_indices(loc[:2]) # NOTE: This is super hacky and only works for 1 agent with 1 sensor state = { "agent_id_0": { "sensors": { "patch" + ".depth": { "rotation": self.rotation, "position": sensor_position, }, "patch" + ".rgba": { "rotation": self.rotation, "position": sensor_position, }, }, "rotation": self.rotation, "position": np.array([0, 0, 0]), } } return state
[docs] def switch_to_object(self, scene_id, scene_version_id): """Load new image to be used as environment.""" self.current_scene = self.scene_names[scene_id] self.scene_version = scene_version_id ( self.current_depth_image, self.current_rgb_image, self.current_loc, ) = self.load_new_scene_data() # Get 3D scene point cloud array from depth image ( self.current_scene_point_cloud, self.current_sf_scene_point_cloud, ) = self.get_3d_scene_point_cloud()
[docs] def reset(self): """Reset environment and extract image patch. TODO: clean up. Do we need this? No reset required in this dataloader, maybe indicate this better here. Returns: The observation from the image patch. """ ( depth_patch, rgb_patch, depth3d_patch, sensor_frame_patch, ) = self.get_image_patch( self.current_loc, ) obs = { "agent_id_0": { "patch": { "depth": depth_patch, "rgba": rgb_patch, "semantic_3d": depth3d_patch, "sensor_frame_data": sensor_frame_patch, "world_camera": self.world_camera, "pixel_loc": self.current_loc, }, "view_finder": { "depth": self.current_depth_image, "rgba": self.current_rgb_image, }, } } return obs
[docs] def load_new_scene_data(self): """Load depth and rgb data for next scene environment. Returns: current_depth_image: The depth image. current_rgb_image: The rgb image. start_location: The start location. """ # Set data paths current_depth_path = ( self.data_path + f"{self.current_scene}/depth_{self.scene_version}.data" ) current_rgb_path = ( self.data_path + f"{self.current_scene}/rgb_{self.scene_version}.png" ) # Load & process data current_rgb_image = self.load_rgb_data(current_rgb_path) height, width, _ = current_rgb_image.shape current_depth_image = self.load_depth_data(current_depth_path, height, width) current_depth_image = self.process_depth_data(current_depth_image) # set start location to center of image # TODO: find object if not in center obs_shape = current_depth_image.shape start_location = [obs_shape[0] // 2, obs_shape[1] // 2] return current_depth_image, current_rgb_image, start_location
[docs] def load_depth_data(self, depth_path, height, width): """Load depth image from .data file. Returns: The depth image. """ depth = np.fromfile(depth_path, np.float32).reshape(height, width) return depth
[docs] def process_depth_data(self, depth): """Process depth data by reshaping, clipping and flipping. Returns: The processed depth image. """ # Set nan values to 10m depth[np.isnan(depth)] = 10 depth_clipped = depth.copy() # Anything thats further away than 40cm is clipped # TODO: make this a hyperparameter? depth_clipped[depth > 0.4] = 10 # flipping image makes visualization more intuitive. If we want to have this # in here we also have to comment in the flipping in the rgb image and probably # flip left-right. It may be better to flip the image in the app, depending on # sensor orientation (TODO). current_depth_image = depth_clipped # np.flipud(depth_clipped) return current_depth_image
[docs] def load_rgb_data(self, rgb_path): """Load RGB image and put into np array. Returns: The rgb image. """ current_rgb_image = np.array( PIL.Image.open(rgb_path) # .transpose(PIL.Image.FLIP_TOP_BOTTOM) ) return current_rgb_image
[docs] def get_3d_scene_point_cloud(self): """Turn 2D depth image into 3D pointcloud using DepthTo3DLocations. This point cloud is used to estimate the sensor displacement in 3D space between two subsequent steps. Without this we get displacements in pixel space which does not work with our 3D models. Returns: current_scene_point_cloud: The 3D scene point cloud. current_sf_scene_point_cloud: The 3D scene point cloud in sensor frame. """ agent_id = "agent_01" sensor_id = "patch_01" obs = {agent_id: {sensor_id: {"depth": self.current_depth_image}}} rotation = qt.from_rotation_vector([np.pi / 2, 0.0, 0.0]) state = { agent_id: { "sensors": { sensor_id + ".depth": { "rotation": rotation, "position": np.array([0, 0, 0]), } }, "rotation": rotation, "position": np.array([0, 0, 0]), } } # Apply gaussian smoothing transform to depth image # Uncomment line below and add import, if needed # transform = GaussianSmoothing(agent_id=agent_id, sigma=2, kernel_width=3) # obs = transform(obs, state=state) transform = DepthTo3DLocations( agent_id=agent_id, sensor_ids=[sensor_id], resolutions=[self.current_depth_image.shape], world_coord=True, zooms=1, # hfov of iPad front camera from # https://developer.apple.com/library/archive/documentation/DeviceInformation/Reference/iOSDeviceCompatibility/Cameras/Cameras.html # TODO: determine dynamically from which device is sending data hfov=54.201, get_all_points=True, use_semantic_sensor=False, depth_clip_sensors=(0,), clip_value=1.1, ) obs_3d = transform(obs, state=state) current_scene_point_cloud = obs_3d[agent_id][sensor_id]["semantic_3d"] image_shape = self.current_depth_image.shape current_scene_point_cloud = current_scene_point_cloud.reshape( (image_shape[0], image_shape[1], 4) ) current_sf_scene_point_cloud = obs_3d[agent_id][sensor_id]["sensor_frame_data"] current_sf_scene_point_cloud = current_sf_scene_point_cloud.reshape( (image_shape[0], image_shape[1], 4) ) self.world_camera = obs_3d[agent_id][sensor_id]["world_camera"] return current_scene_point_cloud, current_sf_scene_point_cloud
[docs] def get_3d_coordinates_from_pixel_indices(self, pixel_idx): """Retrieve 3D coordinates of a pixel. Returns: The 3D coordinates of the pixel. """ [i, j] = pixel_idx loc_3d = np.array(self.current_scene_point_cloud[i, j, :3]) return loc_3d
[docs] def get_move_area(self): """Calculate area in which patch can move on the image. Returns: The move area. """ obs_shape = self.current_depth_image.shape half_patch_size = self.patch_size // 2 + 1 move_area = np.array( [ [half_patch_size, obs_shape[0] - half_patch_size], [half_patch_size, obs_shape[1] - half_patch_size], ] ) return move_area
[docs] def get_next_loc(self, action_name, amount): """Calculate next location in pixel space given the current action. Returns: The next location in pixel space. """ new_loc = np.array(self.current_loc) if action_name == "look_up": new_loc[0] -= amount elif action_name == "look_down": new_loc[0] += amount elif action_name == "turn_left": new_loc[1] -= amount elif action_name == "turn_right": new_loc[1] += amount else: logging.error(f"{action_name} is not a valid action, not moving.") # Make sure location stays within move area if new_loc[0] < self.move_area[0][0]: new_loc[0] = self.move_area[0][0] elif new_loc[0] > self.move_area[0][1]: new_loc[0] = self.move_area[0][1] if new_loc[1] < self.move_area[1][0]: new_loc[1] = self.move_area[1][0] elif new_loc[1] > self.move_area[1][1]: new_loc[1] = self.move_area[1][1] return new_loc
[docs] def get_image_patch(self, loc): """Extract 2D image patch from a location in pixel space. Returns: depth_patch: The depth patch. rgb_patch: The rgb patch. depth3d_patch: The depth3d patch. sensor_frame_patch: The sensor frame patch. """ loc = np.array(loc, dtype=int) x_start = loc[0] - self.patch_size // 2 x_stop = loc[0] + self.patch_size // 2 y_start = loc[1] - self.patch_size // 2 y_stop = loc[1] + self.patch_size // 2 depth_patch = self.current_depth_image[x_start:x_stop, y_start:y_stop] rgb_patch = self.current_rgb_image[x_start:x_stop, y_start:y_stop] depth3d_patch = self.current_scene_point_cloud[x_start:x_stop, y_start:y_stop] depth_shape = depth3d_patch.shape depth3d_patch = depth3d_patch.reshape( (depth_shape[0] * depth_shape[1], depth_shape[2]) ) sensor_frame_patch = self.current_sf_scene_point_cloud[ x_start:x_stop, y_start:y_stop ] sensor_frame_patch = sensor_frame_patch.reshape( (depth_shape[0] * depth_shape[1], depth_shape[2]) ) assert ( depth_patch.shape[0] * depth_patch.shape[1] == self.patch_size * self.patch_size ), f"Didn't extract a patch of size {self.patch_size}" return depth_patch, rgb_patch, depth3d_patch, sensor_frame_patch
[docs] def close(self): self._current_state = None
[docs]class SaccadeOnImageFromStreamEnvironment(SaccadeOnImageEnvironment): """Environment for moving over a 2D streamed image with depth channel.""" def __init__(self, patch_size=64, data_path=None): """Initialize environment. Args: patch_size: height and width of patch in pixels, defaults to 64 data_path: path to the image dataset. If None its set to ~/tbp/data/worldimages/world_data_stream/ """ # TODO: use super() to avoid repeating lines of code self.patch_size = patch_size # Letters are always presented upright self.rotation = qt.from_rotation_vector([np.pi / 2, 0.0, 0.0]) self.state = 0 self.data_path = data_path if self.data_path is None: self.data_path = os.path.expanduser( "~/tbp/data/worldimages/world_data_stream/" ) self.scene_names = [a for a in os.listdir(self.data_path) if a[0] != "."] self.current_scene = 0 ( self.current_depth_image, self.current_rgb_image, self.current_loc, ) = self.load_new_scene_data() self.move_area = self.get_move_area() # Get 3D scene point cloud array from depth image (in world rf and sensor rf) ( self.current_scene_point_cloud, self.current_sf_scene_point_cloud, ) = self.get_3d_scene_point_cloud() # Just for compatibility. TODO: find cleaner way to do this. self._agents = [ type( "FakeAgent", (object,), {"action_space_type": "distant_agent_no_translation"}, )() ] # Instantiate once and reuse when checking action name in step() # TODO Use 2D-specific actions instead of overloading? Habitat actions # TODO Fix how inheritance is used here. We duplicate the below code because we # don't call super().__init__ while inherting self._valid_actions = ["look_up", "look_down", "turn_left", "turn_right"]
[docs] def switch_to_scene(self, scene_id): self.current_scene = scene_id ( self.current_depth_image, self.current_rgb_image, self.current_loc, ) = self.load_new_scene_data() # Get 3D scene point cloud array from depth image ( self.current_scene_point_cloud, self.current_sf_scene_point_cloud, ) = self.get_3d_scene_point_cloud()
[docs] def load_new_scene_data(self): current_depth_path = self.data_path + f"depth_{self.current_scene}.data" current_rgb_path = self.data_path + f"rgb_{self.current_scene}.png" # Load rgb image wait_count = 0 while not os.path.exists(current_rgb_path): if wait_count % 10 == 0: # Print every 10 seconds print("Waiting for new rgb data...") time.sleep(1) wait_count += 1 load_succeeded = False while not load_succeeded: try: current_rgb_image = self.load_rgb_data(current_rgb_path) load_succeeded = True except PIL.UnidentifiedImageError: print("waiting for rgb file to finish streaming") time.sleep(1) height, width, _ = current_rgb_image.shape # Load depth image while not os.path.exists(current_depth_path): print(f"Waiting for new depth data. Looking for {current_depth_path}") time.sleep(1) load_succeeded = False while not load_succeeded: try: current_depth_image = self.load_depth_data( current_depth_path, height, width ) load_succeeded = True except ValueError: print("waiting for depth file to finish streaming") time.sleep(1) current_depth_image = self.process_depth_data(current_depth_image) # set start location to center of image # TODO: find object if not in center start_location = [height // 2, width // 2] return current_depth_image, current_rgb_image, start_location
# Functions from omniglot/python.demo.py # TODO: integrate better and maybe rewrite def load_img(fn): img = plt.imread(fn) img = np.array(img, dtype=bool) return img def load_motor(fn): motor = [] with open(fn, "r") as fid: lines = fid.readlines() lines = [line.strip() for line in lines] for myline in lines: if myline == "START": # beginning of character stk = [] elif myline == "BREAK": # break between strokes stk = np.array(stk) motor.append(stk) # add to list of strokes stk = [] else: arr = np.fromstring(myline, dtype=float, sep=",") stk.append(arr) return motor def space_motor_to_img(pt): pt[:, 1] = -pt[:, 1] return pt