Source code for pyRTC.Pipeline

"""Shared-memory transport and hard-RTC process helpers for pyRTC.

This module contains the infrastructure that lets pyRTC components exchange
frames and command vectors through named shared-memory blocks, optionally mirror
those blocks onto CUDA tensors for compatible deployments, and launch hardware-
facing child processes that communicate with the main RTC over a small
localhost-based JSON protocol.
"""
import argparse
import json 
import logging
import os
import socket
import struct
import sys
import time

from multiprocessing import shared_memory, resource_tracker
from subprocess import PIPE, Popen

import numpy as np

from pyRTC.logging_utils import add_logging_cli_args, configure_logging_from_args, ensure_logging_configured, get_logger
from pyRTC.utils import (
    bind_socket,
    dtype_to_float,
    float_to_dtype,
    precise_delay,
    read_yaml_file,
    setFromConfig,
    set_affinity_and_priority,
)


logger = get_logger(__name__)

TORCH_AVAILABLE = False
torch = None
dtype_mapping = {}

try:
    import torch
    TORCH_AVAILABLE = True
    # Mapping dictionary
    dtype_mapping = {
        np.float32: torch.float32,
        np.float64: torch.float64,
        np.int32: torch.int32,
        np.int64: torch.int64,
        np.uint8: torch.uint8,
        np.uint16: torch.uint16,
        np.dtype("float32"): torch.float32,
        np.dtype("float64"): torch.float64,
        np.dtype("int32"): torch.int32,
        np.dtype("int64"): torch.int64,
        np.dtype("uint8"): torch.uint8,
        np.dtype("uint16"): torch.uint16,
    }
except Exception:
    TORCH_AVAILABLE = False


