diff --git a/carps/analysis/calc_hypervolume.py b/carps/analysis/calc_hypervolume.py index 8de08545d..a0912d308 100644 --- a/carps/analysis/calc_hypervolume.py +++ b/carps/analysis/calc_hypervolume.py @@ -17,6 +17,7 @@ from carps.analysis.utils import get_ids_mo from carps.utils.loggingutils import get_logger, setup_logging +# from carps.analysis.gather_data import convert_mixed_types_to_str setup_logging() logger = get_logger(__file__) @@ -52,63 +53,40 @@ def gather_trajectory(x: pd.DataFrame) -> pd.DataFrame: data.append(D) return pd.DataFrame(data) +def get_pareto_front(costs): + """Return all Pareto-optimal rows from the given array. Assumes minimization.""" + is_efficient = np.ones(len(costs), dtype=bool) + for i, c in enumerate(costs): + if is_efficient[i]: + is_efficient[is_efficient] = np.any(costs[is_efficient] < c, axis=1) | np.all(costs[is_efficient] == c, axis=1) + is_efficient[i] = True + return costs[is_efficient] -def get_reference_point(x: pd.DataFrame, on_key: str = "trial_value__cost") -> np.ndarray: - """Get reference point from the dataframe. - Dataframe should only contain data from one task. The reference point is the maximum - of the costs over all trials. This is the worst case scenario for the hypervolume - calculation. The reference point is needed to define the bound of the hypervolume. +def add_running_pareto_front(group): + """Adds the pareto front of all costs up until the current trial to the group. Args: - x (pd.DataFrame): Dataframe with the trajectory. - on_key (str, optional): Column to use for the reference point. Defaults to "trial_value__cost". - Can also be "trial_value__cost_inc". + group (_type_): _description_ Returns: - np.ndarray: Reference point. + _type_: _description_ """ - if "task_id" in x.columns: - assert x["task_id"].nunique() == 1, "Cannot get reference point for multiple tasks" # noqa: PD101 - costs = get_costs(x, on_key) - return np.max(costs, axis=0) - + group = group.sort_values("n_trials").reset_index(drop=True) + costs = np.stack(group["trial_value__cost_normalized"].to_numpy()) + pareto_fronts = [] -def get_cost_min(x: pd.DataFrame, on_key: str = "trial_value__cost") -> np.ndarray: - """Get the minimum objective values from the dataframe. - - Dataframe should only contain data from one task. The point is the minimum - of the costs over all trials. This is the best case scenario for the hypervolume - calculation. The minimum point is needed for normalization. - - Args: - x (pd.DataFrame): Dataframe with the trajectory. - on_key (str, optional): Column to use for the reference point. Defaults to "trial_value__cost". - Can also be "trial_value__cost_inc". - - Returns: - np.ndarray: Minimum cost. - """ - if "task_id" in x.columns: - assert x["task_id"].nunique() == 1, "Cannot get reference point for multiple tasks" # noqa: PD101 - costs = get_costs(x, on_key) - return np.min(costs, axis=0) + for i in range(len(group)): + current_costs = costs[:i+1] + front = get_pareto_front(current_costs) + pareto_fronts.append(tuple(map(tuple, front))) + group["pareto_front"] = pareto_fronts + return group -def get_costs(x: pd.DataFrame, on_key: str = "trial_value__cost") -> np.ndarray: - """Get costs from the dataframe. - Here, it is expected that the costs are vectors (in the case of multi-objective optimization). - Args: - x (pd.DataFrame): Dataframe with the trajectory. - on_key (str, optional): Column to use for the costs. Defaults to "trial_value__cost". - Can also be "trial_value__cost_raw". - """ - return np.array(x[on_key].to_list()) - - -def add_reference_point(x: pd.DataFrame, on_key: str = "trial_value__cost") -> pd.DataFrame: +def add_reference_point(x: pd.DataFrame) -> pd.DataFrame: """Add reference point to the dataframe. The reference point is needed to define the bound of the hypervolume. @@ -121,14 +99,31 @@ def add_reference_point(x: pd.DataFrame, on_key: str = "trial_value__cost") -> p Returns: pd.DataFrame: Dataframe with the reference point. """ - reference_point = get_reference_point(x, on_key) + # Flatten and stack all cost vectors + costs = np.vstack([np.array(c) for c in x["trial_value__cost_raw"]]) + + # Sanity check for consistent dimensionality + if len(set(cost.shape[0] for cost in costs)) != 1: + raise ValueError("Inconsistent number of objectives in cost vectors.") + + # Reference point is max across all objectives + reference_point = np.max(costs, axis=0) + 1e-4 + + # Set reference point per row x["reference_point"] = [reference_point] * len(x) - minimum_cost = get_cost_min(x, on_key) - x["minimum_cost"] = [minimum_cost] * len(x) return x +def normalize_objectives(x: pd.DataFrame) -> pd.DataFrame: + costs = np.vstack(x["trial_value__cost_raw"]) + min_vals, max_vals = costs.min(0), costs.max(0) + denom = np.where(max_vals - min_vals == 0, 1, max_vals - min_vals) + normalized = (costs - min_vals) / denom + x["trial_value__cost_normalized"] = list(normalized) + return x + + -def calc_hv(x: pd.DataFrame, on_key: str = "trial_value__cost") -> pd.DataFrame: +def calc_hv(x: pd.DataFrame) -> pd.DataFrame: """Calculate hypervolume per trajectory step. Args: @@ -139,8 +134,9 @@ def calc_hv(x: pd.DataFrame, on_key: str = "trial_value__cost") -> pd.DataFrame: Returns: pd.DataFrame: Dataframe with the hypervolume. """ - F = get_costs(x, on_key) - ind = HV(ref_point=x["reference_point"].iloc[0], pf=None, nds=False) + F = np.vstack([np.array(p) for p in x["pareto_front"]]) + + ind = HV(ref_point=[1.000001]*F.shape[1], pf=None, nds=False) x["hypervolume"] = ind(F) return x @@ -219,7 +215,7 @@ def add_hypervolume_to_df(logs: pd.DataFrame, on_key: str = "trial_value__cost") """ tqdm.pandas(desc="Calc hypervolume...") ids_mo = get_ids_mo(logs) - add_reference_point_partial = partial(add_reference_point, on_key=on_key) + add_reference_point_partial = partial(add_reference_point) mo_cols = ["hypervolume", "reference_point"] for mo_col in mo_cols: if mo_col not in logs.columns: diff --git a/carps/analysis/gather_data.py b/carps/analysis/gather_data.py index e19a17269..f5510fe67 100644 --- a/carps/analysis/gather_data.py +++ b/carps/analysis/gather_data.py @@ -24,6 +24,7 @@ from carps.utils.loggingutils import get_logger, setup_logging from carps.utils.task import Task from carps.utils.trials import TrialInfo +from carps.analysis.calc_hypervolume import calc_hv, add_reference_point, run_id, add_running_pareto_front, normalize_objectives if TYPE_CHECKING: from carps.objective_functions.objective_function import ObjectiveFunction @@ -393,11 +394,11 @@ def maybe_postadd_task(logs: pd.DataFrame, overwrite: bool = False) -> pd.DataFr task_cfg = load_task_cfg(task_id=gid[0], task_index=task_index) task_cfg_yaml = OmegaConf.to_yaml(task_cfg) - if "${seed}" in task_cfg_yaml: - # Add seed to config to make it resolvable - assert gdf["seed"].nunique() == 1 # noqa: PD101 - seed = gdf["seed"].iloc[0] - task_cfg.seed = int(seed) + # if "${seed}" in task_cfg_yaml: + # # Add seed to config to make it resolvable + # assert gdf["seed"].nunique() == 1 # noqa: PD101 + # seed = gdf["seed"].iloc[0] + # task_cfg.seed = int(seed) task_cfg = OmegaConf.to_container(task_cfg, resolve=False) task_columns = [c for c in gdf.columns if c.startswith("task.")] if overwrite: @@ -469,7 +470,7 @@ def maybe_convert_cost_to_so(x: float | list | np.ndarray) -> float: float: Single-objective cost or aggregated cost. """ if isinstance(x, list | np.ndarray): - return np.sum(x) + return np.sum(x) # TODO change to HV here if isinstance(x, dict): assert len(x.values()) == 1 # Most likely comes from database @@ -478,7 +479,7 @@ def maybe_convert_cost_to_so(x: float | list | np.ndarray) -> float: if isinstance(value, str): value = ast.literal_eval(value) if isinstance(value, list): - return np.sum(value) + return np.sum(value) # TODO Change to HV here if isinstance(value, float | int): return value if isinstance(x, float): @@ -547,7 +548,12 @@ def process_logs(logs: pd.DataFrame, keep_task_columns: list[str] | None = None) logger.debug("Handle MO costs...") logs["trial_value__cost_raw"] = logs["trial_value__cost"].apply(maybe_convert_cost_dtype) - logs["trial_value__cost"] = logs["trial_value__cost_raw"].apply(maybe_convert_cost_to_so) + # trial_value__cost_raw for add_reference_point and to calc_hv + logs = logs.groupby(by=["task_type", "task_id"]).apply(normalize_objectives).reset_index(drop=True) + logs = logs.groupby(by=[*run_id]).apply(add_running_pareto_front).reset_index(drop=True) + logs = logs.groupby(by=[*run_id, "n_trials"]).apply(calc_hv).reset_index(drop=True) + logs["trial_value__cost"] = logs["hypervolume"] #logs["trial_value__cost_raw"].apply(maybe_convert_cost_to_so) + print(logs.head()) logger.debug("Determine incumbent cost...") logs["trial_value__cost_inc"] = logs.groupby(by=grouper_keys)["trial_value__cost"].transform("cummin") @@ -606,9 +612,9 @@ def normalize_logs(logs: pd.DataFrame) -> pd.DataFrame: logs["trial_value__cost_raw"] = logs["trial_value__cost"].apply(maybe_convert_cost_dtype) else: logs["trial_value__cost_raw"] = logs["trial_value__cost_raw"].apply(maybe_convert_cost_dtype) - logs = add_hypervolume_to_df(logs, on_key="trial_value__cost_raw") + # logs = add_hypervolume_to_df(logs, on_key="trial_value__cost_raw") # IDs have changed, so we need to recalculate - ids_mo = get_ids_mo(logs) + # ids_mo = get_ids_mo(logs) hv = logs.loc[ids_mo, "hypervolume"] logs.loc[ids_mo, "trial_value__cost"] = -hv # higher is better logs["trial_value__cost"] = logs["trial_value__cost"].astype("float64") @@ -785,10 +791,7 @@ def rename_legacy(logs: pd.DataFrame) -> pd.DataFrame: # NOTE(eddiebergman): Use `n_processes=None` as default, which uses `os.cpu_count()` in `Pool` def filelogs_to_df( - rundir: str | list[str], - log_fn: str = "trial_logs.jsonl", - n_processes: int | None = None, - outdir: str | Path | None = None, + rundir: str | list[str] = "results/", log_fn: str = "trial_logs.jsonl", n_processes: int | None = None ) -> tuple[pd.DataFrame, pd.DataFrame]: """Load logs from file and preprocess. diff --git a/carps/analysis/run_autorank.py b/carps/analysis/run_autorank.py index 023a2e1c9..5dcc84b57 100644 --- a/carps/analysis/run_autorank.py +++ b/carps/analysis/run_autorank.py @@ -437,14 +437,14 @@ def cd_evaluation( alpha=alpha, alpha_normality=alpha_normality, num_samples=len(rank_data), + sample_matrix=None, posterior_matrix=None, decision_matrix=None, rope=None, rope_mode=None, effect_size=res.effect_size, force_mode=None, - sample_matrix=None, - plot_order=None, + # plot_order=None, ) is_significant = True if result.pvalue >= result.alpha: diff --git a/carps/experimenter/create_cluster_configs.py b/carps/experimenter/create_cluster_configs.py index 59be7856e..a349f8f62 100644 --- a/carps/experimenter/create_cluster_configs.py +++ b/carps/experimenter/create_cluster_configs.py @@ -13,6 +13,7 @@ from py_experimenter.experimenter import PyExperimenter from carps.utils.loggingutils import CustomEncoder +import pickle as pckl logger = logging.getLogger("create experiments") @@ -79,7 +80,7 @@ def get_experiment_definition(cfg: OmegaConf) -> dict: cfg_dict = OmegaConf.to_container(cfg=cfg, resolve=True) cfg_str = json.dumps(cfg_dict, cls=CustomEncoder) - cfg_hash = create_config_hash(cfg) + cfg_hash = create_config_hash_from_full_cfg(cfg) return { "config": cfg_str, @@ -107,6 +108,7 @@ def fill_database(cfg: DictConfig, experimenter: PyExperimenter) -> None: DatabaseConnectionError: If there is an error with the database connection. """ experiment_definition = get_experiment_definition(cfg) + column_names = list(experimenter.db_connector.database_configuration.keyfields.keys()) exists = False @@ -131,7 +133,7 @@ def fill_database(cfg: DictConfig, experimenter: PyExperimenter) -> None: # experimenter.close_ssh() -@hydra.main(config_path="../configs", config_name="base.yaml", version_base=None) # type: ignore[misc] +@hydra.main(config_path="../configs", config_name="base.yaml", version_base=None, save_as_pckl=True, folder_path="configs_pckl") # type: ignore[misc] def main(cfg: DictConfig) -> None: """Store experiment config in database. @@ -141,23 +143,31 @@ def main(cfg: DictConfig) -> None: Global configuration. """ - fill_database(cfg, experimenter) + if save_as_pckl: + experiment_definition = get_experiment_definition(cfg) + files = list(Path(folder_path).glob("*.pkl")) + + if experiment_definition['config_hash'] not in files: + with open(f"{folder_path}{experiment_definition['config_hash']}.pkl", "wb") as f: + pckl.dump(experiment_definition, f) + else: + experiment_configuration_file_path = Path(__file__).parent / "py_experimenter.yaml" + + database_credential_file_path = Path(__file__).parent / "credentials.yaml" + if database_credential_file_path is not None and not database_credential_file_path.exists(): + database_credential_file_path = None # type: ignore[assignment] + + experimenter = PyExperimenter( + experiment_configuration_file_path=experiment_configuration_file_path, + name="carps", + database_credential_file_path=database_credential_file_path, + log_level=logging.INFO, + use_ssh_tunnel=OmegaConf.load(experiment_configuration_file_path).PY_EXPERIMENTER.Database.use_ssh_tunnel, + use_codecarbon=False + ) + fill_database(cfg, experimenter) if __name__ == "__main__": # TODO make experiment_configuration_file_path and database_credential_file_path a commandline arg - experiment_configuration_file_path = Path(__file__).parent / "py_experimenter.yaml" - - database_credential_file_path = Path(__file__).parent / "credentials.yaml" - if database_credential_file_path is not None and not database_credential_file_path.exists(): - database_credential_file_path = None # type: ignore[assignment] - - experimenter = PyExperimenter( - experiment_configuration_file_path=experiment_configuration_file_path, - name="carps", - database_credential_file_path=database_credential_file_path, - log_level=logging.INFO, - use_ssh_tunnel=OmegaConf.load(experiment_configuration_file_path).PY_EXPERIMENTER.Database.use_ssh_tunnel, - ) - main() diff --git a/carps/experimenter/database/download_results.py b/carps/experimenter/database/download_results.py index 60a285182..54c25f2fe 100644 --- a/carps/experimenter/database/download_results.py +++ b/carps/experimenter/database/download_results.py @@ -18,6 +18,7 @@ def main( pyexperimenter_configuration_file_path: str | None = None, database_credential_file_path: str | Path | None = None, outdir: str | Path | None = None, + codecarbon: bool = False ) -> None: """Download results from the database and save them to outdir. @@ -49,6 +50,7 @@ def main( database_credential_file_path=database_credential_file_path, log_file="logs/reset_experiments.log", use_ssh_tunnel=OmegaConf.load(experiment_configuration_file_path).PY_EXPERIMENTER.Database.use_ssh_tunnel, + use_codecarbon=codecarbon ) experiment_config_table = experimenter.get_table() @@ -64,12 +66,14 @@ def main( logger.info(f"\tFrom them, found {n_errored} errored runs of type {task_type}. ❌") trajectory_table = experimenter.get_logtable("trajectory") trials_table = experimenter.get_logtable("trials") - codecarbon_table = experimenter.get_codecarbon_table() + if codecarbon: + codecarbon_table = experimenter.get_codecarbon_table() experiment_config_table.to_parquet(outdir / "experiment_config.parquet", index=False) trajectory_table.to_parquet(outdir / "trajectory.parquet", index=False) trials_table.to_parquet(outdir / "trials.parquet", index=False) - codecarbon_table.to_parquet(outdir / "codecarbon.parquet", index=False) + if codecarbon: + codecarbon_table.to_parquet(outdir / "codecarbon.parquet", index=False) logger.info( "Downloaded results from the database. " f"Saved to '{outdir}'. " diff --git a/carps/experimenter/database/process_logs.py b/carps/experimenter/database/process_logs.py index 54e6d3f5a..d8b55a2a9 100644 --- a/carps/experimenter/database/process_logs.py +++ b/carps/experimenter/database/process_logs.py @@ -63,6 +63,7 @@ def add_metadata( "config", "config_hash", "name", + "n_trials" ] metadata_columns = [c for c in experiment_config_table.columns if c not in ignore_columns] @@ -100,9 +101,9 @@ def process_single_run_from_database( if logs_from_one_run["experiment_id"].nunique() != 1: # noqa: PD101 raise ValueError("Multiple values for `experiment_id` found in the logs. Something is suspicious.") experiment_id = logs_from_one_run["experiment_id"].iloc[0] - logs_from_one_run = process_logs(logs_from_one_run) - if only_incumbents: - logs_from_one_run = filter_non_incumbent_entries(logs=logs_from_one_run) + # logs_from_one_run = process_logs(logs_from_one_run) + # if only_incumbents: + # logs_from_one_run = filter_non_incumbent_entries(logs=logs_from_one_run) return add_metadata( logs_from_one_run=logs_from_one_run, experiment_id=experiment_id, @@ -197,9 +198,9 @@ def process_logs_from_database( # Combine the results into a single DataFrame processed_logs = pd.concat(result, ignore_index=True).reset_index(drop=True) - processed_logs.to_parquet(output_filename, index=False) + processed_logs = process_logs(processed_logs) + processed_logs.to_parquet(output_filename, index=False, engine="fastparquet") logger.info(f"Processed logs saved to {output_filename} 💌.") - return processed_logs if __name__ == "__main__": diff --git a/carps/experimenter/py_experimenter.yaml b/carps/experimenter/py_experimenter.yaml index d74570216..1fd7140d7 100644 --- a/carps/experimenter/py_experimenter.yaml +++ b/carps/experimenter/py_experimenter.yaml @@ -55,6 +55,7 @@ PY_EXPERIMENTER: trial_value__additional_info: JSON trajectory: n_trials: INT + n_function_calls: INT trial_info__config: JSON trial_info__instance: INT trial_info__seed: INT diff --git a/carps/experimenter/scrape_results_to_db.py b/carps/experimenter/scrape_results_to_db.py index 902963e63..0cce9ef28 100644 --- a/carps/experimenter/scrape_results_to_db.py +++ b/carps/experimenter/scrape_results_to_db.py @@ -35,6 +35,7 @@ database_credential_file_path=database_credential_file_path, log_level=logging.INFO, use_ssh_tunnel=OmegaConf.load(experiment_configuration_file_path).PY_EXPERIMENTER.Database.use_ssh_tunnel, + use_codecarbon=False ) diff --git a/carps/experimenter/write_to_db.py b/carps/experimenter/write_to_db.py new file mode 100644 index 000000000..964b841cc --- /dev/null +++ b/carps/experimenter/write_to_db.py @@ -0,0 +1,85 @@ +from __future__ import annotations +from omegaconf import OmegaConf +from py_experimenter.experimenter import PyExperimenter +from pathlib import Path +import logging +from multiprocessing import Pool +from tqdm import tqdm +import numpy as np +from concurrent.futures import ThreadPoolExecutor +import pickle as pckl +from hydra.core.utils import setup_globals + + +setup_globals() + + +experiment_identifiers = ["optimizer_id", "task_id", "seed", "benchmark_id", "n_trials", "time_budget"] + + +def check_existance_by_keys(experiment_definition: dict, existing_rows: list, identifier_keys: list[str]) -> bool: + """Check existance of experiment in database by the identifier keys. + + Args: + experiment_definition (dict): Experiment definition. + existing_rows (list): List of existing rows in the database. + identifier_keys (list[str]): List of keys to check for existance. + + Returns: + bool: True if the experiment exists, False otherwise. + """ + return any(all(experiment_definition[k] == e[k] for k in identifier_keys) for e in existing_rows) + + + +folder_path = Path("configs_pckl") +pkl_files = list(folder_path.glob("*.pkl")) +print('length of pkl_files', len(pkl_files)) + +def load_pickle(file_path): + with open(file_path, 'rb') as f: + return pckl.load(f) + +with ThreadPoolExecutor() as executor: + exp_defs = list(executor.map(load_pickle, pkl_files)) + + +# CONNECT TO DATABASE and get existing experiments +experiment_configuration_file_path = "carps/experimenter/py_experimenter.yaml" +database_credential_file_path = "carps/experimenter/credentials.yaml" + +experimenter = PyExperimenter( + experiment_configuration_file_path=experiment_configuration_file_path, + name="carps", + database_credential_file_path=database_credential_file_path, + log_level=logging.INFO, + use_ssh_tunnel=OmegaConf.load(experiment_configuration_file_path).PY_EXPERIMENTER.Database.use_ssh_tunnel, + use_codecarbon=False +) + + +column_names = list(experimenter.db_connector.database_configuration.keyfields.keys()) +existing_rows = experimenter.db_connector._get_existing_rows(column_names) + +# Check if experiments exists +print("Checking if experiments already exist...") +rows_exist = [ + check_existance_by_keys(exp_def, existing_rows, experiment_identifiers) + for exp_def in tqdm(exp_defs, total=len(exp_defs)) +] + + +print(f"This number of experiments already exists: {np.sum(rows_exist)}") + +experiments_to_add = [exp_def for exp_def, exists in zip(exp_defs, rows_exist, strict=True) if not exists] +print( + f"number of existing rows {len(existing_rows)}, previous length: " + f"{len(exp_defs)}, length now {len(experiments_to_add)}" +) + + +BATCH_SIZE = 5000 +for i in range(0, len(experiments_to_add), BATCH_SIZE): + batch = experiments_to_add[i:i+BATCH_SIZE] + experimenter.fill_table_with_rows(batch) + diff --git a/carps/loggers/database_logger.py b/carps/loggers/database_logger.py index b6cddd748..54b031123 100644 --- a/carps/loggers/database_logger.py +++ b/carps/loggers/database_logger.py @@ -8,6 +8,7 @@ from carps.loggers.abstract_logger import AbstractLogger from carps.utils.loggingutils import CustomEncoder, get_logger, setup_logging +import time if TYPE_CHECKING: from py_experimenter.result_processor import ResultProcessor @@ -98,12 +99,20 @@ def log_trial( table_name: str, default "trials" The name of the table to log the trial to. """ - info = convert_trial_info(trial_info, trial_value) - info["n_trials"] = n_trials - info["n_function_calls"] = n_function_calls if n_function_calls else n_trials - - if self.result_processor: - self.result_processor.process_logs({table_name: info}) + for i in range(5): + try: + info = convert_trial_info(trial_info, trial_value) + info["n_trials"] = n_trials + info["n_function_calls"] = n_function_calls if n_function_calls else n_trials + + if self.result_processor: + logger.info(f"Logging trial to {table_name}: {info}") + self.result_processor.process_logs({table_name: info}) + break + except Exception as e: + if i == 4: + raise e + time.sleep(10) def log_incumbent(self, n_trials: int | float, incumbent: Incumbent, n_function_calls: int | None = None) -> None: """Log the incumbent. @@ -142,5 +151,14 @@ def log_arbitrary(self, data: dict, entity: str) -> None: entity : str The entity to log the data to. This is the table name in the database. """ - if self.result_processor: - self.result_processor.process_logs({entity: data}) + + for i in range(5): + try: + if self.result_processor: + self.result_processor.process_logs({entity: data}) + break + except Exception as e: + if i == 4: + raise e + time.sleep(10) + diff --git a/carps/run_from_db.py b/carps/run_from_db.py index ddca3774c..d70daf285 100644 --- a/carps/run_from_db.py +++ b/carps/run_from_db.py @@ -95,6 +95,7 @@ def main( database_credential_file_path=database_credential_file_path, log_file=f"logs/{slurm_job_id}.log", use_ssh_tunnel=OmegaConf.load(experiment_configuration_file_path).PY_EXPERIMENTER.Database.use_ssh_tunnel, + use_codecarbon=False ) experimenter.execute(py_experimenter_evaluate, max_experiments=1)