diff --git a/examples/symbolic_regression/eval.py b/examples/symbolic_regression/eval.py index fa68caa80..42ac935a5 100755 --- a/examples/symbolic_regression/eval.py +++ b/examples/symbolic_regression/eval.py @@ -1,456 +1,456 @@ -from typing import Dict, Any # List removed as it's not used -import json -import os -from pathlib import Path -import numpy as np - -# import time # Not used -from scipy.stats import kendalltau -from sklearn.metrics import mean_absolute_percentage_error -from scipy.optimize import minimize -import importlib.util -import sys - -# import traceback # Not used -# import json # Not used -# Example custom JSON encoder if you need to save results with numpy types -import json - - -class NumpyFloatJSONEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, np.integer): - return int(obj) - elif isinstance(obj, np.floating): - return float(obj) - elif isinstance(obj, np.ndarray): - return obj.tolist() - return super(NumpyFloatJSONEncoder, self).default(obj) - - -def compute_output_base_metrics(y_pred: np.ndarray, y: np.ndarray) -> Dict[str, Any]: - """ - Computes base metrics after filtering NaNs from predictions. - Ensures inputs y_pred and y are treated as 1D arrays. - """ - # Ensure y_pred and y are 1D arrays. - y_pred_1d = np.asarray(y_pred).squeeze() - y_1d = np.asarray(y).squeeze() - - # If squeeze results in 0-D (scalar), reshape to 1-D - if y_pred_1d.ndim == 0: - y_pred_1d = y_pred_1d.reshape(1) - if y_1d.ndim == 0: - y_1d = y_1d.reshape(1) - - base_metrics_nan = { - "mse": float("nan"), - "nmse": float("nan"), - "r2": float("nan"), - "kdt": float("nan"), - "mape": float("nan"), - "num_valid_points": 0, - } - - if y_pred_1d.shape != y_1d.shape and not (y_pred_1d.size == 0 and y_1d.size == 0): - return { - **base_metrics_nan, - "error": "y_pred and y have incompatible shapes after ensuring 1D.", - } - - nonnan_mask = ~np.isnan(y_pred_1d) - y_pred_filtered = y_pred_1d[nonnan_mask] - y_filtered = y_1d[nonnan_mask] - - if y_pred_filtered.size == 0: # All predictions were NaN or inputs were empty - return { - **base_metrics_nan, - "error": "All predictions are NaN or no data to compare after filtering.", - } - - mse = np.mean((y_filtered - y_pred_filtered) ** 2) - var_y = np.var(y_filtered) - - if var_y == 0: - nmse = 0.0 if mse == 0 else float("inf") # Consistent if true values are constant - else: - nmse = mse / var_y - - sum_sq_res = np.sum((y_filtered - y_pred_filtered) ** 2) - sum_sq_total = np.sum((y_filtered - np.mean(y_filtered)) ** 2) # Use mean of filtered y - - if sum_sq_total == 0: # True values (after filtering) are constant - r2 = ( - 1.0 if sum_sq_res == 0 else -float("inf") - ) # Or 0.0 if mse is also 0, definition varies. Sklearn uses 1.0. - else: - r2 = 1 - (sum_sq_res / sum_sq_total) - - kdt = float("nan") - try: - if y_filtered.size >= 2: # Kendall's tau requires at least 2 points - kdt_val, _ = kendalltau(y_filtered, y_pred_filtered) - kdt = float(kdt_val) # Ensure it's a basic float (handles np.nan) - # If size < 2, kdt remains float('nan') - except ValueError: # Should be less common with size check, but as a fallback - kdt = float("nan") # Explicitly set, though already NaN. - - mape = float("nan") - try: - valid_mape_indices = y_filtered != 0 - if np.sum(valid_mape_indices) > 0: - mape = mean_absolute_percentage_error( - y_filtered[valid_mape_indices], y_pred_filtered[valid_mape_indices] - ) - elif y_filtered.size > 0: # All true values are zero - mape = 0.0 if np.all(y_pred_filtered == 0) else float("inf") - # If y_filtered.size is 0, mape remains float('nan') - except ValueError: # Fallback for any other MAPE calculation issues - mape = float("nan") - - return { - "mse": float(mse), - "nmse": float(nmse), - "r2": float(r2), - "kdt": kdt, # Already a float - "mape": float(mape) - if mape is not float("inf") - else float("inf"), # Ensure float, preserve inf - "num_valid_points": int(y_pred_filtered.size), - } - - -def objective_function( - params: np.ndarray, model_func: callable, X_matrix: np.ndarray, y_true_vector: np.ndarray -) -> float: - """ - Objective function for scipy.optimize.minimize. - Calculates MSE of the model_func with given params on X_matrix, y_true_vector. - """ - # model_func callable status is checked before calling minimize in the evaluation function. - try: - predictions = model_func(X_matrix, params) - if not isinstance(predictions, np.ndarray) or predictions.shape != y_true_vector.shape: - # print(f"Debug: Objective func - Bad prediction shape/type. Got {type(predictions)}, shape {getattr(predictions, 'shape', 'N/A')}. Expected {y_true_vector.shape}") - return float("inf") - if np.any(np.isnan(predictions)) or np.any(np.isinf(predictions)): - # print("Debug: Objective func - Predictions contain NaN/Inf.") - return float("inf") - except Exception: # Catch any error during model prediction - # print(f"Debug: Objective func - Exception during model_func call: {e_obj}") - return float("inf") - - mse = np.mean((predictions - y_true_vector) ** 2) - return mse - - -def evaluation( - program_path: str, - data_path: str, -) -> Dict[str, Dict[str, Any]]: - """ - Evaluates a model by loading it, optimizing its parameters, and testing it. - The model function from program_path is expected to be named 'func'. - """ - base_error_metrics = { - "mse": float("nan"), - "nmse": float("nan"), - "r2": float("nan"), - "kdt": float("nan"), - "mape": float("nan"), - "num_valid_points": 0, - } - - def _create_error_return(error_message: str) -> Dict[str, Dict[str, Any]]: - print(f"Error: {error_message}") - return { - "train_metrics": {**base_error_metrics, "error": error_message}, - "test_metrics": {**base_error_metrics, "error": error_message}, - "ood_metrics": {**base_error_metrics, "error": error_message}, - } - - # 1. Load data - try: - p_data_path = Path(data_path) - train_x = np.load(p_data_path / "X_train_for_eval.npy") - train_y = np.load(p_data_path / "y_train_for_eval.npy").squeeze() # Ensure 1D - test_x = np.load(p_data_path / "X_test_for_eval.npy") - test_y = np.load(p_data_path / "y_test_for_eval.npy").squeeze() # Ensure 1D - test_x_ood = np.load(p_data_path / "X_ood_test_for_eval.npy") - test_y_ood = np.load(p_data_path / "y_ood_test_for_eval.npy").squeeze() # Ensure 1D - except FileNotFoundError as e: - return _create_error_return(f"Data file not found: {e.filename}") - except Exception as e: - return _create_error_return(f"Error loading or processing data: {str(e)}") - - # 2. Load program (model function) - model_func = None - try: - p_program_path = Path(program_path) - if not p_program_path.is_file(): - raise FileNotFoundError(f"Program file not found: {program_path}") - - spec = importlib.util.spec_from_file_location("custom_model_module", str(p_program_path)) - if spec is None or spec.loader is None: - raise ImportError(f"Could not create module spec from {program_path}") - - custom_model_module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(custom_model_module) - - model_func = getattr(custom_model_module, "func", None) - if not callable(model_func): - raise AttributeError(f"'func' function not found or not callable in {program_path}") - except Exception as e: - return _create_error_return( - f"Failed to load model function 'func' from '{program_path}': {str(e)}" - ) - - # 3. Optimize parameters on training data - optimized_params = None - num_attempts = 10 # Default number of attempts - best_func_value = float("inf") - optimization_critical_error_msg = None - - # Try to get num_params from the model if it provides it, otherwise default - num_params_to_optimize = getattr(model_func, "num_params", 10) # Default to 10 if not specified - - print( - f"Starting optimization for {program_path} with {num_attempts} attempts (num_params: {num_params_to_optimize})..." - ) - for i in range(num_attempts): - print(f"Attempt {i+1}/{num_attempts}") - initial_params = np.random.rand(num_params_to_optimize) - try: - optimization_result = minimize( - objective_function, - initial_params, - args=(model_func, train_x, train_y), - method="BFGS", - # options={'maxiter': 1000, 'disp': False} # Example options - ) - if optimization_result.success: - print(f"Attempt {i+1} successful. Func value: {optimization_result.fun}") - if optimization_result.fun < best_func_value: - best_func_value = optimization_result.fun - optimized_params = optimization_result.x - print(f"New best result found in attempt {i+1}. Func value: {best_func_value}") - else: - print( - f"Warning: Optimization attempt {i+1} did not converge. Message: {optimization_result.message}. Func value: {optimization_result.fun}" - ) - if ( - optimization_result.fun < best_func_value - ): # Still consider if it's the best so far - print( - f"Non-converged result from attempt {i+1} is an improvement. Func value: {optimization_result.fun}" - ) - best_func_value = optimization_result.fun - optimized_params = optimization_result.x - - except Exception as e: - optimization_critical_error_msg = ( - f"Critical error during optimization attempt {i+1} for {program_path}: {str(e)}" - ) - print(f"Error: {optimization_critical_error_msg}") - break - - if optimization_critical_error_msg: - return _create_error_return(optimization_critical_error_msg) - - def _get_metrics_for_set( - X_data: np.ndarray, y_data: np.ndarray, set_name: str - ) -> Dict[str, Any]: - if optimized_params is None: - msg = f"Optimization failed to find parameters for {program_path}, cannot evaluate {set_name}." - return {**base_error_metrics, "error": msg} - try: - pred_y = model_func(X_data, optimized_params) - if not isinstance(pred_y, np.ndarray): - raise ValueError(f"{set_name} predictions are not numpy arrays. Got {type(pred_y)}") - - metrics = compute_output_base_metrics(pred_y, y_data) - if "error" in metrics and metrics["num_valid_points"] == 0: - print(f"Warning for {set_name} ({program_path}): {metrics['error']}") - return metrics - except Exception as e: - error_msg = f"{set_name} evaluation failed for '{program_path}': {str(e)}" - print(f"Error: {error_msg}") - return {**base_error_metrics, "error": error_msg} - - train_metrics = _get_metrics_for_set(train_x, train_y, "Train set") - test_metrics = _get_metrics_for_set(test_x, test_y, "Test set") - ood_metrics = _get_metrics_for_set(test_x_ood, test_y_ood, "OOD test set") - - return { - "train_metrics": train_metrics, - "test_metrics": test_metrics, - "ood_metrics": ood_metrics, - } - - -if __name__ == "__main__": - if len(sys.argv) < 2: - print("Usage: python your_script_name.py ") - sys.exit(1) - - root_path_arg = sys.argv[1] - path_obj = Path(root_path_arg) - problem_dirs = [] - - # Check if the path is a single problem directory - # A problem directory is expected to contain data files directly and an openevolve_output subdir - program_file_check = path_obj / "openevolve_output" / "best" / "best_program.py" - data_file_check = path_obj / "X_train_for_eval.npy" - - if data_file_check.exists() and program_file_check.exists(): - problem_dirs.append(path_obj) - print(f"Identified as single problem directory: {path_obj}") - else: - # Assume path is a parent directory containing multiple problem subdirectories - print( - f"Identified as parent directory: {path_obj}. Searching for problem subdirectories..." - ) - try: - if not path_obj.is_dir(): - print(f"Error: Root path {root_path_arg} is not a directory.") - sys.exit(1) - for d in path_obj.iterdir(): - if d.is_dir(): - # Check if this subdirectory looks like a problem directory - if (d / "X_train_for_eval.npy").exists() and ( - d / "openevolve_output" / "best" / "best_program.py" - ).exists(): - problem_dirs.append(d) - print(f" Found problem subdirectory: {d.name}") - else: - print(f" Skipping subdirectory (missing data or program): {d.name}") - except FileNotFoundError: - print(f"Error: Root directory not found: {root_path_arg}") - sys.exit(1) - - if not problem_dirs: - print( - f"No valid problem subdirectories found in '{root_path_arg}' or '{root_path_arg}' itself is not a valid problem directory." - ) - sys.exit(1) - - all_results = {} - for subdir_path in problem_dirs: - problem_name = subdir_path.name - # if "21" not in problem_name: continue - print(f"\nProcessing problem: {problem_name}") - program_file_path = subdir_path / "openevolve_output" / "best" / "best_program.py" - data_dir_path = subdir_path - - if ( - not program_file_path.exists() - ): # Should have been caught by subdir check, but as a safeguard - print(f"Skipping {problem_name}: best_program.py not found at {program_file_path}") - all_results[problem_name] = { - "train_metrics": {"error": "best_program.py not found"}, - "test_metrics": {"error": "best_program.py not found"}, - "ood_metrics": {"error": "best_program.py not found"}, - } - continue - - print(f" Program path: {program_file_path}") - print(f" Data path: {data_dir_path}") - - metrics_output = evaluation( # Renamed from 'metrics' to avoid conflict - program_path=str(program_file_path), - data_path=str(data_dir_path), - ) - print(f" Metrics for {problem_name}: {metrics_output}") - all_results[problem_name] = metrics_output - - print("\n--- All Evaluation Results ---") - for problem, result in all_results.items(): - print(f"\nProblem: {problem}") - print(f" Train Metrics: {result.get('train_metrics')}") - print(f" Test Metrics: {result.get('test_metrics')}") - print(f" OOD Metrics: {result.get('ood_metrics')}") - - # --- Overall Performance Calculation --- - overall_performance = {} - # Metrics to aggregate: mse, nmse, r2, kdt, mape - metric_keys = ["mse", "nmse", "r2", "kdt", "mape"] - dataset_types = ["train_metrics", "test_metrics", "ood_metrics"] - - for d_type in dataset_types: - overall_performance[d_type] = {} - for m_key in metric_keys: - all_scores = [] - for problem_name, results_data in all_results.items(): - # Ensure the dataset type (e.g., train_metrics) exists and doesn't have a top-level error - if d_type in results_data and "error" not in results_data[d_type]: - score = results_data[d_type].get(m_key) - # Only include if score is a number (not nan, not None, not inf for some metrics initially) - # np.nanmean and np.nanmedian will handle internal NaNs gracefully. - # We explicitly exclude inf from aggregation here, as it can skew means badly. - # For R2, -inf is possible and should be handled by nanmedian/nanmean or filtered if desired. - if isinstance(score, (int, float)) and not np.isinf( - score - ): # np.isnan(score) is fine for nan* functions - all_scores.append(score) - elif ( - score == -float("inf") and m_key == "r2" - ): # Special case for R2, allow -inf - all_scores.append(score) - - if all_scores: - # Replace -inf with NaN for R2 mean calculation if desired, or handle as is. - # For simplicity, we'll let nanmean/nanmedian handle it. - # Extreme values can still affect the mean significantly. - - # Filter out inf values for mean calculation as they make it non-informative - # but keep them for median if appropriate (or filter there too). - # For simplicity here, we are filtering inf before both. - # A more nuanced approach might replace inf with a very large/small number or handle per metric. - - scores_for_mean = [s for s in all_scores if s != -float("inf")] # R2 can be -inf - - overall_performance[d_type][f"mean_{m_key}"] = ( - np.nanmean(scores_for_mean) if scores_for_mean else float("nan") - ) - overall_performance[d_type][f"median_{m_key}"] = ( - np.nanmedian(all_scores) if all_scores else float("nan") - ) - overall_performance[d_type][f"num_problems_for_{m_key}"] = len(all_scores) - else: - overall_performance[d_type][f"mean_{m_key}"] = float("nan") - overall_performance[d_type][f"median_{m_key}"] = float("nan") - overall_performance[d_type][f"num_problems_for_{m_key}"] = 0 - - print("\n--- Overall Performance Summary ---") - for d_type, metrics_summary in overall_performance.items(): - print(f"\n{d_type.replace('_', ' ').title()}:") - if not metrics_summary: - print(" No data for overall summary.") - continue - for stat_name, value in metrics_summary.items(): - if "num_problems_for_" in stat_name: # Print count separately or alongside - m_key = stat_name.replace("num_problems_for_", "") - print(f" Number of problems for {m_key.upper()} stats: {value}") - elif "mean_" in stat_name or "median_" in stat_name: - print( - f" {stat_name.replace('_', ' ').title()}: {value:.4f}" - if isinstance(value, float) and not np.isnan(value) - else f" {stat_name.replace('_', ' ').title()}: {value}" - ) - - # Add overall performance to the results to be saved - all_results["overall_performance_summary"] = overall_performance - - # Optional: Save all_results to a JSON file - # Determine the output file path. If root_path_arg is a file, save alongside it. If a dir, save inside it. - if path_obj.is_file(): # Should not happen with current logic, but as a fallback - output_results_file = path_obj.parent / "all_evaluation_results.json" - else: # path_obj is a directory - output_results_file = path_obj / "all_evaluation_results.json" - - try: - with open(output_results_file, "w") as f: - json.dump(all_results, f, indent=4, cls=NumpyFloatJSONEncoder) - print(f"\nAll results, including overall performance, saved to {output_results_file}") - except Exception as e: - print(f"\nError saving results to JSON: {e}") +from typing import Dict, Any # List removed as it's not used +import json +import os +from pathlib import Path +import numpy as np + +# import time # Not used +from scipy.stats import kendalltau +from sklearn.metrics import mean_absolute_percentage_error +from scipy.optimize import minimize +import importlib.util +import sys + +# import traceback # Not used +# import json # Not used +# Example custom JSON encoder if you need to save results with numpy types +import json + + +class NumpyFloatJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + return super(NumpyFloatJSONEncoder, self).default(obj) + + +def compute_output_base_metrics(y_pred: np.ndarray, y: np.ndarray) -> Dict[str, Any]: + """ + Computes base metrics after filtering NaNs from predictions. + Ensures inputs y_pred and y are treated as 1D arrays. + """ + # Ensure y_pred and y are 1D arrays. + y_pred_1d = np.asarray(y_pred).squeeze() + y_1d = np.asarray(y).squeeze() + + # If squeeze results in 0-D (scalar), reshape to 1-D + if y_pred_1d.ndim == 0: + y_pred_1d = y_pred_1d.reshape(1) + if y_1d.ndim == 0: + y_1d = y_1d.reshape(1) + + base_metrics_nan = { + "mse": float("nan"), + "nmse": float("nan"), + "r2": float("nan"), + "kdt": float("nan"), + "mape": float("nan"), + "num_valid_points": 0, + } + + if y_pred_1d.shape != y_1d.shape and not (y_pred_1d.size == 0 and y_1d.size == 0): + return { + **base_metrics_nan, + "error": "y_pred and y have incompatible shapes after ensuring 1D.", + } + + nonnan_mask = ~np.isnan(y_pred_1d) + y_pred_filtered = y_pred_1d[nonnan_mask] + y_filtered = y_1d[nonnan_mask] + + if y_pred_filtered.size == 0: # All predictions were NaN or inputs were empty + return { + **base_metrics_nan, + "error": "All predictions are NaN or no data to compare after filtering.", + } + + mse = np.mean((y_filtered - y_pred_filtered) ** 2) + var_y = np.var(y_filtered) + + if var_y == 0: + nmse = 0.0 if mse == 0 else float("inf") # Consistent if true values are constant + else: + nmse = mse / var_y + + sum_sq_res = np.sum((y_filtered - y_pred_filtered) ** 2) + sum_sq_total = np.sum((y_filtered - np.mean(y_filtered)) ** 2) # Use mean of filtered y + + if sum_sq_total == 0: # True values (after filtering) are constant + r2 = ( + 1.0 if sum_sq_res == 0 else -float("inf") + ) # Or 0.0 if mse is also 0, definition varies. Sklearn uses 1.0. + else: + r2 = 1 - (sum_sq_res / sum_sq_total) + + kdt = float("nan") + try: + if y_filtered.size >= 2: # Kendall's tau requires at least 2 points + kdt_val, _ = kendalltau(y_filtered, y_pred_filtered) + kdt = float(kdt_val) # Ensure it's a basic float (handles np.nan) + # If size < 2, kdt remains float('nan') + except ValueError: # Should be less common with size check, but as a fallback + kdt = float("nan") # Explicitly set, though already NaN. + + mape = float("nan") + try: + valid_mape_indices = y_filtered != 0 + if np.sum(valid_mape_indices) > 0: + mape = mean_absolute_percentage_error( + y_filtered[valid_mape_indices], y_pred_filtered[valid_mape_indices] + ) + elif y_filtered.size > 0: # All true values are zero + mape = 0.0 if np.all(y_pred_filtered == 0) else float("inf") + # If y_filtered.size is 0, mape remains float('nan') + except ValueError: # Fallback for any other MAPE calculation issues + mape = float("nan") + + return { + "mse": float(mse), + "nmse": float(nmse), + "r2": float(r2), + "kdt": kdt, # Already a float + "mape": ( + float(mape) if mape is not float("inf") else float("inf") + ), # Ensure float, preserve inf + "num_valid_points": int(y_pred_filtered.size), + } + + +def objective_function( + params: np.ndarray, model_func: callable, X_matrix: np.ndarray, y_true_vector: np.ndarray +) -> float: + """ + Objective function for scipy.optimize.minimize. + Calculates MSE of the model_func with given params on X_matrix, y_true_vector. + """ + # model_func callable status is checked before calling minimize in the evaluation function. + try: + predictions = model_func(X_matrix, params) + if not isinstance(predictions, np.ndarray) or predictions.shape != y_true_vector.shape: + # print(f"Debug: Objective func - Bad prediction shape/type. Got {type(predictions)}, shape {getattr(predictions, 'shape', 'N/A')}. Expected {y_true_vector.shape}") + return float("inf") + if np.any(np.isnan(predictions)) or np.any(np.isinf(predictions)): + # print("Debug: Objective func - Predictions contain NaN/Inf.") + return float("inf") + except Exception: # Catch any error during model prediction + # print(f"Debug: Objective func - Exception during model_func call: {e_obj}") + return float("inf") + + mse = np.mean((predictions - y_true_vector) ** 2) + return mse + + +def evaluation( + program_path: str, + data_path: str, +) -> Dict[str, Dict[str, Any]]: + """ + Evaluates a model by loading it, optimizing its parameters, and testing it. + The model function from program_path is expected to be named 'func'. + """ + base_error_metrics = { + "mse": float("nan"), + "nmse": float("nan"), + "r2": float("nan"), + "kdt": float("nan"), + "mape": float("nan"), + "num_valid_points": 0, + } + + def _create_error_return(error_message: str) -> Dict[str, Dict[str, Any]]: + print(f"Error: {error_message}") + return { + "train_metrics": {**base_error_metrics, "error": error_message}, + "test_metrics": {**base_error_metrics, "error": error_message}, + "ood_metrics": {**base_error_metrics, "error": error_message}, + } + + # 1. Load data + try: + p_data_path = Path(data_path) + train_x = np.load(p_data_path / "X_train_for_eval.npy") + train_y = np.load(p_data_path / "y_train_for_eval.npy").squeeze() # Ensure 1D + test_x = np.load(p_data_path / "X_test_for_eval.npy") + test_y = np.load(p_data_path / "y_test_for_eval.npy").squeeze() # Ensure 1D + test_x_ood = np.load(p_data_path / "X_ood_test_for_eval.npy") + test_y_ood = np.load(p_data_path / "y_ood_test_for_eval.npy").squeeze() # Ensure 1D + except FileNotFoundError as e: + return _create_error_return(f"Data file not found: {e.filename}") + except Exception as e: + return _create_error_return(f"Error loading or processing data: {str(e)}") + + # 2. Load program (model function) + model_func = None + try: + p_program_path = Path(program_path) + if not p_program_path.is_file(): + raise FileNotFoundError(f"Program file not found: {program_path}") + + spec = importlib.util.spec_from_file_location("custom_model_module", str(p_program_path)) + if spec is None or spec.loader is None: + raise ImportError(f"Could not create module spec from {program_path}") + + custom_model_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(custom_model_module) + + model_func = getattr(custom_model_module, "func", None) + if not callable(model_func): + raise AttributeError(f"'func' function not found or not callable in {program_path}") + except Exception as e: + return _create_error_return( + f"Failed to load model function 'func' from '{program_path}': {str(e)}" + ) + + # 3. Optimize parameters on training data + optimized_params = None + num_attempts = 10 # Default number of attempts + best_func_value = float("inf") + optimization_critical_error_msg = None + + # Try to get num_params from the model if it provides it, otherwise default + num_params_to_optimize = getattr(model_func, "num_params", 10) # Default to 10 if not specified + + print( + f"Starting optimization for {program_path} with {num_attempts} attempts (num_params: {num_params_to_optimize})..." + ) + for i in range(num_attempts): + print(f"Attempt {i+1}/{num_attempts}") + initial_params = np.random.rand(num_params_to_optimize) + try: + optimization_result = minimize( + objective_function, + initial_params, + args=(model_func, train_x, train_y), + method="BFGS", + # options={'maxiter': 1000, 'disp': False} # Example options + ) + if optimization_result.success: + print(f"Attempt {i+1} successful. Func value: {optimization_result.fun}") + if optimization_result.fun < best_func_value: + best_func_value = optimization_result.fun + optimized_params = optimization_result.x + print(f"New best result found in attempt {i+1}. Func value: {best_func_value}") + else: + print( + f"Warning: Optimization attempt {i+1} did not converge. Message: {optimization_result.message}. Func value: {optimization_result.fun}" + ) + if ( + optimization_result.fun < best_func_value + ): # Still consider if it's the best so far + print( + f"Non-converged result from attempt {i+1} is an improvement. Func value: {optimization_result.fun}" + ) + best_func_value = optimization_result.fun + optimized_params = optimization_result.x + + except Exception as e: + optimization_critical_error_msg = ( + f"Critical error during optimization attempt {i+1} for {program_path}: {str(e)}" + ) + print(f"Error: {optimization_critical_error_msg}") + break + + if optimization_critical_error_msg: + return _create_error_return(optimization_critical_error_msg) + + def _get_metrics_for_set( + X_data: np.ndarray, y_data: np.ndarray, set_name: str + ) -> Dict[str, Any]: + if optimized_params is None: + msg = f"Optimization failed to find parameters for {program_path}, cannot evaluate {set_name}." + return {**base_error_metrics, "error": msg} + try: + pred_y = model_func(X_data, optimized_params) + if not isinstance(pred_y, np.ndarray): + raise ValueError(f"{set_name} predictions are not numpy arrays. Got {type(pred_y)}") + + metrics = compute_output_base_metrics(pred_y, y_data) + if "error" in metrics and metrics["num_valid_points"] == 0: + print(f"Warning for {set_name} ({program_path}): {metrics['error']}") + return metrics + except Exception as e: + error_msg = f"{set_name} evaluation failed for '{program_path}': {str(e)}" + print(f"Error: {error_msg}") + return {**base_error_metrics, "error": error_msg} + + train_metrics = _get_metrics_for_set(train_x, train_y, "Train set") + test_metrics = _get_metrics_for_set(test_x, test_y, "Test set") + ood_metrics = _get_metrics_for_set(test_x_ood, test_y_ood, "OOD test set") + + return { + "train_metrics": train_metrics, + "test_metrics": test_metrics, + "ood_metrics": ood_metrics, + } + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python your_script_name.py ") + sys.exit(1) + + root_path_arg = sys.argv[1] + path_obj = Path(root_path_arg) + problem_dirs = [] + + # Check if the path is a single problem directory + # A problem directory is expected to contain data files directly and an openevolve_output subdir + program_file_check = path_obj / "openevolve_output" / "best" / "best_program.py" + data_file_check = path_obj / "X_train_for_eval.npy" + + if data_file_check.exists() and program_file_check.exists(): + problem_dirs.append(path_obj) + print(f"Identified as single problem directory: {path_obj}") + else: + # Assume path is a parent directory containing multiple problem subdirectories + print( + f"Identified as parent directory: {path_obj}. Searching for problem subdirectories..." + ) + try: + if not path_obj.is_dir(): + print(f"Error: Root path {root_path_arg} is not a directory.") + sys.exit(1) + for d in path_obj.iterdir(): + if d.is_dir(): + # Check if this subdirectory looks like a problem directory + if (d / "X_train_for_eval.npy").exists() and ( + d / "openevolve_output" / "best" / "best_program.py" + ).exists(): + problem_dirs.append(d) + print(f" Found problem subdirectory: {d.name}") + else: + print(f" Skipping subdirectory (missing data or program): {d.name}") + except FileNotFoundError: + print(f"Error: Root directory not found: {root_path_arg}") + sys.exit(1) + + if not problem_dirs: + print( + f"No valid problem subdirectories found in '{root_path_arg}' or '{root_path_arg}' itself is not a valid problem directory." + ) + sys.exit(1) + + all_results = {} + for subdir_path in problem_dirs: + problem_name = subdir_path.name + # if "21" not in problem_name: continue + print(f"\nProcessing problem: {problem_name}") + program_file_path = subdir_path / "openevolve_output" / "best" / "best_program.py" + data_dir_path = subdir_path + + if ( + not program_file_path.exists() + ): # Should have been caught by subdir check, but as a safeguard + print(f"Skipping {problem_name}: best_program.py not found at {program_file_path}") + all_results[problem_name] = { + "train_metrics": {"error": "best_program.py not found"}, + "test_metrics": {"error": "best_program.py not found"}, + "ood_metrics": {"error": "best_program.py not found"}, + } + continue + + print(f" Program path: {program_file_path}") + print(f" Data path: {data_dir_path}") + + metrics_output = evaluation( # Renamed from 'metrics' to avoid conflict + program_path=str(program_file_path), + data_path=str(data_dir_path), + ) + print(f" Metrics for {problem_name}: {metrics_output}") + all_results[problem_name] = metrics_output + + print("\n--- All Evaluation Results ---") + for problem, result in all_results.items(): + print(f"\nProblem: {problem}") + print(f" Train Metrics: {result.get('train_metrics')}") + print(f" Test Metrics: {result.get('test_metrics')}") + print(f" OOD Metrics: {result.get('ood_metrics')}") + + # --- Overall Performance Calculation --- + overall_performance = {} + # Metrics to aggregate: mse, nmse, r2, kdt, mape + metric_keys = ["mse", "nmse", "r2", "kdt", "mape"] + dataset_types = ["train_metrics", "test_metrics", "ood_metrics"] + + for d_type in dataset_types: + overall_performance[d_type] = {} + for m_key in metric_keys: + all_scores = [] + for problem_name, results_data in all_results.items(): + # Ensure the dataset type (e.g., train_metrics) exists and doesn't have a top-level error + if d_type in results_data and "error" not in results_data[d_type]: + score = results_data[d_type].get(m_key) + # Only include if score is a number (not nan, not None, not inf for some metrics initially) + # np.nanmean and np.nanmedian will handle internal NaNs gracefully. + # We explicitly exclude inf from aggregation here, as it can skew means badly. + # For R2, -inf is possible and should be handled by nanmedian/nanmean or filtered if desired. + if isinstance(score, (int, float)) and not np.isinf( + score + ): # np.isnan(score) is fine for nan* functions + all_scores.append(score) + elif ( + score == -float("inf") and m_key == "r2" + ): # Special case for R2, allow -inf + all_scores.append(score) + + if all_scores: + # Replace -inf with NaN for R2 mean calculation if desired, or handle as is. + # For simplicity, we'll let nanmean/nanmedian handle it. + # Extreme values can still affect the mean significantly. + + # Filter out inf values for mean calculation as they make it non-informative + # but keep them for median if appropriate (or filter there too). + # For simplicity here, we are filtering inf before both. + # A more nuanced approach might replace inf with a very large/small number or handle per metric. + + scores_for_mean = [s for s in all_scores if s != -float("inf")] # R2 can be -inf + + overall_performance[d_type][f"mean_{m_key}"] = ( + np.nanmean(scores_for_mean) if scores_for_mean else float("nan") + ) + overall_performance[d_type][f"median_{m_key}"] = ( + np.nanmedian(all_scores) if all_scores else float("nan") + ) + overall_performance[d_type][f"num_problems_for_{m_key}"] = len(all_scores) + else: + overall_performance[d_type][f"mean_{m_key}"] = float("nan") + overall_performance[d_type][f"median_{m_key}"] = float("nan") + overall_performance[d_type][f"num_problems_for_{m_key}"] = 0 + + print("\n--- Overall Performance Summary ---") + for d_type, metrics_summary in overall_performance.items(): + print(f"\n{d_type.replace('_', ' ').title()}:") + if not metrics_summary: + print(" No data for overall summary.") + continue + for stat_name, value in metrics_summary.items(): + if "num_problems_for_" in stat_name: # Print count separately or alongside + m_key = stat_name.replace("num_problems_for_", "") + print(f" Number of problems for {m_key.upper()} stats: {value}") + elif "mean_" in stat_name or "median_" in stat_name: + print( + f" {stat_name.replace('_', ' ').title()}: {value:.4f}" + if isinstance(value, float) and not np.isnan(value) + else f" {stat_name.replace('_', ' ').title()}: {value}" + ) + + # Add overall performance to the results to be saved + all_results["overall_performance_summary"] = overall_performance + + # Optional: Save all_results to a JSON file + # Determine the output file path. If root_path_arg is a file, save alongside it. If a dir, save inside it. + if path_obj.is_file(): # Should not happen with current logic, but as a fallback + output_results_file = path_obj.parent / "all_evaluation_results.json" + else: # path_obj is a directory + output_results_file = path_obj / "all_evaluation_results.json" + + try: + with open(output_results_file, "w") as f: + json.dump(all_results, f, indent=4, cls=NumpyFloatJSONEncoder) + print(f"\nAll results, including overall performance, saved to {output_results_file}") + except Exception as e: + print(f"\nError saving results to JSON: {e}") diff --git a/openevolve/controller.py b/openevolve/controller.py index 6642d48df..d1907a0f3 100644 --- a/openevolve/controller.py +++ b/openevolve/controller.py @@ -1,448 +1,453 @@ -""" -Main controller for OpenEvolve -""" - -import asyncio -import logging -import os -import re -import time -import uuid -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union - -from openevolve.config import Config, load_config -from openevolve.database import Program, ProgramDatabase -from openevolve.evaluator import Evaluator -from openevolve.llm.ensemble import LLMEnsemble -from openevolve.prompt.sampler import PromptSampler -from openevolve.utils.code_utils import ( - apply_diff, - extract_code_language, - extract_diffs, - format_diff_summary, - parse_evolve_blocks, - parse_full_rewrite, -) - -logger = logging.getLogger(__name__) - - -class OpenEvolve: - """ - Main controller for OpenEvolve - - Orchestrates the evolution process, coordinating between the prompt sampler, - LLM ensemble, evaluator, and program database. - - Features: - - Tracks the absolute best program across evolution steps - - Ensures the best solution is not lost during the MAP-Elites process - - Always includes the best program in the selection process for inspiration - - Maintains detailed logs and metadata about improvements - """ - - def __init__( - self, - initial_program_path: str, - evaluation_file: str, - config_path: Optional[str] = None, - config: Optional[Config] = None, - output_dir: Optional[str] = None, - ): - # Load configuration - if config is not None: - # Use provided Config object directly - self.config = config - else: - # Load from file or use defaults - self.config = load_config(config_path) - - # Set up output directory - self.output_dir = output_dir or os.path.join( - os.path.dirname(initial_program_path), "openevolve_output" - ) - os.makedirs(self.output_dir, exist_ok=True) - - # Set up logging - self._setup_logging() - - # Load initial program - self.initial_program_path = initial_program_path - self.initial_program_code = self._load_initial_program() - self.language = extract_code_language(self.initial_program_code) - - # Extract file extension from initial program - self.file_extension = os.path.splitext(initial_program_path)[1] - if not self.file_extension: - # Default to .py if no extension found - self.file_extension = ".py" - else: - # Make sure it starts with a dot - if not self.file_extension.startswith("."): - self.file_extension = f".{self.file_extension}" - - # Initialize components - self.llm_ensemble = LLMEnsemble(self.config.llm) - self.prompt_sampler = PromptSampler(self.config.prompt) - self.database = ProgramDatabase(self.config.database) - self.evaluator = Evaluator(self.config.evaluator, evaluation_file, self.llm_ensemble) - - logger.info(f"Initialized OpenEvolve with {initial_program_path} " f"and {evaluation_file}") - - def _setup_logging(self) -> None: - """Set up logging""" - log_dir = self.config.log_dir or os.path.join(self.output_dir, "logs") - os.makedirs(log_dir, exist_ok=True) - - # Set up root logger - root_logger = logging.getLogger() - root_logger.setLevel(getattr(logging, self.config.log_level)) - - # Add file handler - log_file = os.path.join(log_dir, f"openevolve_{time.strftime('%Y%m%d_%H%M%S')}.log") - file_handler = logging.FileHandler(log_file) - file_handler.setFormatter( - logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - ) - root_logger.addHandler(file_handler) - - # Add console handler - console_handler = logging.StreamHandler() - console_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) - root_logger.addHandler(console_handler) - - logger.info(f"Logging to {log_file}") - - def _load_initial_program(self) -> str: - """Load the initial program from file""" - with open(self.initial_program_path, "r") as f: - return f.read() - - async def run( - self, - iterations: Optional[int] = None, - target_score: Optional[float] = None, - ) -> Program: - """ - Run the evolution process - - Args: - iterations: Maximum number of iterations (uses config if None) - target_score: Target score to reach (continues until reached if specified) - - Returns: - Best program found - """ - max_iterations = iterations or self.config.max_iterations - - # Initialize the database with the initial program - initial_program_id = str(uuid.uuid4()) - - # Evaluate the initial program - initial_metrics = await self.evaluator.evaluate_program( - self.initial_program_code, initial_program_id - ) - - initial_program = Program( - id=initial_program_id, - code=self.initial_program_code, - language=self.language, - metrics=initial_metrics, - ) - - self.database.add(initial_program) - - # Main evolution loop - start_iteration = self.database.last_iteration - total_iterations = start_iteration + max_iterations - - logger.info( - f"Starting evolution from iteration {start_iteration} for {max_iterations} iterations (total: {total_iterations})" - ) - - for i in range(start_iteration, total_iterations): - iteration_start = time.time() - - # Sample parent and inspirations - parent, inspirations = self.database.sample() - - # Build prompt - prompt = self.prompt_sampler.build_prompt( - current_program=parent.code, - parent_program=parent.code, # We don't have the parent's code, use the same - program_metrics=parent.metrics, - previous_programs=[p.to_dict() for p in self.database.get_top_programs(3)], - top_programs=[p.to_dict() for p in inspirations], - language=self.language, - evolution_round=i, - allow_full_rewrite=self.config.allow_full_rewrites, - ) - - # Generate code modification - try: - llm_response = await self.llm_ensemble.generate_with_context( - system_message=prompt["system"], - messages=[{"role": "user", "content": prompt["user"]}], - ) - - # Parse the response - if self.config.diff_based_evolution: - diff_blocks = extract_diffs(llm_response) - - if not diff_blocks: - logger.warning(f"Iteration {i+1}: No valid diffs found in response") - continue - - # Apply the diffs - child_code = apply_diff(parent.code, llm_response) - changes_summary = format_diff_summary(diff_blocks) - else: - # Parse full rewrite - new_code = parse_full_rewrite(llm_response, self.language) - - if not new_code: - logger.warning(f"Iteration {i+1}: No valid code found in response") - continue - - child_code = new_code - changes_summary = "Full rewrite" - - # Check code length - if len(child_code) > self.config.max_code_length: - logger.warning( - f"Iteration {i+1}: Generated code exceeds maximum length " - f"({len(child_code)} > {self.config.max_code_length})" - ) - continue - - # Evaluate the child program - child_id = str(uuid.uuid4()) - child_metrics = await self.evaluator.evaluate_program(child_code, child_id) - - # Create a child program - child_program = Program( - id=child_id, - code=child_code, - language=self.language, - parent_id=parent.id, - generation=parent.generation + 1, - metrics=child_metrics, - metadata={ - "changes": changes_summary, - "parent_metrics": parent.metrics, - }, - ) - - # Add to database - self.database.add(child_program) - - # Log progress - iteration_time = time.time() - iteration_start - self._log_iteration(i, parent, child_program, iteration_time) - - # Specifically check if this is the new best program - if self.database.best_program_id == child_program.id: - logger.info(f"🌟 New best solution found at iteration {i+1}: {child_program.id}") - logger.info( - f"Metrics: {', '.join(f'{name}={value:.4f}' for name, value in child_program.metrics.items())}" - ) - - # Save checkpoint - if (i + 1) % self.config.checkpoint_interval == 0: - self._save_checkpoint(i + 1) - - # Check if target score reached - if target_score is not None: - avg_score = sum(child_metrics.values()) / max(1, len(child_metrics)) - if avg_score >= target_score: - logger.info(f"Target score {target_score} reached after {i+1} iterations") - break - - except Exception as e: - logger.error(f"Error in iteration {i+1}: {str(e)}") - continue - - # Get the best program using our tracking mechanism - best_program = None - if self.database.best_program_id: - best_program = self.database.get(self.database.best_program_id) - logger.info(f"Using tracked best program: {self.database.best_program_id}") - - # Fallback to calculating best program if tracked program not found - if best_program is None: - best_program = self.database.get_best_program() - logger.info("Using calculated best program (tracked program not found)") - - # Check if there's a better program by combined_score that wasn't tracked - if "combined_score" in best_program.metrics: - best_by_combined = self.database.get_best_program(metric="combined_score") - if ( - best_by_combined - and best_by_combined.id != best_program.id - and "combined_score" in best_by_combined.metrics - ): - # If the combined_score of this program is significantly better, use it instead - if ( - best_by_combined.metrics["combined_score"] - > best_program.metrics["combined_score"] + 0.02 - ): - logger.warning( - f"Found program with better combined_score: {best_by_combined.id}" - ) - logger.warning( - f"Score difference: {best_program.metrics['combined_score']:.4f} vs {best_by_combined.metrics['combined_score']:.4f}" - ) - best_program = best_by_combined - - if best_program: - logger.info( - f"Evolution complete. Best program has metrics: " - f"{', '.join(f'{name}={value:.4f}' for name, value in best_program.metrics.items())}" - ) - - # Save the best program (using our tracked best program) - self._save_best_program() - - return best_program - else: - logger.warning("No valid programs found during evolution") - return initial_program - - def _log_iteration( - self, - iteration: int, - parent: Program, - child: Program, - elapsed_time: float, - ) -> None: - """ - Log iteration progress - - Args: - iteration: Iteration number - parent: Parent program - child: Child program - elapsed_time: Elapsed time in seconds - """ - # Calculate improvement - improvement = {} - for metric, value in child.metrics.items(): - if metric in parent.metrics: - diff = value - parent.metrics[metric] - improvement[metric] = diff - - improvement_str = ", ".join(f"{name}={diff:+.4f}" for name, diff in improvement.items()) - - logger.info( - f"Iteration {iteration+1}: Child {child.id} from parent {parent.id} " - f"in {elapsed_time:.2f}s. Metrics: " - f"{', '.join(f'{name}={value:.4f}' for name, value in child.metrics.items())} " - f"(Δ: {improvement_str})" - ) - - def _save_checkpoint(self, iteration: int) -> None: - """ - Save a checkpoint - - Args: - iteration: Current iteration number - """ - checkpoint_dir = os.path.join(self.output_dir, "checkpoints") - os.makedirs(checkpoint_dir, exist_ok=True) - - # Create specific checkpoint directory - checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_{iteration}") - os.makedirs(checkpoint_path, exist_ok=True) - - # Save the database - self.database.save(checkpoint_path, iteration) - - # Save the best program found so far - best_program = None - if self.database.best_program_id: - best_program = self.database.get(self.database.best_program_id) - else: - best_program = self.database.get_best_program() - - if best_program: - # Save the best program at this checkpoint - best_program_path = os.path.join(checkpoint_path, f"best_program{self.file_extension}") - with open(best_program_path, "w") as f: - f.write(best_program.code) - - # Save metrics - best_program_info_path = os.path.join(checkpoint_path, "best_program_info.json") - with open(best_program_info_path, "w") as f: - import json - - json.dump( - { - "id": best_program.id, - "generation": best_program.generation, - "iteration": iteration, - "metrics": best_program.metrics, - "language": best_program.language, - "timestamp": best_program.timestamp, - "saved_at": time.time(), - }, - f, - indent=2, - ) - - logger.info( - f"Saved best program at checkpoint {iteration} with metrics: " - f"{', '.join(f'{name}={value:.4f}' for name, value in best_program.metrics.items())}" - ) - - logger.info(f"Saved checkpoint at iteration {iteration} to {checkpoint_path}") - - def _save_best_program(self, program: Optional[Program] = None) -> None: - """ - Save the best program - - Args: - program: Best program (if None, uses the tracked best program) - """ - # If no program is provided, use the tracked best program from the database - if program is None: - if self.database.best_program_id: - program = self.database.get(self.database.best_program_id) - else: - # Fallback to calculating best program if no tracked best program - program = self.database.get_best_program() - - if not program: - logger.warning("No best program found to save") - return - - best_dir = os.path.join(self.output_dir, "best") - os.makedirs(best_dir, exist_ok=True) - - # Use the extension from the initial program file - filename = f"best_program{self.file_extension}" - code_path = os.path.join(best_dir, filename) - - with open(code_path, "w") as f: - f.write(program.code) - - # Save complete program info including metrics - info_path = os.path.join(best_dir, "best_program_info.json") - with open(info_path, "w") as f: - import json - - json.dump( - { - "id": program.id, - "generation": program.generation, - "timestamp": program.timestamp, - "parent_id": program.parent_id, - "metrics": program.metrics, - "language": program.language, - "saved_at": time.time(), - }, - f, - indent=2, - ) - - logger.info(f"Saved best program to {code_path} with program info to {info_path}") +""" +Main controller for OpenEvolve +""" + +import asyncio +import logging +import os +import re +import time +import uuid +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + +from openevolve.config import Config, load_config +from openevolve.database import Program, ProgramDatabase +from openevolve.evaluator import Evaluator +from openevolve.llm.ensemble import LLMEnsemble +from openevolve.prompt.sampler import PromptSampler +from openevolve.utils.code_utils import ( + apply_diff, + extract_code_language, + extract_diffs, + format_diff_summary, + parse_evolve_blocks, + parse_full_rewrite, +) + +logger = logging.getLogger(__name__) + + +class OpenEvolve: + """ + Main controller for OpenEvolve + + Orchestrates the evolution process, coordinating between the prompt sampler, + LLM ensemble, evaluator, and program database. + + Features: + - Tracks the absolute best program across evolution steps + - Ensures the best solution is not lost during the MAP-Elites process + - Always includes the best program in the selection process for inspiration + - Maintains detailed logs and metadata about improvements + """ + + def __init__( + self, + initial_program_path: str, + evaluation_file: str, + config_path: Optional[str] = None, + config: Optional[Config] = None, + output_dir: Optional[str] = None, + ): + # Load configuration + if config is not None: + # Use provided Config object directly + self.config = config + else: + # Load from file or use defaults + self.config = load_config(config_path) + + # Set up output directory + self.output_dir = output_dir or os.path.join( + os.path.dirname(initial_program_path), "openevolve_output" + ) + os.makedirs(self.output_dir, exist_ok=True) + + # Set up logging + self._setup_logging() + + # Load initial program + self.initial_program_path = initial_program_path + self.initial_program_code = self._load_initial_program() + self.language = extract_code_language(self.initial_program_code) + + # Extract file extension from initial program + self.file_extension = os.path.splitext(initial_program_path)[1] + if not self.file_extension: + # Default to .py if no extension found + self.file_extension = ".py" + else: + # Make sure it starts with a dot + if not self.file_extension.startswith("."): + self.file_extension = f".{self.file_extension}" + + # Initialize components + self.llm_ensemble = LLMEnsemble(self.config.llm) + self.prompt_sampler = PromptSampler(self.config.prompt) + self.database = ProgramDatabase(self.config.database) + self.evaluator = Evaluator(self.config.evaluator, evaluation_file, self.llm_ensemble) + + logger.info(f"Initialized OpenEvolve with {initial_program_path} " f"and {evaluation_file}") + + def _setup_logging(self) -> None: + """Set up logging""" + log_dir = self.config.log_dir or os.path.join(self.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + + # Set up root logger + root_logger = logging.getLogger() + root_logger.setLevel(getattr(logging, self.config.log_level)) + + # Add file handler + log_file = os.path.join(log_dir, f"openevolve_{time.strftime('%Y%m%d_%H%M%S')}.log") + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ) + root_logger.addHandler(file_handler) + + # Add console handler + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) + root_logger.addHandler(console_handler) + + logger.info(f"Logging to {log_file}") + + def _load_initial_program(self) -> str: + """Load the initial program from file""" + with open(self.initial_program_path, "r") as f: + return f.read() + + async def run( + self, + iterations: Optional[int] = None, + target_score: Optional[float] = None, + ) -> Program: + """ + Run the evolution process + + Args: + iterations: Maximum number of iterations (uses config if None) + target_score: Target score to reach (continues until reached if specified) + + Returns: + Best program found + """ + max_iterations = iterations or self.config.max_iterations + + # Initialize the database with the initial program + initial_program_id = str(uuid.uuid4()) + + # Evaluate the initial program + initial_metrics = await self.evaluator.evaluate_program( + self.initial_program_code, initial_program_id + ) + + initial_program = Program( + id=initial_program_id, + code=self.initial_program_code, + language=self.language, + metrics=initial_metrics, + iteration_found=start_iteration, + ) + + self.database.add(initial_program) + + # Main evolution loop + start_iteration = self.database.last_iteration + total_iterations = start_iteration + max_iterations + + logger.info( + f"Starting evolution from iteration {start_iteration} for {max_iterations} iterations (total: {total_iterations})" + ) + + for i in range(start_iteration, total_iterations): + iteration_start = time.time() + + # Sample parent and inspirations + parent, inspirations = self.database.sample() + + # Build prompt + prompt = self.prompt_sampler.build_prompt( + current_program=parent.code, + parent_program=parent.code, # We don't have the parent's code, use the same + program_metrics=parent.metrics, + previous_programs=[p.to_dict() for p in self.database.get_top_programs(3)], + top_programs=[p.to_dict() for p in inspirations], + language=self.language, + evolution_round=i, + allow_full_rewrite=self.config.allow_full_rewrites, + ) + + # Generate code modification + try: + llm_response = await self.llm_ensemble.generate_with_context( + system_message=prompt["system"], + messages=[{"role": "user", "content": prompt["user"]}], + ) + + # Parse the response + if self.config.diff_based_evolution: + diff_blocks = extract_diffs(llm_response) + + if not diff_blocks: + logger.warning(f"Iteration {i+1}: No valid diffs found in response") + continue + + # Apply the diffs + child_code = apply_diff(parent.code, llm_response) + changes_summary = format_diff_summary(diff_blocks) + else: + # Parse full rewrite + new_code = parse_full_rewrite(llm_response, self.language) + + if not new_code: + logger.warning(f"Iteration {i+1}: No valid code found in response") + continue + + child_code = new_code + changes_summary = "Full rewrite" + + # Check code length + if len(child_code) > self.config.max_code_length: + logger.warning( + f"Iteration {i+1}: Generated code exceeds maximum length " + f"({len(child_code)} > {self.config.max_code_length})" + ) + continue + + # Evaluate the child program + child_id = str(uuid.uuid4()) + child_metrics = await self.evaluator.evaluate_program(child_code, child_id) + + # Create a child program + child_program = Program( + id=child_id, + code=child_code, + language=self.language, + parent_id=parent.id, + generation=parent.generation + 1, + metrics=child_metrics, + metadata={ + "changes": changes_summary, + "parent_metrics": parent.metrics, + }, + ) + + # Add to database + self.database.add(child_program, iteration=i + 1) + + # Log progress + iteration_time = time.time() - iteration_start + self._log_iteration(i, parent, child_program, iteration_time) + + # Specifically check if this is the new best program + if self.database.best_program_id == child_program.id: + logger.info( + f"🌟 New best solution found at iteration {i+1}: {child_program.id}" + ) + logger.info( + f"Metrics: {', '.join(f'{name}={value:.4f}' for name, value in child_program.metrics.items())}" + ) + + # Save checkpoint + if (i + 1) % self.config.checkpoint_interval == 0: + self._save_checkpoint(i + 1) + + # Check if target score reached + if target_score is not None: + avg_score = sum(child_metrics.values()) / max(1, len(child_metrics)) + if avg_score >= target_score: + logger.info(f"Target score {target_score} reached after {i+1} iterations") + break + + except Exception as e: + logger.error(f"Error in iteration {i+1}: {str(e)}") + continue + + # Get the best program using our tracking mechanism + best_program = None + if self.database.best_program_id: + best_program = self.database.get(self.database.best_program_id) + logger.info(f"Using tracked best program: {self.database.best_program_id}") + + # Fallback to calculating best program if tracked program not found + if best_program is None: + best_program = self.database.get_best_program() + logger.info("Using calculated best program (tracked program not found)") + + # Check if there's a better program by combined_score that wasn't tracked + if "combined_score" in best_program.metrics: + best_by_combined = self.database.get_best_program(metric="combined_score") + if ( + best_by_combined + and best_by_combined.id != best_program.id + and "combined_score" in best_by_combined.metrics + ): + # If the combined_score of this program is significantly better, use it instead + if ( + best_by_combined.metrics["combined_score"] + > best_program.metrics["combined_score"] + 0.02 + ): + logger.warning( + f"Found program with better combined_score: {best_by_combined.id}" + ) + logger.warning( + f"Score difference: {best_program.metrics['combined_score']:.4f} vs {best_by_combined.metrics['combined_score']:.4f}" + ) + best_program = best_by_combined + + if best_program: + logger.info( + f"Evolution complete. Best program has metrics: " + f"{', '.join(f'{name}={value:.4f}' for name, value in best_program.metrics.items())}" + ) + + # Save the best program (using our tracked best program) + self._save_best_program() + + return best_program + else: + logger.warning("No valid programs found during evolution") + return initial_program + + def _log_iteration( + self, + iteration: int, + parent: Program, + child: Program, + elapsed_time: float, + ) -> None: + """ + Log iteration progress + + Args: + iteration: Iteration number + parent: Parent program + child: Child program + elapsed_time: Elapsed time in seconds + """ + # Calculate improvement + improvement = {} + for metric, value in child.metrics.items(): + if metric in parent.metrics: + diff = value - parent.metrics[metric] + improvement[metric] = diff + + improvement_str = ", ".join(f"{name}={diff:+.4f}" for name, diff in improvement.items()) + + logger.info( + f"Iteration {iteration+1}: Child {child.id} from parent {parent.id} " + f"in {elapsed_time:.2f}s. Metrics: " + f"{', '.join(f'{name}={value:.4f}' for name, value in child.metrics.items())} " + f"(Δ: {improvement_str})" + ) + + def _save_checkpoint(self, iteration: int) -> None: + """ + Save a checkpoint + + Args: + iteration: Current iteration number + """ + checkpoint_dir = os.path.join(self.output_dir, "checkpoints") + os.makedirs(checkpoint_dir, exist_ok=True) + + # Create specific checkpoint directory + checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_{iteration}") + os.makedirs(checkpoint_path, exist_ok=True) + + # Save the database + self.database.save(checkpoint_path, iteration) + + # Save the best program found so far + best_program = None + if self.database.best_program_id: + best_program = self.database.get(self.database.best_program_id) + else: + best_program = self.database.get_best_program() + + if best_program: + # Save the best program at this checkpoint + best_program_path = os.path.join(checkpoint_path, f"best_program{self.file_extension}") + with open(best_program_path, "w") as f: + f.write(best_program.code) + + # Save metrics + best_program_info_path = os.path.join(checkpoint_path, "best_program_info.json") + with open(best_program_info_path, "w") as f: + import json + + json.dump( + { + "id": best_program.id, + "generation": best_program.generation, + "iteration": best_program.iteration_found, + "current_iteration": iteration, + "metrics": best_program.metrics, + "language": best_program.language, + "timestamp": best_program.timestamp, + "saved_at": time.time(), + }, + f, + indent=2, + ) + + logger.info( + f"Saved best program at checkpoint {iteration} with metrics: " + f"{', '.join(f'{name}={value:.4f}' for name, value in best_program.metrics.items())}" + ) + + logger.info(f"Saved checkpoint at iteration {iteration} to {checkpoint_path}") + + def _save_best_program(self, program: Optional[Program] = None) -> None: + """ + Save the best program + + Args: + program: Best program (if None, uses the tracked best program) + """ + # If no program is provided, use the tracked best program from the database + if program is None: + if self.database.best_program_id: + program = self.database.get(self.database.best_program_id) + else: + # Fallback to calculating best program if no tracked best program + program = self.database.get_best_program() + + if not program: + logger.warning("No best program found to save") + return + + best_dir = os.path.join(self.output_dir, "best") + os.makedirs(best_dir, exist_ok=True) + + # Use the extension from the initial program file + filename = f"best_program{self.file_extension}" + code_path = os.path.join(best_dir, filename) + + with open(code_path, "w") as f: + f.write(program.code) + + # Save complete program info including metrics + info_path = os.path.join(best_dir, "best_program_info.json") + with open(info_path, "w") as f: + import json + + json.dump( + { + "id": program.id, + "generation": program.generation, + "iteration": program.iteration_found, + "timestamp": program.timestamp, + "parent_id": program.parent_id, + "metrics": program.metrics, + "language": program.language, + "saved_at": time.time(), + }, + f, + indent=2, + ) + + logger.info(f"Saved best program to {code_path} with program info to {info_path}") diff --git a/openevolve/database.py b/openevolve/database.py index 7226fce9e..e215ecfbd 100644 --- a/openevolve/database.py +++ b/openevolve/database.py @@ -1,595 +1,603 @@ -""" -Program database for OpenEvolve -""" - -import json -import logging -import os -import random -import time -from dataclasses import asdict, dataclass, field -from pathlib import Path -from typing import Any, Dict, List, Optional, Set, Tuple, Union - -import numpy as np - -from openevolve.config import DatabaseConfig -from openevolve.utils.code_utils import calculate_edit_distance - -logger = logging.getLogger(__name__) - - -@dataclass -class Program: - """Represents a program in the database""" - - # Program identification - id: str - code: str - language: str = "python" - - # Evolution information - parent_id: Optional[str] = None - generation: int = 0 - timestamp: float = field(default_factory=time.time) - - # Performance metrics - metrics: Dict[str, float] = field(default_factory=dict) - - # Derived features - complexity: float = 0.0 - diversity: float = 0.0 - - # Metadata - metadata: Dict[str, Any] = field(default_factory=dict) - - def to_dict(self) -> Dict[str, Any]: - """Convert to dictionary representation""" - return asdict(self) - - @classmethod - def from_dict(cls, data: Dict[str, Any]) -> "Program": - """Create from dictionary representation""" - return cls(**data) - - -class ProgramDatabase: - """ - Database for storing and sampling programs during evolution - - The database implements a combination of MAP-Elites algorithm and - island-based population model to maintain diversity during evolution. - It also tracks the absolute best program separately to ensure it's never lost. - """ - - def __init__(self, config: DatabaseConfig): - self.config = config - - # In-memory program storage - self.programs: Dict[str, Program] = {} - - # Feature grid for MAP-Elites - self.feature_map: Dict[str, str] = {} - self.feature_bins = config.feature_bins - - # Island populations - self.islands: List[Set[str]] = [set() for _ in range(config.num_islands)] - - # Archive of elite programs - self.archive: Set[str] = set() - - # Track the absolute best program separately - self.best_program_id: Optional[str] = None - - # Track the last iteration number (for resuming) - self.last_iteration: int = 0 - - # Load database from disk if path is provided - if config.db_path and os.path.exists(config.db_path): - self.load(config.db_path) - - logger.info(f"Initialized program database with {len(self.programs)} programs") - - def add(self, program: Program) -> str: - """ - Add a program to the database - - Args: - program: Program to add - - Returns: - Program ID - """ - # Store the program - self.programs[program.id] = program - - # Calculate feature coordinates for MAP-Elites - feature_coords = self._calculate_feature_coords(program) - - # Add to feature map (replacing existing if better) - feature_key = self._feature_coords_to_key(feature_coords) - if feature_key not in self.feature_map or self._is_better( - program, self.programs[self.feature_map[feature_key]] - ): - self.feature_map[feature_key] = program.id - - # Add to an island (randomly) - island_idx = random.randint(0, len(self.islands) - 1) - self.islands[island_idx].add(program.id) - - # Update archive - self._update_archive(program) - - # Update the absolute best program tracking - self._update_best_program(program) - - # Save to disk if configured - if self.config.db_path: - self._save_program(program) - - logger.debug(f"Added program {program.id} to database") - return program.id - - def get(self, program_id: str) -> Optional[Program]: - """ - Get a program by ID - - Args: - program_id: Program ID - - Returns: - Program or None if not found - """ - return self.programs.get(program_id) - - def sample(self) -> Tuple[Program, List[Program]]: - """ - Sample a program and inspirations for the next evolution step - - Returns: - Tuple of (parent_program, inspiration_programs) - """ - # Select parent program - parent = self._sample_parent() - - # Select inspirations - inspirations = self._sample_inspirations(parent, n=5) - - logger.debug(f"Sampled parent {parent.id} and {len(inspirations)} inspirations") - return parent, inspirations - - def get_best_program(self, metric: Optional[str] = None) -> Optional[Program]: - """ - Get the best program based on a metric - - Args: - metric: Metric to use for ranking (uses combined_score or average if None) - - Returns: - Best program or None if database is empty - """ - if not self.programs: - return None - - # If no specific metric and we have a tracked best program, return it - if metric is None and self.best_program_id and self.best_program_id in self.programs: - logger.debug(f"Using tracked best program: {self.best_program_id}") - return self.programs[self.best_program_id] - - if metric: - # Sort by specific metric - sorted_programs = sorted( - [p for p in self.programs.values() if metric in p.metrics], - key=lambda p: p.metrics[metric], - reverse=True, - ) - if sorted_programs: - logger.debug(f"Found best program by metric '{metric}': {sorted_programs[0].id}") - elif self.programs and all("combined_score" in p.metrics for p in self.programs.values()): - # Sort by combined_score if it exists (preferred method) - sorted_programs = sorted( - self.programs.values(), key=lambda p: p.metrics["combined_score"], reverse=True - ) - if sorted_programs: - logger.debug(f"Found best program by combined_score: {sorted_programs[0].id}") - else: - # Sort by average of all metrics as fallback - sorted_programs = sorted( - self.programs.values(), - key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)), - reverse=True, - ) - if sorted_programs: - logger.debug(f"Found best program by average metrics: {sorted_programs[0].id}") - - # Update the best program tracking if we found a better program - if sorted_programs and ( - self.best_program_id is None or sorted_programs[0].id != self.best_program_id - ): - old_id = self.best_program_id - self.best_program_id = sorted_programs[0].id - logger.info(f"Updated best program tracking from {old_id} to {self.best_program_id}") - - # Also log the scores to help understand the update - if ( - old_id - and old_id in self.programs - and "combined_score" in self.programs[old_id].metrics - and "combined_score" in self.programs[self.best_program_id].metrics - ): - old_score = self.programs[old_id].metrics["combined_score"] - new_score = self.programs[self.best_program_id].metrics["combined_score"] - logger.info( - f"Score change: {old_score:.4f} → {new_score:.4f} ({new_score-old_score:+.4f})" - ) - - return sorted_programs[0] if sorted_programs else None - - def get_top_programs(self, n: int = 10, metric: Optional[str] = None) -> List[Program]: - """ - Get the top N programs based on a metric - - Args: - n: Number of programs to return - metric: Metric to use for ranking (uses average if None) - - Returns: - List of top programs - """ - if not self.programs: - return [] - - if metric: - # Sort by specific metric - sorted_programs = sorted( - [p for p in self.programs.values() if metric in p.metrics], - key=lambda p: p.metrics[metric], - reverse=True, - ) - else: - # Sort by average of all metrics - sorted_programs = sorted( - self.programs.values(), - key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)), - reverse=True, - ) - - return sorted_programs[:n] - - def save(self, path: Optional[str] = None, iteration: int = 0) -> None: - """ - Save the database to disk - - Args: - path: Path to save to (uses config.db_path if None) - iteration: Current iteration number - """ - save_path = path or self.config.db_path - if not save_path: - logger.warning("No database path specified, skipping save") - return - - # Create directory if it doesn't exist - os.makedirs(save_path, exist_ok=True) - - # Save each program - for program in self.programs.values(): - self._save_program(program, save_path) - - # Save metadata - metadata = { - "feature_map": self.feature_map, - "islands": [list(island) for island in self.islands], - "archive": list(self.archive), - "best_program_id": self.best_program_id, - "last_iteration": iteration or self.last_iteration, - } - - with open(os.path.join(save_path, "metadata.json"), "w") as f: - json.dump(metadata, f) - - logger.info(f"Saved database with {len(self.programs)} programs to {save_path}") - - def load(self, path: str) -> None: - """ - Load the database from disk - - Args: - path: Path to load from - """ - if not os.path.exists(path): - logger.warning(f"Database path {path} does not exist, skipping load") - return - - # Load metadata - metadata_path = os.path.join(path, "metadata.json") - if os.path.exists(metadata_path): - with open(metadata_path, "r") as f: - metadata = json.load(f) - - self.feature_map = metadata.get("feature_map", {}) - self.islands = [set(island) for island in metadata.get("islands", [])] - self.archive = set(metadata.get("archive", [])) - self.best_program_id = metadata.get("best_program_id") - self.last_iteration = metadata.get("last_iteration", 0) - - logger.info(f"Loaded database metadata with last_iteration={self.last_iteration}") - - # Load programs - programs_dir = os.path.join(path, "programs") - if os.path.exists(programs_dir): - for program_file in os.listdir(programs_dir): - if program_file.endswith(".json"): - program_path = os.path.join(programs_dir, program_file) - try: - with open(program_path, "r") as f: - program_data = json.load(f) - - program = Program.from_dict(program_data) - self.programs[program.id] = program - except Exception as e: - logger.warning(f"Error loading program {program_file}: {str(e)}") - - logger.info(f"Loaded database with {len(self.programs)} programs from {path}") - - def _save_program(self, program: Program, base_path: Optional[str] = None) -> None: - """ - Save a program to disk - - Args: - program: Program to save - base_path: Base path to save to (uses config.db_path if None) - """ - save_path = base_path or self.config.db_path - if not save_path: - return - - # Create programs directory if it doesn't exist - programs_dir = os.path.join(save_path, "programs") - os.makedirs(programs_dir, exist_ok=True) - - # Save program - program_path = os.path.join(programs_dir, f"{program.id}.json") - with open(program_path, "w") as f: - json.dump(program.to_dict(), f) - - def _calculate_feature_coords(self, program: Program) -> List[int]: - """ - Calculate feature coordinates for the MAP-Elites grid - - Args: - program: Program to calculate features for - - Returns: - List of feature coordinates - """ - coords = [] - - for dim in self.config.feature_dimensions: - if dim == "complexity": - # Use code length as complexity measure - complexity = len(program.code) - bin_idx = min(int(complexity / 1000 * self.feature_bins), self.feature_bins - 1) - coords.append(bin_idx) - elif dim == "diversity": - # Use average edit distance to other programs - if len(self.programs) < 5: - bin_idx = 0 - else: - sample_programs = random.sample( - list(self.programs.values()), min(5, len(self.programs)) - ) - avg_distance = sum( - calculate_edit_distance(program.code, other.code) - for other in sample_programs - ) / len(sample_programs) - bin_idx = min( - int(avg_distance / 1000 * self.feature_bins), self.feature_bins - 1 - ) - coords.append(bin_idx) - elif dim == "score": - # Use average of metrics - if not program.metrics: - bin_idx = 0 - else: - avg_score = sum(program.metrics.values()) / len(program.metrics) - bin_idx = min(int(avg_score * self.feature_bins), self.feature_bins - 1) - coords.append(bin_idx) - elif dim in program.metrics: - # Use specific metric - score = program.metrics[dim] - bin_idx = min(int(score * self.feature_bins), self.feature_bins - 1) - coords.append(bin_idx) - else: - # Default to middle bin if feature not found - coords.append(self.feature_bins // 2) - - return coords - - def _feature_coords_to_key(self, coords: List[int]) -> str: - """ - Convert feature coordinates to a string key - - Args: - coords: Feature coordinates - - Returns: - String key - """ - return "-".join(str(c) for c in coords) - - def _is_better(self, program1: Program, program2: Program) -> bool: - """ - Determine if program1 is better than program2 - - Args: - program1: First program - program2: Second program - - Returns: - True if program1 is better than program2 - """ - # If no metrics, use newest - if not program1.metrics and not program2.metrics: - return program1.timestamp > program2.timestamp - - # If only one has metrics, it's better - if program1.metrics and not program2.metrics: - return True - if not program1.metrics and program2.metrics: - return False - - # Check for combined_score first (this is the preferred metric) - if "combined_score" in program1.metrics and "combined_score" in program2.metrics: - return program1.metrics["combined_score"] > program2.metrics["combined_score"] - - # Fallback to average of all metrics - avg1 = sum(program1.metrics.values()) / len(program1.metrics) - avg2 = sum(program2.metrics.values()) / len(program2.metrics) - - return avg1 > avg2 - - def _update_archive(self, program: Program) -> None: - """ - Update the archive of elite programs - - Args: - program: Program to consider for archive - """ - # If archive not full, add program - if len(self.archive) < self.config.archive_size: - self.archive.add(program.id) - return - - # Otherwise, find worst program in archive - archive_programs = [self.programs[pid] for pid in self.archive] - worst_program = min( - archive_programs, key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)) - ) - - # Replace if new program is better - if self._is_better(program, worst_program): - self.archive.remove(worst_program.id) - self.archive.add(program.id) - - def _update_best_program(self, program: Program) -> None: - """ - Update the absolute best program tracking - - Args: - program: Program to consider as the new best - """ - # If we don't have a best program yet, this becomes the best - if self.best_program_id is None: - self.best_program_id = program.id - logger.debug(f"Set initial best program to {program.id}") - return - - # Compare with current best program - current_best = self.programs[self.best_program_id] - - # Update if the new program is better - if self._is_better(program, current_best): - old_id = self.best_program_id - self.best_program_id = program.id - - # Log the change - if "combined_score" in program.metrics and "combined_score" in current_best.metrics: - old_score = current_best.metrics["combined_score"] - new_score = program.metrics["combined_score"] - score_diff = new_score - old_score - logger.info( - f"New best program {program.id} replaces {old_id} (combined_score: {old_score:.4f} → {new_score:.4f}, +{score_diff:.4f})" - ) - else: - logger.info(f"New best program {program.id} replaces {old_id}") - - def _sample_parent(self) -> Program: - """ - Sample a parent program for the next evolution step - - Returns: - Parent program - """ - # Decide between exploitation and exploration - if random.random() < self.config.exploitation_ratio and self.archive: - # Exploitation: Use elite program from archive - parent_id = random.choice(list(self.archive)) - return self.programs[parent_id] - - # Exploration: Sample from an island - island_idx = random.randint(0, len(self.islands) - 1) - - if not self.islands[island_idx]: - # If island is empty, use best program - return self.get_best_program() or next(iter(self.programs.values())) - - parent_id = random.choice(list(self.islands[island_idx])) - return self.programs[parent_id] - - def _sample_inspirations(self, parent: Program, n: int = 5) -> List[Program]: - """ - Sample inspiration programs for the next evolution step - - Args: - parent: Parent program - n: Number of inspirations to sample - - Returns: - List of inspiration programs - """ - inspirations = [] - - # Always include the absolute best program if available and different from parent - if self.best_program_id is not None and self.best_program_id != parent.id: - best_program = self.programs[self.best_program_id] - inspirations.append(best_program) - logger.debug(f"Including best program {self.best_program_id} in inspirations") - - # Add top programs as inspirations - top_n = max(1, int(n * self.config.elite_selection_ratio)) - top_programs = self.get_top_programs(n=top_n) - for program in top_programs: - if program.id not in [p.id for p in inspirations] and program.id != parent.id: - inspirations.append(program) - - # Add diverse programs - if len(self.programs) > n and len(inspirations) < n: - # Sample from different feature cells - feature_coords = self._calculate_feature_coords(parent) - - # Get programs from nearby feature cells - nearby_programs = [] - for _ in range(n - len(inspirations)): - # Perturb coordinates - perturbed_coords = [ - max(0, min(self.feature_bins - 1, c + random.randint(-1, 1))) - for c in feature_coords - ] - - # Try to get program from this cell - cell_key = self._feature_coords_to_key(perturbed_coords) - if cell_key in self.feature_map: - program_id = self.feature_map[cell_key] - if program_id != parent.id and program_id not in [p.id for p in inspirations]: - nearby_programs.append(self.programs[program_id]) - - # If we need more, add random programs - if len(inspirations) + len(nearby_programs) < n: - remaining = n - len(inspirations) - len(nearby_programs) - all_ids = set(self.programs.keys()) - excluded_ids = ( - {parent.id} - .union(p.id for p in inspirations) - .union(p.id for p in nearby_programs) - ) - available_ids = list(all_ids - excluded_ids) - - if available_ids: - random_ids = random.sample(available_ids, min(remaining, len(available_ids))) - random_programs = [self.programs[pid] for pid in random_ids] - nearby_programs.extend(random_programs) - - inspirations.extend(nearby_programs) - - return inspirations[:n] +""" +Program database for OpenEvolve +""" + +import json +import logging +import os +import random +import time +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Tuple, Union + +import numpy as np + +from openevolve.config import DatabaseConfig +from openevolve.utils.code_utils import calculate_edit_distance + +logger = logging.getLogger(__name__) + + +@dataclass +class Program: + """Represents a program in the database""" + + # Program identification + id: str + code: str + language: str = "python" + + # Evolution information + parent_id: Optional[str] = None + generation: int = 0 + timestamp: float = field(default_factory=time.time) + iteration_found: int = 0 # Track which iteration this program was found + + # Performance metrics + metrics: Dict[str, float] = field(default_factory=dict) + + # Derived features + complexity: float = 0.0 + diversity: float = 0.0 + + # Metadata + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary representation""" + return asdict(self) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "Program": + """Create from dictionary representation""" + return cls(**data) + + +class ProgramDatabase: + """ + Database for storing and sampling programs during evolution + + The database implements a combination of MAP-Elites algorithm and + island-based population model to maintain diversity during evolution. + It also tracks the absolute best program separately to ensure it's never lost. + """ + + def __init__(self, config: DatabaseConfig): + self.config = config + + # In-memory program storage + self.programs: Dict[str, Program] = {} + + # Feature grid for MAP-Elites + self.feature_map: Dict[str, str] = {} + self.feature_bins = config.feature_bins + + # Island populations + self.islands: List[Set[str]] = [set() for _ in range(config.num_islands)] + + # Archive of elite programs + self.archive: Set[str] = set() + + # Track the absolute best program separately + self.best_program_id: Optional[str] = None + + # Track the last iteration number (for resuming) + self.last_iteration: int = 0 + + # Load database from disk if path is provided + if config.db_path and os.path.exists(config.db_path): + self.load(config.db_path) + + logger.info(f"Initialized program database with {len(self.programs)} programs") + + def add(self, program: Program, iteration: int = None) -> str: + """ + Add a program to the database + + Args: + program: Program to add + iteration: Current iteration (defaults to last_iteration) + + Returns: + Program ID + """ + # Store the program + # If iteration is provided, update the program's iteration_found + if iteration is not None: + program.iteration_found = iteration + # Update last_iteration if needed + self.last_iteration = max(self.last_iteration, iteration) + + self.programs[program.id] = program + + # Calculate feature coordinates for MAP-Elites + feature_coords = self._calculate_feature_coords(program) + + # Add to feature map (replacing existing if better) + feature_key = self._feature_coords_to_key(feature_coords) + if feature_key not in self.feature_map or self._is_better( + program, self.programs[self.feature_map[feature_key]] + ): + self.feature_map[feature_key] = program.id + + # Add to an island (randomly) + island_idx = random.randint(0, len(self.islands) - 1) + self.islands[island_idx].add(program.id) + + # Update archive + self._update_archive(program) + + # Update the absolute best program tracking + self._update_best_program(program) + + # Save to disk if configured + if self.config.db_path: + self._save_program(program) + + logger.debug(f"Added program {program.id} to database") + return program.id + + def get(self, program_id: str) -> Optional[Program]: + """ + Get a program by ID + + Args: + program_id: Program ID + + Returns: + Program or None if not found + """ + return self.programs.get(program_id) + + def sample(self) -> Tuple[Program, List[Program]]: + """ + Sample a program and inspirations for the next evolution step + + Returns: + Tuple of (parent_program, inspiration_programs) + """ + # Select parent program + parent = self._sample_parent() + + # Select inspirations + inspirations = self._sample_inspirations(parent, n=5) + + logger.debug(f"Sampled parent {parent.id} and {len(inspirations)} inspirations") + return parent, inspirations + + def get_best_program(self, metric: Optional[str] = None) -> Optional[Program]: + """ + Get the best program based on a metric + + Args: + metric: Metric to use for ranking (uses combined_score or average if None) + + Returns: + Best program or None if database is empty + """ + if not self.programs: + return None + + # If no specific metric and we have a tracked best program, return it + if metric is None and self.best_program_id and self.best_program_id in self.programs: + logger.debug(f"Using tracked best program: {self.best_program_id}") + return self.programs[self.best_program_id] + + if metric: + # Sort by specific metric + sorted_programs = sorted( + [p for p in self.programs.values() if metric in p.metrics], + key=lambda p: p.metrics[metric], + reverse=True, + ) + if sorted_programs: + logger.debug(f"Found best program by metric '{metric}': {sorted_programs[0].id}") + elif self.programs and all("combined_score" in p.metrics for p in self.programs.values()): + # Sort by combined_score if it exists (preferred method) + sorted_programs = sorted( + self.programs.values(), key=lambda p: p.metrics["combined_score"], reverse=True + ) + if sorted_programs: + logger.debug(f"Found best program by combined_score: {sorted_programs[0].id}") + else: + # Sort by average of all metrics as fallback + sorted_programs = sorted( + self.programs.values(), + key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)), + reverse=True, + ) + if sorted_programs: + logger.debug(f"Found best program by average metrics: {sorted_programs[0].id}") + + # Update the best program tracking if we found a better program + if sorted_programs and ( + self.best_program_id is None or sorted_programs[0].id != self.best_program_id + ): + old_id = self.best_program_id + self.best_program_id = sorted_programs[0].id + logger.info(f"Updated best program tracking from {old_id} to {self.best_program_id}") + + # Also log the scores to help understand the update + if ( + old_id + and old_id in self.programs + and "combined_score" in self.programs[old_id].metrics + and "combined_score" in self.programs[self.best_program_id].metrics + ): + old_score = self.programs[old_id].metrics["combined_score"] + new_score = self.programs[self.best_program_id].metrics["combined_score"] + logger.info( + f"Score change: {old_score:.4f} → {new_score:.4f} ({new_score-old_score:+.4f})" + ) + + return sorted_programs[0] if sorted_programs else None + + def get_top_programs(self, n: int = 10, metric: Optional[str] = None) -> List[Program]: + """ + Get the top N programs based on a metric + + Args: + n: Number of programs to return + metric: Metric to use for ranking (uses average if None) + + Returns: + List of top programs + """ + if not self.programs: + return [] + + if metric: + # Sort by specific metric + sorted_programs = sorted( + [p for p in self.programs.values() if metric in p.metrics], + key=lambda p: p.metrics[metric], + reverse=True, + ) + else: + # Sort by average of all metrics + sorted_programs = sorted( + self.programs.values(), + key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)), + reverse=True, + ) + + return sorted_programs[:n] + + def save(self, path: Optional[str] = None, iteration: int = 0) -> None: + """ + Save the database to disk + + Args: + path: Path to save to (uses config.db_path if None) + iteration: Current iteration number + """ + save_path = path or self.config.db_path + if not save_path: + logger.warning("No database path specified, skipping save") + return + + # Create directory if it doesn't exist + os.makedirs(save_path, exist_ok=True) + + # Save each program + for program in self.programs.values(): + self._save_program(program, save_path) + + # Save metadata + metadata = { + "feature_map": self.feature_map, + "islands": [list(island) for island in self.islands], + "archive": list(self.archive), + "best_program_id": self.best_program_id, + "last_iteration": iteration or self.last_iteration, + } + + with open(os.path.join(save_path, "metadata.json"), "w") as f: + json.dump(metadata, f) + + logger.info(f"Saved database with {len(self.programs)} programs to {save_path}") + + def load(self, path: str) -> None: + """ + Load the database from disk + + Args: + path: Path to load from + """ + if not os.path.exists(path): + logger.warning(f"Database path {path} does not exist, skipping load") + return + + # Load metadata + metadata_path = os.path.join(path, "metadata.json") + if os.path.exists(metadata_path): + with open(metadata_path, "r") as f: + metadata = json.load(f) + + self.feature_map = metadata.get("feature_map", {}) + self.islands = [set(island) for island in metadata.get("islands", [])] + self.archive = set(metadata.get("archive", [])) + self.best_program_id = metadata.get("best_program_id") + self.last_iteration = metadata.get("last_iteration", 0) + + logger.info(f"Loaded database metadata with last_iteration={self.last_iteration}") + + # Load programs + programs_dir = os.path.join(path, "programs") + if os.path.exists(programs_dir): + for program_file in os.listdir(programs_dir): + if program_file.endswith(".json"): + program_path = os.path.join(programs_dir, program_file) + try: + with open(program_path, "r") as f: + program_data = json.load(f) + + program = Program.from_dict(program_data) + self.programs[program.id] = program + except Exception as e: + logger.warning(f"Error loading program {program_file}: {str(e)}") + + logger.info(f"Loaded database with {len(self.programs)} programs from {path}") + + def _save_program(self, program: Program, base_path: Optional[str] = None) -> None: + """ + Save a program to disk + + Args: + program: Program to save + base_path: Base path to save to (uses config.db_path if None) + """ + save_path = base_path or self.config.db_path + if not save_path: + return + + # Create programs directory if it doesn't exist + programs_dir = os.path.join(save_path, "programs") + os.makedirs(programs_dir, exist_ok=True) + + # Save program + program_path = os.path.join(programs_dir, f"{program.id}.json") + with open(program_path, "w") as f: + json.dump(program.to_dict(), f) + + def _calculate_feature_coords(self, program: Program) -> List[int]: + """ + Calculate feature coordinates for the MAP-Elites grid + + Args: + program: Program to calculate features for + + Returns: + List of feature coordinates + """ + coords = [] + + for dim in self.config.feature_dimensions: + if dim == "complexity": + # Use code length as complexity measure + complexity = len(program.code) + bin_idx = min(int(complexity / 1000 * self.feature_bins), self.feature_bins - 1) + coords.append(bin_idx) + elif dim == "diversity": + # Use average edit distance to other programs + if len(self.programs) < 5: + bin_idx = 0 + else: + sample_programs = random.sample( + list(self.programs.values()), min(5, len(self.programs)) + ) + avg_distance = sum( + calculate_edit_distance(program.code, other.code) + for other in sample_programs + ) / len(sample_programs) + bin_idx = min( + int(avg_distance / 1000 * self.feature_bins), self.feature_bins - 1 + ) + coords.append(bin_idx) + elif dim == "score": + # Use average of metrics + if not program.metrics: + bin_idx = 0 + else: + avg_score = sum(program.metrics.values()) / len(program.metrics) + bin_idx = min(int(avg_score * self.feature_bins), self.feature_bins - 1) + coords.append(bin_idx) + elif dim in program.metrics: + # Use specific metric + score = program.metrics[dim] + bin_idx = min(int(score * self.feature_bins), self.feature_bins - 1) + coords.append(bin_idx) + else: + # Default to middle bin if feature not found + coords.append(self.feature_bins // 2) + + return coords + + def _feature_coords_to_key(self, coords: List[int]) -> str: + """ + Convert feature coordinates to a string key + + Args: + coords: Feature coordinates + + Returns: + String key + """ + return "-".join(str(c) for c in coords) + + def _is_better(self, program1: Program, program2: Program) -> bool: + """ + Determine if program1 is better than program2 + + Args: + program1: First program + program2: Second program + + Returns: + True if program1 is better than program2 + """ + # If no metrics, use newest + if not program1.metrics and not program2.metrics: + return program1.timestamp > program2.timestamp + + # If only one has metrics, it's better + if program1.metrics and not program2.metrics: + return True + if not program1.metrics and program2.metrics: + return False + + # Check for combined_score first (this is the preferred metric) + if "combined_score" in program1.metrics and "combined_score" in program2.metrics: + return program1.metrics["combined_score"] > program2.metrics["combined_score"] + + # Fallback to average of all metrics + avg1 = sum(program1.metrics.values()) / len(program1.metrics) + avg2 = sum(program2.metrics.values()) / len(program2.metrics) + + return avg1 > avg2 + + def _update_archive(self, program: Program) -> None: + """ + Update the archive of elite programs + + Args: + program: Program to consider for archive + """ + # If archive not full, add program + if len(self.archive) < self.config.archive_size: + self.archive.add(program.id) + return + + # Otherwise, find worst program in archive + archive_programs = [self.programs[pid] for pid in self.archive] + worst_program = min( + archive_programs, key=lambda p: sum(p.metrics.values()) / max(1, len(p.metrics)) + ) + + # Replace if new program is better + if self._is_better(program, worst_program): + self.archive.remove(worst_program.id) + self.archive.add(program.id) + + def _update_best_program(self, program: Program) -> None: + """ + Update the absolute best program tracking + + Args: + program: Program to consider as the new best + """ + # If we don't have a best program yet, this becomes the best + if self.best_program_id is None: + self.best_program_id = program.id + logger.debug(f"Set initial best program to {program.id}") + return + + # Compare with current best program + current_best = self.programs[self.best_program_id] + + # Update if the new program is better + if self._is_better(program, current_best): + old_id = self.best_program_id + self.best_program_id = program.id + + # Log the change + if "combined_score" in program.metrics and "combined_score" in current_best.metrics: + old_score = current_best.metrics["combined_score"] + new_score = program.metrics["combined_score"] + score_diff = new_score - old_score + logger.info( + f"New best program {program.id} replaces {old_id} (combined_score: {old_score:.4f} → {new_score:.4f}, +{score_diff:.4f})" + ) + else: + logger.info(f"New best program {program.id} replaces {old_id}") + + def _sample_parent(self) -> Program: + """ + Sample a parent program for the next evolution step + + Returns: + Parent program + """ + # Decide between exploitation and exploration + if random.random() < self.config.exploitation_ratio and self.archive: + # Exploitation: Use elite program from archive + parent_id = random.choice(list(self.archive)) + return self.programs[parent_id] + + # Exploration: Sample from an island + island_idx = random.randint(0, len(self.islands) - 1) + + if not self.islands[island_idx]: + # If island is empty, use best program + return self.get_best_program() or next(iter(self.programs.values())) + + parent_id = random.choice(list(self.islands[island_idx])) + return self.programs[parent_id] + + def _sample_inspirations(self, parent: Program, n: int = 5) -> List[Program]: + """ + Sample inspiration programs for the next evolution step + + Args: + parent: Parent program + n: Number of inspirations to sample + + Returns: + List of inspiration programs + """ + inspirations = [] + + # Always include the absolute best program if available and different from parent + if self.best_program_id is not None and self.best_program_id != parent.id: + best_program = self.programs[self.best_program_id] + inspirations.append(best_program) + logger.debug(f"Including best program {self.best_program_id} in inspirations") + + # Add top programs as inspirations + top_n = max(1, int(n * self.config.elite_selection_ratio)) + top_programs = self.get_top_programs(n=top_n) + for program in top_programs: + if program.id not in [p.id for p in inspirations] and program.id != parent.id: + inspirations.append(program) + + # Add diverse programs + if len(self.programs) > n and len(inspirations) < n: + # Sample from different feature cells + feature_coords = self._calculate_feature_coords(parent) + + # Get programs from nearby feature cells + nearby_programs = [] + for _ in range(n - len(inspirations)): + # Perturb coordinates + perturbed_coords = [ + max(0, min(self.feature_bins - 1, c + random.randint(-1, 1))) + for c in feature_coords + ] + + # Try to get program from this cell + cell_key = self._feature_coords_to_key(perturbed_coords) + if cell_key in self.feature_map: + program_id = self.feature_map[cell_key] + if program_id != parent.id and program_id not in [p.id for p in inspirations]: + nearby_programs.append(self.programs[program_id]) + + # If we need more, add random programs + if len(inspirations) + len(nearby_programs) < n: + remaining = n - len(inspirations) - len(nearby_programs) + all_ids = set(self.programs.keys()) + excluded_ids = ( + {parent.id} + .union(p.id for p in inspirations) + .union(p.id for p in nearby_programs) + ) + available_ids = list(all_ids - excluded_ids) + + if available_ids: + random_ids = random.sample(available_ids, min(remaining, len(available_ids))) + random_programs = [self.programs[pid] for pid in random_ids] + nearby_programs.extend(random_programs) + + inspirations.extend(nearby_programs) + + return inspirations[:n]