Source code for tbp.monty.frameworks.utils.follow_up_configs

# Copyright 2025 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

from __future__ import annotations

import copy
import json
from pathlib import Path

import torch
from omegaconf import DictConfig, OmegaConf, open_dict

from tbp.monty.frameworks.environments.object_init_samplers import (
    Predefined,
)
from tbp.monty.frameworks.loggers.monty_handlers import DetailedJSONHandler
from tbp.monty.frameworks.loggers.wandb_handlers import DetailedWandbMarkedObsHandler
from tbp.monty.frameworks.utils.dataclass_utils import config_to_dict


[docs]def recover_output_dir(config, config_name): """Update output directory based on how run.py modifies on the fly. Returns: output_dir: Path to output directory. """ output_dir = Path(config["logging"]["output_dir"]) if not config["logging"]["run_name"]: output_dir = output_dir / config_name return output_dir
[docs]def recover_run_name(config, config_name): if not config["logging"]["run_name"]: return config_name return config["logging"]["run_name"]
[docs]def recover_wandb_id(output_dir): # 0 is just go specify any epoch subdirectory; ids should all be the same config_file_name = Path(output_dir) / "0" / "config.pt" cfg = torch.load(config_file_name) config = config_to_dict(cfg) return config["logging"]["wandb_id"]
[docs]def create_eval_episode_hydra_cfg( parent_config: DictConfig, episode: int ) -> DictConfig: """Returns a new Hydra config that runs only that episode with detailed logging. This doesn't support the `update_run_dir` option that the function below does. """ new_cfg = copy.deepcopy(parent_config) # Make an alias for the actual config to shorten how far we need to dig into # the object tree exp_cfg = new_cfg.test.config output_dir = exp_cfg.logging.output_dir # 1) Use a policy that uses exact actions from previous episode motor_system_cfg = exp_cfg.monty_config.motor_system_config motor_system_cfg.motor_system_args.policy_args.file_name = str( Path(output_dir) / "reproduce_episode_data" / f"eval_episode_{episode}_actions.jsonl" ) # 2) Load object params from this episode into environment interface config object_params_file = ( Path(output_dir) / "reproduce_episode_data" / f"eval_episode_{episode}_target.txt" ) with object_params_file.open() as f: target_data = json.load(f) exp_cfg.eval_env_interface_args.object_names = [ target_data["primary_target_object"] ] # Need to create the config that Hydra expects using fully qualified # class names instead of sticking the class instance on the config. def class_path(klass: type) -> str: """Returns the fully qualified class path for a class.""" return f"{klass.__module__}.{klass.__name__}" def monty_class(klass: type) -> str: """Returns the ${monty.class:...} resolver expression for a class.""" return f"${{monty.class:{class_path(klass)}}}" exp_cfg.eval_env_interface_args.object_init_sampler = { "_target_": class_path(Predefined), "positions": [target_data["primary_target_position"]], "rotations": [target_data["primary_target_rotation_euler"]], # FIXME: target_scale is a float, need an array of floats for this } # 3) Update logging config # We have to convert the handlers from the config into a format that will match # the values we can look things up with. # TODO: update handlers used here as more sophisticated ones get made monty_handlers = OmegaConf.to_object(exp_cfg.logging.monty_handlers) if DetailedJSONHandler not in monty_handlers: exp_cfg.logging.monty_handlers.append(monty_class(DetailedJSONHandler)) wandb_handlers = OmegaConf.to_object(exp_cfg.logging.wandb_handlers) if DetailedWandbMarkedObsHandler not in wandb_handlers: exp_cfg.logging.wandb_handlers.append( monty_class(DetailedWandbMarkedObsHandler) ) # Second, update the output directory, run_name, set resume to True new_output_dir = Path(output_dir) / f"eval_episode_{episode}_rerun" new_output_dir.mkdir(exist_ok=True) exp_cfg.logging.output_dir = str(new_output_dir) exp_cfg.logging.resume_wandb_run = True exp_cfg.logging.wandb_id = None exp_cfg.logging.monty_log_level = "DETAILED" # 4) Make sure this config is set to only run for one episode exp_cfg.n_eval_epochs = 1 exp_cfg.do_train = False # We've been using exp_cfg for convenience to shorten how far we # need to dig into the object tree, but we want to return the top # of the new config. return new_cfg
[docs]def create_eval_episode_config( parent_config, parent_config_name, episode, update_run_dir=True ): """Generate a new config that runs only that episode with detailed logging. Inputs are the config for a previous experiment, and the episode we want to re-run. Args: parent_config: actual config used to run previous experiment parent_config_name: str key into configs with name of experiment to reproduce episode: int index of episode we want to reproduce update_run_dir: bool, update run directory or not based on of run.py modified Note: For now, assume we just care about re-running an eval episode Note: For now, assume env_interface_class is a subclass of `EnvironmentInterfacePerObject` Returns: Config for re-running the specified episode. """ # Create config that loads latest checkpoint and reproduces an eval episode # 0) Get path to prev experiment data # 1) Setup policy # 2) Setup object config # 3) Update logging config # 4) Update run args; only do one epoch, one episode new_config = copy.deepcopy(config_to_dict(parent_config)) # Determine parent output_dir, run_name, based on how run.py modifies on the fly output_dir = Path(new_config["logging"]["output_dir"]) run_name = new_config["logging"]["run_name"] wandb_id = None if update_run_dir: output_dir = recover_output_dir(new_config, parent_config_name) run_name = recover_run_name(new_config, parent_config_name) wandb_id = recover_wandb_id(output_dir) # 1) Use a policy that uses exact actions from previous episode motor_file = ( output_dir / "reproduce_episode_data" / f"eval_episode_{episode}_actions.jsonl" ) new_config["monty_config"]["motor_system_config"]["motor_system_args"][ "policy_args" ]["file_name"] = motor_file # 2) Load object params from this episode into environment interface config object_params_file = ( output_dir / "reproduce_episode_data" / f"eval_episode_{episode}_target.txt" ) with object_params_file.open() as f: target_data = json.load(f) new_config["eval_env_interface_args"]["object_names"] = [ target_data["primary_target_object"] ] new_config["eval_env_interface_args"]["object_init_sampler"] = Predefined( positions=[target_data["primary_target_position"]], rotations=[target_data["primary_target_rotation_euler"]], # FIXME: target_scale is a float, need an array of floats for this ) # 3) Update logging config # First, make sure the detailed handlers are in there # TODO: update handlers used here as more sophisticated ones get made if DetailedJSONHandler not in new_config["logging"]["monty_handlers"]: new_config["logging"]["monty_handlers"].append(DetailedJSONHandler) if DetailedWandbMarkedObsHandler not in new_config["logging"]["wandb_handlers"]: new_config["logging"]["wandb_handlers"].append(DetailedWandbMarkedObsHandler) # Second, update the output directory, run_name, set resume to True new_output_dir = output_dir / f"eval_episode_{episode}_rerun" new_output_dir.mkdir(exist_ok=True, parents=True) new_config["logging"]["output_dir"] = new_output_dir new_config["logging"]["run_name"] = run_name new_config["logging"]["resume_wandb_run"] = True new_config["logging"]["wandb_id"] = wandb_id new_config["logging"]["monty_log_level"] = "DETAILED" # 4) Make sure this config is set to only run for one episode new_config["experiment_args"]["n_eval_epochs"] = 1 new_config["experiment_args"]["do_train"] = False return new_config
[docs]def create_eval_multiple_episodes_hydra_cfg( parent_config: DictConfig, episodes: list[int] ) -> DictConfig: """Returns a new Hydra config that runs only the episodes specified. This doesn't support the `update_run_dir` option that the function below does. """ new_cfg = copy.deepcopy(parent_config) # Make an alias for the actual config to shorten how far we need to dig into # the object tree exp_cfg = new_cfg.test.config output_dir = exp_cfg.logging.output_dir # Add notes to indicate which episodes of the parent experiment were rerun. # This field might not exist to open the DictConfig up for adding keys with open_dict(new_cfg): exp_cfg.notes = {"rerun_episodes": episodes} # Update the output directory to be a "rerun" subdir new_output_dir = Path(output_dir) / "eval_rerun_episodes" new_output_dir.mkdir(exist_ok=True) exp_cfg.logging.output_dir = new_output_dir exp_cfg.logging.resume_wandb_run = False exp_cfg.logging.wandb_id = None exp_cfg.logging.monty_log_level = "DETAILED" # Turn training off; this feature is only intended for eval episodes exp_cfg.do_train = False exp_cfg.n_eval_epochs = 1 # Add detailed handlers # Need to create the config that Hydra expects using fully qualified # class names instead of sticking the class instance on the config. def class_path(klass: type) -> str: """Returns the fully qualified class path for a class.""" return f"{klass.__module__}.{klass.__name__}" def monty_class(klass: type) -> str: """Returns the ${monty.class:...} resolver expression for a class.""" return f"${{monty.class:{class_path(klass)}}}" monty_handlers = OmegaConf.to_object(exp_cfg.logging.monty_handlers) if DetailedJSONHandler not in monty_handlers: exp_cfg.logging.monty_handlers.append(monty_class(DetailedJSONHandler)) file_names_per_episode = {} target_objects = [] target_positions = [] target_rotations = [] for episode_counter, episode in enumerate(episodes): # Get actions from this episode motor_file = ( Path(output_dir) / "reproduce_episode_data" / f"eval_episode_{episode}_actions.jsonl" ) # Get object params from this episode object_params_file = ( Path(output_dir) / "reproduce_episode_data" / f"eval_episode_{episode}_target.txt" ) with object_params_file.open() as f: target_data = json.load(f) # Update accumulators file_names_per_episode[episode_counter] = str(motor_file) target_objects.append(target_data["primary_target_object"]) target_positions.append(target_data["primary_target_position"]) target_rotations.append(target_data["primary_target_rotation_euler"]) # Update config with episode-specific data exp_cfg.eval_env_interface_args.object_names = target_objects exp_cfg.eval_env_interface_args.object_init_sampler = { "_target_": class_path(Predefined), "positions": target_positions, "rotations": target_rotations, "change_every_episode": True, } motor_system_cfg = exp_cfg.monty_config.motor_system_config # This field might not exist to open the DictConfig up for adding keys with open_dict(new_cfg): motor_system_cfg.motor_system_args.policy_args.file_names_per_episode = ( file_names_per_episode ) return new_cfg
[docs]def create_eval_config_multiple_episodes( parent_config, parent_config_name, episodes, update_run_dir=True ): # NOTE: update_run_dir is assumed to be True, only added as an argument for # easier testing. new_config = copy.deepcopy(config_to_dict(parent_config)) ### # General steps to update config that do not depend on specific episodes ### # Recover output dir based on how run.py modifies on the fly output_dir = Path(new_config["logging"]["output_dir"]) run_name = new_config["logging"]["run_name"] wandb_id = None if update_run_dir: output_dir = recover_output_dir(new_config, parent_config_name) run_name = recover_run_name(new_config, parent_config_name) wandb_id = recover_wandb_id(output_dir) # Add notes to indicate which episodes of the parent experiment were rerun notes = dict(rerun_episodes=episodes) if "notes" not in new_config: new_config["notes"] = notes else: new_config["notes"].update(notes) # Update the output directory to be a "rerun" subdir new_output_dir = output_dir / "eval_rerun_episodes" new_output_dir.mkdir(exist_ok=True, parents=True) new_config["logging"]["output_dir"] = new_output_dir new_config["logging"]["run_name"] = run_name new_config["logging"]["resume_wandb_run"] = True new_config["logging"]["wandb_id"] = wandb_id new_config["logging"]["monty_log_level"] = "DETAILED" # Turn training off; this feature is only intended for eval episodes new_config["experiment_args"]["do_train"] = False new_config["experiment_args"]["n_eval_epochs"] = 1 # Add detailed handlers if DetailedJSONHandler not in new_config["logging"]["monty_handlers"]: new_config["logging"]["monty_handlers"].append(DetailedJSONHandler) ### # Accumulate episode-specific data: actions and object params ### file_names_per_episode = {} target_objects = [] target_positions = [] target_rotations = [] for episode_counter, episode in enumerate(episodes): # Get actions from this episode motor_file = ( output_dir / "reproduce_episode_data" / f"eval_episode_{episode}_actions.jsonl" ) # Get object params from this episode object_params_file = ( output_dir / "reproduce_episode_data" / f"eval_episode_{episode}_target.txt" ) with object_params_file.open() as f: target_data = json.load(f) # Update accumulators file_names_per_episode[episode_counter] = motor_file target_objects.append(target_data["primary_target_object"]) target_positions.append(target_data["primary_target_position"]) target_rotations.append(target_data["primary_target_rotation_euler"]) # Update config with episode-specific data new_config["eval_env_interface_args"]["object_names"] = target_objects new_config["eval_env_interface_args"]["object_init_sampler"] = Predefined( positions=target_positions, rotations=target_rotations, change_every_episode=True, ) new_config["monty_config"]["motor_system_config"]["motor_system_args"][ "policy_args" ]["file_names_per_episode"] = file_names_per_episode return new_config