Source code for tbp.monty.frameworks.loggers.monty_handlers

# Copyright 2025 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

import abc
import copy
import json
import logging
import os
from pprint import pformat

from tbp.monty.frameworks.actions.actions import ActionJSONEncoder
from tbp.monty.frameworks.models.buffer import BufferEncoder
from tbp.monty.frameworks.utils.logging_utils import (
    lm_stats_to_dataframe,
    maybe_rename_existing_file,
)

###
# Template for MontyHandler
###


[docs]class MontyHandler(metaclass=abc.ABCMeta):
[docs] @abc.abstractmethod def report_episode(self, **kwargs): pass
[docs] @abc.abstractmethod def close(self): pass
[docs] @abc.abstractclassmethod def log_level(self): """Handlers filter information from the data they receive. This class method specifies the level they filter at. """ pass
### # Handler classes ###
[docs]class DetailedJSONHandler(MontyHandler): """Grab any logs at the DETAILED level and append to a json file.""" def __init__(self): self.report_count = 0
[docs] @classmethod def log_level(cls): return "DETAILED"
[docs] def report_episode(self, data, output_dir, episode, mode="train", **kwargs): """Report episode data. Changed name to report episode since we are currently running with reporting and flushing exactly once per episode. """ output_data = dict() if mode == "train": total = kwargs["train_episodes_to_total"][episode] stats = data["BASIC"]["train_stats"][episode] elif mode == "eval": total = kwargs["eval_episodes_to_total"][episode] stats = data["BASIC"]["eval_stats"][episode] output_data[total] = copy.deepcopy(stats) output_data[total].update(data["DETAILED"][total]) save_stats_path = os.path.join(output_dir, "detailed_run_stats.json") maybe_rename_existing_file(save_stats_path, ".json", self.report_count) with open(save_stats_path, "a") as f: json.dump({total: output_data[total]}, f, cls=BufferEncoder) f.write(os.linesep) print("Stats appended to " + save_stats_path) self.report_count += 1
[docs] def close(self): pass
[docs]class BasicCSVStatsHandler(MontyHandler): """Grab any logs at the BASIC level and append to train or eval CSV files."""
[docs] @classmethod def log_level(cls): return "BASIC"
def __init__(self): """Initialize with empty dictionary to keep track of writes per file. We only want to include the header the first time we write to a file. This keeps track of writes per file so we can format the file properly. """ self.reports_per_file = dict()
[docs] def report_episode(self, data, output_dir, episode, mode="train", **kwargs): # Look for train_stats or eval_stats under BASIC logs basic_logs = data["BASIC"] mode_key = f"{mode}_stats" output_file = os.path.join(output_dir, f"{mode}_stats.csv") stats = basic_logs.get(mode_key, dict()) logging.debug(pformat(stats)) # Remove file if it existed before to avoid appending to previous results file if output_file not in self.reports_per_file: self.reports_per_file[output_file] = 0 maybe_rename_existing_file(output_file, ".csv", 0) else: self.reports_per_file[output_file] += 1 # Format stats for a single episode as a dataframe dataframe = lm_stats_to_dataframe(stats) # Move most relevant columns to front if "most_likely_object" in dataframe: top_columns = [ "primary_performance", "stepwise_performance", "num_steps", "rotation_error", "result", "most_likely_object", "primary_target_object", "stepwise_target_object", "highest_evidence", "time", "symmetry_evidence", "monty_steps", "monty_matching_steps", "individual_ts_performance", "individual_ts_reached_at_step", "primary_target_position", "primary_target_rotation_euler", "most_likely_rotation", ] else: top_columns = [ "primary_performance", "stepwise_performance", "num_steps", "rotation_error", "result", "primary_target_object", "stepwise_target_object", "time", "symmetry_evidence", "monty_steps", "monty_matching_steps", "primary_target_position", "primary_target_rotation_euler", ] dataframe = self.move_columns_to_front( dataframe, top_columns, ) # Only include header first time you write to this file header = self.reports_per_file[output_file] < 1 dataframe.to_csv(output_file, mode="a", header=header)
[docs] def move_columns_to_front(self, df, columns): for c_key in reversed(columns): df.insert(0, c_key, df.pop(c_key)) return df
[docs] def close(self): pass
[docs]class ReproduceEpisodeHandler(MontyHandler):
[docs] @classmethod def log_level(cls): return "BASIC"
[docs] def report_episode(self, data, output_dir, episode, mode="train", **kwargs): # Set up data directory with reproducibility info for each episode if not hasattr(self, "data_dir"): self.data_dir = os.path.join(output_dir, "reproduce_episode_data") os.makedirs(self.data_dir, exist_ok=True) # TODO: store a pointer to the training model # something like if train_epochs == 0: # use model_name_or_path # else: # get checkpoint of most up to date model # Write data to action file action_file = f"{mode}_episode_{episode}_actions.jsonl" action_file_path = os.path.join(self.data_dir, action_file) actions = data["BASIC"][f"{mode}_actions"][episode] with open(action_file_path, "w") as f: for action in actions: f.write(f"{json.dumps(action[0], cls=ActionJSONEncoder)}\n") # Write data to object params / targets file object_file = f"{mode}_episode_{episode}_target.txt" object_file_path = os.path.join(self.data_dir, object_file) target = data["BASIC"][f"{mode}_targets"][episode] with open(object_file_path, "w") as f: json.dump(target, f, cls=BufferEncoder)
[docs] def close(self): pass