diff --git a/docs/sphinx/examples/solvers/python/gqe_h2.py b/docs/sphinx/examples/solvers/python/gqe_h2.py index 7bf2da20..92ba9c60 100644 --- a/docs/sphinx/examples/solvers/python/gqe_h2.py +++ b/docs/sphinx/examples/solvers/python/gqe_h2.py @@ -44,7 +44,7 @@ import cudaq_solvers as solvers from cudaq import spin -from lightning.fabric.loggers import CSVLogger +from lightning.pytorch.loggers import CSVLogger from cudaq_solvers.gqe_algorithm.gqe import get_default_config # Set deterministic seed and environment variables for deterministic behavior @@ -171,18 +171,22 @@ def cost(sampled_ops: list[cudaq.SpinOperator], **kwargs): # Configure GQE cfg = get_default_config() -cfg.use_fabric_logging = False -logger = CSVLogger("gqe_h2_logs/gqe.csv") -cfg.fabric_logger = logger +cfg.use_lightning_logging = True +logger = CSVLogger(save_dir="gqe_h2_logs", name="gqe") +cfg.max_iters = 50 +cfg.ngates = 10 +cfg.lightning_logger = logger cfg.save_trajectory = False cfg.verbose = True +cfg.enable_checkpointing = True # Run GQE -minE, best_ops = solvers.gqe(cost, op_pool, max_iters=25, ngates=10, config=cfg) +minE, best_ops = solvers.gqe(cost, op_pool, config=cfg) # Only print results from rank 0 when using MPI if not args.mpi or cudaq.mpi.rank() == 0: - print(f'Ground Energy = {minE}') + print(f'Ground Energy = {minE} (Ha)') + print(f'Error = {minE - molecule.energies["fci_energy"]} (Ha)') print('Ansatz Ops') for idx in best_ops: # Get the first (and only) term since these are simple operators diff --git a/docs/sphinx/examples/solvers/python/gqe_n2.py b/docs/sphinx/examples/solvers/python/gqe_n2.py new file mode 100644 index 00000000..aeb0ccd4 --- /dev/null +++ b/docs/sphinx/examples/solvers/python/gqe_n2.py @@ -0,0 +1,158 @@ +# ============================================================================ # +# Copyright (c) 2025 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # +# [Begin Documentation] + +# GQE is an optional component of the CUDA-QX Solvers Library. To install its +# dependencies, run: +# pip install cudaq-solvers[gqe] +# +# This example demonstrates GQE on the N2 molecule using the utility function +# get_gqe_pauli_pool() to generate an operator pool based on UCCSD Pauli terms. +# The pool is automatically generated from UCCSD operators and scaled by +# different parameter values, making it suitable for variational quantum algorithms. +# +# Run this script with +# python3 gqe_n2.py +# +# In order to leverage CUDA-Q MQPU and distribute the work across +# multiple QPUs (thereby observing a speed-up), run with: +# +# mpiexec -np N and vary N to see the speedup... +# e.g. PMIX_MCA_gds=hash mpiexec -np 2 python3 gqe_n2.py --mpi + +import argparse, cudaq + +parser = argparse.ArgumentParser() +parser.add_argument('--mpi', action='store_true') +args = parser.parse_args() + +if args.mpi: + try: + cudaq.set_target('nvidia', option='mqpu') + cudaq.mpi.initialize() + except RuntimeError: + print( + 'Warning: NVIDIA GPUs or MPI not available, unable to use CUDA-Q MQPU. Skipping...' + ) + exit(0) +else: + try: + cudaq.set_target('nvidia', option='fp64') + except RuntimeError: + cudaq.set_target('qpp-cpu') + +import cudaq_solvers as solvers + +from lightning.pytorch.loggers import CSVLogger +from cudaq_solvers.gqe_algorithm.gqe import get_default_config +from cudaq_solvers.gqe_algorithm.utils import get_gqe_pauli_pool + +# Set deterministic seed and environment variables for deterministic behavior +# Disable this section for non-deterministic behavior +import os, torch + +os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8' +torch.manual_seed(3047) +torch.use_deterministic_algorithms(True) +torch.backends.cudnn.deterministic = True +torch.backends.cudnn.benchmark = False + +# Create the molecular hamiltonian +geometry = [('N', (0., 0., 0.)), ('N', (0., 0., 1.1))] +molecule = solvers.create_molecule(geometry, 'sto-3g', 0, 0, nele_cas=6, norb_cas=6, casci=True) + +spin_ham = molecule.hamiltonian +n_qubits = molecule.n_orbitals * 2 +n_electrons = molecule.n_electrons + +# Generate the operator pool using utility function +params = [ + 0.003125, -0.003125, 0.00625, -0.00625, 0.0125, -0.0125, 0.025, -0.025, + 0.05, -0.05, 0.1, -0.1 +] + +op_pool = get_gqe_pauli_pool(n_qubits, n_electrons, params) + + +def term_coefficients(op: cudaq.SpinOperator) -> list[complex]: + return [term.evaluate_coefficient() for term in op] + + +def term_words(op: cudaq.SpinOperator) -> list[cudaq.pauli_word]: + return [term.get_pauli_word(n_qubits) for term in op] + + +# Kernel that applies the selected operators +@cudaq.kernel +def kernel(n_qubits: int, n_electrons: int, coeffs: list[float], + words: list[cudaq.pauli_word]): + q = cudaq.qvector(n_qubits) + + for i in range(n_electrons): + x(q[i]) + + for i in range(len(coeffs)): + exp_pauli(coeffs[i], q, words[i]) + + +def cost(sampled_ops: list[cudaq.SpinOperator], **kwargs): + + full_coeffs = [] + full_words = [] + + for op in sampled_ops: + full_coeffs += [c.real for c in term_coefficients(op)] + full_words += term_words(op) + + if args.mpi: + handle = cudaq.observe_async(kernel, + spin_ham, + n_qubits, + n_electrons, + full_coeffs, + full_words, + qpu_id=kwargs['qpu_id']) + return handle, lambda res: res.get().expectation() + else: + return cudaq.observe(kernel, spin_ham, n_qubits, n_electrons, + full_coeffs, full_words).expectation() + + +# Configure GQE +cfg = get_default_config() +cfg.use_lightning_logging = True +logger = CSVLogger(save_dir="gqe_n2_logs", name="gqe") +cfg.max_iters = 1500 +cfg.ngates = 60 +cfg.num_samples = 50 +cfg.buffer_size = 50 +cfg.warmup_size = 50 +cfg.batch_size = 50 + +cfg.scheduler = 'variance' +cfg.lightning_logger = logger +cfg.save_trajectory = False +cfg.verbose = True +cfg.benchmark_energy = molecule.energies + +# Run GQE +minE, best_ops = solvers.gqe(cost, op_pool, config=cfg) + +# Only print results from rank 0 when using MPI +if not args.mpi or cudaq.mpi.rank() == 0: + print(f'Ground Energy = {minE} (Ha)') + print(f'Error = {minE - molecule.energies["R-CASCI"]} (Ha)') + print('Ansatz Ops') + for idx in best_ops: + # Get the first (and only) term since these are simple operators + term = next(iter(op_pool[idx])) + print(term.evaluate_coefficient().real, term.get_pauli_word(n_qubits)) + +if args.mpi: + cudaq.mpi.finalize() + diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/callbacks.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/callbacks.py new file mode 100644 index 00000000..21fcb6b9 --- /dev/null +++ b/libs/solvers/python/cudaq_solvers/gqe_algorithm/callbacks.py @@ -0,0 +1,144 @@ +# ============================================================================ # +# Copyright (c) 2025 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # + +import sys +import torch +from lightning.pytorch.callbacks import Callback + + +class MinEnergyCallback(Callback): + """Callback to track minimum energy found during training. + + Keeps track of the minimum energy value and corresponding operator indices + across all training epochs. + """ + + def __init__(self): + super().__init__() + self.min_energy = sys.maxsize + self.min_indices = None + self.min_energy_history = [] + + def on_train_epoch_end(self, trainer, pl_module): + """Update minimum energy after each epoch. + + Args: + trainer: Lightning trainer instance + pl_module: The Pipeline module being trained + """ + # Get energies from the buffer + if len(pl_module.buffer) > 0: + # Check recent energies added to buffer + for i in range(max(0, len(pl_module.buffer) - + pl_module.num_samples), len(pl_module.buffer)): + seq, energy = pl_module.buffer.buf[i] + if isinstance(energy, torch.Tensor): + energy = energy.item() + if energy < self.min_energy: + self.min_energy = energy + self.min_indices = seq + + self.min_energy_history.append(self.min_energy) + pl_module.log( + "best energy", + self.min_energy, + prog_bar=False, + on_epoch=True, + on_step=False) + for key, value in pl_module.benchmark_energy.items(): + pl_module.log( + f"best energy - {key}", + self.min_energy - value, + prog_bar=False, + on_epoch=True, + on_step=False) + + def get_results(self): + """Get the minimum energy and corresponding indices. + + Returns: + tuple: (min_energy, min_indices) + """ + return self.min_energy, self.min_indices + + +class TrajectoryCallback(Callback): + """Callback to save training trajectory data. + + Records loss, energies, and indices for each training step and saves + to a file at the end of training. + + Args: + trajectory_file_path: Path to save trajectory data + """ + + def __init__(self, trajectory_file_path): + super().__init__() + self.trajectory_file_path = trajectory_file_path + self.trajectory_data = [] + + def on_train_batch_end( + self, + trainer, + pl_module, + outputs, + batch, + batch_idx): + """Record trajectory data after each training batch. + + Args: + trainer: Lightning trainer instance + pl_module: The Pipeline module being trained + outputs: Training step outputs + batch: Current batch data + batch_idx: Index of current batch + """ + # Record the batch data + if outputs is not None and 'loss' in outputs: + loss = outputs['loss'] + if isinstance(loss, torch.Tensor): + loss = loss.item() + + # Get indices and energies from batch + indices = batch.get('idx', None) + energies = batch.get('energy', None) + + if indices is not None and energies is not None: + if isinstance(indices, torch.Tensor): + indices = indices.cpu().numpy().tolist() + if isinstance(energies, torch.Tensor): + energies = energies.cpu().numpy().tolist() + + self.trajectory_data.append({ + 'epoch': trainer.current_epoch, + 'batch_idx': batch_idx, + 'loss': loss, + 'indices': indices, + 'energies': energies + }) + + def on_train_end(self, trainer, pl_module): + """Save trajectory data to file at end of training. + + Args: + trainer: Lightning trainer instance + pl_module: The Pipeline module being trained + """ + import json + import os + + os.makedirs(os.path.dirname(self.trajectory_file_path), exist_ok=True) + if os.path.exists(self.trajectory_file_path): + print( + f"Warning: Overwriting existing trajectory file at {self.trajectory_file_path}") + + with open(self.trajectory_file_path, 'w') as f: + for data in self.trajectory_data: + f.write(json.dumps(data) + '\n') + + print(f"Trajectory data saved to {self.trajectory_file_path}") diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/data.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/data.py new file mode 100644 index 00000000..b4a43463 --- /dev/null +++ b/libs/solvers/python/cudaq_solvers/gqe_algorithm/data.py @@ -0,0 +1,44 @@ +from collections import deque +from torch.utils.data import Dataset +import sys +import pickle + + +class ReplayBuffer: + def __init__(self, size=sys.maxsize, capacity=1000000): + self.size = size + self.buf = deque(maxlen=capacity) + + def push(self, seq, energy): + self.buf.append((seq, energy)) + if len(self.buf) > self.size: + self.buf.popleft() + + def save(self, path): + with open(path, "wb") as f: + pickle.dump(self.buf, f) + + def load(self, path): + with open(path, "rb") as f: + self.buf = pickle.load(f) + + def __getitem__(self, idx): + seq, energy = self.buf[idx] + return {"idx": seq, "energy": energy} + + def __len__(self): + return len(self.buf) + + +class BufferDataset(Dataset): + def __init__(self, buffer: ReplayBuffer, repetition): + self.buffer = buffer + self.repetition = repetition + + def __getitem__(self, idx): + idx = idx % len(self.buffer) + item = self.buffer[idx] + return {"idx": item["idx"], "energy": item["energy"]} + + def __len__(self): + return len(self.buffer) * self.repetition diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/factory.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/factory.py new file mode 100644 index 00000000..83379234 --- /dev/null +++ b/libs/solvers/python/cudaq_solvers/gqe_algorithm/factory.py @@ -0,0 +1,38 @@ +from .loss import ExpLogitMatching, GFlowLogitMatching, GRPOLoss +from .scheduler import DefaultScheduler, CosineScheduler, VarBasedScheduler + +class Factory: + def create_loss_fn(self, cfg): + if cfg.loss == "exp": + return ExpLogitMatching(cfg.energy_offset) + elif cfg.loss == "grpo": + clip_ratio = getattr(cfg, 'clip_ratio', 0.2) + return GRPOLoss(clip_ratio=clip_ratio) + elif cfg.loss == "gflow": + return GFlowLogitMatching(cfg.energy_offset) + else: + raise ValueError(f"Invalid loss function: {cfg.loss}") + + def create_temperature_scheduler(self, cfg): + """Create temperature scheduler based on configuration. + + Args: + cfg: Configuration object with temperature parameters + + Returns: + TemperatureScheduler: Scheduler instance + """ + scheduler_type = getattr(cfg, 'scheduler', 'default') + + if scheduler_type == 'cosine': + minimum = getattr(cfg, 'temperature_min', cfg.temperature) + maximum = getattr(cfg, 'temperature_max', cfg.temperature + 1.0) + frequency = getattr(cfg, 'scheduler_frequency', 100) + return CosineScheduler(minimum, maximum, frequency) + elif scheduler_type == 'variance': + target_var = getattr(cfg, 'target_variance', 1e-5) + return VarBasedScheduler(cfg.temperature, cfg.del_temperature, target_var) + elif scheduler_type == 'default': + return DefaultScheduler(cfg.temperature, cfg.del_temperature) + else: + raise ValueError(f"Invalid scheduler type: {scheduler_type}") \ No newline at end of file diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/gqe.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/gqe.py index 5dc5402f..e2b680c1 100644 --- a/libs/solvers/python/cudaq_solvers/gqe_algorithm/gqe.py +++ b/libs/solvers/python/cudaq_solvers/gqe_algorithm/gqe.py @@ -6,91 +6,19 @@ # the terms of the Apache License 2.0 which accompanies this distribution. # # ============================================================================ # -from .transformer import Transformer +from .pipeline import Pipeline +from .model import GPT2 +from .factory import Factory import torch import lightning as L -from abc import ABC, abstractmethod -import math, json, sys, time, os +import json +import os from ml_collections import ConfigDict import cudaq torch.set_float32_matmul_precision('high') -class TemperatureScheduler(ABC): - """Abstract base class for temperature scheduling in GQE. - - Temperature scheduling controls how the temperature parameter changes during training, - which affects the exploration vs exploitation trade-off in operator selection. - """ - - @abstractmethod - def get(self, iter): - """Get temperature value for the given iteration. - - Args: - iter: Current iteration number - - Returns: - float: Temperature value for this iteration - """ - pass - - -class DefaultScheduler(TemperatureScheduler): - """Linear temperature scheduler that increases temperature by a fixed delta each iteration. - - Args: - start: Initial temperature value - delta: Amount to increase temperature each iteration - """ - - def __init__(self, start, delta) -> None: - self.start = start - self.delta = delta - - def get(self, iter): - """Get linearly increasing temperature value. - - Args: - iter: Current iteration number - - Returns: - float: start + iter * delta - """ - return self.start + iter * self.delta - - -class CosineScheduler(TemperatureScheduler): - """Cosine-based temperature scheduler that oscillates between min and max values. - - Useful for periodic exploration and exploitation phases during training. - - Args: - minimum: Minimum temperature value - maximum: Maximum temperature value - frequency: Number of iterations for one complete cycle - """ - - def __init__(self, minimum, maximum, frequency) -> None: - self.minimum = minimum - self.maximum = maximum - self.frequency = frequency - - def get(self, iter): - """Get temperature value following a cosine curve. - - Args: - iter: Current iteration number - - Returns: - float: Temperature value between minimum and maximum following cosine curve - """ - return (self.maximum + self.minimum) / 2 - ( - self.maximum - self.minimum) / 2 * math.cos( - 2 * math.pi * iter / self.frequency) - - class TrajectoryData: """Container for training trajectory data at a single iteration. @@ -163,8 +91,8 @@ def save(self, path): if os.path.exists(path): print(f"Warning: Overwriting existing trajectory file at {path}") with open(path, 'w') as f: - for l in self.lines: - f.write(f"{l}\n") + for line in self.lines: + f.write(f"{line}\n") def validate_config(cfg: ConfigDict): @@ -211,127 +139,155 @@ def get_default_config(): """Create a default configuration for GQE. Args: - num_samples (int): Number of circuits to generate during each epoch/batch. Default=5 + num_samples (int): Number of circuits to generate during each epoch/batch. Default=20 max_iters (int): Number of epochs to run. Default=100 ngates (int): Number of gates that make up each generated circuit. Default=20 seed (int): Random seed. Default=3047 lr (float): Learning rate used by the optimizer. Default=5e-7 - energy_offset (float): Offset added to expectation value of the circuit (Energy) for numerical + energy_offset (float): Offset added to expectation value of the circuit (Energy) for numerical stability, see `K. Nakaji et al. (2024) `_ Sec. 3. Default=0.0 grad_norm_clip (float): max_norm for clipping gradients, see `Lightning docs `_. Default=1.0 - temperature (float): Starting inverse temperature β as described in `K. Nakaji et al. (2024) `_ - Sec. 2.2. Default=5.0 - del_temperature (float): Temperature increase after each epoch. Default=0.05 - resid_pdrop (float): The dropout probability for all fully connected layers in the embeddings, + temperature (float): Starting inverse temperature β as described in `K. Nakaji et al. (2024) `_ + Sec. 2.2. Default=0.5 + del_temperature (float): Temperature increase after each epoch. Default=0.02 + resid_pdrop (float): The dropout probability for all fully connected layers in the embeddings, encoder, and pooler, see `GPT2Config `_. Default=0.0 embd_pdrop (float): The dropout ratio for the embeddings, see `GPT2Config `_. Default=0.0 attn_pdrop (float): The dropout ratio for the attention, see `GPT2Config `_. Default=0.0 - small (bool): Uses a small transformer (6 hidden layers and 6 attention heads as opposed to + small (bool): Uses a small transformer (6 hidden layers and 6 attention heads as opposed to the default transformer of 12 of each). Default=False - use_fabric_logging (bool): Whether to enable fabric logging. Default=False - fabric_logger (object): Fabric logger to use for logging. If None, no logging will be done. Default=None + use_lightning_logging (bool): Whether to enable lightning logging. Default=False + lightning_logger (object): Lightning logger to use for logging. If None, no logging will be done. Default=None save_trajectory (bool): Whether to save the trajectory data to a file. Default=False trajectory_file_path (str): Path to save the trajectory data file. Default="gqe_logs/gqe_trajectory.json" - verbose (bool): Enable verbose output to the console. Output includes the epoch, loss, + verbose (bool): Enable verbose output to the console. Output includes the epoch, loss, model.train_step time, and minimum energy. Default=False + loss (str): Loss function to use. Default="grpo" + scheduler (str): Learning rate scheduler to use. Default="default" + benchmark_energy (dict): Reference energy of the molecule for comparison. Default={} + enable_checkpointing (bool): Whether to enable model checkpointing. Default=False + buffer_size (int): Size of replay buffer for storing trajectories. Default=20 + warmup_size (int): Initial buffer warmup size before training starts. Default=20 + step_per_epoch (int): Number of training steps per epoch. Default=10 + batch_size (int): Batch size for training. Default=20 + trainer_kwargs (dict): Additional keyword arguments to pass to Lightning Trainer. Default={} + callbacks (list): Additional callbacks to pass to Lightning Trainer. Note that MinEnergyCallback + is always included automatically. Default=[] Returns: ConfigDict: Default configuration for GQE """ cfg = ConfigDict() - cfg.num_samples = 5 # akin to batch size + cfg.num_samples = 20 # akin to batch size cfg.max_iters = 100 cfg.ngates = 20 cfg.seed = 3047 cfg.lr = 5e-7 cfg.energy_offset = 0.0 cfg.grad_norm_clip = 1.0 - cfg.temperature = 5.0 - cfg.del_temperature = 0.05 + cfg.temperature = 0.5 + cfg.del_temperature = 0.02 cfg.resid_pdrop = 0.0 cfg.embd_pdrop = 0.0 cfg.attn_pdrop = 0.0 cfg.small = False - cfg.use_fabric_logging = False # Whether to enable fabric logging - cfg.fabric_logger = None # Fabric logger + cfg.use_lightning_logging = False # Whether to enable lightning logging + cfg.lightning_logger = None # Lightning logger cfg.save_trajectory = False # Whether to save trajectory data cfg.trajectory_file_path = "gqe_logs/gqe_trajectory.json" # Path to save trajectory data cfg.verbose = False + cfg.loss = "grpo" + cfg.scheduler = "default" + cfg.benchmark_energy = {} # Reference energy of the molecule + # Replay buffer parameters + cfg.buffer_size = 20 # Size of replay buffer + cfg.warmup_size = 20 # Initial buffer warmup size + cfg.step_per_epoch = 10 # Steps per training epoch + cfg.batch_size = 20 # Batch size for training + cfg.trainer_kwargs = {} # Additional kwargs to pass to Lightning Trainer + cfg.callbacks = [] # Additional callbacks to pass to Lightning Trainer return cfg -def __internal_run_gqe(temperature_scheduler: TemperatureScheduler, - cfg: ConfigDict, model, pool, optimizer): - """Internal implementation of the GQE training loop. +def __internal_run_gqe(cfg: ConfigDict, pipeline, pool): + """Internal implementation of the GQE training loop using Lightning Trainer. Args: - temperature_scheduler: Optional scheduler for temperature parameter cfg: Configuration object - model: The transformer model to train + pipeline: The Pipeline module containing model and training logic pool: Pool of quantum operators to select from - optimizer: Optimizer for model parameters Returns: tuple: (minimum energy found, corresponding operator indices) """ - # Configure Fabric with optional logging - fabric_kwargs = {"accelerator": "auto", "devices": 1} - if cfg.use_fabric_logging: - if cfg.fabric_logger is None: + from .callbacks import MinEnergyCallback, TrajectoryCallback + + # Configure trainer kwargs + trainer_kwargs = { + "accelerator": "auto", + "devices": 1, + "max_epochs": cfg.max_iters, + "gradient_clip_val": cfg.grad_norm_clip, + "enable_progress_bar": cfg.verbose, + "enable_model_summary": cfg.verbose, + "enable_checkpointing": False, + "reload_dataloaders_every_n_epochs": 1, + "log_every_n_steps": 1, + "num_sanity_val_steps": 0, # Disable validation sanity checks + } + + # Merge user-provided trainer kwargs + if hasattr(cfg, 'trainer_kwargs') and cfg.trainer_kwargs: + trainer_kwargs.update(cfg.trainer_kwargs) + + # Set up logging + if cfg.use_lightning_logging: + if cfg.lightning_logger is None: raise ValueError( - "Fabric Logger is not set. Please set it in the config by providing a logger to `cfg.fabric_logger`." + "Lightning Logger is not set. Please set it in the config by providing a logger to `cfg.lightning_logger`." ) - fabric_kwargs["loggers"] = [cfg.fabric_logger] - else: - fabric_kwargs["loggers"] = False - - fabric = L.Fabric(**fabric_kwargs) - fabric.seed_everything(cfg.seed) - fabric.launch() - if cfg.save_trajectory: - monitor = FileMonitor() + trainer_kwargs["logger"] = cfg.lightning_logger else: - monitor = None - model, optimizer = fabric.setup(model, optimizer) - model.mark_forward_method('train_step') - pytorch_total_params = sum( - p.numel() for p in model.parameters() if p.requires_grad) - if cfg.verbose: - print(f"total trainable params: {pytorch_total_params / 1e6:.2f}M") - min_energy = sys.maxsize - min_indices = None - for epoch in range(cfg.max_iters): - optimizer.zero_grad() - start = time.time() - loss, energies, indices, log_values = model.train_step(pool) - if cfg.verbose: - print('epoch', epoch, 'loss', loss, 'model.train_step time:', - time.time() - start, torch.min(energies)) - if monitor is not None: - monitor.record(epoch, loss, energies, indices) - for e, indices in zip(energies, indices): - energy = e.item() - if energy < min_energy: - min_energy = e.item() - min_indices = indices - if cfg.use_fabric_logging: - log_values[f"min_energy at"] = min_energy - log_values[f"temperature at"] = model.temperature - log_values[f"loss at"] = loss - fabric.log_dict(log_values, step=epoch) - fabric.backward(loss) - fabric.clip_gradients(model, optimizer, max_norm=cfg.grad_norm_clip) - optimizer.step() - if temperature_scheduler is not None: - model.temperature = temperature_scheduler.get(epoch) - else: - model.temperature += cfg.del_temperature - model.set_cost(None) - min_indices = min_indices.cpu().numpy().tolist() - if cfg.use_fabric_logging: - fabric.log('circuit', json.dumps(min_indices)) + trainer_kwargs["logger"] = False + + # Set up callbacks + callbacks = [] + + # MinEnergyCallback is required for returning results + min_energy_callback = MinEnergyCallback() + callbacks.append(min_energy_callback) + + # Add trajectory callback if requested if cfg.save_trajectory: - monitor.save(cfg.trajectory_file_path) + trajectory_callback = TrajectoryCallback(cfg.trajectory_file_path) + callbacks.append(trajectory_callback) + + # Add user-provided callbacks + if hasattr(cfg, 'callbacks') and cfg.callbacks: + callbacks.extend(cfg.callbacks) + + trainer_kwargs["callbacks"] = callbacks + + # Create trainer + trainer = L.Trainer(**trainer_kwargs) + + # Train the model + trainer.fit(pipeline) + + # Get results from callback + min_energy, min_indices = min_energy_callback.get_results() + + # Convert indices to list if needed + if min_indices is not None and isinstance(min_indices, torch.Tensor): + min_indices = min_indices.cpu().numpy().tolist() + + # Log final circuit if logging is enabled + if cfg.use_lightning_logging and min_indices is not None: + trainer.logger.log_metrics({'circuit': json.dumps(min_indices)}) + + # Clean up + pipeline.set_cost(None) + return min_energy, min_indices @@ -351,7 +307,6 @@ def gqe(cost, pool, config=None, **kwargs): special arguments are supported: - model: Can pass in an already constructed transformer - - optimizer: Can pass in an already constructed optimizer Additionally, any default config parameter can be overridden via kwargs if no config object is provided, for example: @@ -364,7 +319,7 @@ def gqe(cost, pool, config=None, **kwargs): """ cfg = get_default_config() - if config == None: + if config is None: [ setattr(cfg, a, kwargs[a]) for a in dir(cfg) @@ -379,10 +334,7 @@ def gqe(cost, pool, config=None, **kwargs): cfg.vocab_size = len(pool) cudaqTarget = cudaq.get_target() numQPUs = cudaqTarget.num_qpus() - model = Transformer( - cfg, cost, loss='exp', - numQPUs=numQPUs) if 'model' not in kwargs else kwargs['model'] - optimizer = torch.optim.AdamW( - model.parameters(), - lr=cfg.lr) if 'optimizer' not in kwargs else kwargs['optimizer'] - return __internal_run_gqe(None, cfg, model, pool, optimizer) + factory = Factory() + model = GPT2(cfg.small, cfg.vocab_size) if 'model' not in kwargs else kwargs['model'] + pipeline = Pipeline(cfg, cost, pool, model, factory, numQPUs=numQPUs) + return __internal_run_gqe(cfg, pipeline, pool) diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/loss.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/loss.py index 798f3b37..4fe1d4fa 100644 --- a/libs/solvers/python/cudaq_solvers/gqe_algorithm/loss.py +++ b/libs/solvers/python/cudaq_solvers/gqe_algorithm/loss.py @@ -8,45 +8,59 @@ from abc import abstractmethod, ABC import torch +import torch.nn.functional as F -class LogitMatchingLoss(ABC): +class Loss(ABC, torch.nn.Module): """Abstract base class for logit-energy matching loss functions. - + Loss functions in GQE compare the model's logits (predictions) with computed energies to guide the model toward selecting operator sequences that minimize energy. """ @abstractmethod - def compute(self, energies, logits_tensor, log_values): + def compute( + self, + energies, + gate_logits, + gate_indices, + log_values, + **kwargs): pass -class ExpLogitMatching(LogitMatchingLoss): +class ExpLogitMatching(Loss): """Simple exponential matching between logits and energies. - + Computes loss by comparing exponential of negative logits with exponential of negative energies from circuit evaluation. The energies represent the expectation values of the Hamiltonian problem operator obtained from quantum circuit execution during GPT training. - + Args: energy_offset: Offset added to expectation values of the circuit (Energy) - for numerical stability during training. + for numerical stability during training. label: Label for logging purposes """ - def __init__(self, energy_offset, label) -> None: - self._label = label + def __init__(self, energy_offset) -> None: + super().__init__() self.energy_offset = energy_offset self.loss_fn = torch.nn.MSELoss() - def compute(self, energies, logits_tensor, log_values): + def compute( + self, + energies, + gate_logits, + gate_indices, + log_values, + **kwargs): + logits_tensor = torch.gather( + gate_logits, 2, gate_indices.unsqueeze(-1)).squeeze(-1) mean_logits = torch.mean(logits_tensor, 1) - log_values[f"mean_logits at {self._label}"] = torch.mean( + log_values["mean_logits"] = torch.mean( mean_logits - self.energy_offset) - log_values[f"mean energy at {self._label}"] = torch.mean(energies) mean_logits = torch.mean(logits_tensor, 1) device = mean_logits.device return self.loss_fn( @@ -54,13 +68,13 @@ def compute(self, energies, logits_tensor, log_values): torch.exp(-energies.to(device) - self.energy_offset)) -class GFlowLogitMatching(LogitMatchingLoss): +class GFlowLogitMatching(Loss): """Advanced logit-energy matching with learnable offset. - + Similar to ExpLogitMatching but learns an additional energy offset parameter during training, allowing for better adaptation to the energy scale. - + Args: energy_offset: Initial energy offset device: Device to place tensors on @@ -68,24 +82,88 @@ class GFlowLogitMatching(LogitMatchingLoss): nn: Neural network module to register the offset parameter with """ - def __init__(self, energy_offset, device, label, nn: torch.nn) -> None: - self._label = label + def __init__(self, energy_offset) -> None: + super().__init__() self.loss_fn = torch.nn.MSELoss() self.energy_offset = energy_offset self.normalization = 10**-5 - self.param = torch.nn.Parameter(torch.tensor([0.0]).to(device)) - nn.register_parameter(name="energy_offset", param=self.param) + self.param = torch.nn.Parameter(torch.tensor([0.0])) - def compute(self, energies, logits_tensor, log_values): + def compute( + self, + energies, + gate_logits, + gate_indices, + log_values, + **kwargs): + logits_tensor = torch.gather( + gate_logits, 2, gate_indices.unsqueeze(-1)).squeeze(-1) mean_logits = torch.mean(logits_tensor, 1) energy_offset = self.energy_offset + self.param / self.normalization - log_values[f"energy_offset at {self._label}"] = energy_offset - log_values[f"mean_logits at {self._label}"] = torch.mean(mean_logits - - energy_offset) - log_values[f"mean energy at {self._label}"] = torch.mean(energies) + log_values["energy_offset"] = energy_offset + log_values["mean_logits"] = torch.mean(mean_logits - energy_offset) mean_logits = torch.mean(logits_tensor, 1) device = mean_logits.device loss = self.loss_fn( torch.exp(-mean_logits), torch.exp(-(energies.to(device) + energy_offset.to(device)))) - return loss \ No newline at end of file + return loss + + +class GRPOLoss(Loss): + """Generalized-RPO / clipped-PPO variant used in the original code.""" + + def __init__(self, clip_ratio: float = 0.2): + super().__init__() + self.clip_ratio = clip_ratio + self.old_log_probs = None + self.advantages = None + + def compute( + self, + energies, + gate_logits, + gate_indices, + log_values=None, + **kwargs): + current_log_probs = self.log_prob( + gate_indices, gate_logits, kwargs["inverse_temperature"] + ) + + # nagative log likelihood loss + win_id = torch.argmin(energies) + log_prob_sum_win = torch.mean(current_log_probs[win_id]) + loss = -log_prob_sum_win + + # If all the generated circuits are identical, we use the inverse log + # probability as the loss. + if torch.std(energies) == 0: + return loss + + # use the log probability from the first epoch as the reference. + if kwargs["current_step"] == 0: + self.old_log_probs = current_log_probs.detach() + self.advantages = self.calc_advantage(energies) + clipped_ratio = 1 + else: + ratio = torch.exp(current_log_probs - self.old_log_probs) + clipped_ratio = torch.clamp( + ratio, 1. - self.clip_ratio, 1. + self.clip_ratio) + + loss -= (clipped_ratio * self.advantages.unsqueeze(1)).mean() + return loss + + def calc_advantage(self, energies): + return (energies.mean() - energies) / (energies.std() + 1e-8) + + def log_prob( + self, + gate_seqs, + gate_logits, + inverse_temperature): + log_probs = torch.gather( + F.log_softmax(-inverse_temperature * gate_logits, dim=-1), + 2, + gate_seqs.unsqueeze(-1) + ).squeeze(-1) + return log_probs diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/model.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/model.py new file mode 100644 index 00000000..be13d331 --- /dev/null +++ b/libs/solvers/python/cudaq_solvers/gqe_algorithm/model.py @@ -0,0 +1,13 @@ +from transformers import GPT2LMHeadModel, GPT2Config + +class SmallConfig(GPT2Config): + def __init__(self, **kwargs): + super().__init__(n_layer=6, n_head=6, **kwargs) + + +class GPT2(GPT2LMHeadModel): + def __init__(self, small, vocab_size): + gpt2cfg = GPT2Config(vocab_size=vocab_size) + if small: + gpt2cfg = SmallConfig(vocab_size=vocab_size) + super().__init__(gpt2cfg) \ No newline at end of file diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/pipeline.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/pipeline.py new file mode 100644 index 00000000..da38efb0 --- /dev/null +++ b/libs/solvers/python/cudaq_solvers/gqe_algorithm/pipeline.py @@ -0,0 +1,232 @@ +# ============================================================================ # +# Copyright (c) 2025 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # + +import torch +import cudaq +import lightning as L +from mpi4py import MPI +from torch.nn import functional as F +from lightning import LightningModule +from .data import ReplayBuffer, BufferDataset +from torch.utils.data import DataLoader +from torch.distributions import Categorical + + +class Pipeline(LightningModule): + """GPT2-based transformer model for quantum operator selection. + + This model learns to select quantum operators from a pool to minimize + a given cost function. It can be configured to use either a full-size + or reduced-size architecture. + + Args: + cfg: Configuration object containing model parameters + cost: Cost function to evaluate operator sequences + loss: Loss function type ('exp' or 'gflow') + numQPUs: Number of QPUs available for cost evaluation + """ + + def __init__(self, cfg, cost, pool, model, factory, numQPUs=1): + super().__init__() + + # Set seed for reproducibility + L.seed_everything(cfg.seed) + + self.numQPUs = numQPUs + self.cfg = cfg + self.model = model.to(self.device) + self.factory = factory + self.pool = pool + self.benchmark_energy = cfg.benchmark_energy + self._cost = cost + self.loss = self.factory.create_loss_fn(cfg).to(self.device) + self.scheduler = self.factory.create_temperature_scheduler(self.cfg) + self.ngates = cfg.ngates + self.num_samples = cfg.num_samples + self.buffer = ReplayBuffer(size=cfg.buffer_size) + self.save_hyperparameters(ignore=['cost', 'pool', 'model', 'factory']) + self._starting_idx = torch.zeros(self.num_samples, + 1, + dtype=torch.long, + device=self.device) + + def configure_optimizers(self): + """Configure optimizer for training. + + Returns: + torch.optim.Optimizer: Configured optimizer + """ + return torch.optim.AdamW(self.model.parameters(), lr=self.cfg.lr) + + def on_fit_start(self): + # Recreate _starting_idx with correct device after model setup + self._starting_idx = torch.zeros(self.num_samples, + 1, + dtype=torch.long, + device=self.device) + while len(self.buffer) < self.cfg.warmup_size: + self.collect_rollout() + super().on_fit_start() + + def on_train_epoch_start(self): + self.collect_rollout() + + def collect_rollout(self): + idx_output = self.generate() + energies = self.computeCost(idx_output[:, 1:], self.pool) + for seq, energy in zip(idx_output, energies): + self.buffer.push(seq, energy) + self.scheduler.update(energies=energies) + + def set_cost(self, cost): + """Set the cost function used to evaluate operator sequences. + + Args: + cost: New cost function to use + """ + self._cost = cost + + @torch.no_grad() + def computeCost(self, idx_output, pool, **kwargs): + """Compute cost for given operator sequences. + + Supports distributed computation using MPI if available. + + Args: + idx_output: Indices of selected operators + pool: Pool of quantum operators + **kwargs: Additional arguments passed to cost function + + Returns: + torch.Tensor: Computed costs for each sequence + + Raises: + RuntimeError: If cost function returns invalid type + """ + res = [] + if cudaq.mpi.is_initialized(): + rank = cudaq.mpi.rank() + numRanks = cudaq.mpi.num_ranks() + total_elements = len(idx_output) + elements_per_rank = total_elements // numRanks + remainder = total_elements % numRanks + start = rank * elements_per_rank + min(rank, remainder) + end = start + elements_per_rank + (1 if rank < remainder else 0) + # This MPI rank owns rows[start:end] + res = [ + self._cost([pool[j] + for j in row], qpu_id=i % self.numQPUs) + for i, row in enumerate(idx_output[start:end]) + ] + else: + res = [ + self._cost([pool[j] + for j in row], qpu_id=i % self.numQPUs) + for i, row in enumerate(idx_output) + ] + + if isinstance(res[0], tuple) and len(res[0]) == 2: + res = [ + getScalarFromHandleFunctor(handle) + for (handle, getScalarFromHandleFunctor) in res + ] + + if not isinstance(res[0], float): + raise RuntimeError( + 'Invalid return type detected from user cost function.') + + # Need to perform MPI all gather here + if cudaq.mpi.is_initialized(): + res = MPI.COMM_WORLD.allgather(res) + res = [x for xs in res for x in xs] + + return torch.tensor(res, dtype=torch.float) + + def training_step(self, batch, batch_idx): + """Perform one training step. + + Lightning calls this method during training with batches from train_dataloader. + + Args: + batch: Dictionary containing 'idx' (sequences) and 'energy' (energy values) + batch_idx: Index of current batch + + Returns: + torch.Tensor: Loss value for this batch + """ + # Move batch data to device + idx = batch["idx"].to(self.device) + energies = batch["energy"].to(self.device) + + log_values = {} + logits = self.model(idx).logits + loss = self.loss.compute(energies, + logits, + idx[:, 1:], + log_values, + inverse_temperature=self.scheduler.get_inverse_temperature(), + current_step=batch_idx) + + # Log metrics + self.log_dict( + log_values, + prog_bar=False, + on_step=False, + on_epoch=True) + self.log("loss", loss, prog_bar=True, on_step=False, on_epoch=True) + self.log( + "energy_mean", + energies.mean(), + prog_bar=False, + on_epoch=True, on_step=False) + self.log( + "energy_min", + energies.min(), + prog_bar=False, + on_epoch=True, + on_step=False) + self.log( + "inverse_temperature", + self.scheduler.get_inverse_temperature(), + prog_bar=True, + on_epoch=True, on_step=False) + + return loss + + def train_dataloader(self): + return DataLoader( + BufferDataset(self.buffer, self.cfg.step_per_epoch), + batch_size=self.cfg.batch_size, + shuffle=False, + num_workers=0, # Avoid multiprocessing to prevent pickling issues with CUDA-Q objects + ) + + def generate(self, idx=None, ngates=None): + """ + Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete + the sequence max_new_tokens times, feeding the predictions back into the model each time. + """ + if idx is None: + idx = self._starting_idx.clone() + if ngates is None: + ngates = self.ngates + current_temp = self.scheduler.get_inverse_temperature() + for _ in range(ngates): + idx_cond = idx + logits_base = self.model(idx_cond) + logits = logits_base.logits[:, -1, :] + probs = Categorical(logits=-current_temp * logits) + idx_next = probs.sample() + idx = torch.cat((idx, idx_next.unsqueeze(1)), dim=1) + return idx + + def logits(self, idx): + logits_base = self.model(idx) + idx = idx[:, 1:] + return torch.gather(logits_base.logits, 2, + idx.unsqueeze(-1)).squeeze(-1) \ No newline at end of file diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/scheduler.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/scheduler.py new file mode 100644 index 00000000..60037c38 --- /dev/null +++ b/libs/solvers/python/cudaq_solvers/gqe_algorithm/scheduler.py @@ -0,0 +1,154 @@ +# ============================================================================ # +# Copyright (c) 2025 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # + +from abc import ABC, abstractmethod +import math + + +class TemperatureScheduler(ABC): + """Abstract base class for temperature scheduling in GQE. + + Temperature scheduling controls how the temperature parameter changes during training, + which affects the exploration vs exploitation trade-off in operator selection. + """ + + @abstractmethod + def get_inverse_temperature(self): + """Get current inverse temperature value. + + Returns: + float: Current inverse temperature (beta) + """ + pass + + @abstractmethod + def update(self, **kwargs): + """Update scheduler state. + + This can be used to adjust temperature dynamically based on training progress. + + Args: + **kwargs: Optional keyword arguments (e.g., energies, loss, iteration) + """ + pass + + +class DefaultScheduler(TemperatureScheduler): + """Linear temperature scheduler that increases temperature by a fixed delta each iteration. + + Args: + start: Initial temperature value + delta: Amount to increase temperature each iteration + """ + + def __init__(self, start, delta) -> None: + self.start = start + self.delta = delta + self.current_temperature = start + + def get_inverse_temperature(self): + """Get current inverse temperature value. + + Returns: + float: Current inverse temperature (beta) + """ + return self.current_temperature + + def update(self, **kwargs): + """Update temperature by incrementing by delta. + + Args: + **kwargs: Unused, but accepts any keyword arguments for interface compatibility + """ + self.current_temperature += self.delta + + +class CosineScheduler(TemperatureScheduler): + """Cosine-based temperature scheduler that oscillates between min and max values. + + Useful for periodic exploration and exploitation phases during training. + + Args: + minimum: Minimum temperature value + maximum: Maximum temperature value + frequency: Number of iterations for one complete cycle + """ + + def __init__(self, minimum, maximum, frequency) -> None: + self.minimum = minimum + self.maximum = maximum + self.frequency = frequency + self.current_iter = 0 + self.current_temperature = (maximum + minimum) / 2 + + def get_inverse_temperature(self): + """Get current inverse temperature value. + + Returns: + float: Current inverse temperature (beta) + """ + return self.current_temperature + + def update(self, **kwargs): + """Update temperature following cosine schedule. + + Args: + **kwargs: Unused, but accepts any keyword arguments for interface compatibility + """ + self.current_iter += 1 + self.current_temperature = (self.maximum + self.minimum) / 2 - ( + self.maximum - self.minimum) / 2 * math.cos( + 2 * math.pi * self.current_iter / self.frequency) + + +class VarBasedScheduler(TemperatureScheduler): + """Variance-based adaptive temperature scheduler. + + Adjusts temperature based on the variance of energies in the current batch. + When variance is high (diverse energies), increases temperature to encourage exploration. + When variance is low (converged energies), decreases temperature for exploitation. + + Args: + initial: Initial temperature value + delta: Amount to adjust temperature each iteration + target_var: Target variance threshold for determining adjustment direction + """ + + def __init__(self, initial, delta, target_var) -> None: + self.delta = delta + self.current_temperature = initial + self.target_var = target_var + + def get_inverse_temperature(self): + """Get current inverse temperature value. + + Returns: + float: Current inverse temperature (beta) + """ + return self.current_temperature + + def update(self, **kwargs): + """Update temperature based on energy variance. + + If current variance exceeds target, increases temperature (more exploration). + Otherwise, decreases temperature (more exploitation). + + Args: + **kwargs: Must contain 'energies' key with a tensor of energy values + """ + energies = kwargs["energies"] + current_var = energies.var().item() + + # Adjust temperature based on variance + if current_var > self.target_var: + self.current_temperature += self.delta # Increase inverse temperature (decrease T) + else: + self.current_temperature -= self.delta # Decrease inverse temperature (increase T) + + # Keep temperature positive + self.current_temperature = max(self.current_temperature, 0.01) diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/transformer.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/transformer.py deleted file mode 100644 index 4367c3db..00000000 --- a/libs/solvers/python/cudaq_solvers/gqe_algorithm/transformer.py +++ /dev/null @@ -1,232 +0,0 @@ -# ============================================================================ # -# Copyright (c) 2025 NVIDIA Corporation & Affiliates. # -# All rights reserved. # -# # -# This source code and the accompanying materials are made available under # -# the terms of the Apache License 2.0 which accompanies this distribution. # -# ============================================================================ # - -import torch, cudaq -from mpi4py import MPI -from torch.nn import functional as F -from transformers import GPT2LMHeadModel, GPT2Config -from lightning import LightningModule -from .loss import ExpLogitMatching, GFlowLogitMatching - - -def get_device(): - """Determine the appropriate device for tensor operations. - - Returns: - str: 'cuda' if GPU available, 'mps' for Apple Silicon, 'cpu' otherwise - """ - if torch.cuda.is_available(): - return 'cuda' - elif torch.backends.mps.is_available(): - return 'mps' - return 'cpu' - - -class SmallConfig(GPT2Config): - """Reduced-size configuration for GPT2 model. - - Uses fewer layers (6) and attention heads (6) than the default GPT2 - configuration, resulting in a smaller model that trains faster. - - Args: - **kwargs: Additional GPT2 configuration parameters - """ - - def __init__(self, **kwargs): - super().__init__(n_layer=6, n_head=6, **kwargs) - - -class Transformer(LightningModule): - """GPT2-based transformer model for quantum operator selection. - - This model learns to select quantum operators from a pool to minimize - a given cost function. It can be configured to use either a full-size - or reduced-size architecture. - - Args: - cfg: Configuration object containing model parameters - cost: Cost function to evaluate operator sequences - loss: Loss function type ('exp' or 'gflow') - numQPUs: Number of QPUs available for cost evaluation - """ - - def __init__(self, cfg, cost, loss="exp", numQPUs=1): - super().__init__() - self._label = 'label_stand_in' - self.numQPUs = numQPUs - self.cfg = cfg - gpt2cfg = GPT2Config( - **{k: cfg[k] for k in GPT2Config().to_dict().keys() & cfg.keys()}) - if cfg.small: - gpt2cfg = SmallConfig( - ** - {k: cfg[k] for k in GPT2Config().to_dict().keys() & cfg.keys()}) - self.transformer = GPT2LMHeadModel(gpt2cfg).to(get_device()) - self.ngates = cfg.ngates - self.num_samples = cfg.num_samples - self.temperature = cfg.temperature - self.save_hyperparameters() - self._starting_idx = torch.zeros(self.num_samples, - 1, - dtype=torch.int, - device=get_device()) - if loss == "exp": - self.loss = ExpLogitMatching(cfg.energy_offset, self._label) - else: - self.loss = GFlowLogitMatching(cfg.energy_offset, get_device(), - self._label, self) - self._cost = cost - - def generate_logits(self, idx): - """Generate logits for the next token given input indices. - - Args: - idx: Input token indices - - Returns: - torch.Tensor: Logits for next token prediction - """ - logits = self.transformer(idx)[0] - return logits - - def set_cost(self, cost): - """Set the cost function used to evaluate operator sequences. - - Args: - cost: New cost function to use - """ - self._cost = cost - - def gather(self, idx, logits_base): - """Gather logits for specific indices from base logits. - - Args: - idx: Indices to gather logits for - logits_base: Base logits to gather from - - Returns: - torch.Tensor: Gathered logits - """ - b_size = idx.shape[0] - return torch.gather(logits_base, 2, idx.reshape(b_size, -1, - 1)).reshape(b_size, -1) - - @torch.no_grad() - def computeCost(self, idx_output, pool, **kwargs): - """Compute cost for given operator sequences. - - Supports distributed computation using MPI if available. - - Args: - idx_output: Indices of selected operators - pool: Pool of quantum operators - **kwargs: Additional arguments passed to cost function - - Returns: - torch.Tensor: Computed costs for each sequence - - Raises: - RuntimeError: If cost function returns invalid type - """ - res = [] - if cudaq.mpi.is_initialized(): - rank = cudaq.mpi.rank() - numRanks = cudaq.mpi.num_ranks() - total_elements = len(idx_output) - elements_per_rank = total_elements // numRanks - remainder = total_elements % numRanks - start = rank * elements_per_rank + min(rank, remainder) - end = start + elements_per_rank + (1 if rank < remainder else 0) - # This MPI rank owns rows[start:end] - res = [ - self._cost([pool[j] - for j in row], qpu_id=i % self.numQPUs) - for i, row in enumerate(idx_output[start:end]) - ] - else: - res = [ - self._cost([pool[j] - for j in row], qpu_id=i % self.numQPUs) - for i, row in enumerate(idx_output) - ] - - if isinstance(res[0], tuple) and len(res[0]) == 2: - res = [ - getScalarFromHandleFunctor(handle) - for (handle, getScalarFromHandleFunctor) in res - ] - - if not isinstance(res[0], float): - raise RuntimeError( - 'Invalid return type detected from user cost function.') - - # Need to perform MPI all gather here - if cudaq.mpi.is_initialized(): - res = MPI.COMM_WORLD.allgather(res) - res = [x for xs in res for x in xs] - - return torch.tensor(res, dtype=torch.float) - - def train_step(self, - pool, - indices=None, - energies=None, - numQPUs=None, - comm=None): - """Perform one training step. - - Either generates new sequences and computes their costs, - or uses provided sequences and energies for training. - - Args: - pool: Pool of quantum operators - indices: Optional pre-computed operator indices - energies: Optional pre-computed energies - numQPUs: Optional number of QPUs to use - comm: Optional MPI communicator - - Returns: - tuple: (loss, energies, indices, log_values) - """ - log_values = {} - if energies is not None: - assert indices is not None - idx_output = indices[:, 1:] - logits_base = self.generate_logits(idx_output) - else: - idx_output, logits_base = self.generate() - energies = self.computeCost(idx_output, - pool, - numQPUs=numQPUs, - comm=comm) - logits_tensor = self.gather(idx_output, logits_base) - allLogits = logits_tensor - - loss = self.loss.compute(energies, allLogits, log_values) - log_values[f"loss at {self._label}"] = loss - return loss, energies, idx_output, log_values - - def generate(self, idx=None, ngates=None): - """ - Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete - the sequence max_new_tokens times, feeding the predictions back into the model each time. - """ - if idx is None: - idx = self._starting_idx.clone() - condition_length = idx.size(dim=1) - if ngates is None: - ngates = self.ngates - for _ in range(ngates): - idx_cond = idx - logits_base = self.generate_logits(idx_cond) - logits = logits_base[:, -1, :] - probs = F.softmax(-self.temperature * logits, dim=-1) - idx_next = torch.multinomial(probs, num_samples=1) - idx = torch.cat((idx, idx_next), dim=1) - idx = idx[:, condition_length:] - return idx, logits_base diff --git a/libs/solvers/python/cudaq_solvers/gqe_algorithm/utils.py b/libs/solvers/python/cudaq_solvers/gqe_algorithm/utils.py new file mode 100644 index 00000000..8a3dbe70 --- /dev/null +++ b/libs/solvers/python/cudaq_solvers/gqe_algorithm/utils.py @@ -0,0 +1,90 @@ +import cudaq +import cudaq_solvers as solvers +from cudaq import spin +from typing import List + + +def get_identity(n_qubits: int) -> cudaq.SpinOperator: + """ + Generate identity operator for n qubits. + + Args: + n_qubits: Number of qubits. + + Returns: + Identity SpinOperator (I ⊗ I ⊗ ... ⊗ I). + """ + In = cudaq.spin.i(0) + for q in range(1, n_qubits): + In = In * cudaq.spin.i(q) + return 1.0 * cudaq.SpinOperator(In) + + +def get_gqe_pauli_pool(num_qubits: int, + num_electrons: int, + params: List[float]) -> List[cudaq.SpinOperator]: + """ + Generate a GQE operator pool based on individual UCCSD Pauli terms with parameter scaling. + + This function creates a pool by: + 1. Getting UCCSD operators + 2. Extracting Pauli string patterns from each term (ignoring original coefficients) + 3. Creating a separate SpinOperator for each Pauli string pattern + 4. Scaling each pattern by different parameter values + + Args: + num_qubits: Total number of qubits in the system. + num_electrons: Number of electrons in the system. + params: List of parameter coefficients for scaling operators. + + Returns: + List of cudaq.SpinOperator objects, each representing a single + parameterized Pauli string. + + Example: + >>> params = [0.01, -0.01, 0.05, -0.05, 0.1, -0.1] + >>> pool = get_gqe_pauli_pool(num_qubits=4, num_electrons=2, params=params) + """ + # Get base UCCSD operators + uccsd_operators = solvers.get_operator_pool( + "uccsd", num_qubits=num_qubits, num_electrons=num_electrons) + + # Start with identity operator + pool = [] + pool.append(get_identity(num_qubits)) + + # Extract individual Pauli string patterns (ignoring coefficients) + individual_terms = [] + for op in uccsd_operators: + for term in op: # Iterate over terms in the SpinOperator + # Get the Pauli word pattern and reconstruct as SpinOperator + pauli_word = term.get_pauli_word(num_qubits) + + # Build spin operator from pauli_word string + pauli_op = None + for qubit_idx, pauli_char in enumerate(pauli_word): + if pauli_char == 'I': + gate = spin.i(qubit_idx) + elif pauli_char == 'X': + gate = spin.x(qubit_idx) + elif pauli_char == 'Y': + gate = spin.y(qubit_idx) + elif pauli_char == 'Z': + gate = spin.z(qubit_idx) + else: + continue + + if pauli_op is None: + pauli_op = gate + else: + pauli_op = pauli_op * gate + + if pauli_op is not None: + individual_terms.append(cudaq.SpinOperator(pauli_op)) + + # Add parameterized individual terms to pool + for term_op in individual_terms: + for param in params: + pool.append(param * term_op) + + return pool diff --git a/libs/solvers/python/tests/test_gqe.py b/libs/solvers/python/tests/test_gqe.py index 95d40f5f..d2a3a7fa 100644 --- a/libs/solvers/python/tests/test_gqe.py +++ b/libs/solvers/python/tests/test_gqe.py @@ -10,7 +10,9 @@ import pytest from cudaq import spin import cudaq -from cudaq_solvers.gqe_algorithm.gqe import DefaultScheduler, CosineScheduler, get_default_config +from cudaq_solvers.gqe_algorithm.gqe import get_default_config +from cudaq_solvers.gqe_algorithm.scheduler import DefaultScheduler, CosineScheduler, VarBasedScheduler +from cudaq_solvers.gqe_algorithm.utils import get_gqe_pauli_pool import cudaq_solvers as solvers qubit_count = 2 @@ -67,21 +69,61 @@ def cost(sampled_ops: list[cudaq.SpinOperator], **kwargs): def test_default_scheduler(): """Test the DefaultScheduler temperature scheduling""" scheduler = DefaultScheduler(start=1.0, delta=0.1) - assert scheduler.get(0) == 1.0 - assert scheduler.get(1) == 1.1 - assert scheduler.get(10) == 2.0 + assert scheduler.get_inverse_temperature() == 1.0 + scheduler.update() + assert np.isclose(scheduler.get_inverse_temperature(), 1.1, atol=1e-6) + for _ in range(9): + scheduler.update() + assert np.isclose(scheduler.get_inverse_temperature(), 2.0, atol=1e-6) def test_cosine_scheduler(): """Test the CosineScheduler temperature scheduling""" scheduler = CosineScheduler(minimum=1.0, maximum=5.0, frequency=10) - # Test at key points in the cosine cycle - assert np.isclose(scheduler.get(0), 1.0, - atol=1e-6) # min at start (cos(0)=1) - assert np.isclose(scheduler.get(5), 5.0, - atol=1e-6) # max at half cycle (cos(π)=-1) - assert np.isclose(scheduler.get(10), 1.0, - atol=1e-6) # min at full cycle (cos(2π)=1) + # Initial temperature should be at midpoint + assert np.isclose(scheduler.get_inverse_temperature(), 3.0, atol=1e-6) + + # After 5 updates, should be at maximum (cos(π)=-1) + for _ in range(5): + scheduler.update() + assert np.isclose(scheduler.get_inverse_temperature(), 5.0, atol=1e-6) + + # After 10 updates total, should be back near starting point (cos(2π)=1) + for _ in range(5): + scheduler.update() + assert np.isclose(scheduler.get_inverse_temperature(), 1.0, atol=1e-6) + + +def test_variance_scheduler(): + """Test the VarBasedScheduler temperature scheduling""" + import torch + scheduler = VarBasedScheduler(initial=2.0, delta=0.1, target_var=0.1) + + # Test initial temperature + assert scheduler.get_inverse_temperature() == 2.0 + + # Simulate high variance scenario (should increase temperature) + high_var_energies = torch.tensor([1.0, 5.0, 2.0, 6.0, 3.0]) # var ≈ 3.5 + initial_temp = scheduler.current_temperature + scheduler.update(energies=high_var_energies) + temp_after_high_var = scheduler.current_temperature + assert temp_after_high_var > initial_temp # Temperature should increase + + # Simulate low variance scenario (should decrease temperature) + scheduler2 = VarBasedScheduler(initial=2.0, delta=0.1, target_var=0.5) + low_var_energies = torch.tensor( + [1.0, 1.1, 1.05, 0.95, 1.02]) # var ≈ 0.003 + initial_temp2 = scheduler2.current_temperature + scheduler2.update(energies=low_var_energies) + temp_after_low_var = scheduler2.current_temperature + assert temp_after_low_var < initial_temp2 # Temperature should decrease + + # Test minimum temperature bound + scheduler3 = VarBasedScheduler(initial=2.0, delta=0.1, target_var=0.1) + for _ in range(100): # Many decreases + scheduler3.update(energies=low_var_energies) + final_temp = scheduler3.current_temperature + assert final_temp >= 0.01 # Should not go below min_temp (0.01) def test_solvers_gqe_basic(): @@ -151,8 +193,87 @@ def test_solvers_gqe_with_gflow_loss(): cfg.small = False cfg.cache = False cfg.save_dir = "/dev/null" + cfg.loss = "gflow" - energy, indices = solvers.gqe(cost, pool, config=cfg, loss="gflow") + energy, indices = solvers.gqe(cost, pool, config=cfg) + assert energy < 0.0 + assert energy > -2.0 + + +def test_solvers_gqe_with_exp_loss(): + """Test GQE with Exponential loss function""" + cfg = get_default_config() + cfg.num_samples = 5 + cfg.max_iters = 50 + cfg.ngates = 10 + cfg.seed = 3047 + cfg.lr = 1e-6 + cfg.energy_offset = 0.0 + cfg.grad_norm_clip = 1.0 + cfg.temperature = 2.0 + cfg.del_temperature = 0.1 + cfg.resid_pdrop = 0.0 + cfg.embd_pdrop = 0.0 + cfg.attn_pdrop = 0.0 + cfg.small = False + cfg.cache = False + cfg.save_dir = "/dev/null" + cfg.loss = "exp" + + energy, indices = solvers.gqe(cost, pool, config=cfg) + assert energy < 0.0 + assert energy > -2.0 + + +def test_solvers_gqe_with_variance_scheduler(): + """Test GQE with variance-based temperature scheduler""" + cfg = get_default_config() + cfg.num_samples = 5 + cfg.max_iters = 50 + cfg.ngates = 10 + cfg.seed = 3047 + cfg.lr = 1e-6 + cfg.energy_offset = 0.0 + cfg.grad_norm_clip = 1.0 + cfg.temperature = 2.0 + cfg.del_temperature = 0.1 + cfg.scheduler = 'variance' + cfg.target_variance = 0.1 + cfg.resid_pdrop = 0.0 + cfg.embd_pdrop = 0.0 + cfg.attn_pdrop = 0.0 + cfg.small = False + cfg.cache = False + cfg.save_dir = "/dev/null" + + energy, indices = solvers.gqe(cost, pool, config=cfg) + assert energy < 0.0 + assert energy > -2.0 + + +def test_solvers_gqe_with_cosine_scheduler(): + """Test GQE with cosine temperature scheduler""" + cfg = get_default_config() + cfg.num_samples = 5 + cfg.max_iters = 50 + cfg.ngates = 10 + cfg.seed = 3047 + cfg.lr = 1e-6 + cfg.energy_offset = 0.0 + cfg.grad_norm_clip = 1.0 + cfg.temperature = 2.0 + cfg.scheduler = 'cosine' + cfg.temperature_min = 1.5 + cfg.temperature_max = 3.0 + cfg.scheduler_frequency = 20 + cfg.resid_pdrop = 0.0 + cfg.embd_pdrop = 0.0 + cfg.attn_pdrop = 0.0 + cfg.small = False + cfg.cache = False + cfg.save_dir = "/dev/null" + + energy, indices = solvers.gqe(cost, pool, config=cfg) assert energy < 0.0 assert energy > -2.0 @@ -201,3 +322,53 @@ def test_invalid_inputs(): cfg.temperature = -1.0 with pytest.raises(ValueError): solvers.gqe(cost, pool, config=cfg) + + +def test_get_gqe_pauli_pool(): + """Test GQE Pauli operator pool generation""" + num_qubits = 4 + num_electrons = 2 + params = [0.01, -0.01, 0.05, -0.05] + + # Generate pool + pool = get_gqe_pauli_pool(num_qubits, num_electrons, params) + + # Pool should be a list + assert isinstance(pool, list) + + # Pool should not be empty + assert len(pool) > 0 + + # First operator should be identity + identity_terms = list(pool[0]) + assert len(identity_terms) == 1 + pauli_word = identity_terms[0].get_pauli_word(num_qubits) + assert pauli_word == "IIII" + + # All operators should be SpinOperators + for op in pool: + assert isinstance(op, cudaq.SpinOperator) + + # Check that pool contains parameterized operators + # Pool size should be: 1 (identity) + (num_uccsd_terms * num_params) + # At minimum, we expect more operators than just identity + assert len(pool) > len(params) + + # Verify some operators have the expected parameter scaling + non_identity_ops = pool[1:] # Skip identity + found_scaled_ops = False + for op in non_identity_ops: + terms = list(op) + for term in terms: + coeff = term.evaluate_coefficient() + # Check if coefficient matches one of our params + for param in params: + if np.isclose(abs(coeff.real), abs(param), atol=1e-10): + found_scaled_ops = True + break + if found_scaled_ops: + break + if found_scaled_ops: + break + + assert found_scaled_ops, "Pool should contain operators scaled by the provided parameters"