Source code for pyRTC.ScienceCamera

"""Science-camera abstractions and common image-quality telemetry.

This module defines the base class used by pyRTC science-camera adapters. It
handles the shared-memory products that downstream tools expect, including short
and long exposure PSFs, Strehl ratio estimates, and tip-tilt telemetry, while
leaving camera-specific acquisition details to hardware subclasses.
"""

import matplotlib.pyplot as plt
import numpy as np

from pyRTC.logging_utils import ensure_logging_configured, get_logger
from pyRTC.Pipeline import ImageSHM
from pyRTC.Pipeline import launchComponent
from pyRTC.pyRTCComponent import pyRTCComponent
from pyRTC.utils import centroid, clean_image_for_strehl, setFromConfig


logger = get_logger(__name__)

[docs] class ScienceCamera(pyRTCComponent): """ Base class for cameras that produce science images and image-quality metrics. ``ScienceCamera`` centralizes the parts of imaging that are shared across real and synthetic science-camera backends: SHM publication, dark/model PSF handling, long-exposure accumulation, and simple Strehl/tip-tilt telemetry. Subclasses are expected to implement the device-facing acquisition logic and then call the parent methods so the standard pyRTC products stay updated. Config ------ name : str Name of the camera. width : int Width of the image. Required. height : int Height of the image. Required. darkCount : int Number of dark frames to average. Required. integration : int Integration length. Required. darkFile : str, optional File to save the dark frames. Default is "". modelFile : str, optional File to save the model PSF. Default is "". Attributes ---------- name : str Name of the camera. imageShape : tuple Shape of the image. imageRawDType : type Data type of the raw image. imageDType : type Data type of the image. psfLongDtype : type Data type of the long exposure PSF. psfShort : ImageSHM Shared memory object for the short exposure PSF. psfLong : ImageSHM Shared memory object for the long exposure PSF. strehlShm : ImageSHM Shared memory object for the Strehl ratio. tipTiltShm : ImageSHM Shared memory object for the tip-tilt. data : numpy.ndarray Data array for the image. dark : numpy.ndarray Dark frame. darkCount : int Number of dark frames to average. darkFile : str File to save the dark frames. model : numpy.ndarray Model PSF. modelFile : str File to save the model PSF. strehl_ratio : float Strehl ratio. peak_dist : float Peak distance. integrationLength : int Integration length. roiWidth : int Width of the region of interest. roiHeight : int Height of the region of interest. roiLeft : int Left coordinate of the region of interest. roiTop : int Top coordinate of the region of interest. exposure : int Exposure time. binning : int Binning factor. gain : int Gain setting. bitDepth : int Bit depth setting. """ def __init__(self, conf) -> None: try: ensure_logging_configured(app_name="pyrtc", component_name=self.__class__.__name__) self.logger = get_logger(f"{self.__class__.__module__}.{self.__class__.__name__}") self.name = conf["name"] self.imageShape = (conf["width"], conf["height"]) self.imageRawDType = np.uint16 self.imageDType = np.int32 self.psfLongDtype = np.float64 self.psfShort = ImageSHM("psfShort", self.imageShape, self.imageDType) self.psfLong = ImageSHM("psfLong", self.imageShape, self.psfLongDtype) self.strehlShm = ImageSHM("strehl", (1,), float) self.tipTiltShm = ImageSHM("tiptilt", (1,), float) self.data = np.zeros(self.imageShape, dtype=self.imageRawDType) self.dark = np.zeros(self.imageShape, dtype=self.imageDType) self.darkCount = conf["darkCount"] self.darkFile = setFromConfig(conf, "darkFile", "") self.model = np.zeros(self.imageShape, dtype=self.psfLongDtype) self.modelFile = setFromConfig(conf, "modelFile", "") self.strehl_ratio = 0 self.peak_dist = 0 self.loadDark() self.loadModelPSF() self.integrationLength = conf["integration"] super().__init__(conf) self.logger.info( "Initialized science camera name=%s image_shape=%s integration=%s", self.name, self.imageShape, self.integrationLength, ) except Exception: logger.exception("Failed to initialize science camera") raise
[docs] def setRoi(self, roi): """ Set the region of interest (ROI). Parameters ---------- roi : tuple Tuple containing (width, height, left, top) of the ROI. """ try: self.roiWidth = roi[0] self.roiHeight = roi[1] self.roiLeft = roi[2] self.roiTop = roi[3] self.logger.info("Set ROI width=%s height=%s left=%s top=%s", *roi) except Exception: logger.exception("Failed to set ROI from %s", roi) raise return
[docs] def setExposure(self, exposure): """ Set the exposure time. Parameters ---------- exposure : int Exposure time to set. """ try: self.exposure = exposure self.logger.info("Set exposure to %s", exposure) except Exception: logger.exception("Failed to set exposure to %s", exposure) raise return
[docs] def setBinning(self, binning): """ Set the binning factor. Parameters ---------- binning : int Binning factor to set. """ try: self.binning = binning self.logger.info("Set binning to %s", binning) except Exception: logger.exception("Failed to set binning to %s", binning) raise return
[docs] def setGain(self, gain): """ Set the gain. Parameters ---------- gain : int Gain to set. """ try: self.gain = gain self.logger.info("Set gain to %s", gain) except Exception: logger.exception("Failed to set gain to %s", gain) raise return
[docs] def setGamma(self, gamma): """ Set the gamma. Parameters ---------- gamma : float Gamma to set. """ try: self.gamma = gamma self.logger.info("Set gamma to %s", gamma) except Exception: logger.exception("Failed to set gamma to %s", gamma) raise return
[docs] def setBitDepth(self, bitDepth): """ Set the bit depth. Parameters ---------- bitDepth : int Bit depth to set. """ try: self.bitDepth = bitDepth self.logger.info("Set bit depth to %s", bitDepth) except Exception: logger.exception("Failed to set bit depth to %s", bitDepth) raise return
[docs] def setIntegrationLength(self, integrationLength): """ Set the integration length. Parameters ---------- integrationLength : int Integration length to set. """ try: self.integrationLength = integrationLength self.logger.info("Set integration length to %s", integrationLength) except Exception: logger.exception("Failed to set integration length to %s", integrationLength) raise return
[docs] def expose(self): """ Perform a single exposure. """ self.psfShort.write(self.data.astype(self.imageDType) - self.dark) return
[docs] def integrate(self): """ Perform multiple exposures and integrate the results. Number of frames set by integrationLength. """ x = np.zeros(self.data.shape) for i in range(self.integrationLength): x += self.read().astype(x.dtype) self.psfLong.write(x/self.integrationLength) return
[docs] def read(self, block = True): """ Read the current short exposure PSF. Returns ------- numpy.ndarray Current short exposure PSF. """ if block: return self.psfShort.read(RELEASE_GIL = True) return self.psfShort.read_noblock()
[docs] def readLong(self): """ Read the current long exposure PSF. Returns ------- numpy.ndarray Current long exposure PSF. """ return self.psfLong.read(RELEASE_GIL = True)
[docs] def takeDark(self): """ Take dark frames and average them to create a dark frame. Number of exposures to average set by darkCount parameter. """ try: if self.darkCount < 1: raise ValueError("darkCount must be at least 1 to acquire a dark frame") self.logger.info("Taking science camera dark frame using %s exposures", self.darkCount) self.setDark(np.zeros_like(self.dark)) dark = np.zeros(self.imageShape, dtype=np.float64) for _ in range(self.darkCount): dark += self.read().astype(np.float64) dark /= self.darkCount self.setDark(dark) self.logger.info("Completed science camera dark frame acquisition") except Exception: logger.exception("Failed to acquire science camera dark frame") raise return
[docs] def setDark(self, dark): """ Set the dark frame. Parameters ---------- dark : numpy.ndarray Dark frame to set. """ try: self.dark = dark.astype(self.imageDType) self.logger.info("Updated science camera dark frame") except Exception: logger.exception("Failed to update science camera dark frame") raise return
[docs] def saveDark(self,filename=''): """ Save the dark frame to a file. Parameters ---------- filename : str, optional File to save the dark frame to. If not specified, uses the configured darkFile. """ try: if filename == '': filename = self.darkFile if filename == '': raise ValueError("No dark frame filename provided") np.save(filename, self.dark) self.logger.info("Saved science camera dark frame to %s", filename) except Exception: logger.exception("Failed to save science camera dark frame to %s", filename or self.darkFile) raise return
[docs] def loadDark(self,filename=''): """ Load the dark frame from a file. Parameters ---------- filename : str, optional File to load the dark frame from. If not specified, uses the configured darkFile. """ #If no file given, first try dark file try: if filename == '': filename = self.darkFile if filename == '': self.dark = np.zeros_like(self.dark) logger.info("No science camera dark frame file configured; using zeros") else: self.dark = np.load(filename) self.logger.info("Loaded science camera dark frame from %s", filename) except Exception: logger.exception("Failed to load science camera dark frame from %s", filename or self.darkFile) raise return
[docs] def takeModelPSF(self): """ Capture the current long exposure PSF as the model PSF. """ try: self.model = self.readLong() self.logger.info("Captured model PSF from current long-exposure image") except Exception: logger.exception("Failed to capture model PSF") raise return
[docs] def setModelPSF(self, model): """ Set the model PSF. Parameters ---------- model : numpy.ndarray Model PSF to set. """ try: self.model = model.astype(self.psfLongDtype) self.logger.info("Updated model PSF") except Exception: logger.exception("Failed to update model PSF") raise return
[docs] def saveModelPSF(self,filename=''): """ Save the model PSF to a file. Parameters ---------- filename : str, optional File to save the model PSF to. If not specified, uses the configured modelFile. """ try: if filename == '': filename = self.modelFile if filename == '': raise ValueError("No model PSF filename provided") np.save(filename, self.model) self.logger.info("Saved model PSF to %s", filename) except Exception: logger.exception("Failed to save model PSF to %s", filename or self.modelFile) raise return
[docs] def loadModelPSF(self,filename=''): """ Load the model PSF from a file. Parameters ---------- filename : str, optional File to load the model PSF from. If not specified, uses the configured modelFile. """ #If no file given, first try dark file try: if filename == '': filename = self.modelFile if filename == '': self.model = np.zeros_like(self.model) logger.info("No model PSF file configured; using zeros") else: self.model = np.load(filename) self.logger.info("Loaded model PSF from %s", filename) except Exception: logger.exception("Failed to load model PSF from %s", filename or self.modelFile) raise return
[docs] def computeStrehl(self, median_filter_size = 1, gaussian_sigma = 0): """ Compute the rough Strehl ratio and tip tilt offset. These values are reference to the modelPSF. If your model PSF is taken empirically, then the Strehl ratio is not absolute, and should only be used as a relative measurement for focal plane feedback. Parameters ---------- median_filter_size : int, optional Size of the median filter to apply. Default is 1. gaussian_sigma : float, optional Sigma for the Gaussian filter. Default is 0. Returns ------- float Strehl ratio. """ model = clean_image_for_strehl(self.model, median_filter_size = median_filter_size, gaussian_sigma = gaussian_sigma) current = clean_image_for_strehl(self.readLong(), median_filter_size = median_filter_size, gaussian_sigma = gaussian_sigma) self.strehl_ratio = np.max(current) / np.max(model) self.peak_dist = np.linalg.norm(centroid(current) - centroid(self.model)) self.strehlShm.write(np.array([self.strehl_ratio], dtype=float)) self.tipTiltShm.write(np.array([self.peak_dist], dtype=float)) return self.strehl_ratio
[docs] def plot(self): """ Plot the current short exposure PSF. """ try: arr = self.read() plt.imshow(arr, cmap = 'inferno', origin='lower') plt.colorbar() plt.show() self.logger.info("Plotted science camera image") except Exception: logger.exception("Failed to plot science camera image") raise return
if __name__ == "__main__": launchComponent(ScienceCamera, "psf", start = True)