Source code for tbp.monty.frameworks.models.evidence_matching.model
# Copyright 2025-2026 Thousand Brains Project
# Copyright 2022-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
from __future__ import annotations
import copy
import logging
import numpy as np
from tbp.monty.frameworks.models.graph_matching import (
MontyForGraphMatching,
)
from tbp.monty.frameworks.utils.spatial_arithmetics import (
align_orthonormal_vectors,
)
logger = logging.getLogger(__name__)
[docs]class MontyForEvidenceGraphMatching(MontyForGraphMatching):
"""Monty model for evidence-based graphs.
Customize voting and union of possible matches.
"""
[docs] def __init__(self, *args, **kwargs):
"""Initialize and reset LM."""
super().__init__(*args, **kwargs)
def _pass_infos_to_motor_system(self):
"""Pass processed observations and goal states to the motor system.
Currently there is no complex connectivity or hierarchy, and all generated
goal states are considered bound for the motor system. TODO M change this.
"""
super()._pass_infos_to_motor_system()
# Check that the motor system can receive goal states
if self.motor_system._policy.use_goal_state_driven_actions:
best_goal_state = None
best_goal_confidence = -np.inf
for current_goal_state in self.gsg_outputs:
if (
current_goal_state is not None
and current_goal_state.confidence > best_goal_confidence
):
best_goal_state = current_goal_state
best_goal_confidence = current_goal_state.confidence
self.motor_system._policy.set_driving_goal_state(best_goal_state)
def _combine_votes(self, votes_per_lm):
"""Combine evidence from different LMs.
Returns:
The combined votes.
"""
combined_votes = []
for i in range(len(self.learning_modules)):
lm_state_votes = {}
if votes_per_lm[i] is not None:
receiving_lm_pose = votes_per_lm[i]["sensed_pose_rel_body"]
for j in self.lm_to_lm_vote_matrix[i]:
if votes_per_lm[j] is not None:
sending_lm_pose = votes_per_lm[j]["sensed_pose_rel_body"]
sensor_disp = np.array(receiving_lm_pose[0]) - np.array(
sending_lm_pose[0]
)
sensor_rotation_disp, _ = align_orthonormal_vectors(
sending_lm_pose[1:],
receiving_lm_pose[1:],
as_scipy=False,
)
logger.debug(
f"LM {j} to {i} - displacement: {sensor_disp}, "
f"rotation: "
f"{sensor_rotation_disp}"
)
for obj in votes_per_lm[j]["possible_states"]:
# Get the displacement between the sending and receiving
# sensor and take this into account when transmitting
# possible locations on the object.
# "If I am here, you should be there."
lm_states_for_object = votes_per_lm[j]["possible_states"][
obj
]
# Take the location votes and transform them so they would
# apply to the receiving LM's sensor. Basically, if my
# sensor is here in this pose, then your sensor should be
# there in that pose.
# NOTE: rotation votes are not being used right now.
transformed_lm_states_for_object = []
for s in lm_states_for_object:
# need to make a copy because the same vote state may be
# transformed in different ways depending on the
# receiving LMs' poses
new_s = copy.deepcopy(s)
rotated_displacement = new_s.get_pose_vectors().dot(
sensor_disp
)
new_s.transform_morphological_features(
translation=rotated_displacement,
rotation=sensor_rotation_disp,
)
transformed_lm_states_for_object.append(new_s)
if obj in lm_state_votes:
lm_state_votes[obj].extend(
transformed_lm_states_for_object
)
else:
lm_state_votes[obj] = transformed_lm_states_for_object
logger.debug(f"VOTE from LMs {self.lm_to_lm_vote_matrix[i]} to LM {i}")
vote = lm_state_votes
combined_votes.append(vote)
return combined_votes
[docs] def switch_to_exploratory_step(self):
"""Switch to an exploratory step.
Also set MLH evidence high enough to generate output during exploration.
"""
super().switch_to_exploratory_step()
# Make sure the new object ID is communicated to higher-level LMs during
# exploration.
for lm in self.learning_modules:
lm.current_mlh["evidence"] = lm.object_evidence_threshold + 1