Source code for pyRTC.pyRTCComponent

"""Base class for threaded pyRTC runtime components.

Most pyRTC subsystems share the same lifecycle model: validate configuration,
normalize optional GPU settings, spawn one or more worker threads from YAML,
and expose lightweight start/stop controls. This module provides that shared
behavior.
"""
import os
import threading

from pyRTC.logging_utils import ensure_logging_configured, get_logger
from pyRTC.Pipeline import launchComponent, normalize_gpu_device, work
from pyRTC.utils import setFromConfig, validate_component_config


logger = get_logger(__name__)


[docs] class pyRTCComponent: """ Common threaded component base used throughout pyRTC. The base class standardizes the repeated mechanics shared by the wavefront sensor, slopes processor, loop controller, telemetry recorder, and many hardware-facing helpers. Components list runtime methods under the configuration key ``functions`` and the base class starts one worker thread per listed method. Those worker functions are assumed to matter for their side effects rather than their return values. They usually produce, consume, or transform shared- memory streams inside the running RTC. For examples: psf: functions: - expose - integrate Config Parameters ----------------- affinity : int Base CPU affinity for the component. Additional worker functions are assigned subsequent cores when possible. functions : list Bound method names to run in worker threads. gpuDevice : str, optional Requested GPU device identifier. When PyTorch is unavailable this is normalized back to CPU mode. Attributes ---------- alive : bool Indicates whether the component is alive. running : bool Indicates whether the component is currently running. The class intentionally does not define component-specific data flow. It is only responsible for the shared runtime lifecycle. """ def __init__(self, conf) -> None: """ Constructs all the necessary attributes for the real-time control component object. Parameters ---------- conf : dict Configuration dictionary for the component. The following keys are used: - affinity (int, optional): The CPU affinity for the component. Default 0. - functions (list, optional): A list of functions to run in separate threads. Default is an empty list. """ ensure_logging_configured(app_name="pyrtc", component_name=self.__class__.__name__) self.logger = get_logger(f"{self.__class__.__module__}.{self.__class__.__name__}") try: validate_component_config(conf, [cls.__name__ for cls in self.__class__.mro()]) self.alive = True self.running = False self.affinity = setFromConfig(conf, "affinity", 0) requested_gpu_device = setFromConfig(conf, "gpuDevice", None) self.gpuDevice = normalize_gpu_device(requested_gpu_device, self.__class__.__name__) functionsToRun = setFromConfig(conf, "functions", []) self.workThreads = [] self.RELEASE_GIL = True if isinstance(functionsToRun, list) and len(functionsToRun) > 0: for i, functionName in enumerate(functionsToRun): threadAffinity = (self.affinity + i) % os.cpu_count() workThread = threading.Thread( target=work, args=(self, functionName, threadAffinity), daemon=True, ) workThread.start() self.workThreads.append(workThread) self.logger.info( "Initialized component affinity=%s gpuDevice=%s functions=%s", self.affinity, self.gpuDevice, functionsToRun, ) except Exception: self.logger.exception("Failed to initialize component") raise return def __del__(self): """ Destructor to clean up the component. """ component_logger = getattr(self, "logger", logger) try: self.stop() except Exception: component_logger.exception("Failed while stopping component during destruction") finally: self.alive = False return
[docs] def start(self): """ Start the registered real-time functions. """ component_logger = getattr(self, "logger", logger) try: self.running = True component_logger.info("Started component") except Exception: component_logger.exception("Failed to start component") raise return
[docs] def stop(self): """ Stops the registered real-time functions. """ component_logger = getattr(self, "logger", logger) try: self.running = False component_logger.info("Stopped component") except Exception: component_logger.exception("Failed to stop component") raise return
if __name__ == "__main__": launchComponent(pyRTCComponent, "component", start = True)