Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 105 additions & 2 deletions GEECS-Scanner-GUI/geecs_scanner/data_acquisition/scan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
import logging
import time

from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor

import numpy as np

from geecs_python_api.controls.devices.geecs_device import GeecsDevice
Expand Down Expand Up @@ -336,7 +339,8 @@ def execute_step(self, step: Dict[str, Any], index: int) -> None:
self.prepare_for_step()

logging.info(f"Moving devices for step: {step['variables']}")
self.move_devices(step["variables"], step["is_composite"])
# self.move_devices(step["variables"], step["is_composite"])
self.move_devices_parallel_by_device(step["variables"], step["is_composite"])

logging.info(f"Waiting for acquisition: {step}")
self.wait_for_acquisition(step["wait_time"])
Expand Down Expand Up @@ -471,6 +475,105 @@ def move_devices(
f"Device {device_name} not found in device manager."
)

def move_devices_parallel_by_device(
self,
component_vars: Dict[str, Any],
is_composite: bool,
max_retries: int = 3,
retry_delay: float = 0.5,
) -> None:
"""
Set device variables in parallel, grouped by device, with optional retry and tolerance checks.

This method initiates device variable settings in parallel by assigning one thread per device.
Variables belonging to the same device are set sequentially in that thread, preserving device-level
ordering and avoiding conflicts. Threads are launched and the method returns immediately without
waiting for completion.

Parameters
----------
component_vars : dict of str to Any
Dictionary mapping device variables to target values.
Keys are formatted as "device_name:variable_name".
is_composite : bool
Flag indicating whether the variables are part of a composite device configuration.
max_retries : int, optional
Maximum number of attempts to set each device variable. Defaults to 3.
retry_delay : float, optional
Time (in seconds) to wait between retry attempts. Defaults to 0.5 seconds.

Notes
-----
- This method returns immediately after launching threads; device settings may still be in progress.
- Device setting for composite variables is performed without tolerance checking.
- For standard variables, the device-specific tolerance is used to verify success.
- Device setting is skipped if `component_vars` corresponds to a statistical no-scan configuration.
- Logs are generated for each attempt, including success, warnings, and failures.
"""
if self.device_manager.is_statistic_noscan(component_vars):
return

if not component_vars:
logging.info("No variables to move for this scan step.")
return

# Step 1: Group variables by device
vars_by_device = defaultdict(list)
for device_var, set_val in component_vars.items():
device_name, var_name = (device_var.split(":") + ["composite_var"])[:2]
vars_by_device[device_name].append((var_name, set_val))

# Step 2: Define per-device setting function
def set_device_variables(device_name, var_list):
"""Helper fucntion to set vars in threads."""
device = self.device_manager.devices.get(device_name)
if not device:
logging.warning(f"Device {device_name} not found.")
return

logging.info(f"[{device_name}] Preparing to set vars: {var_list}")
for var_name, set_val in var_list:
tol = (
10000
if device.is_composite
else float(
GeecsDevice.exp_info["devices"][device_name][var_name][
"tolerance"
]
)
)
success = False
for attempt in range(max_retries):
ret_val = device.set(var_name, set_val)
logging.info(
f"[{device_name}] Attempt {attempt + 1}: Set {var_name}={set_val}, got {ret_val}"
)
if ret_val - tol <= set_val <= ret_val + tol:
logging.info(
f"[{device_name}] Success: {var_name}={ret_val} within tolerance {tol}"
)
success = True
break
else:
logging.warning(
f"[{device_name}] {var_name}={ret_val} not within tolerance of {set_val}"
)
time.sleep(retry_delay)

if not success:
logging.error(
f"[{device_name}] Failed to set {var_name} after {max_retries} attempts."
)

# Step 3: Run each device in parallel
with ThreadPoolExecutor(max_workers=len(vars_by_device)) as executor:
futures = [
executor.submit(set_device_variables, device_name, var_list)
for device_name, var_list in vars_by_device.items()
]
for f in futures:
f.result() # propagate exceptions, if any

def wait_for_acquisition(self, wait_time: float) -> None:
"""
Manage the acquisition phase of a scan step with comprehensive event handling.
Expand Down Expand Up @@ -756,7 +859,7 @@ def generate_next_step(self, next_index: int) -> None:
evaluate_acquired_data : Data evaluation method preceding step generation
"""
# Define number of initialization steps
num_initialization_steps = 3
num_initialization_steps = 1

try:
# Determine generation strategy based on step index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from geecs_scanner.data_acquisition.scan_data_manager import ScanDataManager
from geecs_scanner.data_acquisition.data_logger import DataLogger

import numpy as np

from geecs_scanner.optimization.base_evaluator import BaseEvaluator

Expand Down Expand Up @@ -144,6 +145,16 @@ def __init__(
"PlaceHolder" # string to append to logged objective value
)

@property
def counts_key(self) -> str:
"""Return key name to find total counts."""
return f"{self.dev_name}:total_counts"

@property
def emittance_key(self) -> str:
"""Return key name to find emittance proxy."""
return f"{self.dev_name}:emittance_proxy"

def evaluate_all_shots(self, shot_entries: list[dict]) -> float:
"""
Evaluate beam quality objective function for all shots in current bin.
Expand Down Expand Up @@ -185,6 +196,8 @@ def evaluate_all_shots(self, shot_entries: list[dict]) -> float:
>>> objective_value = evaluator.evaluate_all_shots(shot_entries)
>>> print(f"Beam quality metric: {objective_value:.3f}")
"""
self._last_metrics = {} # reset each call

