"""Shared-memory transport and hard-RTC process helpers for pyRTC.
This module contains the infrastructure that lets pyRTC components exchange
frames and command vectors through named shared-memory blocks, optionally mirror
those blocks onto CUDA tensors for compatible deployments, and launch hardware-
facing child processes that communicate with the main RTC over a small
localhost-based JSON protocol.
"""
import argparse
import json
import logging
import os
import socket
import struct
import sys
import time
from multiprocessing import shared_memory, resource_tracker
from subprocess import PIPE, Popen
import numpy as np
from pyRTC.logging_utils import add_logging_cli_args, configure_logging_from_args, ensure_logging_configured, get_logger
from pyRTC.utils import (
bind_socket,
dtype_to_float,
float_to_dtype,
precise_delay,
read_yaml_file,
setFromConfig,
set_affinity_and_priority,
)
logger = get_logger(__name__)
TORCH_AVAILABLE = False
torch = None
dtype_mapping = {}
try:
import torch
TORCH_AVAILABLE = True
# Mapping dictionary
dtype_mapping = {
np.float32: torch.float32,
np.float64: torch.float64,
np.int32: torch.int32,
np.int64: torch.int64,
np.uint8: torch.uint8,
np.uint16: torch.uint16,
np.dtype("float32"): torch.float32,
np.dtype("float64"): torch.float64,
np.dtype("int32"): torch.int32,
np.dtype("int64"): torch.int64,
np.dtype("uint8"): torch.uint8,
np.dtype("uint16"): torch.uint16,
}
except Exception:
TORCH_AVAILABLE = False
[docs]
def gpu_torch_available() -> bool:
return TORCH_AVAILABLE
[docs]
def normalize_gpu_device(gpuDevice, context: str = ""):
if gpuDevice is None:
return None
if not TORCH_AVAILABLE:
prefix = f"{context}: " if context else ""
logging.log(
level=logging.WARNING,
msg=f"{prefix}gpuDevice was requested but PyTorch is not installed; defaulting to CPU mode.",
)
return None
return gpuDevice
def work(obj, functionName, affinity):
"""
The main working thread for the any Pipeline object
"""
set_affinity_and_priority(functionName, [affinity])
#Get what function we need to run
workFunction = getattr(obj, functionName, None)
# count = 0
# N = 10000
# times = np.zeros(N)
#If the wfs object is still alive
while obj.alive:
#If we are meant to be running
if obj.running:
# start = time.time()
#Call it
workFunction()
# precise_delay(1)
# diff = time.time()-start
# times[count % N] = diff
# count += 1
# if count % N == 0:
# print(f"Thread {functionName} -- Mean Execution Time {1000*np.mean(times):.3f}ms")
else:
time.sleep(1e-3)
return
[docs]
class ImageSHM:
"""Named shared-memory array with metadata and optional GPU mirror state.
``ImageSHM`` is the transport primitive used throughout pyRTC. Each stream
has a CPU shared-memory block, a small metadata block containing shape,
dtype, and timing information, and optionally a GPU-backed tensor mirror in
hard-RTC deployments where CUDA sharing is supported.
Producers create the stream and update it with NumPy arrays. Consumers
reconstruct the stream by name and read either safe copies or direct views,
depending on their performance and synchronization needs.
"""
METADATA_SIZE = 10
def __init__(self, name, shape, dtype, gpuDevice=None, consumer=True) -> None:
self.name = name
self.dtype = dtype
self.shape = shape
self.arr = np.empty(shape, dtype=self.dtype)
self.size = self.arr.nbytes
self.metadata = np.zeros(self.METADATA_SIZE, dtype=np.float64)
self.count = 0
self.lastWriteTime = 0
self.lastReadTime = 0
self.areData = "meta" not in name
self.gpuDevice = normalize_gpu_device(gpuDevice, name)
try:
self.shm = shared_memory.SharedMemory(name= name, create=True, size=self.arr.nbytes)
logger.debug("Creating shared memory object %s", self.name)
except Exception:
self.shm = shared_memory.SharedMemory(name=name)
logger.debug("Opening existing shared memory object %s", self.name)
#Doesn't work in windows
if sys.platform != 'win32':
resource_tracker.unregister(self.shm._name, 'shared_memory')
self.arr = np.ndarray(shape, dtype=dtype, buffer=self.shm.buf)
#If we are not opening a metadata shm
if self.areData:
#Create/Open an associated metadata SHM
try:
self.metadataShm = shared_memory.SharedMemory(name= name+"_meta", create=True, size=self.metadata.nbytes)
logger.debug("Creating shared memory object %s_meta", self.name)
except Exception:
self.metadataShm = shared_memory.SharedMemory(name= name+"_meta")
logger.debug("Opening existing shared memory object %s_meta", self.name)
if sys.platform != 'win32':
resource_tracker.unregister(self.metadataShm._name, 'shared_memory')
self.metadata = np.ndarray(self.metadata.shape, dtype=self.metadata.dtype, buffer=self.metadataShm.buf)
self.updateMetadata(FULL_UPDATE=True)
if self.gpuDevice is not None:
self.torchDtype = dtype_mapping.get(self.dtype, None)
if self.torchDtype is None:
self.gpuDevice = None
logging.log(level=logging.WARNING, msg=f"{self.name}: dtype {self.dtype} not supported for GPU SHM; defaulting to CPU mode.")
return
#If we expect the SHM to already exist
if consumer:
self.initGPUMemFromSHM()
else:
self.createGPUMemSHM()
return
def __del__(self):
self.close()
[docs]
def createGPUMemSHM(self):
if self.gpuDevice is None or not TORCH_AVAILABLE:
return None
# Create a GPU tensor
self.shmGPU = torch.empty(self.shape, dtype=self.torchDtype, device=self.gpuDevice)
storage = self.shmGPU.untyped_storage()
# Get all outputs from storage._share_cuda_()
(
device_index,
handle_bytes,
storage_size_bytes,
storage_offset_bytes,
path_bytes,
unknown,
additional_bytes,
is_host_device # Assuming the 9th element is a boolean
) = storage._share_cuda_()
# Prepare variable-length byte fields
handle_length = len(handle_bytes)
path_length = len(path_bytes)
additional_length = len(additional_bytes)
# Define header format
# 'I' - uint32, 'Q' - uint64, 'B' - uint8
header_format = 'I I I I I I I I I' # device_index, handle_length, storage_size, storage_offset, size_bytes, view_size, view_offset, is_host_device
header = struct.pack(
header_format,
device_index,
handle_length,
storage_size_bytes,
storage_offset_bytes,
unknown,
path_length,
additional_length,
int(is_host_device), # Convert bool to int
os.getpid(),
)
# Total size: header + handle_bytes + path_bytes + additional_bytes
total_size = struct.calcsize(header_format) + handle_length + path_length + additional_length
# Create or open the shared memory segment
# try:
gpuHandleShm = shared_memory.SharedMemory(
name=self.name + "_gpu_handle", create=True, size=total_size)
logger.debug("Creating shared memory object %s_gpu_handle", self.name)
# except FileExistsError:
# gpuHandleShm = shared_memory.SharedMemory(name=self.name + "_gpu_handle")
# print(f"Opening Existing Shared Memory Object {self.name}_gpu_handle")
# if sys.platform != 'win32':
# resource_tracker.unregister(gpuHandleShm._name, 'shared_memory')
# Write data to shared memory
buf = gpuHandleShm.buf
offset = 0
# Write header
buf[offset:offset + struct.calcsize(header_format)] = header
offset += struct.calcsize(header_format)
# Write handle_bytes
buf[offset:offset + handle_length] = handle_bytes
offset += handle_length
# Write path_bytes
buf[offset:offset + path_length] = path_bytes
offset += path_length
# Write additional_bytes
buf[offset:offset + additional_length] = additional_bytes
offset += additional_length
return self.shmGPU
[docs]
def initGPUMemFromSHM(self):
if self.gpuDevice is None or not TORCH_AVAILABLE:
return
# Open the shared memory segment
try:
gpuHandleShm = shared_memory.SharedMemory(name=self.name + "_gpu_handle")
logger.debug("Opened shared memory object %s_gpu_handle", self.name)
except Exception:
self.gpuDevice=None
logging.log(level=logging.WARNING, msg=f"{self.name}: Trying to initialize GPU memory which does not exist. Defaulting to CPU")
return
# raise Exception(f"{self.name}: Trying to initialize GPU memory which does not exist")
# if sys.platform != 'win32':
# resource_tracker.unregister(gpuHandleShm._name, 'shared_memory')
buf = gpuHandleShm.buf
offset = 0
# Define header format
header_format = 'I I I I I I I I I' # device_index, handle_length, storage_size, storage_offset, size_bytes, view_size, view_offset, is_host_device
header_size = struct.calcsize(header_format)
# Read and unpack header
header_bytes = bytes(buf[offset:offset + header_size])
(
device_index,
handle_length,
storage_size_bytes,
storage_offset_bytes,
unknown,
path_length,
additional_length,
is_host_device, # Convert bool to int
pid
) = struct.unpack(header_format, header_bytes)
if pid == os.getpid():
raise Exception(f"{self.name}:GPU SHMs only work in hard real-time mode, set gpuDevice to None or remove from config")
offset += header_size
# Read handle_bytes
handle_bytes = bytes(buf[offset:offset + handle_length])
offset += handle_length
# Read path_bytes
path_bytes = bytes(buf[offset:offset + path_length])
offset += path_length
# Read additional_bytes
# Assuming additional_length is known or fixed; alternatively, store it in header
additional_bytes = bytes(buf[offset:offset + additional_length])
offset += additional_length
# Reconstruct the storage
storage = torch.UntypedStorage._new_shared_cuda(
device_index,
handle_bytes,
storage_size_bytes,
storage_offset_bytes,
path_bytes,
unknown,
additional_bytes,
bool(is_host_device)
)
# Create a tensor from the storage
self.shmGPU = torch.tensor([], dtype=self.torchDtype, device=device_index).set_(storage).reshape(self.shape)
return
[docs]
def close(self):
logger.debug("Closing shared memory object %s", self.name)
self.shm.close()
return
[docs]
def write(self, arr):
#Check if we are a GPU shm
if self.gpuDevice is not None:
#If you didn't write a numpy array or tensor
if not isinstance(arr, np.ndarray) and not isinstance(arr, torch.Tensor):
return -1
#If you wrote the wrong shape
if arr.shape != self.arr.shape:
logging.log(level=logging.ERROR, msg=f"{self.name}: Writing Wrong size array to SHM. Expecting {self.arr.shape}, Got {arr.shape}")
return -1
#If you passed a tensor
if isinstance(arr, torch.Tensor):
#copy the tensor to the GPU shm
self.shmGPU.copy_(arr)
#copy a CPU numpy version to the CPU shm
np.copyto(self.arr, arr.cpu().numpy())
elif isinstance(arr, np.ndarray):
#copy a CPU numpy version to the CPU shm
np.copyto(self.arr, arr)
#copy the tensor to the GPU shm
tensor = torch.from_numpy(arr)
self.shmGPU.copy_(tensor)
else:
#If you didn't write a numpy array
if not isinstance(arr, np.ndarray):
return -1
#If you wrote the wrong shape
if arr.shape != self.arr.shape:
logging.log(level=logging.ERROR, msg=f"{self.name}: Writing Wrong size array to SHM. Expecting {self.arr.shape}, Got {arr.shape}")
return -1
#Copy to SHM
np.copyto(self.arr, arr)
#Update metadata
self.count += 1
self.lastWriteTime = time.time()
if self.areData:
self.updateMetadata()
#Return Success
return 1
[docs]
def hold(self, timeout=None, RELEASE_GIL = True):
if timeout is None:
while not self.checkNew():
if RELEASE_GIL:
time.sleep(1e-5)
else:
precise_delay(5)
elif isinstance(timeout, float) or isinstance(timeout,int):
start = time.time()
while not self.checkNew() and (time.time() - start) < timeout:
if RELEASE_GIL:
time.sleep(1e-5)
else:
precise_delay(5)
return
[docs]
def read(self, SAFE=True, GPU = False, RELEASE_GIL = True):
self.hold(RELEASE_GIL = RELEASE_GIL)
return self.read_noblock(SAFE=SAFE, GPU=GPU)
[docs]
def read_timeout(self, timeout, SAFE = True, GPU = False, RELEASE_GIL = True):
self.hold(timeout=timeout, RELEASE_GIL = RELEASE_GIL)
return self.read_noblock(SAFE=SAFE, GPU=GPU)
[docs]
def read_noblock(self, SAFE=True, GPU=False):
#Mark that we have seen the shm before
self.markSeen()
#Return a copy of the CPU shm
if SAFE:
#If the user asks to read the GPU shm
if GPU and self.gpuDevice is not None:
return self.shmGPU.clone()
else:
arr = np.copy(self.arr)
return arr
else:##EXPERIMENTAL if not safe, return the raw shm memory
if GPU and self.gpuDevice is not None:
return self.shmGPU
else:
return self.arr
[docs]
def checkNew(self):
if self.areData:
# metadata = np.copy(self.metadata)
if self.metadata[1] != self.lastReadTime:
self.markSeen()
return True
else: #If we are just reading a meta data object directly
return True
return False
[docs]
def markSeen(self):
self.lastReadTime = self.metadata[1]
return
def clear_shms(names):
for n in names:
shm = ImageSHM(n,(1,),np.uint8)
shm.shm.unlink()
shm = ImageSHM(n+"_meta",(1,),np.uint8)
shm.shm.unlink()
try:
shm = ImageSHM(n+"_gpu_handle",(1,),np.uint8)
shm.shm.unlink()
except Exception:
pass
[docs]
class hardwareLauncher:
"""Launch and supervise a hardware-side child process.
The launcher is the client-side helper for pyRTC's hard-RTC deployment
model. It starts a Python subprocess, waits for the child to expose a socket
listener, and then sends simple JSON messages to get or set properties,
invoke helper methods, or request shutdown.
Logging-related environment variables are propagated so parent and child
processes share the same operator-facing logging policy.
"""
def __init__(self, hardwareFile, configFile, port, timeout=None) -> None:
self.hardwareFile = hardwareFile
self.command = [sys.executable, hardwareFile, "-c", f"{configFile}", "-p", f"{port}"]
self.running = False
# Client configuration
self.host = '127.0.0.1' # localhost
self.port = port
self.timeout = timeout
return
[docs]
def launch(self):
ensure_logging_configured(app_name="pyrtc-hardware-launcher", component_name=self.hardwareFile)
if not self.running:
logger.info("Launching process %s", self.hardwareFile)
self.process = Popen(self.command,stdin=PIPE,stdout=PIPE, text=True, bufsize=1, env=os.environ.copy())
self.running = True
# Create a socket object
self.processSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logger.info("Waiting for process at %s:%s", self.host, self.port)
connected = False
restTime = 2
while not connected:
time.sleep(restTime)
try:
# Connect to the server
self.processSocket.connect((self.host, self.port))
connected = True
except Exception as e:
logger.warning("Connection failed: %s", e)
logger.info("Retrying in %s seconds", restTime)
if isinstance(self.timeout,float) or isinstance(self.timeout,int):
self.processSocket.settimeout(self.timeout)
logger.info("Connected to child process socket")
return
[docs]
def shutdown(self):
message = {"type": "shutdown"}
return self.writeAndRead(message)
[docs]
def getProperty(self, property):
message = {"type": "get", "property": property}
return self.writeAndRead(message)
[docs]
def setProperty(self, property, value):
message = {"type": "set", "property": property, "value": value}
return self.writeAndRead(message)
[docs]
def run(self, function, *args, timeout = None):
message = {"type": "run", "function": function}
for i, arg in enumerate(args):
message[f"arg_{i+1}"] = arg
return self.writeAndRead(message)
[docs]
def writeAndRead(self,message):
if self.running:
self.write(message)
reply = self.read()
#If there are issues with the reply format
if not isinstance(reply, dict) or "status" not in reply.keys():
return -1
#If there was an issue on the process end
if reply["status"] == 'BAD':
return -1
#If our request went through
if reply["status"] == 'OK':
#If the reply came with a property to return
if "property" in reply.keys():
return reply["property"]
#Otherwise just return OK
else:
return 1
#default is a fail
return -1
[docs]
def write(self, message):
message = json.dumps(message)
self.processSocket.send(message.encode())
return
[docs]
def read(self):
try:
reply = self.processSocket.recv(4096).decode()
return json.loads(reply)
except socket.timeout:
return -1
[docs]
class Listener:
"""Server-side control socket for a launched hardware object.
``Listener`` is the child-process counterpart to :class:`hardwareLauncher`.
It binds a localhost socket, accepts the RTC-side connection, and services a
narrow JSON RPC surface for property access, method calls, and clean
shutdown.
"""
def __init__(self, hardware, port) -> None:
self.hardware = hardware
self.running = True
self.keyCharacter = '$'
self.host = '127.0.0.1' # localhost
self.port = port
server_socket = bind_socket(self.host, self.port)
# # Create a socket object
# server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# try:
# print(f"{hardware.name}: Binding to {self.host}:{self.port}")
# # Bind the socket to a specific address and port
# server_socket.bind((self.host, self.port))
# except OSError as
# Listen for incoming connections
server_socket.listen()
logger.info("%s: awaiting RTC connection", hardware.name)
#Connect to the RTC process that spawned you
self.RTCsocket, self.RTCaddress = server_socket.accept()
self.OKMessage = {"status": "OK"}
self.BadMessage = {"status": "BAD"}
return
[docs]
def listen(self):
try:
request = self.read()
except Exception:
logger.exception("Failed to read listener request")
self.write(self.BadMessage)
return
if "type" not in request:
self.write(self.BadMessage)
logger.error("Listener request missing type field: %s", request)
return
#Sort behaviour by request type
requestType = request["type"]
if requestType == "shutdown":
try:
self.hardware.__del__()
self.running = False
self.write(self.OKMessage)
except Exception:
logger.exception("Listener shutdown request failed")
self.write(self.BadMessage)
elif requestType == "get":
try:
propertyName = request["property"]
property = getattr(self.hardware, propertyName)
message = self.OKMessage.copy()
message["property"] = property
self.write(message)
except Exception:
logger.exception("Listener get request failed for %s", request.get("property"))
self.write(self.BadMessage)
elif requestType == "set":
try:
propertyName = request["property"]
propertyValue = request["value"]
property = getattr(self.hardware, propertyName)
setattr(self.hardware, propertyName, type(property)(propertyValue))
self.write(self.OKMessage)
except Exception:
logger.exception("Listener set request failed for %s", request.get("property"))
self.write(self.BadMessage)
elif requestType == "run":
try:
functionName = request["function"]
args = []
for i in range(0, len(request.keys())-2):
arg = request[f"arg_{i+1}"]
args.append(arg)
function = getattr(self.hardware, functionName)
if len(args) > 0:
function(*args)
else:
function()
self.write(self.OKMessage)
except Exception:
logger.exception("Listener run request failed for %s", request.get("function"))
self.write(self.BadMessage)
else:
logger.error("Unknown listener request type: %s", requestType)
self.write(self.BadMessage)
[docs]
def write(self, message):
message = json.dumps(message)
self.RTCsocket.send(message.encode())
return
[docs]
def read(self):
reply = self.RTCsocket.recv(4096).decode()
return json.loads(reply)
[docs]
def initExistingShm(shmName, gpuDevice=None):
#Read wfc metadata and open a stream to the shared memory
shmMeta = ImageSHM(shmName+"_meta", (ImageSHM.METADATA_SIZE,), np.float64).read_noblock()
shmDType = float_to_dtype(shmMeta[3])
shmDims = []
i = 0
while int(shmMeta[4+i]) > 0:
shmDims.append(int(shmMeta[4+i]))
i += 1
shm = ImageSHM(shmName, shmDims, shmDType, gpuDevice=gpuDevice, consumer=True)
return shm, shmDims, shmDType
[docs]
def launchComponent(component, confKey, start = True):
# Create argument parser
parser = argparse.ArgumentParser(description="Read a config file from the command line.")
# Add command-line argument for the config file
parser.add_argument("-c", "--config", required=True, help="Path to the config file")
parser.add_argument("-p", "--port", required=True, help="Port for communication")
add_logging_cli_args(parser)
# Parse command-line arguments
args = parser.parse_args()
configure_logging_from_args(args, app_name=f"pyrtc-{confKey}", component_name=confKey)
conf = read_yaml_file(args.config)[confKey]
set_affinity_and_priority("", setFromConfig(conf, "affinity", 0))
try:
obj = component(conf=conf)
obj.RELEASE_GIL = False
if start:
obj.start()
listener = Listener(obj, port= int(args.port))
while listener.running:
listener.listen()
time.sleep(1e-3)
except Exception:
logger.exception("Failed to launch component %s", confKey)
raise