Source code for tbp.monty.frameworks.models.evidence_matching.model

# Copyright 2025-2026 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

from __future__ import annotations

import copy
import logging

import numpy as np

from tbp.monty.frameworks.models.graph_matching import (
    MontyForGraphMatching,
)
from tbp.monty.frameworks.utils.spatial_arithmetics import (
    align_orthonormal_vectors,
)

logger = logging.getLogger(__name__)


[docs]class MontyForEvidenceGraphMatching(MontyForGraphMatching): """Monty model for evidence-based graphs. Customize voting and union of possible matches. """
[docs] def __init__(self, *args, **kwargs): """Initialize and reset LM.""" super().__init__(*args, **kwargs)
def _combine_votes(self, votes_per_lm): """Combine evidence from different LMs. Returns: The combined votes. """ combined_votes = [] for i in range(len(self.learning_modules)): lm_state_votes = {} if votes_per_lm[i] is not None: receiving_lm_pose = votes_per_lm[i]["sensed_pose_rel_body"] for j in self.lm_to_lm_vote_matrix[i]: if votes_per_lm[j] is not None: sending_lm_pose = votes_per_lm[j]["sensed_pose_rel_body"] sensor_disp = np.array(receiving_lm_pose[0]) - np.array( sending_lm_pose[0] ) sensor_rotation_disp, _ = align_orthonormal_vectors( sending_lm_pose[1:], receiving_lm_pose[1:], as_scipy=False, ) logger.debug( f"LM {j} to {i} - displacement: {sensor_disp}, " f"rotation: " f"{sensor_rotation_disp}" ) for obj in votes_per_lm[j]["possible_states"]: # Get the displacement between the sending and receiving # sensor and take this into account when transmitting # possible locations on the object. # "If I am here, you should be there." lm_states_for_object = votes_per_lm[j]["possible_states"][ obj ] # Take the location votes and transform them so they would # apply to the receiving LM's sensor. Basically, if my # sensor is here in this pose, then your sensor should be # there in that pose. # NOTE: rotation votes are not being used right now. transformed_lm_states_for_object = [] for s in lm_states_for_object: # need to make a copy because the same vote may be # transformed in different ways depending on the # receiving LMs' poses new_s = copy.deepcopy(s) rotated_displacement = new_s.get_pose_vectors().dot( sensor_disp ) new_s.transform_morphological_features( translation=rotated_displacement, rotation=sensor_rotation_disp, ) transformed_lm_states_for_object.append(new_s) if obj in lm_state_votes: lm_state_votes[obj].extend( transformed_lm_states_for_object ) else: lm_state_votes[obj] = transformed_lm_states_for_object logger.debug(f"VOTE from LMs {self.lm_to_lm_vote_matrix[i]} to LM {i}") vote = lm_state_votes combined_votes.append(vote) return combined_votes
[docs] def switch_to_exploratory_step(self): """Switch to an exploratory step. Also set MLH evidence high enough to generate output during exploration. """ super().switch_to_exploratory_step() # Make sure the new object ID is communicated to higher-level LMs during # exploration. for lm in self.learning_modules: lm.current_mlh["evidence"] = lm.object_evidence_threshold + 1