[docs] def gpu_torch_available() -> bool: return TORCH_AVAILABLE
[docs] def normalize_gpu_device(gpuDevice, context: str = ""): if gpuDevice is None: return None if not TORCH_AVAILABLE: prefix = f"{context}: " if context else "" logging.log( level=logging.WARNING, msg=f"{prefix}gpuDevice was requested but PyTorch is not installed; defaulting to CPU mode.", ) return None return gpuDevice
def work(obj, functionName, affinity): """ The main working thread for the any Pipeline object """ set_affinity_and_priority(functionName, [affinity]) #Get what function we need to run workFunction = getattr(obj, functionName, None) # count = 0 # N = 10000 # times = np.zeros(N) #If the wfs object is still alive while obj.alive: #If we are meant to be running if obj.running: # start = time.time() #Call it workFunction() # precise_delay(1) # diff = time.time()-start # times[count % N] = diff # count += 1 # if count % N == 0: # print(f"Thread {functionName} -- Mean Execution Time {1000*np.mean(times):.3f}ms") else: time.sleep(1e-3) return
[docs] class ImageSHM: """Named shared-memory array with metadata and optional GPU mirror state. ``ImageSHM`` is the transport primitive used throughout pyRTC. Each stream has a CPU shared-memory block, a small metadata block containing shape, dtype, and timing information, and optionally a GPU-backed tensor mirror in hard-RTC deployments where CUDA sharing is supported. Producers create the stream and update it with NumPy arrays. Consumers reconstruct the stream by name and read either safe copies or direct views, depending on their performance and synchronization needs. """ METADATA_SIZE = 10 def __init__(self, name, shape, dtype, gpuDevice=None, consumer=True) -> None: self.name = name self.dtype = dtype self.shape = shape self.arr = np.empty(shape, dtype=self.dtype) self.size = self.arr.nbytes self.metadata = np.zeros(self.METADATA_SIZE, dtype=np.float64) self.count = 0 self.lastWriteTime = 0 self.lastReadTime = 0 self.areData = "meta" not in name self.gpuDevice = normalize_gpu_device(gpuDevice, name) try: self.shm = shared_memory.SharedMemory(name= name, create=True, size=self.arr.nbytes) logger.debug("Creating shared memory object %s", self.name) except Exception: self.shm = shared_memory.SharedMemory(name=name) logger.debug("Opening existing shared memory object %s", self.name) #Doesn't work in windows if sys.platform != 'win32': resource_tracker.unregister(self.shm._name, 'shared_memory') self.arr = np.ndarray(shape, dtype=dtype, buffer=self.shm.buf) #If we are not opening a metadata shm if self.areData: #Create/Open an associated metadata SHM try: self.metadataShm = shared_memory.SharedMemory(name= name+"_meta", create=True, size=self.metadata.nbytes) logger.debug("Creating shared memory object %s_meta", self.name) except Exception: self.metadataShm = shared_memory.SharedMemory(name= name+"_meta") logger.debug("Opening existing shared memory object %s_meta", self.name) if sys.platform != 'win32': resource_tracker.unregister(self.metadataShm._name, 'shared_memory') self.metadata = np.ndarray(self.metadata.shape, dtype=self.metadata.dtype, buffer=self.metadataShm.buf) self.updateMetadata(FULL_UPDATE=True) if self.gpuDevice is not None: self.torchDtype = dtype_mapping.get(self.dtype, None) if self.torchDtype is None: self.gpuDevice = None logging.log(level=logging.WARNING, msg=f"{self.name}: dtype {self.dtype} not supported for GPU SHM; defaulting to CPU mode.") return #If we expect the SHM to already exist if consumer: self.initGPUMemFromSHM() else: self.createGPUMemSHM() return def __del__(self): self.close()
[docs] def createGPUMemSHM(self): if self.gpuDevice is None or not TORCH_AVAILABLE: return None # Create a GPU tensor self.shmGPU = torch.empty(self.shape, dtype=self.torchDtype, device=self.gpuDevice) storage = self.shmGPU.untyped_storage() # Get all outputs from storage._share_cuda_() ( device_index, handle_bytes, storage_size_bytes, storage_offset_bytes, path_bytes, unknown, additional_bytes, is_host_device # Assuming the 9th element is a boolean ) = storage._share_cuda_() # Prepare variable-length byte fields handle_length = len(handle_bytes) path_length = len(path_bytes) additional_length = len(additional_bytes) # Define header format # 'I' - uint32, 'Q' - uint64, 'B' - uint8 header_format = 'I I I I I I I I I' # device_index, handle_length, storage_size, storage_offset, size_bytes, view_size, view_offset, is_host_device header = struct.pack( header_format, device_index, handle_length, storage_size_bytes, storage_offset_bytes, unknown, path_length, additional_length, int(is_host_device), # Convert bool to int os.getpid(), ) # Total size: header + handle_bytes + path_bytes + additional_bytes total_size = struct.calcsize(header_format) + handle_length + path_length + additional_length # Create or open the shared memory segment # try: gpuHandleShm = shared_memory.SharedMemory( name=self.name + "_gpu_handle", create=True, size=total_size) logger.debug("Creating shared memory object %s_gpu_handle", self.name) # except FileExistsError: # gpuHandleShm = shared_memory.SharedMemory(name=self.name + "_gpu_handle") # print(f"Opening Existing Shared Memory Object {self.name}_gpu_handle") # if sys.platform != 'win32': # resource_tracker.unregister(gpuHandleShm._name, 'shared_memory') # Write data to shared memory buf = gpuHandleShm.buf offset = 0 # Write header buf[offset:offset + struct.calcsize(header_format)] = header offset += struct.calcsize(header_format) # Write handle_bytes buf[offset:offset + handle_length] = handle_bytes offset += handle_length # Write path_bytes buf[offset:offset + path_length] = path_bytes offset += path_length # Write additional_bytes buf[offset:offset + additional_length] = additional_bytes offset += additional_length return self.shmGPU
[docs] def initGPUMemFromSHM(self): if self.gpuDevice is None or not TORCH_AVAILABLE: return # Open the shared memory segment try: gpuHandleShm = shared_memory.SharedMemory(name=self.name + "_gpu_handle") logger.debug("Opened shared memory object %s_gpu_handle", self.name) except Exception: self.gpuDevice=None logging.log(level=logging.WARNING, msg=f"{self.name}: Trying to initialize GPU memory which does not exist. Defaulting to CPU") return # raise Exception(f"{self.name}: Trying to initialize GPU memory which does not exist") # if sys.platform != 'win32': # resource_tracker.unregister(gpuHandleShm._name, 'shared_memory') buf = gpuHandleShm.buf offset = 0 # Define header format header_format = 'I I I I I I I I I' # device_index, handle_length, storage_size, storage_offset, size_bytes, view_size, view_offset, is_host_device header_size = struct.calcsize(header_format) # Read and unpack header header_bytes = bytes(buf[offset:offset + header_size]) ( device_index, handle_length, storage_size_bytes, storage_offset_bytes, unknown, path_length, additional_length, is_host_device, # Convert bool to int pid ) = struct.unpack(header_format, header_bytes) if pid == os.getpid(): raise Exception(f"{self.name}:GPU SHMs only work in hard real-time mode, set gpuDevice to None or remove from config") offset += header_size # Read handle_bytes handle_bytes = bytes(buf[offset:offset + handle_length]) offset += handle_length # Read path_bytes path_bytes = bytes(buf[offset:offset + path_length]) offset += path_length # Read additional_bytes # Assuming additional_length is known or fixed; alternatively, store it in header additional_bytes = bytes(buf[offset:offset + additional_length]) offset += additional_length # Reconstruct the storage storage = torch.UntypedStorage._new_shared_cuda( device_index, handle_bytes, storage_size_bytes, storage_offset_bytes, path_bytes, unknown, additional_bytes, bool(is_host_device) ) # Create a tensor from the storage self.shmGPU = torch.tensor([], dtype=self.torchDtype, device=device_index).set_(storage).reshape(self.shape) return
[docs] def close(self): logger.debug("Closing shared memory object %s", self.name) self.shm.close() return
[docs] def write(self, arr): #Check if we are a GPU shm if self.gpuDevice is not None: #If you didn't write a numpy array or tensor if not isinstance(arr, np.ndarray) and not isinstance(arr, torch.Tensor): return -1 #If you wrote the wrong shape if arr.shape != self.arr.shape: logging.log(level=logging.ERROR, msg=f"{self.name}: Writing Wrong size array to SHM. Expecting {self.arr.shape}, Got {arr.shape}") return -1 #If you passed a tensor if isinstance(arr, torch.Tensor): #copy the tensor to the GPU shm self.shmGPU.copy_(arr) #copy a CPU numpy version to the CPU shm np.copyto(self.arr, arr.cpu().numpy()) elif isinstance(arr, np.ndarray): #copy a CPU numpy version to the CPU shm np.copyto(self.arr, arr) #copy the tensor to the GPU shm tensor = torch.from_numpy(arr) self.shmGPU.copy_(tensor) else: #If you didn't write a numpy array if not isinstance(arr, np.ndarray): return -1 #If you wrote the wrong shape if arr.shape != self.arr.shape: logging.log(level=logging.ERROR, msg=f"{self.name}: Writing Wrong size array to SHM. Expecting {self.arr.shape}, Got {arr.shape}") return -1 #Copy to SHM np.copyto(self.arr, arr) #Update metadata self.count += 1 self.lastWriteTime = time.time() if self.areData: self.updateMetadata() #Return Success return 1
[docs] def hold(self, timeout=None, RELEASE_GIL = True): if timeout is None: while not self.checkNew(): if RELEASE_GIL: time.sleep(1e-5) else: precise_delay(5) elif isinstance(timeout, float) or isinstance(timeout,int): start = time.time() while not self.checkNew() and (time.time() - start) < timeout: if RELEASE_GIL: time.sleep(1e-5) else: precise_delay(5) return
[docs] def read(self, SAFE=True, GPU = False, RELEASE_GIL = True): self.hold(RELEASE_GIL = RELEASE_GIL) return self.read_noblock(SAFE=SAFE, GPU=GPU)
[docs] def read_timeout(self, timeout, SAFE = True, GPU = False, RELEASE_GIL = True): self.hold(timeout=timeout, RELEASE_GIL = RELEASE_GIL) return self.read_noblock(SAFE=SAFE, GPU=GPU)
[docs] def read_noblock(self, SAFE=True, GPU=False): #Mark that we have seen the shm before self.markSeen() #Return a copy of the CPU shm if SAFE: #If the user asks to read the GPU shm if GPU and self.gpuDevice is not None: return self.shmGPU.clone() else: arr = np.copy(self.arr) return arr else:##EXPERIMENTAL if not safe, return the raw shm memory if GPU and self.gpuDevice is not None: return self.shmGPU else: return self.arr
[docs] def checkNew(self): if self.areData: # metadata = np.copy(self.metadata) if self.metadata[1] != self.lastReadTime: self.markSeen() return True else: #If we are just reading a meta data object directly return True return False
[docs] def markSeen(self): self.lastReadTime = self.metadata[1] return
[docs] def updateMetadata(self, FULL_UPDATE=False): # self.metadata = np.zeros_like(self.metadata) self.metadata[0] = self.count self.metadata[1] = self.lastWriteTime if FULL_UPDATE: self.metadata[2] = self.size self.metadata[3] = dtype_to_float(self.arr.dtype) for i in range(len(self.arr.shape)): if i + 4 < self.metadata.size: self.metadata[i+4] = self.arr.shape[i] # np.copyto(self.metadata, metadata) return
def clear_shms(names): for n in names: shm = ImageSHM(n,(1,),np.uint8) shm.shm.unlink() shm = ImageSHM(n+"_meta",(1,),np.uint8) shm.shm.unlink() try: shm = ImageSHM(n+"_gpu_handle",(1,),np.uint8) shm.shm.unlink() except Exception: pass
[docs] class hardwareLauncher: """Launch and supervise a hardware-side child process. The launcher is the client-side helper for pyRTC's hard-RTC deployment model. It starts a Python subprocess, waits for the child to expose a socket listener, and then sends simple JSON messages to get or set properties, invoke helper methods, or request shutdown. Logging-related environment variables are propagated so parent and child processes share the same operator-facing logging policy. """ def __init__(self, hardwareFile, configFile, port, timeout=None) -> None: self.hardwareFile = hardwareFile self.command = [sys.executable, hardwareFile, "-c", f"{configFile}", "-p", f"{port}"] self.running = False # Client configuration self.host = '127.0.0.1' # localhost self.port = port self.timeout = timeout return
[docs] def launch(self): ensure_logging_configured(app_name="pyrtc-hardware-launcher", component_name=self.hardwareFile) if not self.running: logger.info("Launching process %s", self.hardwareFile) self.process = Popen(self.command,stdin=PIPE,stdout=PIPE, text=True, bufsize=1, env=os.environ.copy()) self.running = True # Create a socket object self.processSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) logger.info("Waiting for process at %s:%s", self.host, self.port) connected = False restTime = 2 while not connected: time.sleep(restTime) try: # Connect to the server self.processSocket.connect((self.host, self.port)) connected = True except Exception as e: logger.warning("Connection failed: %s", e) logger.info("Retrying in %s seconds", restTime) if isinstance(self.timeout,float) or isinstance(self.timeout,int): self.processSocket.settimeout(self.timeout) logger.info("Connected to child process socket") return
[docs] def shutdown(self): message = {"type": "shutdown"} return self.writeAndRead(message)
[docs] def getProperty(self, property): message = {"type": "get", "property": property} return self.writeAndRead(message)
[docs] def setProperty(self, property, value): message = {"type": "set", "property": property, "value": value} return self.writeAndRead(message)
[docs] def run(self, function, *args, timeout = None): message = {"type": "run", "function": function} for i, arg in enumerate(args): message[f"arg_{i+1}"] = arg return self.writeAndRead(message)
[docs] def writeAndRead(self,message): if self.running: self.write(message) reply = self.read() #If there are issues with the reply format if not isinstance(reply, dict) or "status" not in reply.keys(): return -1 #If there was an issue on the process end if reply["status"] == 'BAD': return -1 #If our request went through if reply["status"] == 'OK': #If the reply came with a property to return if "property" in reply.keys(): return reply["property"] #Otherwise just return OK else: return 1 #default is a fail return -1
[docs] def write(self, message): message = json.dumps(message) self.processSocket.send(message.encode()) return
[docs] def read(self): try: reply = self.processSocket.recv(4096).decode() return json.loads(reply) except socket.timeout: return -1
[docs] class Listener: """Server-side control socket for a launched hardware object. ``Listener`` is the child-process counterpart to :class:`hardwareLauncher`. It binds a localhost socket, accepts the RTC-side connection, and services a narrow JSON RPC surface for property access, method calls, and clean shutdown. """ def __init__(self, hardware, port) -> None: self.hardware = hardware self.running = True self.keyCharacter = '$' self.host = '127.0.0.1' # localhost self.port = port server_socket = bind_socket(self.host, self.port) # # Create a socket object # server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # try: # print(f"{hardware.name}: Binding to {self.host}:{self.port}") # # Bind the socket to a specific address and port # server_socket.bind((self.host, self.port)) # except OSError as # Listen for incoming connections server_socket.listen() logger.info("%s: awaiting RTC connection", hardware.name) #Connect to the RTC process that spawned you self.RTCsocket, self.RTCaddress = server_socket.accept() self.OKMessage = {"status": "OK"} self.BadMessage = {"status": "BAD"} return
[docs] def listen(self): try: request = self.read() except Exception: logger.exception("Failed to read listener request") self.write(self.BadMessage) return if "type" not in request: self.write(self.BadMessage) logger.error("Listener request missing type field: %s", request) return #Sort behaviour by request type requestType = request["type"] if requestType == "shutdown": try: self.hardware.__del__() self.running = False self.write(self.OKMessage) except Exception: logger.exception("Listener shutdown request failed") self.write(self.BadMessage) elif requestType == "get": try: propertyName = request["property"] property = getattr(self.hardware, propertyName) message = self.OKMessage.copy() message["property"] = property self.write(message) except Exception: logger.exception("Listener get request failed for %s", request.get("property")) self.write(self.BadMessage) elif requestType == "set": try: propertyName = request["property"] propertyValue = request["value"] property = getattr(self.hardware, propertyName) setattr(self.hardware, propertyName, type(property)(propertyValue)) self.write(self.OKMessage) except Exception: logger.exception("Listener set request failed for %s", request.get("property")) self.write(self.BadMessage) elif requestType == "run": try: functionName = request["function"] args = [] for i in range(0, len(request.keys())-2): arg = request[f"arg_{i+1}"] args.append(arg) function = getattr(self.hardware, functionName) if len(args) > 0: function(*args) else: function() self.write(self.OKMessage) except Exception: logger.exception("Listener run request failed for %s", request.get("function")) self.write(self.BadMessage) else: logger.error("Unknown listener request type: %s", requestType) self.write(self.BadMessage)
[docs] def write(self, message): message = json.dumps(message) self.RTCsocket.send(message.encode()) return
[docs] def read(self): reply = self.RTCsocket.recv(4096).decode() return json.loads(reply)
[docs] def initExistingShm(shmName, gpuDevice=None): #Read wfc metadata and open a stream to the shared memory shmMeta = ImageSHM(shmName+"_meta", (ImageSHM.METADATA_SIZE,), np.float64).read_noblock() shmDType = float_to_dtype(shmMeta[3]) shmDims = [] i = 0 while int(shmMeta[4+i]) > 0: shmDims.append(int(shmMeta[4+i])) i += 1 shm = ImageSHM(shmName, shmDims, shmDType, gpuDevice=gpuDevice, consumer=True) return shm, shmDims, shmDType
[docs] def launchComponent(component, confKey, start = True): # Create argument parser parser = argparse.ArgumentParser(description="Read a config file from the command line.") # Add command-line argument for the config file parser.add_argument("-c", "--config", required=True, help="Path to the config file") parser.add_argument("-p", "--port", required=True, help="Port for communication") add_logging_cli_args(parser) # Parse command-line arguments args = parser.parse_args() configure_logging_from_args(args, app_name=f"pyrtc-{confKey}", component_name=confKey) conf = read_yaml_file(args.config)[confKey] set_affinity_and_priority("", setFromConfig(conf, "affinity", 0)) try: obj = component(conf=conf) obj.RELEASE_GIL = False if start: obj.start() listener = Listener(obj, port= int(args.port)) while listener.running: listener.listen() time.sleep(1e-3) except Exception: logger.exception("Failed to launch component %s", confKey) raise