"""Telemetry capture utilities for pyRTC shared-memory streams.
The telemetry component records finite stretches of data from existing pyRTC
streams and stores them as raw binary files on disk. It is intended for
operator-facing and maintainer-facing tasks such as debugging, calibration, and
offline analysis rather than long-term archival storage.
"""
import numpy as np
from pyRTC.logging_utils import get_logger
from pyRTC.Pipeline import initExistingShm
from pyRTC.pyRTCComponent import pyRTCComponent
from pyRTC.utils import append_to_file, generate_filepath, setFromConfig
logger = get_logger(__name__)
[docs]
class Telemetry(pyRTCComponent):
"""Persist a bounded capture from an existing pyRTC stream.
``Telemetry`` attaches to named shared-memory streams that are already being
produced elsewhere in the running RTC, writes a configurable number of
frames to disk, and keeps enough bookkeeping to reconstruct known captures
with the correct dtype and dimensions later.
Typical uses include saving wavefront sensor frames, residual signals, or
command vectors during tuning and diagnostics.
"""
def __init__(self, conf) -> None:
try:
super().__init__(conf)
self.dataDir = setFromConfig(conf, "dataDir", "./data/")
self.mostRecentFile = ''
self.allFiles = []
self.dTypes = []
self.dims = []
self.logger.info("Initialized telemetry dataDir=%s", self.dataDir)
except Exception:
logger.exception("Failed to initialize telemetry")
raise
return
[docs]
def save(self, shmName, numFrames, uniqueStr = ''):
component_logger = getattr(self, "logger", logger)
try:
shm, shmDims, shmDtype = initExistingShm(shmName)
self.mostRecentFile = generate_filepath(base_dir=self.dataDir,
prefix=f"{shmName}_{uniqueStr}")
self.allFiles.append(self.mostRecentFile)
self.dTypes.append(shmDtype)
self.dims.append(shmDims)
for _ in range(numFrames):
append_to_file(self.mostRecentFile, shm.read(), dtype=shmDtype)
component_logger.info("Saved %s frames from %s to %s", numFrames, shmName, self.mostRecentFile)
except Exception:
component_logger.exception("Failed to save telemetry stream %s", shmName)
raise
return
[docs]
def read(self, filename="", dtype = None):
component_logger = getattr(self, "logger", logger)
try:
if filename == "":
filename = self.mostRecentFile
if filename in self.allFiles:
arr = np.fromfile(filename, dtype=self.dTypes[self.allFiles.index(filename)])
arr = arr.reshape(-1, *self.dims[self.allFiles.index(filename)])
component_logger.info("Read telemetry capture from %s", filename)
return arr
if dtype is not None:
arr = np.fromfile(filename, dtype=dtype)
component_logger.info("Read raw telemetry file %s with dtype=%s", filename, dtype)
return arr
raise ValueError("File not part of current capture, please provide a dtype")
except Exception:
component_logger.exception("Failed to read telemetry file %s", filename or getattr(self, "mostRecentFile", ""))
raise