Source code for tbp.monty.frameworks.environment_utils.server

# Copyright 2025 Thousand Brains Project
# Copyright 2023-2024 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

import http.server
import os
import re

# This class is used for the monty meets world demo to live stream data from the iPad
# camera to a server that Monty can then read from.


[docs]class MontyRequestHandler(http.server.SimpleHTTPRequestHandler):
[docs] def do_PUT(self): # noqa: N802 # Check type of incoming data: depth or rgb inc_filename = os.path.basename(self.path) data_type = "depth" if inc_filename == "depth.data" else "rgb" # check existing filenames in the directory data_path = os.path.expanduser("~/tbp/data/worldimages/world_data_stream") file_list = [f for f in os.listdir(data_path) if not f.startswith(".")] if data_type == "depth": match = [re.search("depth_(.*).data", file) for file in file_list] match_no_none = [m for m in match if m is not None] number_list = [int(m.group(1)) for m in match_no_none] next_number = max(number_list) + 1 if number_list else 0 new_filename = "depth_" + str(next_number) + ".data" else: match = [re.search("rgb_(.*).png", file) for file in file_list] match_no_none = [m for m in match if m is not None] number_list = [int(m.group(1)) for m in match_no_none] next_number = max(number_list) + 1 if number_list else 0 new_filename = "rgb_" + str(next_number) + ".png" # Write data to file file_length = int(self.headers["Content-Length"]) with open(data_path + "/" + new_filename, "wb") as output_file: output_file.write(self.rfile.read(file_length)) output_file.close() self.send_response(201, "Created") self.end_headers() # Return object id reply_body = 'Saved "%s"\n' % new_filename self.wfile.write(reply_body.encode("utf-8"))
if __name__ == "__main__": server = http.server.HTTPServer(("0.0.0.0", 8080), MontyRequestHandler) server.serve_forever()