"""Base optimization component for slow control-plane tuning tasks.
The optimizer layer in pyRTC is used for tasks such as loop gain tuning or
hardware-assisted aberration optimization. These workflows are intentionally
outside the steady-state real-time pipeline, which makes them a good fit for an
Optuna-based trial/study abstraction.
"""
import argparse
import os
import sys
import time
import optuna
from pyRTC.logging_utils import get_logger
from pyRTC.Pipeline import Listener
from pyRTC.pyRTCComponent import pyRTCComponent
from pyRTC.utils import decrease_nice, read_yaml_file, set_affinity, setFromConfig
logger = get_logger(__name__)
[docs]
class Optimizer(pyRTCComponent):
"""
Abstract Optuna-backed optimization driver.
``Optimizer`` is meant to be subclassed by hardware- or algorithm-specific
optimizers in :mod:`pyRTC.hardware`. The base class owns the Optuna study,
default CMA-ES sampler choice, and the helper methods used to run a full
study or advance one trial at a time.
Subclasses are responsible for defining the objective function and for
applying candidate parameters to the system under test.
Attributes
----------
name : str
Name of the optimizer component.
study : optuna.Study
The Optuna study object, initialized with a CMA-ES sampler.
numSteps : int
Number of steps/trials to perform during optimization.
Methods
-------
objective():
Defines the objective function for the optimization.
optimize():
Performs the optimization process.
applyOptimum():
Applies the optimum values obtained from the optimization process.
applyTrial(trial):
Applies a given trial.
applyNext():
Requests and applies the next trial from the study.
"""
def __init__(self, conf) -> None:
"""
Initialize the optimizer study and runtime configuration.
Parameters
----------
conf : dict
Optimizer configuration. The base class uses ``numSteps`` while
subclasses may require additional problem-specific keys.
"""
try:
self.name = "Optimizer"
self.study = optuna.create_study(direction='maximize',
sampler=optuna.samplers.CmaEsSampler())
self.numSteps = setFromConfig(conf, "numSteps", 100)
super().__init__(conf)
self.logger.info("Initialized optimizer numSteps=%s", self.numSteps)
except Exception:
logger.exception("Failed to initialize optimizer")
raise
return
[docs]
def objective(self):
"""
Defines the objective function for the optimization.
This method should be overridden by subclasses to provide the
specific objective function for the optimization task.
:return: The objective value to be optimized.
"""
return
[docs]
def optimize(self):
"""
Performs the optimization process.
This method runs the optimization process using the defined objective
function and the number of steps specified in the configuration.
"""
component_logger = getattr(self, "logger", logger)
try:
component_logger.info("Starting optimization for %s trials", self.numSteps)
self.study.optimize(self.objective, n_trials=self.numSteps)
self.applyOptimum()
component_logger.info("Completed optimization")
except Exception:
component_logger.exception("Failed during optimization")
raise
return
[docs]
def applyOptimum(self):
"""
Applies the optimum values obtained from the optimization process.
This method should be implemented to apply the optimal parameters
found during the optimization to the system or component.
"""
return
[docs]
def applyTrial(self, trial):
"""
Applies a given trial.
:param trial: The trial object containing the parameters to be applied.
"""
return
[docs]
def applyNext(self):
"""
Requests and applies the next trial from the study.
This method obtains the next trial from the study and applies it
using the applyTrial method.
"""
component_logger = getattr(self, "logger", logger)
try:
trial = self.study.ask()
component_logger.info("Applying next trial %s", trial)
self.applyTrial(trial)
except Exception:
component_logger.exception("Failed to apply next optimization trial")
raise
return
[docs]
def resetStudy(self):
component_logger = getattr(self, "logger", logger)
try:
self.study = optuna.create_study(direction='maximize',
sampler=optuna.samplers.CmaEsSampler())
component_logger.info("Reset optimizer study")
except Exception:
component_logger.exception("Failed to reset optimizer study")
raise
return
if __name__ == "__main__":
#Prevents camera output from messing with communication
original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
# Create argument parser
parser = argparse.ArgumentParser(description="Read a config file from the command line.")
# Add command-line argument for the config file
parser.add_argument("-c", "--config", required=True, help="Path to the config file")
parser.add_argument("-p", "--port", required=True, help="Port for communication")
# Parse command-line arguments
args = parser.parse_args()
conf = read_yaml_file(args.config)
pid = os.getpid()
set_affinity((conf["loop"]["affinity"])%os.cpu_count())
decrease_nice(pid)
component = Optimizer(conf=conf)
component.start()
# Go back to communicating with the main program through stdout
sys.stdout = original_stdout
listener = Listener(component, port = int(args.port))
while listener.running:
listener.listen()
time.sleep(1e-3)