"""Slope-processing kernels and the pyRTC slope extraction component.
This module turns wavefront-sensor camera frames into the residual slope or
signal vectors consumed by the AO loop. It includes optimized CPU and GPU
helpers for pyramid and Shack-Hartmann processing plus the ``SlopesProcess``
component that manages calibration data and SHM publication.
"""
import os
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ['NUMBA_NUM_THREADS'] = '1'
import matplotlib.pyplot as plt
import numpy as np
from typing import Any
from numba import jit
from pyRTC.logging_utils import get_logger
from pyRTC.Pipeline import ImageSHM, gpu_torch_available, initExistingShm, launchComponent
from pyRTC.pyRTCComponent import pyRTCComponent
from pyRTC.utils import (
compute_fwhm_dark_subtracted_image,
generate_circular_aperture_mask,
setFromConfig,
)
logger = get_logger(__name__)
def computeSlopesPYWFSTorch(image: Any,
p1Mask: Any,
p2Mask: Any,
p3Mask: Any,
p4Mask: Any,
numPixelsInPupils: int,
slopes: Any,
refSlopes: Any):
"""Compute normalized pyramid-WFS slopes on a torch device.
The function extracts the four pupil images selected by the provided masks,
forms differential x/y slope channels, normalizes by the mean total pupil
flux, and subtracts the stored reference slopes.
"""
if not gpu_torch_available():
raise ImportError("computeSlopesPYWFSTorch requires PyTorch. Install with 'pip install pyRTC[gpu]' or 'pip install torch'.")
import torch
# Ensure the image is in float format
image = image.to(torch.float32)
# Mask pupils out of the image
p1 = image[p1Mask]
p2 = image[p2Mask]
p3 = image[p3Mask]
p4 = image[p4Mask]
# Sum pupils, saving partial sums to avoid recomputing later
tmp1 = p1 + p2
tmp2 = p3 + p4
# Compute X slopes
slopes[:numPixelsInPupils] = tmp1 - tmp2
# Compute Y slopes
slopes[numPixelsInPupils:] = (p1 + p3) - (p2 + p4)
# Normalize slopes
slopes = slopes / torch.mean(tmp1 + tmp2)
# Subtract reference slopes
return slopes - refSlopes
"""
Optimized for best performance with numpy only
All memory is preallocated.
"""
def computeSlopesPYWFSOptimNumpy(image:np.ndarray,
p1Mask:np.ndarray,
p2Mask:np.ndarray,
p3Mask:np.ndarray,
p4Mask:np.ndarray,
p1:np.ndarray,
p2:np.ndarray,
p3:np.ndarray,
p4:np.ndarray,
tmp1:np.ndarray,
tmp2:np.ndarray,
numPixelsInPupils:int,
slopes:np.ndarray,
refSlopes:np.ndarray,
):
# Mask Pupils out of image and convert to floats
p1 = image[p1Mask].astype(np.float32)
p2 = image[p2Mask].astype(np.float32)
p3 = image[p3Mask].astype(np.float32)
p4 = image[p4Mask].astype(np.float32)
# Sum Pupils, Saving partial sums to avoid recomputing later
tmp1 = np.add(p1,p2)
tmp2 = np.add(p3,p4)
# Compute X slopes
slopes[:numPixelsInPupils] = np.subtract(tmp1,tmp2)
# Compute Y slopes
slopes[numPixelsInPupils:] = np.subtract(np.add(p1,p3),np.add(p2,p4))
# Normalize slopes
slopes = np.divide(slopes, np.mean(np.add(tmp1,tmp2)))
# Subtract reference slopes
return slopes - refSlopes
"""
Optimized for best performance.
Works very well with numba JIT compilation.
Performed better compared to a numpy only implementation
"""
@jit(nopython=True, nogil=True, cache=True, fastmath=True)
def computeSlopesPYWFSOptimNumba(image:np.ndarray,
p1Mask:np.ndarray,
p2Mask:np.ndarray,
p3Mask:np.ndarray,
p4Mask:np.ndarray,
p1:np.ndarray,
p2:np.ndarray,
p3:np.ndarray,
p4:np.ndarray,
tmp1:np.ndarray,
tmp2:np.ndarray,
numPixelsInPupils:int,
slopes:np.ndarray,
refSlopes:np.ndarray,
):
"""Compute pyramid-WFS slopes using a Numba-optimized CPU kernel."""
# Mask Pupils out of image and convert to floats
p1_count, p2_count, p3_count, p4_count = 0, 0, 0, 0
for i in range(len(image)):
if p1Mask[i]:
p1[p1_count] = np.float32(image[i])
p1_count += 1
if p2Mask[i]:
p2[p2_count] = np.float32(image[i])
p2_count += 1
if p3Mask[i]:
p3[p3_count] = np.float32(image[i])
p3_count += 1
if p4Mask[i]:
p4[p4_count] = np.float32(image[i])
p4_count += 1
# Sum Pupils, Saving partial sums to avoid recomputing later
total_sum = 0.0
for i in range(numPixelsInPupils): # Assuming all counts are equal
tmp1[i] = p1[i] + p2[i]
tmp2[i] = p3[i] + p4[i]
total_sum += tmp1[i] + tmp2[i]
mean_value = total_sum / p1_count
for i in range(numPixelsInPupils):
# Compute Y slopes
slopes[i] = (tmp1[i] - tmp2[i])/mean_value - refSlopes[i]
# Compute X slopes
slopes[numPixelsInPupils + i] = ((p1[i] + p3[i]) - (p2[i] + p4[i]))/mean_value \
- refSlopes[numPixelsInPupils + i]
return slopes
"""
Optimized for best performance.
Works very well with numba JIT compilation.
Performed better compared to a numpy only implementation, while also
allowing for non-integer spacing.
"""
@jit(nopython=True, nogil=True, cache=True)
def computeSlopesSHWFSOptimNumba(image:np.ndarray,
slopes:np.ndarray,
unaberratedSlopes:np.ndarray,
threshold:np.float32,
spacing:np.float32,
xvals:np.ndarray,
offsetX:int,
offsetY:int,
intN:int,
):
"""Compute Shack-Hartmann centroid slopes with a Numba kernel.
The image is traversed lenslet by lenslet, thresholded locally, and reduced
into x/y centroid offsets relative to the unaberrated reference slopes.
"""
# Convert image to the same dtype as unaberratedSlopes
image = image.astype(np.float32)
# Compute the number of sub-apertures
numRegions = unaberratedSlopes.shape[1]
# Loop over all regions
for i in range(numRegions):
for j in range(numRegions):
# Compute where to start
start_i = int(round(spacing * i)) + offsetY
start_j = int(round(spacing * j)) + offsetX
# Ensure we stay within the bounds of the image
if start_j + intN <= image.shape[1] and start_i + intN <= image.shape[0]:
#Create a local subimage around the lenslet spot
sub_im = image[start_i:start_i + intN, start_j:start_j + intN]
#loop through the sub image
norm = np.float32(0)
weightX = np.float32(0)
weightY = np.float32(0)
for m in range(intN):
for n in range(intN):
#If we are counting the pixel
if sub_im[m,n] > threshold:
#Add it to the normalization
norm += sub_im[m,n]
#Compute the X and Y centroids (before normalization)
weightX += xvals[m,n] * sub_im[m,n]
weightY += xvals[n,m] * sub_im[m,n]
#If we have flux in the sub aperture
if norm > 0:
#Normalize the centroids and remove the reference slope
slopes[i, j] = weightX/norm - unaberratedSlopes[i, j]
slopes[i + numRegions, j] = weightY/norm - unaberratedSlopes[i + numRegions, j]
#If we have no flux slopes should be zero
return slopes
"""
Optimized for best performance with numpy only.
Does not allow for non-integer spacing.
"""
def computeSlopesSHWFSOptimNumpy(image:np.ndarray,
slopes:np.ndarray,
unaberratedSlopes:np.ndarray,
threshold:float,
spacing:int,
xvals:np.array):
#Only works for integer spacings
spacing = int(spacing)
# Convert the image to floats and threshold in one operation
image = np.where(image > threshold, image.astype(np.float32), 0.0)
# Reshape the image into blocks of size spacing X spacing
reshaped_image = image.reshape(image.shape[0] // spacing, spacing, image.shape[1] // spacing, spacing)
# Compute the sum of pixel values in each MxM region
region_sums = np.sum(reshaped_image, axis=(1, 3))
# Precompute the dot products instead of tensordot (which is more general but slower)
weighted_sum_x = np.einsum('ijkl,jl->ik', reshaped_image, xvals)
weighted_sum_y = np.einsum('ijkl,jl->ik', reshaped_image, xvals.T)
# Get mask for non-zero value sums
mask = region_sums > 0.0
# Compute the centroids directly on the valid regions
valid_region_sums = region_sums[mask]
slopes[:slopes.shape[1]][mask] = weighted_sum_x[mask] / valid_region_sums - unaberratedSlopes[:slopes.shape[1]][mask]
slopes[slopes.shape[1]:][mask] = weighted_sum_y[mask] / valid_region_sums - unaberratedSlopes[slopes.shape[1]:][mask]
# Return the difference with reference slopes
return slopes
[docs]
class SlopesProcess(pyRTCComponent):
"""
A class to handle real-time slope computation for wavefront sensors.
Config
------
type : str
Type of the WFS ("PYWFS" or "SHWFS").
signalType : str
Type of signal ("slopes").
imageNoise : float, optional
Image noise. Default is 0.0.
centralObscurationRatio : float, optional
Central obscuration ratio. Default is 0.0.
flatNorm : float, optional
Normalization factor for the flat. Required for "PYWFS" with "slopes" signalType.
pupils : list of str, optional
List of pupil locations in "x,y" format. Required for "PYWFS".
pupilsRadius : int, optional
Radius of the pupils. Required for "PYWFS".
contrast : float, optional
Contrast for "SHWFS". Default is 0.
subApSpacing : float, optional
Sub-aperture spacing for "SHWFS".
subApOffsetX : float, optional
Sub-aperture offset in X direction for "SHWFS".
subApOffsetY : float, optional
Sub-aperture offset in Y direction for "SHWFS".
refSlopeCount : int, optional
Number of reference slopes for averaging. Default is 1000.
validSubApsFile : str, optional
File containing valid sub-aperture mask. Default is "".
refSlopesFile : str, optional
File containing reference slopes. Default is "".
Attributes
----------
confWFS : dict
Wavefront sensor configuration.
name : str
Name of the process.
imageShape : tuple
Shape of the WFS image.
conf : dict
Slopes configuration.
wfsMeta : numpy.ndarray
Metadata of the WFS image.
imageDType : type
Data type of the WFS image.
wfsShm : ImageSHM
Shared memory object for the WFS image.
signalDType : type
Data type of the signal.
imageNoise : float
Image noise.
centralObscurationRatio : float
Central obscuration ratio.
wfsType : str
Type of the WFS.
signalType : str
Type of signal.
validSubAps : numpy.ndarray or None
Valid sub-aperture mask.
shwfsContrast : float
Contrast for "SHWFS".
subApSpacing : float
Sub-aperture spacing for "SHWFS".
numRegions : int
Number of regions for "SHWFS".
offsetX : float
Sub-aperture offset in X direction for "SHWFS".
offsetY : float
Sub-aperture offset in Y direction for "SHWFS".
refSlopeCount : int
Number of reference slopes for averaging.
signal2DSize : int
Size of the 2D signal.
signal2DShape : tuple
Shape of the 2D signal.
validSubApsFile : str
File containing valid sub-aperture mask.
signalSize : int
Size of the signal.
signalShape : tuple
Shape of the signal.
signal : ImageSHM
Shared memory object for the signal.
signal2D : ImageSHM
Shared memory object for the 2D signal.
refSlopesFile : str
File containing reference slopes.
refSlopes : numpy.ndarray
Reference slopes.
gpuDevice : str
Default device if using GPU
flatNorm : float
Normalization factor for the flat.
pupilLocs : list of tuple
List of pupil locations.
pupilRadius : int
Radius of the pupils.
pupilMask : numpy.ndarray
Mask of the pupils.
p1mask : numpy.ndarray
Mask for pupil 1.
p2mask : numpy.ndarray
Mask for pupil 2.
p3mask : numpy.ndarray
Mask for pupil 3.
p4mask : numpy.ndarray
Mask for pupil 4.
"""
def __init__(self, conf) -> None:
try:
super().__init__(conf)
self.conf = conf
self.name = "Slopes"
self.wfsShm, self.imageShape, self.imageDType = initExistingShm("wfs", gpuDevice=self.gpuDevice)
self.signalDType = np.float32
self.imageNoise = setFromConfig(self.conf, "imageNoise", 0.0)
self.centralObscurationRatio = setFromConfig(self.conf, "centralObscurationRatio", 0.0)
self.wfsType = self.conf["type"].lower()
self.signalType = self.conf["signalType"]
self.validSubAps = None
self.validSubApsFile = setFromConfig(self.conf, "validSubApsFile", "")
self.refSlopesFile = setFromConfig(self.conf, "refSlopesFile", "")
self.refSlopeCount = setFromConfig(self.conf, "refSlopeCount", 1000)
if self.wfsType == "pywfs":
if "pupils" in self.conf.keys():
pupilLocs = [(int(x.split(',')[1]), int(x.split(',')[0])) for x in self.conf["pupils"]]
self.setPupils(pupilLocs, self.conf["pupilsRadius"])
else:
a, b = int(0.25 * self.imageShape[0]), int(0.75 * self.imageShape[0])
c, d = int(0.25 * self.imageShape[1]), int(0.75 * self.imageShape[1])
r = min(self.imageShape[0] - b, self.imageShape[1] - d)
self.setPupils([(a, c), (a, d), (b, c), (b, d)], r)
if self.signalType == 'slopes':
self.flatNorm = setFromConfig(self.conf, "flatNorm", True)
self.refSlopes = np.zeros(self.signal2DShape, dtype=self.signalDType)
self.refSlopes1D = np.zeros_like(self.signal.read_noblock())
self.slopesArr1D = np.zeros_like(self.refSlopes1D)
self.numPixelsInPupils = np.count_nonzero(self.p1mask)
self.p1 = np.empty(self.numPixelsInPupils, dtype=self.signalDType)
self.p2 = np.empty_like(self.p1)
self.p3 = np.empty_like(self.p1)
self.p4 = np.empty_like(self.p1)
self.tmp1, self.tmp2 = np.empty_like(self.p1), np.empty_like(self.p1)
elif self.wfsType == "shwfs":
self.shwfsContrast = setFromConfig(self.conf, "contrast", 0)
self.subApSpacing = self.conf["subApSpacing"]
self.regionSize = int(np.round(self.subApSpacing, 0))
self.numRegions = self.imageShape[0] // self.regionSize
self.offsetX = self.conf["subApOffsetX"]
self.offsetY = self.conf["subApOffsetY"]
xvals = np.arange(self.regionSize).astype(int) - self.regionSize // 2
self.xvals = np.meshgrid(xvals, xvals)[0].astype(self.signalDType)
self.signal2DSize = int(2 * self.numRegions**2)
self.signal2DShape = (2 * self.numRegions, self.numRegions)
self.validSubAps = np.ones(self.signal2DShape, dtype=bool)
self.loadValidSubAps()
self.signalSize = np.sum(self.validSubAps)
self.signalShape = (self.signalSize,)
logger.info(
"SHWFS slopes configured subApSpacing=%s numRegions=%s offsetX=%s offsetY=%s signalSize=%s signalShape=%s signalDType=%s",
self.subApSpacing,
self.numRegions,
self.offsetX,
self.offsetY,
self.signalSize,
self.signalShape,
self.signalDType,
)
self.signal = ImageSHM("signal", self.signalShape, self.signalDType, gpuDevice=self.gpuDevice, consumer=False)
self.signal2D = ImageSHM("signal2D", self.signal2DShape, self.signalDType, gpuDevice=self.gpuDevice, consumer=False)
self.refSlopes = np.zeros(self.signal2DShape, dtype=self.signalDType)
self.loadRefSlopes()
self.logger.info("Initialized slopes process wfsType=%s signalType=%s imageShape=%s", self.wfsType, self.signalType, self.imageShape)
except Exception:
logger.exception("Failed to initialize slopes process")
raise
[docs]
def read(self, block = True, SAFE=True, GPU=False):
"""
Read the current signal.
Returns
-------
numpy.ndarray
Current signal.
"""
if block:
return self.signal.read(SAFE=SAFE, GPU=GPU, RELEASE_GIL = self.RELEASE_GIL)
return self.signal.read_noblock(SAFE=SAFE, GPU=GPU)
[docs]
def readImage(self, SAFE=True, GPU=False, block=True):
"""
Read the current WFS image.
Returns
-------
numpy.ndarray
Current WFS image.
"""
if block:
return self.wfsShm.read(SAFE=SAFE, GPU=GPU, RELEASE_GIL = self.RELEASE_GIL)
return self.wfsShm.read_noblock(SAFE=SAFE, GPU=GPU)
[docs]
def setValidSubAps(self, validSubAps):
"""
Set the valid sub-aperture mask. Converts to boolean if not already
Parameters
----------
validSubAps : numpy.ndarray
Valid sub-aperture mask.
"""
component_logger = getattr(self, "logger", logger)
try:
self.validSubAps = validSubAps.astype(bool)
self.curSignal2D = np.zeros(validSubAps.shape)
component_logger.info("Set valid sub-aperture mask shape=%s", validSubAps.shape)
except Exception:
component_logger.exception("Failed to set valid sub-aperture mask")
raise
return
[docs]
def saveValidSubAps(self,filename=''):
"""
Save the valid sub-aperture mask to a file.
Parameters
----------
filename : str, optional
File to save the valid sub-aperture mask to. If not specified, uses the configured validSubApsFile.
"""
component_logger = getattr(self, "logger", logger)
try:
if filename == '':
filename = self.validSubApsFile
if filename == '':
raise ValueError("No validSubAps filename provided")
np.save(filename, self.validSubAps)
component_logger.info("Saved valid sub-aperture mask to %s", filename)
except Exception:
component_logger.exception("Failed to save valid sub-aperture mask to %s", filename or getattr(self, "validSubApsFile", ""))
raise
return
[docs]
def loadValidSubAps(self,filename=''):
"""
Load the valid sub-aperture mask from a file.
Parameters
----------
filename : str, optional
File to load the valid sub-aperture mask from. If not specified, uses the configured validSubApsFile.
"""
#If no file given, first try reference slopes file
component_logger = getattr(self, "logger", logger)
try:
if filename == '':
filename = self.validSubApsFile
if filename == '':
validSubAps = np.ones_like(self.validSubAps)
component_logger.info("No validSubAps file configured; using all-true mask")
else:
validSubAps = np.load(filename)
component_logger.info("Loaded valid sub-aperture mask from %s", filename)
self.setValidSubAps(validSubAps)
except Exception:
component_logger.exception("Failed to load valid sub-aperture mask from %s", filename or getattr(self, "validSubApsFile", ""))
raise
return
[docs]
def takeRefSlopes(self):
"""
Take reference slopes by averaging multiple slope measurements. Number of measurements
set by refSlopeCount variable.
"""
component_logger = getattr(self, "logger", logger)
try:
if self.refSlopeCount < 1:
raise ValueError("refSlopeCount must be at least 1")
component_logger.info("Taking reference slopes using %s frames", self.refSlopeCount)
self.setRefSlopes(np.zeros_like(self.refSlopes))
refSlopes = np.zeros_like(self.refSlopes)
for _ in range(self.refSlopeCount):
cur_slopes = self.read().astype(refSlopes.dtype)
refSlopes += self.computeSignal2D(cur_slopes)
refSlopes /= self.refSlopeCount
self.setRefSlopes(refSlopes)
component_logger.info("Completed reference slope acquisition")
except Exception:
component_logger.exception("Failed to take reference slopes")
raise
return
[docs]
def setRefSlopes(self, refSlopes):
"""
Set the reference slopes.
Parameters
----------
refSlopes : numpy.ndarray
Reference slopes.
"""
component_logger = getattr(self, "logger", logger)
try:
self.refSlopes = refSlopes.astype(self.signalDType)
if self.wfsType == 'pywfs':
slopemask = self.validSubAps[:, :self.validSubAps.shape[1] // 2]
self.refSlopes1D = np.zeros_like(self.signal.read_noblock())
self.refSlopes1D[:self.refSlopes1D.size // 2] = self.refSlopes[:, :self.refSlopes.shape[1] // 2][slopemask]
self.refSlopes1D[self.refSlopes1D.size // 2:] = self.refSlopes[:, self.refSlopes.shape[1] // 2:][slopemask]
component_logger.info("Updated reference slopes")
except Exception:
component_logger.exception("Failed to update reference slopes")
raise
return
[docs]
def saveRefSlopes(self,filename=''):
"""
Save the reference slopes to a file.
Parameters
----------
filename : str, optional
File to save the reference slopes to. If not specified, uses the configured refSlopesFile.
"""
component_logger = getattr(self, "logger", logger)
try:
if filename == '':
filename = self.refSlopesFile
if filename == '':
raise ValueError("No reference slopes filename provided")
np.save(filename, self.refSlopes)
component_logger.info("Saved reference slopes to %s", filename)
except Exception:
component_logger.exception("Failed to save reference slopes to %s", filename or getattr(self, "refSlopesFile", ""))
raise
return
[docs]
def loadRefSlopes(self,filename=''):
"""
Load the reference slopes from a file.
Parameters
----------
filename : str, optional
File to load the reference slopes from. If not specified, uses the configured refSlopesFile.
"""
#If no file given, first try reference slopes file
component_logger = getattr(self, "logger", logger)
try:
if filename == '':
filename = self.refSlopesFile
if filename == '':
refSlopes = np.zeros_like(self.refSlopes)
component_logger.info("No reference slopes file configured; using zeros")
else:
refSlopes = np.load(filename)
component_logger.info("Loaded reference slopes from %s", filename)
self.setRefSlopes(refSlopes)
except Exception:
component_logger.exception("Failed to load reference slopes from %s", filename or getattr(self, "refSlopesFile", ""))
raise
return
[docs]
def computeSignal(self):
"""
Compute the signal from the WFS image.
"""
image = self.readImage(SAFE=False, GPU = False)
if self.signalType == "slopes":
if self.wfsType == "pywfs":
if self.gpuDevice is not None and gpu_torch_available():
import torch
slope_signal = computeSlopesPYWFSTorch(image.ravel(),
p1Mask=torch.from_numpy(self.p1mask.ravel()).to(self.gpuDevice),
p2Mask=torch.from_numpy(self.p2mask.ravel()).to(self.gpuDevice),
p3Mask=torch.from_numpy(self.p3mask.ravel()).to(self.gpuDevice),
p4Mask=torch.from_numpy(self.p4mask.ravel()).to(self.gpuDevice),
numPixelsInPupils = self.numPixelsInPupils,
slopes = torch.from_numpy(self.slopesArr1D).to(self.gpuDevice),
refSlopes=torch.from_numpy(self.refSlopes1D).to(self.gpuDevice)).cpu().numpy()
else:
slope_signal = computeSlopesPYWFSOptimNumba(image=image.ravel(),
p1Mask=self.p1mask.ravel(),
p2Mask=self.p2mask.ravel(),
p3Mask=self.p3mask.ravel(),
p4Mask=self.p4mask.ravel(),
p1=self.p1,
p2=self.p2,
p3=self.p3,
p4=self.p4,
tmp1=self.tmp1,
tmp2=self.tmp2,
numPixelsInPupils = self.numPixelsInPupils,
slopes = self.slopesArr1D,
refSlopes=self.refSlopes1D)
elif self.wfsType == "shwfs":
slopes = computeSlopesSHWFSOptimNumba(image = image,
slopes = np.zeros_like(self.refSlopes),
unaberratedSlopes = self.refSlopes,
threshold = self.imageNoise*self.shwfsContrast,
spacing = self.subApSpacing,
xvals = self.xvals,
offsetX = self.offsetX,
offsetY = self.offsetY,
intN = self.regionSize)
slope_signal = slopes[self.validSubAps]
# self.signal.write(slopes[self.validSubAps])
# self.signal2D.write(slopes*self.validSubAps)
# slopes = np.zeros_like(self.refSlopes)
# self.signal.write(self.refSlopes.flatten()[:np.prod(self.signalShape)].reshape(self.signalShape))
self.signal.write(slope_signal)
self.signal2D.write(self.computeSignal2D(slope_signal))
return
[docs]
def computeImageNoise(self):
"""
Compute the image noise. Useful to set a good SNR cutoff for SHWFS
"""
component_logger = getattr(self, "logger", logger)
try:
img = self.readImage()
if img[img < 0].size > 0:
self.imageNoise = compute_fwhm_dark_subtracted_image(img) / 2
component_logger.info("Computed image noise=%s", self.imageNoise)
else:
logger.warning("Image is not dark subtracted")
except Exception:
component_logger.exception("Failed to compute image noise")
raise
return
[docs]
def setPupils(self, pupilLocs, pupilRadius):
"""
Set the pupils' locations and radius. First computes a Pupil Mask, then generates slope mask
and sets up SHMS of the correct sizes.
Parameters
----------
pupilLocs : list of tuple
List of pupil locations.
pupilRadius : int
Radius of the pupils.
"""
component_logger = getattr(self, "logger", logger)
try:
self.pupilLocs = pupilLocs
self.pupilRadius = pupilRadius
self.computePupilsMask()
if self.signalType == "slopes":
self.signalSize = np.count_nonzero(self.pupilMask) // 2
slopemask = self.pupilMask[
self.pupilLocs[0][1]-self.pupilRadius:self.pupilLocs[0][1]+self.pupilRadius,
self.pupilLocs[0][0]-self.pupilRadius:self.pupilLocs[0][0]+self.pupilRadius,
] > 0
self.setValidSubAps(np.concatenate([slopemask, slopemask], axis=1))
if self.validSubApsFile != "":
self.saveValidSubAps()
self.signal2DShape = (self.validSubAps.shape[0], self.validSubAps.shape[1])
self.signal = ImageSHM("signal", (self.signalSize,), self.signalDType, gpuDevice=self.gpuDevice, consumer=False)
self.signal2D = ImageSHM("signal2D", self.signal2DShape, self.signalDType, gpuDevice=self.gpuDevice, consumer=False)
component_logger.info("Configured pupils locs=%s radius=%s", pupilLocs, pupilRadius)
except Exception:
component_logger.exception("Failed to set pupils locs=%s radius=%s", pupilLocs, pupilRadius)
raise
return
[docs]
def computePupilsMask(self):
"""
Compute the mask for the pupils. Assumes circular aperture with obstruction ratio
set by the centralObscurationRatio parameter.
"""
component_logger = getattr(self, "logger", logger)
try:
self.pupilMask = np.zeros(self.imageShape)
pupilTemplate = generate_circular_aperture_mask(int(np.ceil(2*self.pupilRadius)),
self.pupilRadius,
self.centralObscurationRatio)
N = self.pupilMask.shape[0]
n = pupilTemplate.shape[0]
half_n = n // 2
for i, pupil_loc in enumerate(self.pupilLocs):
px, py = pupil_loc
x_start = px - half_n
x_end = px + half_n + (n % 2)
y_start = py - half_n
y_end = py + half_n + (n % 2)
if x_start < 0 or y_start < 0 or x_end > N or y_end > N:
raise ValueError("The subimage exceeds the bounds of the larger array.")
self.pupilMask[y_start:y_end, x_start:x_end] += pupilTemplate*(i+1)
self.p1mask = self.pupilMask == 1
self.p2mask = self.pupilMask == 2
self.p3mask = self.pupilMask == 3
self.p4mask = self.pupilMask == 4
component_logger.info("Computed pupil masks for %s pupils", len(self.pupilLocs))
except Exception:
component_logger.exception("Failed to compute pupil mask")
raise
return
[docs]
def plotPupils(self):
"""
Plot the pupil mask to see if its right.
"""
# plt.figure(figsize=(10,8))
plt.imshow(self.pupilMask, cmap = 'inferno',origin='lower',aspect ='auto')
plt.colorbar()
plt.title("Pupil Mask (Value is Pupil Number)")
plt.show()
plt.imshow(self.pupilMask*self.readImage(), cmap = 'inferno',origin='lower',aspect ='auto')
colors = ['g','b','orange', 'r']
for i in range(len(self.pupilLocs)):
px, py = self.pupilLocs[i]
plt.axvline(x = px, color = colors[i], alpha = 0.6)
plt.axhline(y = py, color = colors[i], alpha = 0.6)
plt.colorbar()
plt.title("Pupil Mask * Image ")
plt.show()
return
[docs]
def computeSignal2D(self, signal, validSubAps=None):
"""
Compute the 2D signal from the valid sub-aperture mask.
Parameters
----------
signal : numpy.ndarray
Signal to process.
validSubAps : numpy.ndarray, optional
Valid sub-aperture mask. If not provided, uses the current valid sub-aperture mask.
Returns
-------
numpy.ndarray
2D signal.
"""
if validSubAps is None and isinstance(self.validSubAps, np.ndarray):
validSubAps = self.validSubAps
else:
return -1
if self.wfsType == "pywfs":
slopemask = validSubAps[:,:validSubAps.shape[1]//2]
self.curSignal2D[:,:validSubAps.shape[1]//2][slopemask] = signal[:signal.size//2]
self.curSignal2D[:,validSubAps.shape[1]//2:][slopemask] = signal[signal.size//2:]
else:
self.curSignal2D[self.validSubAps] = signal
return self.curSignal2D
if __name__ == "__main__":
launchComponent(SlopesProcess, "slopes", start = True)