# set the 'aux' data manually to isolate the current bin to get analyzed by the ScanAnalyzer
self.scan_analyzer.auxiliary_data = self.current_data_bin
self.scan_analyzer.run_analysis(scan_tag=self.scan_tag)
Expand All @@ -202,13 +215,18 @@ def evaluate_all_shots(self, shot_entries: list[dict]) -> float:
# extract the scalar results returned by the image analyzer
scalar_results = result["analyzer_return_dictionary"]

# define keys to extract values to use for the objective function
x_key = f"{self.dev_name}:total_counts"
y_key = f"{self.dev_name}:emittance_proxy"
# ---- NEW: extract + cache the metrics used in constraints
# pull metrics; if missing/bad, use NaN and let Xopt handle it
counts = float(scalar_results.get(self.counts_key, np.nan))
emit = float(scalar_results.get(self.emittance_key, np.nan))

objective_value = self.objective_fn(
x=scalar_results[x_key], y=scalar_results[y_key]
)
self._last_metrics = { # stash for _get_value
self.counts_key: counts,
self.emittance_key: emit,
}
# ----

objective_value = self.objective_fn(x=counts, y=emit)

for shot_entry in shot_entries:
self.log_objective_result(
Expand Down Expand Up @@ -254,9 +272,7 @@ def objective_fn(x, y):
and lower emittance proxy (better beam quality) both contribute to
a more negative (better) objective value.
"""

return -x/y/20000000

return -x / y / 20000000

def _get_value(self, input_data: Dict) -> Dict:
"""
Expand Down Expand Up @@ -304,6 +320,11 @@ def _get_value(self, input_data: Dict) -> Dict:
non_scalar_variables=["UC_ALineEBeam3"],
)

result = self.evaluate_all_shots(shot_entries)
f_val = self.evaluate_all_shots(shot_entries)

return {self.output_key: result}
# ---- NEW: return constraint outcomes alongside 'f'
out = {self.output_key: f_val}
if hasattr(self, "_last_metrics"):
out.update(self._last_metrics)
return out
# ----
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,32 @@
"""

# optimization/generator_factory.py

from typing import Any, Dict, Callable

from xopt.vocs import VOCS
from xopt.generators.random import RandomGenerator
from xopt.generators.bayesian import ExpectedImprovementGenerator
from xopt.generators.bayesian.models.standard import StandardModelConstructor

from typing import Any, Dict
from xopt.generators.bayesian.turbo import OptimizeTurboController

# Explicitly defined generators dictionary
PREDEFINED_GENERATORS = {
PREDEFINED_GENERATORS: dict[str, Callable[[VOCS], Any]] = {
"random": lambda vocs: RandomGenerator(vocs=vocs),
"bayes_default": lambda vocs: ExpectedImprovementGenerator(
vocs=vocs, gp_constructor=StandardModelConstructor(use_low_noise_prior=False)
),
"bayes_cheetah": lambda vocs: _load_cheetah_generator(vocs),
"bayes_turbo_standard": lambda vocs: _make_bayes_turbo(vocs),
"bayes_turbo_HTU_e_beam_brightness": lambda vocs: _make_bayes_turbo(
vocs,
success_tolerance=2,
failure_tolerance=2,
length=0.25,
length_max=2.0,
length_min=0.0078125,
scale_factor=2.0,
),
# Add more explicit named generators here if needed
}

Expand Down Expand Up @@ -126,6 +138,64 @@ def build_generator_from_config(config: Dict[str, Any], vocs: VOCS):
raise ValueError(f"Unsupported or undefined generator name: '{generator_name}'")


def _make_bayes_turbo(
vocs: VOCS,
length: float = 0.30,
length_min: float = 0.01,
length_max: float = 1.00,
success_tolerance: int = 2,
failure_tolerance: int = 2,
scale_factor: float = 2.0,
restrict_model_data: bool = True,
batch_size: int = 1,
n_monte_carlo_samples: int = 128,
use_low_noise_prior: bool = False,
) -> ExpectedImprovementGenerator:
"""
Build an ExpectedImprovementGenerator with a customized TuRBO trust region.

Parameters
----------
vocs : VOCS
VOCS specification for the optimization problem.
length, length_min, length_max : float
Trust region bounds.
success_tolerance, failure_tolerance : int
Number of successes/failures to expand/shrink TR.
scale_factor : float
Trust region expansion factor.
restrict_model_data : bool
Whether to fit GP only to points in the TR.
batch_size : int
Number of candidates per iteration.
n_monte_carlo_samples : int
Number of MC samples for qEI.
use_low_noise_prior : bool
Use low noise prior in GP model.
"""
turbo = OptimizeTurboController(
vocs=vocs,
batch_size=batch_size,
length=length,
length_min=length_min,
length_max=length_max,
success_tolerance=success_tolerance,
failure_tolerance=failure_tolerance,
scale_factor=scale_factor,
restrict_model_data=restrict_model_data,
name="OptimizeTurboController",
)

return ExpectedImprovementGenerator(
vocs=vocs,
gp_constructor=StandardModelConstructor(
use_low_noise_prior=use_low_noise_prior
),
n_monte_carlo_samples=n_monte_carlo_samples,
turbo_controller=turbo,
)


def _load_cheetah_generator(vocs):
"""
Load Cheetah-based Bayesian optimization generator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ vocs:

objectives:
f: MINIMIZE
constraints: {}

constraints:
UC_HiResMagCam:total_counts: ["GREATER_THAN", 100000.]
UC_HiResMagCam:emittance_proxy: ["LESS_THAN", 10.]

evaluator:
module: geecs_scanner.optimization.evaluators.HiResMagCam
class: HiResMagCam

generator:
name: bayes_default
name: bayes_turbo_HTU_e_beam_brightness

device_requirements:
Devices:
Expand Down