Source code for tbp.monty.frameworks.models.evidence_matching.model
# Copyright 2025-2026 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
from __future__ import annotations
import copy
import logging
import numpy as np
from tbp.monty.frameworks.models.graph_matching import (
MontyForGraphMatching,
)
from tbp.monty.frameworks.utils.spatial_arithmetics import (
align_orthonormal_vectors,
)
logger = logging.getLogger(__name__)
[docs]class MontyForEvidenceGraphMatching(MontyForGraphMatching):
"""Monty model for evidence-based graphs.
Customize voting and union of possible matches.
"""
[docs] def __init__(self, *args, **kwargs):
"""Initialize and reset LM."""
super().__init__(*args, **kwargs)
def _combine_votes(self, votes_per_lm):
"""Combine evidence from different LMs.
Returns:
The combined votes.
"""
combined_votes = []
for i in range(len(self.learning_modules)):
lm_state_votes = {}
if votes_per_lm[i] is not None:
receiving_lm_pose = votes_per_lm[i]["sensed_pose_rel_body"]
for j in self.lm_to_lm_vote_matrix[i]:
if votes_per_lm[j] is not None:
sending_lm_pose = votes_per_lm[j]["sensed_pose_rel_body"]
sensor_disp = np.array(receiving_lm_pose[0]) - np.array(
sending_lm_pose[0]
)
sensor_rotation_disp, _ = align_orthonormal_vectors(
sending_lm_pose[1:],
receiving_lm_pose[1:],
as_scipy=False,
)
logger.debug(
f"LM {j} to {i} - displacement: {sensor_disp}, "
f"rotation: "
f"{sensor_rotation_disp}"
)
for obj in votes_per_lm[j]["possible_states"]:
# Get the displacement between the sending and receiving
# sensor and take this into account when transmitting
# possible locations on the object.
# "If I am here, you should be there."
lm_states_for_object = votes_per_lm[j]["possible_states"][
obj
]
# Take the location votes and transform them so they would
# apply to the receiving LM's sensor. Basically, if my
# sensor is here in this pose, then your sensor should be
# there in that pose.
# NOTE: rotation votes are not being used right now.
transformed_lm_states_for_object = []
for s in lm_states_for_object:
# need to make a copy because the same vote may be
# transformed in different ways depending on the
# receiving LMs' poses
new_s = copy.deepcopy(s)
rotated_displacement = new_s.get_pose_vectors().dot(
sensor_disp
)
new_s.transform_morphological_features(
translation=rotated_displacement,
rotation=sensor_rotation_disp,
)
transformed_lm_states_for_object.append(new_s)
if obj in lm_state_votes:
lm_state_votes[obj].extend(
transformed_lm_states_for_object
)
else:
lm_state_votes[obj] = transformed_lm_states_for_object
logger.debug(f"VOTE from LMs {self.lm_to_lm_vote_matrix[i]} to LM {i}")
vote = lm_state_votes
combined_votes.append(vote)
return combined_votes
[docs] def switch_to_exploratory_step(self):
"""Switch to an exploratory step.
Also set MLH evidence high enough to generate output during exploration.
"""
super().switch_to_exploratory_step()
# Make sure the new object ID is communicated to higher-level LMs during
# exploration.
for lm in self.learning_modules:
lm.current_mlh["evidence"] = lm.object_evidence_threshold + 1