diff --git a/causaltune/optimiser.py b/causaltune/optimiser.py index ff5f037..72cd8d3 100644 --- a/causaltune/optimiser.py +++ b/causaltune/optimiser.py @@ -94,6 +94,7 @@ def __init__( test_size=None, num_samples=-1, propensity_model="dummy", + propensity_automl_estimators: Optional[List[str]] = None, outcome_model="nested", components_task="regression", components_verbose=0, @@ -185,6 +186,7 @@ def __init__( self._settings["component_models"]["n_jobs"] = components_njobs self._settings["component_models"]["time_budget"] = components_time_budget self._settings["component_models"]["eval_method"] = "holdout" + self._settings["propensity_automl_estimators"] = propensity_automl_estimators if 0 < train_size < 1: component_test_size = 1 - train_size @@ -224,9 +226,11 @@ def init_propensity_model(self, propensity_model: str): if propensity_model == "dummy": self.propensity_model = DummyClassifier(strategy="prior") elif propensity_model == "auto": - self.propensity_model = AutoML( - **{**self._settings["component_models"], "task": "classification"} - ) + automl_args = {**self._settings["component_models"], "task": "classification"} + if self._settings["propensity_automl_estimators"]: + automl_args["estimator_list"] = self._settings["propensity_automl_estimators"] + + self.propensity_model = AutoML(**automl_args) elif hasattr(propensity_model, "fit") and hasattr(propensity_model, "predict_proba"): self.propensity_model = propensity_model else: diff --git a/causaltune/remote.py b/causaltune/remote.py index e1c1c1b..ddd79b8 100644 --- a/causaltune/remote.py +++ b/causaltune/remote.py @@ -7,6 +7,6 @@ def remote_exec(function, args, use_ray=False): else: from joblib import Parallel, delayed - return Parallel(n_jobs=2, backend="threading")(delayed(function)(*args) for i in range(1))[ + return Parallel(n_jobs=1, backend="threading")(delayed(function)(*args) for i in range(1))[ 0 ] diff --git a/causaltune/score/bite.py b/causaltune/score/bite.py new file mode 100644 index 0000000..b501ec5 --- /dev/null +++ b/causaltune/score/bite.py @@ -0,0 +1,143 @@ +from typing import List, Optional + +import numpy as np +import pandas as pd +from scipy.stats import kendalltau + + +def bite( + working_df: pd.DataFrame, + treatment_name: str, + outcome_name: str, + min_N: int = 10, + max_N: int = 1000, + num_N: int = 20, + N_values: Optional[List[int]] = None, + clip_propensity: float = 0.05, +) -> float: + max_N = int(min(max_N, len(working_df) / 10)) + if N_values is None: + N_values = exponential_spacing(min_N, max_N, num_N) + # Calculate weights with clipping to avoid extremes + working_df["weights"] = np.where( + working_df[treatment_name] == 1, + 1 / np.clip(working_df["propensity"], clip_propensity, 1 - clip_propensity), + 1 / np.clip(1 - working_df["propensity"], clip_propensity, 1 - clip_propensity), + ) + + kendall_tau_values = [] + + for N in N_values: + iter_df = working_df.copy() + + try: + # Ensure enough unique values for binning + unique_ites = np.unique(iter_df["estimated_ITE"]) + if len(unique_ites) < N: + continue + + # Create bins + iter_df["ITE_bin"] = pd.qcut( + iter_df["estimated_ITE"], q=N, labels=False, duplicates="drop" + ) + + # Compute bin statistics + bin_stats = [] + for bin_idx in iter_df["ITE_bin"].unique(): + bin_data = iter_df[iter_df["ITE_bin"] == bin_idx] + + # Skip if bin is too small + if len(bin_data) < 2: + continue + + naive_est = compute_naive_estimate(bin_data, treatment_name, outcome_name) + + # Only compute average ITE if weights are valid + bin_weights = bin_data["weights"].values + if bin_weights.sum() > 0 and not np.isnan(naive_est): + try: + avg_est_ite = np.average(bin_data["estimated_ITE"], weights=bin_weights) + bin_stats.append( + { + "ITE_bin": bin_idx, + "naive_estimate": naive_est, + "average_estimated_ITE": avg_est_ite, + } + ) + except ZeroDivisionError: + continue + + # Calculate Kendall's Tau if we have enough valid bins + bin_stats_df = pd.DataFrame(bin_stats) + if len(bin_stats_df) >= 2: + tau, _ = kendalltau( + bin_stats_df["naive_estimate"], + bin_stats_df["average_estimated_ITE"], + ) + if not np.isnan(tau): + kendall_tau_values.append(tau) + + except (ValueError, ZeroDivisionError): + continue + + # Return final score + if len(kendall_tau_values) == 0: + return -np.inf # Return -inf for failed computations + + # top_3_taus = sorted(kendall_tau_values, reverse=True)[:3] + return np.mean(kendall_tau_values) + + +def compute_naive_estimate( + group_data: pd.DataFrame, treatment_name: str, outcome_name: str +) -> float: + """Compute naive estimate for a group with safeguards against edge cases.""" + treated = group_data[group_data[treatment_name] == 1] + control = group_data[group_data[treatment_name] == 0] + + if len(treated) == 0 or len(control) == 0: + return np.nan + + treated_weights = treated["weights"].values + control_weights = control["weights"].values + + # Check if weights sum to 0 or if all weights are 0 + if ( + treated_weights.sum() == 0 + or control_weights.sum() == 0 + or not (treated_weights > 0).any() + or not (control_weights > 0).any() + ): + return np.nan + + # Weighted averages with explicit handling of edge cases + try: + y1 = np.average(treated[outcome_name], weights=treated_weights) + y0 = np.average(control[outcome_name], weights=control_weights) + return y1 - y0 + except ZeroDivisionError: + return np.nan + + +def exponential_spacing(start, end, num_points): + """ + Generate approximately exponentially spaced integers between start and end. + + Parameters: + start (int): The starting value. + end (int): The ending value. + num_points (int): Number of integers to generate. + + Returns: + list: A list of approximately exponentially spaced integers. + """ + # Use a logarithmic scale for exponential spacing + log_start = np.log(start) + log_end = np.log(end) + log_space = np.linspace(log_start, log_end, num_points) + + # Exponentiate back and round to nearest integers + spaced_integers = np.round(np.exp(log_space)).astype(int) + + # Ensure unique integers + return list(np.unique(spaced_integers)) diff --git a/causaltune/score/frobenius.py b/causaltune/score/frobenius.py new file mode 100644 index 0000000..e69de29 diff --git a/causaltune/score/scoring.py b/causaltune/score/scoring.py index 9aa5cd1..67009c1 100644 --- a/causaltune/score/scoring.py +++ b/causaltune/score/scoring.py @@ -14,17 +14,17 @@ from causaltune.score.thompson import thompson_policy, extract_means_stds from causaltune.thirdparty.causalml import metrics from causaltune.score.erupt import ERUPT +from .bite import bite from causaltune.utils import treatment_values, psw_joint_weights import dcor from scipy.spatial import distance from sklearn.neighbors import NearestNeighbors - -from scipy.stats import kendalltau - from sklearn.preprocessing import StandardScaler +logger = logging.getLogger(__name__) + class DummyEstimator: def __init__(self, cate_estimate: np.ndarray, effect_intervals: Optional[np.ndarray] = None): @@ -93,7 +93,7 @@ def __init__( Access methods and attributes via `CausalTune.scorer`. """ - + logger.info("Initializing Scorer") self.problem = problem self.multivalue = multivalue self.causal_model = copy.deepcopy(causal_model) @@ -142,6 +142,26 @@ def __init__( + self.psw_estimator._observed_common_causes_names, ) + def inverse_propensity_score(self, df: pd.DataFrame, clip: float = 0.05) -> np.ndarray: + """ + Calculate the inverse propensity score weights for the given dataframe. + + Args: + df (pandas.DataFrame): input dataframe + clip (float): clipping value for propensity scores + """ + + propensity_model = self.psw_estimator.estimator.propensity_model + p = propensity_model.predict_proba( + df[self.causal_model.get_effect_modifiers() + self.causal_model.get_common_causes()] + ) + treatment = df[self.psw_estimator._treatment_name].values + ex_ante_p = p[np.arange(p.shape[0]), treatment] + + psw = 1.0 / np.clip(ex_ante_p, clip, 1 - clip) + + return psw + def ate(self, df: pd.DataFrame) -> tuple: """ Calculate the Average Treatment Effect. Provide naive std estimates in @@ -308,6 +328,7 @@ def frobenius_norm_score( # Get data splits and check validity Y0X, treatment_name, split_test_by = self._Y0_X_potential_outcomes(estimate, df) + Y0X_1 = Y0X[Y0X[split_test_by] == 1] Y0X_0 = Y0X[Y0X[split_test_by] == 0] @@ -320,8 +341,8 @@ def frobenius_norm_score( # Normalize features select_cols = estimate.estimator._effect_modifier_names + ["yhat"] scaler = StandardScaler() - Y0X_1_normalized = scaler.fit_transform(Y0X_1[select_cols]) - Y0X_0_normalized = scaler.transform(Y0X_0[select_cols]) + Y0X_0_normalized = scaler.fit_transform(Y0X_0[select_cols]) + Y0X_1_normalized = scaler.transform(Y0X_1[select_cols]) # Calculate pairwise differences differences_xy = Y0X_1_normalized[:, np.newaxis, :] - Y0X_0_normalized[np.newaxis, :, :] @@ -906,7 +927,7 @@ def codec_score(estimate: CausalEstimate, df: pd.DataFrame) -> float: if standard_deviations < 0.01: return np.inf - return Scorer.codec(Y, Z, X) + return abs(Scorer.codec(Y, Z, X)) @staticmethod def auc_make_score( @@ -924,7 +945,7 @@ def auc_make_score( float: area under the uplift curve """ - + print("running auuc_score") est = estimate.estimator new_df = pd.DataFrame() new_df["y"] = df[est._outcome_name] @@ -1041,8 +1062,6 @@ def bite_score( Returns: float: The BITE score. Higher values indicate better model performance. """ - if N_values is None: - N_values = list(range(10, 21)) + list(range(25, 51, 5)) + list(range(60, 101, 10)) est = estimate.estimator treatment_name = est._treatment_name @@ -1068,102 +1087,9 @@ def bite_score( else: raise ValueError("Propensity model is not available.") - # Calculate weights with clipping to avoid extremes - working_df["weights"] = np.where( - working_df[treatment_name] == 1, - 1 / np.clip(working_df["propensity"], 0.05, 0.95), - 1 / np.clip(1 - working_df["propensity"], 0.05, 0.95), - ) - - kendall_tau_values = [] - - def compute_naive_estimate(group_data): - """Compute naive estimate for a group with safeguards against edge cases.""" - treated = group_data[group_data[treatment_name] == 1] - control = group_data[group_data[treatment_name] == 0] - - if len(treated) == 0 or len(control) == 0: - return np.nan - - treated_weights = treated["weights"].values - control_weights = control["weights"].values - - # Check if weights sum to 0 or if all weights are 0 - if ( - treated_weights.sum() == 0 - or control_weights.sum() == 0 - or not (treated_weights > 0).any() - or not (control_weights > 0).any() - ): - return np.nan - - # Weighted averages with explicit handling of edge cases - try: - y1 = np.average(treated[outcome_name], weights=treated_weights) - y0 = np.average(control[outcome_name], weights=control_weights) - return y1 - y0 - except ZeroDivisionError: - return np.nan - - for N in N_values: - iter_df = working_df.copy() - - try: - # Ensure enough unique values for binning - unique_ites = np.unique(iter_df["estimated_ITE"]) - if len(unique_ites) < N: - continue - - # Create bins - iter_df["ITE_bin"] = pd.qcut( - iter_df["estimated_ITE"], q=N, labels=False, duplicates="drop" - ) - - # Compute bin statistics - bin_stats = [] - for bin_idx in iter_df["ITE_bin"].unique(): - bin_data = iter_df[iter_df["ITE_bin"] == bin_idx] - - # Skip if bin is too small - if len(bin_data) < 2: - continue - - naive_est = compute_naive_estimate(bin_data) - - # Only compute average ITE if weights are valid - bin_weights = bin_data["weights"].values - if bin_weights.sum() > 0 and not np.isnan(naive_est): - try: - avg_est_ite = np.average(bin_data["estimated_ITE"], weights=bin_weights) - bin_stats.append( - { - "ITE_bin": bin_idx, - "naive_estimate": naive_est, - "average_estimated_ITE": avg_est_ite, - } - ) - except ZeroDivisionError: - continue - - # Calculate Kendall's Tau if we have enough valid bins - bin_stats_df = pd.DataFrame(bin_stats) - if len(bin_stats_df) >= 2: - tau, _ = kendalltau( - bin_stats_df["naive_estimate"], - bin_stats_df["average_estimated_ITE"], - ) - if not np.isnan(tau): - kendall_tau_values.append(tau) - - except (ValueError, ZeroDivisionError): - continue - - # Return final score - if len(kendall_tau_values) == 0: - return -np.inf # Return -inf for failed computations - - top_3_taus = sorted(kendall_tau_values, reverse=True)[:3] - return np.mean(top_3_taus) + # Calculate the BITE score + bite_score = bite(working_df, treatment_name, outcome_name) + return bite_score def make_scores( self, diff --git a/notebooks/RunExperiments/cluster_config.yaml b/notebooks/RunExperiments/cluster_config.yaml index 9e4dbbf..80d602d 100644 --- a/notebooks/RunExperiments/cluster_config.yaml +++ b/notebooks/RunExperiments/cluster_config.yaml @@ -6,7 +6,7 @@ cluster_name: default # The maximum number of workers nodes to launch in addition to the head # node. -max_workers: 2 +max_workers: 8 # The autoscaler will scale up the cluster faster with higher upscaling speed. # E.g., if the task requires adding more nodes then autoscaler will gradually @@ -41,7 +41,7 @@ idle_timeout_minutes: 5 provider: type: gcp region: us-west1 - availability_zone: us-west1-a + availability_zone: us-west1-b project_id: motleys # Globally unique project id # How Ray will authenticate with newly launched nodes. @@ -60,7 +60,7 @@ auth: available_node_types: ray_head_default: # The resources provided by this node type. - resources: {"CPU": 2} + resources: {"CPU": 0} # Provider-specific config for the head node, e.g. instance type. By default # Ray will auto-configure unspecified fields such as subnets and ssh-keys. # For more documentation on available fields, see: @@ -93,7 +93,7 @@ available_node_types: min_workers: 1 # The maximum number of worker nodes of this type to launch. # This takes precedence over min_workers. - max_workers: 5 + max_workers: 8 # The resources provided by this node type. resources: {"CPU": 2} # Provider-specific config for the head node, e.g. instance type. By default @@ -101,7 +101,7 @@ available_node_types: # For more documentation on available fields, see: # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert node_config: - machineType: n1-standard-2 + machineType: n2-highmem-2 disks: - boot: true autoDelete: true @@ -161,7 +161,7 @@ initialization_commands: [] # List of shell commands to run to set up nodes. setup_commands: - - pip install causaltune catboost + - pip install causaltune catboost ray[tune] flaml[blendsearch] # Note: if you're developing Ray, you probably want to create a Docker image that # has your Ray repo pre-cloned. Then, you can replace the pip installs diff --git a/notebooks/RunExperiments/runners/experiment_plots.py b/notebooks/RunExperiments/runners/experiment_plots.py new file mode 100644 index 0000000..98617c5 --- /dev/null +++ b/notebooks/RunExperiments/runners/experiment_plots.py @@ -0,0 +1,349 @@ +import glob +import os +import pickle +from typing import Union, List + +import matplotlib +import numpy as np + +import pandas as pd +from matplotlib import pyplot as plt + +from causaltune.score.scoring import metrics_to_minimize, supported_metrics + + +def extract_metrics_datasets(out_dir: str): + metrics = set() + datasets = set() + + for file in glob.glob(f"{out_dir}/*.pkl"): + parts = os.path.basename(file).split("-") + metrics.add(parts[0]) + datasets.add(parts[-1].replace(".pkl", "").replace("_", " ")) + + return sorted(list(metrics)), sorted(list(datasets)) + + +def make_filename(metric, dataset, i_run): + return f"{metric}-run-{i_run}-{dataset.replace(' ', '_')}.pkl" + + +def get_all_test_scores(out_dir, dataset_name): + size, ds_type, case = dataset_name.split(" ") + all_scores = [] + for file in glob.glob(f"{out_dir}/*_{ds_type}_{case}.pkl"): + with open(file, "rb") as f: + results = pickle.load(f) + for x in results["all_scores"]: + all_scores.append( + {k: v for k, v in x["test"]["scores"].items() if k not in ["values"]} + ) + out = pd.DataFrame(all_scores) + return out + + +def generate_plots( + out_dir: str, + log_scale: Union[List[str], None] = None, + upper_bounds: Union[dict, None] = None, + lower_bounds: Union[dict, None] = None, + font_size=0, +): + if log_scale is None: + log_scale = ["energy_distance", "psw_energy_distance", "frobenius_norm"] + if upper_bounds is None: + upper_bounds = {} # Use an empty dictionary if None + if lower_bounds is None: + lower_bounds = {} # Use an empty dictionary if None + + metrics, datasets = extract_metrics_datasets(out_dir) + # Remove 'ate' from metrics + metrics = [m for m in metrics if m.lower() not in ["ate", "norm_erupt"]] + + metric_names = { + "psw_frobenius_norm": "PSW\nFrobenius\nNorm", + "frobenius_norm": "Frobenius\nNorm", + "erupt": "ERUPT", + "codec": "CODEC", + "auc": "AUC", + "qini": "Qini", + "bite": "BITE", + "policy_risk": "Policy\nRisk", + "energy_distance": "Energy\nDistance", + "psw_energy_distance": "Energy\nDistance", + "norm_erupt": "Normalized\nERUPT", + } + + colors = ( + [matplotlib.colors.CSS4_COLORS["black"]] + + list(matplotlib.colors.TABLEAU_COLORS) + + [ + matplotlib.colors.CSS4_COLORS["lime"], + matplotlib.colors.CSS4_COLORS["yellow"], + matplotlib.colors.CSS4_COLORS["pink"], + ] + ) + markers = ["o", "s", "D", "^", "v", "<", ">", "P", "*", "h", "X", "|", "_", "8"] + + # Determine the problem type from the dataset name + problem = "iv" if any("IV" in dataset for dataset in datasets) else "backdoor" + + def plot_grid(title): + # Use determined problem type instead of hardcoding "backdoor" + # files = os.listdir(out_dir) + all_metrics = metrics # sorted(list(set([f.split("-")[0] for f in files]))) + if "psw_energy_distance" in all_metrics and "energy_distance" in all_metrics: + all_metrics.remove("energy_distance") + + fig, axs = plt.subplots( + len(all_metrics), len(datasets), figsize=(20, 5 * len(all_metrics)), dpi=300 + ) + + if len(all_metrics) == 1 and len(datasets) == 1: + axs = np.array([[axs]]) + elif len(all_metrics) == 1 or len(datasets) == 1: + axs = axs.reshape(-1, 1) if len(datasets) == 1 else axs.reshape(1, -1) + + # For multiple metrics in args.metrics, use the first one that has a results file + results_files = {} + for dataset in datasets: + for metric in all_metrics: + filename = make_filename(metric, dataset, 1) + filepath = os.path.join(out_dir, filename) + if os.path.exists(filepath): + results_files[dataset] = filepath + break + if dataset not in results_files: + print(f"No results file found for dataset {dataset}") + + for j, dataset in enumerate(datasets): + if dataset not in results_files: + continue + + with open(results_files[dataset], "rb") as f: + results = pickle.load(f) + + print(f"Loading results for Dataset: {dataset}") + + for i, metric in enumerate(all_metrics): + ax = axs[i, j] + + try: + # Find best estimator for this metric + best_estimator = None + best_score = float("inf") if metric in metrics_to_minimize() else float("-inf") + estimator_name = None + + for score in results["all_scores"]: + if "test" in score and metric in score["test"]["scores"]: + current_score = score["test"]["scores"][metric] + if metric in metrics_to_minimize(): + if current_score < best_score: + best_score = current_score + best_estimator = score + estimator_name = score["test"]["scores"]["estimator_name"] + else: + if current_score > best_score: + best_score = current_score + best_estimator = score + estimator_name = score["test"]["scores"]["estimator_name"] + + if best_estimator: + CATE_gt = np.array(best_estimator["test"]["CATE_groundtruth"]).flatten() + CATE_est = np.array(best_estimator["test"]["CATE_estimate"]).flatten() + + # Plotting + ax.scatter(CATE_gt, CATE_est, s=40, alpha=0.5) + ax.plot( + [min(CATE_gt), max(CATE_gt)], + [min(CATE_gt), max(CATE_gt)], + "k-", + linewidth=1.0, + ) + + # Calculate correlation coefficient + corr = np.corrcoef(CATE_gt, CATE_est)[0, 1] + + # Add correlation + ax.text( + 0.05, + 0.95, + f"Corr: {corr:.2f}", + transform=ax.transAxes, + verticalalignment="top", + fontsize=font_size + 12, + fontweight="bold", + ) + + # Add estimator name at bottom center + if estimator_name: + estimator_base = estimator_name.split(".")[-1] + ax.text( + 0.5, + 0.02, + estimator_base, + transform=ax.transAxes, + horizontalalignment="center", + color="blue", + fontsize=font_size + 10, + ) + + except Exception as e: + print(f"Error processing metric {metric} for dataset {dataset}: {e}") + ax.text( + 0.5, + 0.5, + "Error processing data", + ha="center", + va="center", + fontsize=font_size + 12, + ) + + if j == 0: + # Create tight layout for ylabel + ax.set_ylabel( + metric_names.get(metric, metric), + fontsize=font_size + 12, + fontweight="bold", + labelpad=5, # Reduce padding between label and plot + ) + if i == 0: + ax.set_title(dataset, fontsize=font_size + 14, fontweight="bold", pad=15) + ax.set_xticks([]) + ax.set_yticks([]) + + plt.suptitle( + f"Estimated CATEs vs. True CATEs: {title}", + fontsize=font_size + 18, + fontweight="bold", + ) + # Adjust spacing between subplots + plt.tight_layout(rect=[0.1, 0, 1, 0.96], h_pad=1.0, w_pad=0.5) + plt.savefig(os.path.join(out_dir, "CATE_grid.pdf"), format="pdf", bbox_inches="tight") + plt.savefig(os.path.join(out_dir, "CATE_grid.png"), format="png", bbox_inches="tight") + plt.close() + + def plot_mse_grid(title): + df = get_all_test_scores(out_dir, datasets[0]) + est_names = sorted(df["estimator_name"].unique()) + + # Problem type already determined at top level + all_metrics = [ + c + for c in df.columns + if c in supported_metrics(problem, False, False) + and c.lower() not in ["ate", "norm_erupt"] + ] + + if "psw_energy_distance" in all_metrics: + all_metrics.remove("energy_distance") + + fig, axs = plt.subplots( + len(all_metrics), len(datasets), figsize=(20, 5 * len(all_metrics)), dpi=300 + ) + + # Handle single plot cases + if len(all_metrics) == 1 and len(datasets) == 1: + axs = np.array([[axs]]) + elif len(all_metrics) == 1 or len(datasets) == 1: + axs = axs.reshape(-1, 1) if len(datasets) == 1 else axs.reshape(1, -1) + + legend_elements = [] + for j, dataset in enumerate(datasets): + df = get_all_test_scores(out_dir, dataset) + # Apply bounds filtering + for m, value in upper_bounds.items(): + if m in df.columns: + df = df[df[m] < value].copy() + for m, value in lower_bounds.items(): + if m in df.columns: + df = df[df[m] > value].copy() + + for i, metric in enumerate(all_metrics): + ax = axs[i, j] + this_df = df[["estimator_name", metric, "MSE"]].dropna() + this_df = this_df[~np.isinf(this_df[metric].values)] + + if len(this_df): + for idx, est_name in enumerate(est_names): + df_slice = this_df[this_df["estimator_name"] == est_name] + if "Dummy" not in est_name and len(df_slice): + marker = markers[idx % len(markers)] + ax.scatter( + df_slice["MSE"], + df_slice[metric], + color=colors[idx], + s=50, + marker=marker, + linewidths=0.5, + ) + if metric not in metrics_to_minimize(): + ax.invert_yaxis() + + trimmed_est_name = est_name.split(".")[-1] + if i == 0 and j == 0: + legend_elements.append( + plt.Line2D( + [0], + [0], + color=colors[idx], + marker=marker, + label=trimmed_est_name, + linestyle="None", + markersize=6, + ) + ) + + ax.set_xscale("log") + if metric in log_scale: + ax.set_yscale("log") + ax.grid(True) + else: + ax.text( + 0.5, + 0.5, + "No data", + ha="center", + va="center", + fontsize=font_size + 12, + ) + + if j == 0: + # Match ylabel style with plot_grid + ax.set_ylabel( + metric_names.get(metric, metric), + fontsize=font_size + 12, + fontweight="bold", + labelpad=5, + ) + if i == 0: + ax.set_title(dataset, fontsize=font_size + 14, fontweight="bold", pad=15) + + plt.suptitle( + f"MSE vs. Scores: {title}", + fontsize=font_size + 18, + fontweight="bold", + ) + + # # Match spacing style with plot_grid + # plt.tight_layout(rect=[0.1, 0, 1, 0.96], h_pad=1.0, w_pad=0.5) + # + # fig_legend, ax_legend = plt.subplots(figsize=(6, 6)) + # ax_legend.legend(handles=legend_elements, loc="center", fontsize=10) + # ax_legend.axis("off") + + plt.savefig(os.path.join(out_dir, "MSE_grid.pdf"), format="pdf", bbox_inches="tight") + plt.savefig(os.path.join(out_dir, "MSE_grid.png"), format="png", bbox_inches="tight") + plt.close() + + # # Create separate legend + # fig_legend, ax_legend = plt.subplots(figsize=(6, 6)) + # ax_legend.legend(handles=legend_elements, loc="center", fontsize=10) + # ax_legend.axis("off") + # plt.savefig(os.path.join(out_dir, "MSE_legend.pdf"), format="pdf", bbox_inches="tight") + # plt.savefig(os.path.join(out_dir, "MSE_legend.png"), format="png", bbox_inches="tight") + # plt.close() + + # Generate plots + plot_grid("Experiment Results") + plot_mse_grid("Experiment Results") diff --git a/notebooks/RunExperiments/runners/experiment_runner.py b/notebooks/RunExperiments/runners/experiment_runner.py index 3d774e0..4ebb081 100644 --- a/notebooks/RunExperiments/runners/experiment_runner.py +++ b/notebooks/RunExperiments/runners/experiment_runner.py @@ -1,36 +1,42 @@ import argparse import copy -import glob import os import pickle import sys +import time import warnings -from datetime import datetime -from typing import List, Union +from typing import List, Optional -import matplotlib -import matplotlib.pyplot as plt import numpy as np -import pandas as pd +import ray from sklearn.model_selection import train_test_split + +sys.path.insert(0, os.getcwd()) +import causaltune # noqa: E402 + +from causaltune import CausalTune +from causaltune.data_utils import CausalityDataset +from causaltune.models.passthrough import passthrough_model +from experiment_plots import make_filename + # Ensure CausalTune is in the Python path root_path = os.path.realpath("../../../../..") # noqa: E402 sys.path.append(os.path.join(root_path, "causaltune")) # noqa: E402 # Import CausalTune and other custom modules after setting up the path -from causaltune import CausalTune # noqa: E402 from causaltune.datasets import load_dataset # noqa: E402 -from causaltune.models.passthrough import passthrough_model # noqa: E402 from causaltune.search.params import SimpleParamService # noqa: E402 from causaltune.score.scoring import ( metrics_to_minimize, # noqa: E402 - supported_metrics, # noqa: E402 + # noqa: E402 ) # Configure warnings warnings.filterwarnings("ignore") +RAY_NAMESPACE = "causaltune_experiments" + def parse_arguments(): parser = argparse.ArgumentParser(description="Run CausalTune experiments") @@ -47,9 +53,7 @@ def parse_arguments(): help="Datasets to use (format: Size Name, e.g., Small Linear_RCT)", ) parser.add_argument("--n_runs", type=int, default=1, help="Number of runs") - parser.add_argument( - "--num_samples", type=int, default=-1, help="Maximum number of iterations" - ) + parser.add_argument("--num_samples", type=int, default=-1, help="Maximum number of iterations") parser.add_argument("--outcome_model", type=str, default="nested", help="Outcome model type") parser.add_argument( @@ -75,7 +79,14 @@ def parse_arguments(): return parser.parse_args() -def get_estimator_list(dataset_name): +def get_estimator_list( + dataset_name, + include_patterns: Optional[List[str]] = None, + exclude_patterns: Optional[List[str]] = None, +): + assert ( + include_patterns is None or exclude_patterns is None + ), "Cannot specify both include and exclude patterns" if "IV" in dataset_name: problem = "iv" else: @@ -87,10 +98,25 @@ def get_estimator_list(dataset_name): multivalue=False, ) estimator_list = cfg.estimator_names_from_patterns(problem, "all", 1001) - return [est for est in estimator_list if "Dummy" not in est] + out = [est for est in estimator_list if "Dummy" not in est] + + if include_patterns is not None: + out = [est for est in out if any(pat in est for pat in include_patterns)] + + if exclude_patterns is not None: + out = [est for est in out if not any(pat in est for pat in exclude_patterns)] -def run_experiment(args, dataset_path: str, use_ray: bool = False): + return out + + +def run_experiment( + args, + estimators: List[str], + dataset_path: str, + use_ray: bool, + propensity_automl_estimators: Optional[List[str]] = None, +): # Process datasets data_sets = {} for dataset in args.datasets: @@ -103,564 +129,306 @@ def run_experiment(args, dataset_path: str, use_ray: bool = False): name = " ".join(parts[1:]) file_path = f"{dataset_path}/{size}/{name}.pkl" data_sets[f"{size} {name}"] = load_dataset(file_path) + run_kind = dataset.split("_")[1] - if args.timestamp_in_dirname: - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - out_dir = f"EXPERIMENT_RESULTS_{timestamp}_{args.identifier}" - else: - out_dir = f"EXPERIMENT_RESULTS_{args.identifier}" - + out_dir = f"../EXPERIMENT_RESULTS_{args.identifier}" os.makedirs(out_dir, exist_ok=True) out_dir = os.path.realpath(os.path.join(out_dir, size)) os.makedirs(out_dir, exist_ok=True) print(f"Loaded datasets: {list(data_sets.keys())}") - # Set time budgets properly - if args.time_budget is not None and args.components_time_budget is not None: - raise ValueError("Please specify either time_budget or components_time_budget, not both.") - elif args.time_budget is None and args.components_time_budget is None: - args.components_time_budget = 30 # Set default components budget - - # If only time_budget is specified, derive components_time_budget from it - if args.time_budget is not None: - args.components_time_budget = max(30, args.time_budget / 4) # Ensure minimum budget - args.time_budget = None # Use only components_time_budget - - for dataset_name, cd in data_sets.items(): - # Extract case while preserving original string checking logic - if "KCKP" in dataset_name: - case = "KCKP" - elif "KC" in dataset_name: - case = "KC" - elif "IV" in dataset_name: - case = "IV" - else: - case = "RCT" - - os.makedirs(f"{out_dir}/{case}", exist_ok=True) - - for i_run in range(1, args.n_runs + 1): - cd_i = copy.deepcopy(cd) - train_df, test_df = train_test_split(cd_i.data, test_size=args.test_size) - test_df = test_df.reset_index(drop=True) - cd_i.data = train_df - + already_running = False + if use_ray: + try: + runner = ray.get_actor(f"TaskRunner {run_kind}") + print("\n" * 4) + print( + "!!! Found an existing detached TaskRunner. Will assume the tasks have already been submitted." + ) + print( + f"!!! If you want to re-run the experiments from scratch, " + 'run ray.kill(ray.get_actor("TaskRunner {run_kind}", namespace="{RAY_NAMESPACE}")) or recreate the cluster.' + ) + print("\n" * 4) + already_running = True + except ValueError: + print("Ray: no detached TaskRunner found, creating...") + # This thing will be alive even if the host program exits + # Must be killed explicitly: ray.kill(ray.get_actor(f"TaskRunner {run_kind}")) + runner = TaskRunner.options(name=f"TaskRunner {run_kind}", lifetime="detached").remote() + + out = [] + if not already_running: + tasks = [] + i_run = 1 + + for dataset_name, cd in data_sets.items(): + + # Extract case while preserving original string checking logic + if "KCKP" in dataset_name: + case = "KCKP" + elif "KC" in dataset_name: + case = "KC" + elif "IV" in dataset_name: + case = "IV" + else: + case = "RCT" + + os.makedirs(f"{out_dir}/{case}", exist_ok=True) for metric in args.metrics: - if metric == "ate": # this is not something to optimize + fn = make_filename(metric, dataset_name, i_run) + out_fn = os.path.join(out_dir, case, fn) + if os.path.isfile(out_fn): + print(f"File {out_fn} exists, skipping...") continue - - print(f"Optimizing {metric} for {dataset_name} (run {i_run})") - try: - fn = make_filename(metric, dataset_name, i_run) - out_fn = os.path.join(out_dir, case, fn) - if os.path.isfile(out_fn): - print(f"File {out_fn} exists, skipping...") - continue - - # Set propensity model using string checking like original version - if "KCKP" in dataset_name: - print(f"Using passthrough propensity model for {dataset_name}") - propensity_model = passthrough_model( - cd_i.propensity_modifiers, include_control=False + if use_ray: + tasks.append( + runner.remote_single_run.remote( + dataset_name, + cd, + metric, + args.test_size, + args.num_samples, + args.components_time_budget, + out_fn, + estimators, + propensity_automl_estimators, ) - elif "KC" in dataset_name: - print(f"Using auto propensity model for {dataset_name}") - propensity_model = "auto" - else: - print(f"Using dummy propensity model for {dataset_name}") - propensity_model = "dummy" - - ct = CausalTune( - metric=metric, - estimator_list=get_estimator_list(dataset_name), - num_samples=args.num_samples, - components_time_budget=args.components_time_budget, # Use this instead - verbose=1, - components_verbose=1, - store_all_estimators=True, - propensity_model=propensity_model, - outcome_model=args.outcome_model, - use_ray=use_ray, ) - - ct.fit( - data=cd_i, - treatment="treatment", - outcome="outcome", + else: + results = single_run( + dataset_name, + cd, + metric, + args.test_size, + args.num_samples, + args.components_time_budget, + out_fn, + estimators, + propensity_automl_estimators, ) + out.append(results) + + if use_ray: + while True: + completed, in_progress = ray.get(runner.get_progress.remote()) + print(f"Ray: {completed}/{completed + in_progress} tasks completed") + if not in_progress: + print("Ray: all tasks completed!") + break + time.sleep(10) - # Compute scores and save results - results = compute_scores(ct, metric, test_df) + print("Ray: fetching results...") + out = ray.get(runner.get_results.remote()) - with open(out_fn, "wb") as f: - pickle.dump(results, f) - except Exception as e: - print(f"Error processing {dataset_name}_{metric}_{i_run}: {e}") + for out_fn, results in out: + with open(out_fn, "wb") as f: + pickle.dump(results, f) + + if use_ray: + destroy = input("Ray: seems like the results fetched OK. Destroy TaskRunner? ") + if destroy.lower().startswith("y"): + print("Destroying TaskRunner... ", end="") + ray.kill(runner) + print("success!") return out_dir -def compute_scores(ct, metric, test_df): - datasets = {"train": ct.train_df, "validation": ct.test_df, "test": test_df} - estimator_scores = {est: [] for est in ct.scores.keys() if "NewDummy" not in est} +def run_batch( + identifier: str, + kind: str, + metrics: List[str], + estimators: List[str], + dataset_path: str, + use_ray: bool = False, + propensity_automl_estimators: Optional[List[str]] = None, +): + args = parse_arguments() + args.identifier = identifier + args.metrics = metrics + # run_experiment assumes we don't mix large and small datasets in the same call + args.datasets = [f"Large Linear_{kind}", f"Large NonLinear_{kind}"] + args.num_samples = 100 + args.timestamp_in_dirname = False + args.outcome_model = "auto" # or use "nested" for the old-style nested model + args.components_time_budget = 120 - all_scores = [] - for trial in ct.results.trials: - try: - estimator_name = trial.last_result["estimator_name"] - if "estimator" in trial.last_result and trial.last_result["estimator"]: - estimator = trial.last_result["estimator"] - scores = {} - for ds_name, df in datasets.items(): - scores[ds_name] = {} - est_scores = ct.scorer.make_scores( - estimator, - df, - metrics_to_report=ct.metrics_to_report, - ) - est_scores["estimator_name"] = estimator_name + if use_ray: + import ray - scores[ds_name]["CATE_estimate"] = np.squeeze(estimator.estimator.effect(df)) - scores[ds_name]["CATE_groundtruth"] = np.squeeze(df["true_effect"]) - est_scores["MSE"] = np.mean( - (scores[ds_name]["CATE_estimate"] - scores[ds_name]["CATE_groundtruth"]) - ** 2 - ) - scores[ds_name]["scores"] = est_scores - scores["optimization_score"] = trial.last_result.get("optimization_score") - estimator_scores[estimator_name].append(copy.deepcopy(scores)) - # Will use this in the nex - all_scores.append(scores) - except Exception as e: - print(f"Error processing trial: {e}") - - for k in estimator_scores.keys(): - estimator_scores[k] = sorted( - estimator_scores[k], - key=lambda x: x["validation"]["scores"][metric], - reverse=metric not in metrics_to_minimize(), + # Assuming we port-mapped already by running ray dashboard + ray.init( + "ray://localhost:10001", + runtime_env={ + "working_dir": ".", + "pip": ["causaltune", "catboost", "ray[tune]", "flaml[blendsearch]"], + }, + namespace=RAY_NAMESPACE, ) - # Debugging: Log final result structure - print(f"Returning scores for metric {metric}: Best estimator: {ct.best_estimator}") + out_dir = run_experiment( + args, + estimators=estimators, + dataset_path=dataset_path, + use_ray=use_ray, + propensity_automl_estimators=propensity_automl_estimators, + ) + return out_dir - return { - "best_estimator": ct.best_estimator, - "best_config": ct.best_config, - "best_score": ct.best_score, - "optimised_metric": metric, - "scores_per_estimator": estimator_scores, - "all_scores": all_scores, - } +@ray.remote +def remote_single_run(*args): + return single_run(*args) -def extract_metrics_datasets(out_dir: str): - metrics = set() - datasets = set() - for file in glob.glob(f"{out_dir}/*.pkl"): - parts = os.path.basename(file).split("-") - metrics.add(parts[0]) - datasets.add(parts[-1].replace(".pkl", "").replace("_", " ")) +@ray.remote +class TaskRunner: + def __init__(self): + self.futures = {} - return sorted(list(metrics)), sorted(list(datasets)) + def remote_single_run(self, *args, **kwargs): + ref = remote_single_run.remote(*args, **kwargs) + self.futures[ref.hex()] = ref + return ref.hex() + def get_results(self): + return ray.get(list(self.futures.values())) -def make_filename(metric, dataset, i_run): - return f"{metric}-run-{i_run}-{dataset.replace(' ', '_')}.pkl" + def get_single_result(self, ref_hex): + return ray.get(self.futures[ref_hex]) + def is_ready(self, ref_hex): + ready, _ = ray.wait([self.futures[ref_hex]], timeout=0, fetch_local=False) + return bool(ready) -def get_all_test_scores(out_dir, dataset_name): - size, ds_type, case = dataset_name.split(" ") - all_scores = [] - for file in glob.glob(f"{out_dir}/*_{ds_type}_{case}.pkl"): - with open(file, "rb") as f: - results = pickle.load(f) - for x in results["all_scores"]: - all_scores.append( - {k: v for k, v in x["test"]["scores"].items() if k not in ["values"]} - ) - out = pd.DataFrame(all_scores) - return out - + def all_tasks_ready(self): + _, in_progress = ray.wait(list(self.futures.values()), timeout=0, fetch_local=False) + return not bool(in_progress) -def generate_plots( - out_dir: str, - log_scale: Union[List[str], None] = None, - upper_bounds: Union[dict, None] = None, - lower_bounds: Union[dict, None] = None, - font_size=0, + def get_progress(self): + completed, in_progress = ray.wait( + list(self.futures.values()), num_returns=len(self.futures), timeout=0, fetch_local=False + ) + return len(completed), len(in_progress) + + +def single_run( + dataset_name: str, + cd: CausalityDataset, + metric: str, + test_size: float, + num_samples: int, + components_time_budget: int, + out_fn: str, + estimators: List[str], + propensity_automl_estimators: Optional[List[str]] = None, + outcome_model: str = "auto", + i_run: int = 1, ): - if log_scale is None: - log_scale = ["energy_distance", "psw_energy_distance", "frobenius_norm"] - if upper_bounds is None: - upper_bounds = {} # Use an empty dictionary if None - if lower_bounds is None: - lower_bounds = {} # Use an empty dictionary if None - - metrics, datasets = extract_metrics_datasets(out_dir) - # Remove 'ate' from metrics - metrics = [m for m in metrics if m.lower() != "ate"] - - metric_names = { - "psw_frobenius_norm": "PSW\nFrobenius\nNorm", - "frobenius_norm": "Frobenius\nNorm", - "erupt": "ERUPT", - "codec": "CODEC", - "auc": "AUC", - "qini": "Qini", - "bite": "BITE", - "policy_risk": "Policy\nRisk", - "energy_distance": "Energy\nDistance", - "psw_energy_distance": "PSW\nEnergy\nDistance", - "norm_erupt": "Normalized\nERUPT", - } - - colors = ( - [matplotlib.colors.CSS4_COLORS["black"]] - + list(matplotlib.colors.TABLEAU_COLORS) - + [ - matplotlib.colors.CSS4_COLORS["lime"], - matplotlib.colors.CSS4_COLORS["yellow"], - matplotlib.colors.CSS4_COLORS["pink"], - ] - ) - markers = ["o", "s", "D", "^", "v", "<", ">", "P", "*", "h", "X", "|", "_", "8"] - # Determine the problem type from the dataset name - problem = "iv" if any("IV" in dataset for dataset in datasets) else "backdoor" + cd_i = copy.deepcopy(cd) + train_df, test_df = train_test_split(cd_i.data, test_size=test_size) + test_df = test_df.reset_index(drop=True) + cd_i.data = train_df + print(f"Optimizing {metric} for {dataset_name} (run {i_run})") + try: - def plot_grid(title): - # Use determined problem type instead of hardcoding "backdoor" - all_metrics = [ - m - for m in supported_metrics(problem, False, False) - if m.lower() != "ate" and m.lower() != "norm_erupt" - ] - - fig, axs = plt.subplots( - len(all_metrics), len(datasets), figsize=(20, 5 * len(all_metrics)), dpi=300 + # Set propensity model using string checking like original version + if "KCKP" in dataset_name: + print(f"Using passthrough propensity model for {dataset_name}") + propensity_model = passthrough_model(cd_i.propensity_modifiers, include_control=False) + elif "KC" in dataset_name: + print(f"Using auto propensity model for {dataset_name}") + propensity_model = "auto" + else: + print(f"Using dummy propensity model for {dataset_name}") + propensity_model = "dummy" + + ct = CausalTune( + metric=metric, + estimator_list=estimators, + num_samples=num_samples, + components_time_budget=components_time_budget, # Use this instead + verbose=1, + components_verbose=1, + store_all_estimators=True, + propensity_model=propensity_model, + outcome_model=outcome_model, + propensity_automl_estimators=propensity_automl_estimators, + use_ray=False, ) - if len(all_metrics) == 1 and len(datasets) == 1: - axs = np.array([[axs]]) - elif len(all_metrics) == 1 or len(datasets) == 1: - axs = axs.reshape(-1, 1) if len(datasets) == 1 else axs.reshape(1, -1) - - # For multiple metrics in args.metrics, use the first one that has a results file - results_files = {} - for dataset in datasets: - for metric in args.metrics: - filename = make_filename(metric, dataset, 1) - filepath = os.path.join(out_dir, filename) - if os.path.exists(filepath): - results_files[dataset] = filepath - break - if dataset not in results_files: - print(f"No results file found for dataset {dataset}") - - for j, dataset in enumerate(datasets): - if dataset not in results_files: - continue - - with open(results_files[dataset], "rb") as f: - results = pickle.load(f) + ct.fit( + data=cd_i, + treatment="treatment", + outcome="outcome", + ) - print(f"Loading results for Dataset: {dataset}") + # Embedding this so it ships well to Ray remotes - for i, metric in enumerate(all_metrics): - ax = axs[i, j] + def compute_scores(ct, metric, test_df): + datasets = {"train": ct.train_df, "validation": ct.test_df, "test": test_df} + estimator_scores = {est: [] for est in ct.scores.keys() if "NewDummy" not in est} + all_scores = [] + for trial in ct.results.trials: try: - # Find best estimator for this metric - best_estimator = None - best_score = ( - float("inf") - if metric in metrics_to_minimize() - else float("-inf") - ) - estimator_name = None - - for score in results["all_scores"]: - if "test" in score and metric in score["test"]["scores"]: - current_score = score["test"]["scores"][metric] - if metric in metrics_to_minimize(): - if current_score < best_score: - best_score = current_score - best_estimator = score - estimator_name = score["test"]["scores"][ - "estimator_name" - ] - else: - if current_score > best_score: - best_score = current_score - best_estimator = score - estimator_name = score["test"]["scores"][ - "estimator_name" - ] - - if best_estimator: - CATE_gt = np.array( - best_estimator["test"]["CATE_groundtruth"] - ).flatten() - CATE_est = np.array( - best_estimator["test"]["CATE_estimate"] - ).flatten() - - # Plotting - ax.scatter(CATE_gt, CATE_est, s=40, alpha=0.5) - ax.plot( - [min(CATE_gt), max(CATE_gt)], - [min(CATE_gt), max(CATE_gt)], - "k-", - linewidth=1.0, - ) - - # Calculate correlation coefficient - corr = np.corrcoef(CATE_gt, CATE_est)[0, 1] - - # Add correlation - ax.text( - 0.05, - 0.95, - f"Corr: {corr:.2f}", - transform=ax.transAxes, - verticalalignment="top", - fontsize=font_size + 12, - fontweight="bold", - ) - - # Add estimator name at bottom center - if estimator_name: - estimator_base = estimator_name.split(".")[-1] - ax.text( - 0.5, - 0.02, - estimator_base, - transform=ax.transAxes, - horizontalalignment="center", - color="blue", - fontsize=font_size + 10, + estimator_name = trial.last_result["estimator_name"] + if "estimator" in trial.last_result and trial.last_result["estimator"]: + estimator = trial.last_result["estimator"] + scores = {} + for ds_name, df in datasets.items(): + scores[ds_name] = {} + est_scores = ct.scorer.make_scores( + estimator, + df, + metrics_to_report=ct.metrics_to_report, ) + est_scores["estimator_name"] = estimator_name - except Exception as e: - print( - f"Error processing metric {metric} for dataset {dataset}: {e}" - ) - ax.text( - 0.5, - 0.5, - "Error processing data", - ha="center", - va="center", - fontsize=font_size + 12, - ) - - if j == 0: - # Create tight layout for ylabel - ax.set_ylabel( - metric_names.get(metric, metric), - fontsize=font_size + 12, - fontweight="bold", - labelpad=5, # Reduce padding between label and plot - ) - if i == 0: - ax.set_title( - dataset, fontsize=font_size + 14, fontweight="bold", pad=15 - ) - ax.set_xticks([]) - ax.set_yticks([]) - - plt.suptitle( - f"Estimated CATEs vs. True CATEs: {title}", - fontsize=font_size + 18, - fontweight="bold", - ) - # Adjust spacing between subplots - plt.tight_layout(rect=[0.1, 0, 1, 0.96], h_pad=1.0, w_pad=0.5) - plt.savefig(os.path.join(out_dir, "CATE_grid.pdf"), format="pdf", bbox_inches="tight") - plt.savefig(os.path.join(out_dir, "CATE_grid.png"), format="png", bbox_inches="tight") - plt.close() - - def plot_mse_grid(title): - df = get_all_test_scores(out_dir, datasets[0]) - est_names = sorted(df["estimator_name"].unique()) - - # Problem type already determined at top level - all_metrics = [c for c in df.columns if c in supported_metrics(problem, False, False)and c.lower() != "ate" - ] - - fig, axs = plt.subplots( - len(all_metrics), len(datasets), figsize=(20, 5 * len(all_metrics)), dpi=300 - ) - - # Handle single plot cases - if len(all_metrics) == 1 and len(datasets) == 1: - axs = np.array([[axs]]) - elif len(all_metrics) == 1 or len(datasets) == 1: - axs = axs.reshape(-1, 1) if len(datasets) == 1 else axs.reshape(1, -1) - - legend_elements = [] - for j, dataset in enumerate(datasets): - df = get_all_test_scores(out_dir, dataset) - # Apply bounds filtering - for m, value in upper_bounds.items(): - if m in df.columns: - df = df[df[m] < value].copy() - for m, value in lower_bounds.items(): - if m in df.columns: - df = df[df[m] > value].copy() - - for i, metric in enumerate(all_metrics): - ax = axs[i, j] - this_df = df[["estimator_name", metric, "MSE"]].dropna() - this_df = this_df[~np.isinf(this_df[metric].values)] - - if len(this_df): - for idx, est_name in enumerate(est_names): - df_slice = this_df[this_df["estimator_name"] == est_name] - if "Dummy" not in est_name and len(df_slice): - marker = markers[idx % len(markers)] - ax.scatter( - df_slice["MSE"], - df_slice[metric], - color=colors[idx], - s=50, - marker=marker, - linewidths=0.5, + scores[ds_name]["CATE_estimate"] = np.squeeze( + estimator.estimator.effect(df) ) - if metric not in metrics_to_minimize(): - ax.invert_yaxis() - - trimmed_est_name = est_name.split(".")[-1] - if i == 0 and j == 0: - legend_elements.append( - plt.Line2D( - [0], - [0], - color=colors[idx], - marker=marker, - label=trimmed_est_name, - linestyle="None", - markersize=6, - ) + scores[ds_name]["CATE_groundtruth"] = np.squeeze(df["true_effect"]) + est_scores["MSE"] = np.mean( + ( + scores[ds_name]["CATE_estimate"] + - scores[ds_name]["CATE_groundtruth"] ) + ** 2 + ) + scores[ds_name]["scores"] = est_scores + scores["optimization_score"] = trial.last_result.get("optimization_score") + estimator_scores[estimator_name].append(copy.deepcopy(scores)) + # Will use this in the nex + all_scores.append(scores) + except Exception as e: + print(f"Error processing trial: {e}") - ax.set_xscale("log") - if metric in log_scale: - ax.set_yscale("log") - ax.grid(True) - else: - ax.text( - 0.5, - 0.5, - "No data", - ha="center", - va="center", - fontsize=font_size + 12, - ) - - if j == 0: - # Match ylabel style with plot_grid - ax.set_ylabel( - metric_names.get(metric, metric), - fontsize=font_size + 12, - fontweight="bold", - labelpad=5, - ) - if i == 0: - ax.set_title( - dataset, fontsize=font_size + 14, fontweight="bold", pad=15 - ) - - plt.suptitle( - f"MSE vs. Scores: {title}", - fontsize=font_size + 18, - fontweight="bold", - ) - - # Match spacing style with plot_grid - plt.tight_layout(rect=[0.1, 0, 1, 0.96], h_pad=1.0, w_pad=0.5) - plt.savefig(os.path.join(out_dir, "MSE_grid.pdf"), format="pdf", bbox_inches="tight") - plt.savefig(os.path.join(out_dir, "MSE_grid.png"), format="png", bbox_inches="tight") - plt.close() - - # # Create separate legend - # fig_legend, ax_legend = plt.subplots(figsize=(6, 6)) - # ax_legend.legend(handles=legend_elements, loc="center", fontsize=10) - # ax_legend.axis("off") - # plt.savefig(os.path.join(out_dir, "MSE_legend.pdf"), format="pdf", bbox_inches="tight") - # plt.savefig(os.path.join(out_dir, "MSE_legend.png"), format="png", bbox_inches="tight") - # plt.close() - - # Generate plots - plot_grid("Experiment Results") - plot_mse_grid("Experiment Results") - - -def run_batch( - identifier: str, - kind: str, - metrics: List[str], - dataset_path: str, -): - args = parse_arguments() - args.identifier = identifier - args.metrics = metrics - # run_experiment assumes we don't mix large and small datasets in the same call - args.datasets = [f"Large Linear_{kind}", f"Large NonLinear_{kind}"] - args.num_samples = 100 - args.timestamp_in_dirname = False - args.outcome_model = "auto" # or use "nested" for the old-style nested model - - # os.environ["RAY_ADDRESS"] = "ray://127.0.0.1:8265" - - use_ray = True - if use_ray: - import ray - - # Assuming we port-mapped already by running ray dashboard - ray.init( - "ray://localhost:10001", runtime_env={"pip": ["causaltune", "catboost"]} - ) # "34.82.184.148:6379" - out_dir = run_experiment(args, dataset_path=dataset_path, use_ray=use_ray) - return out_dir - - -if __name__ == "__main__": - - args = parse_arguments() - args.identifier = "Egor_test" - args.metrics = supported_metrics("backdoor", False, False) - # run_experiment assumes we don't mix large and small datasets in the same call - args.datasets = ["Large Linear_RCT", "Large NonLinear_RCT"] - args.num_samples = 100 - args.timestamp_in_dirname = False - args.outcome_model = "auto" # or use "nested" for the old-style nested model + for k in estimator_scores.keys(): + estimator_scores[k] = sorted( + estimator_scores[k], + key=lambda x: x["validation"]["scores"][metric], + reverse=metric not in metrics_to_minimize(), + ) - use_ray = True - if use_ray: - import ray + # Debugging: Log final result structure + print(f"Returning scores for metric {metric}: Best estimator: {ct.best_estimator}") - ray.init() - out_dir = run_experiment(args, dataset_path="../RunDatasets", use_ray=use_ray) + return { + "best_estimator": ct.best_estimator, + "best_config": ct.best_config, + "best_score": ct.best_score, + "optimised_metric": metric, + "scores_per_estimator": estimator_scores, + "all_scores": all_scores, + } - # plot results - upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} - lower_bounds = {"erupt": 0.06, "bite": 0.75} + # Compute scores and save results + results = compute_scores(ct, metric, test_df) - # Determine case from datasets - if any("IV" in dataset for dataset in args.datasets): - case = "IV" - elif any("KC" in dataset for dataset in args.datasets): - case = "KC" - elif any("KCKP" in dataset for dataset in args.datasets): - case = "KCKP" - else: - case = "RCT" - # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} - # lower_bounds = {"erupt": 0.06, "bite": 0.75} - generate_plots( - os.path.join(out_dir, case), font_size=8 - ) # , upper_bounds=upper_bounds, lower_bounds=lower_bounds) + return out_fn, results + except Exception as e: + print(f"Error processing {dataset_name}_{metric}_{i_run}: {e}") diff --git a/notebooks/RunExperiments/runners/iv.py b/notebooks/RunExperiments/runners/iv.py index 8e975e8..4889f45 100644 --- a/notebooks/RunExperiments/runners/iv.py +++ b/notebooks/RunExperiments/runners/iv.py @@ -1,12 +1,19 @@ import os +import ray -from experiment_runner import run_batch, generate_plots +from experiment_runner import run_batch +from experiment_plots import generate_plots identifier = "Egor_test" kind = "IV" metrics = ["energy_distance", "frobenius_norm", "codec"] +use_ray = False +remote_function = ray.remote(run_batch) +calls = [] -out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +out_dir = run_batch( + identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets"), use_ray=use_ray +) # plot results # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} # lower_bounds = {"erupt": 0.06, "bite": 0.75} diff --git a/notebooks/RunExperiments/runners/kc.py b/notebooks/RunExperiments/runners/kc.py index 3168719..065333e 100644 --- a/notebooks/RunExperiments/runners/kc.py +++ b/notebooks/RunExperiments/runners/kc.py @@ -1,6 +1,7 @@ import os -from experiment_runner import run_batch, generate_plots +from experiment_runner import run_batch +from experiment_plots import generate_plots identifier = "Egor_test" kind = "KC" @@ -15,8 +16,10 @@ "codec", # NEW "bite", # NEW ] - -out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +use_ray = True +out_dir = run_batch( + identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets"), use_ray=use_ray +) # plot results # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} # lower_bounds = {"erupt": 0.06, "bite": 0.75} diff --git a/notebooks/RunExperiments/runners/kc_no_meta.py b/notebooks/RunExperiments/runners/kc_no_meta.py new file mode 100644 index 0000000..7be9466 --- /dev/null +++ b/notebooks/RunExperiments/runners/kc_no_meta.py @@ -0,0 +1,39 @@ +import os + +from experiment_runner import run_batch, get_estimator_list +from experiment_plots import generate_plots + +identifier = "Egor_test" +kind = "KC" +metrics = [ + "erupt", + # "greedy_erupt", # regular erupt was made probabilistic, + "policy_risk", # NEW + "qini", + "auc", + "psw_energy_distance", + "frobenius_norm", # NEW + "codec", # NEW + "bite", # NEW +] +estimators = get_estimator_list(kind, exclude_patterns=["SLearner", "TLearner", "XLearner"]) +ptt_estimators = [ + "lgbm", + "lrl2", +] + +use_ray = True +out_dir = run_batch( + identifier, + kind, + metrics, + estimators=estimators, + propensity_automl_estimators=ptt_estimators, + dataset_path=os.path.realpath("../RunDatasets"), + use_ray=use_ray, +) +# plot results +# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} +# lower_bounds = {"erupt": 0.06, "bite": 0.75} +generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds) +print("yay!") diff --git a/notebooks/RunExperiments/runners/kckp.py b/notebooks/RunExperiments/runners/kckp.py index 79955ca..32f5b1d 100644 --- a/notebooks/RunExperiments/runners/kckp.py +++ b/notebooks/RunExperiments/runners/kckp.py @@ -1,6 +1,7 @@ import os -from experiment_runner import run_batch, generate_plots +from experiment_runner import run_batch, get_estimator_list +from experiment_plots import generate_plots identifier = "Egor_test" kind = "KCKP" @@ -15,8 +16,16 @@ "codec", # NEW "bite", # NEW ] - -out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +use_ray = True +estimators = get_estimator_list(kind) +out_dir = run_batch( + identifier, + kind, + metrics, + estimators=estimators, + dataset_path=os.path.realpath("../RunDatasets"), + use_ray=use_ray, +) # plot results # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} # lower_bounds = {"erupt": 0.06, "bite": 0.75} diff --git a/notebooks/RunExperiments/runners/kckp_no_meta.py b/notebooks/RunExperiments/runners/kckp_no_meta.py new file mode 100644 index 0000000..13f2bd7 --- /dev/null +++ b/notebooks/RunExperiments/runners/kckp_no_meta.py @@ -0,0 +1,35 @@ +import os + +from experiment_runner import run_batch, get_estimator_list +from experiment_plots import generate_plots + +identifier = "Egor_test_no_meta" +kind = "KCKP" +metrics = [ + "erupt", + # "greedy_erupt", # regular erupt was made probabilistic, + "policy_risk", # NEW + "qini", + "auc", + "psw_energy_distance", + "frobenius_norm", # NEW + "codec", # NEW + "bite", # NEW +] + +estimators = get_estimator_list(kind, exclude_patterns=["SLearner", "TLearner", "XLearner"]) + +use_ray = True +out_dir = run_batch( + identifier, + kind, + metrics, + estimators=estimators, + dataset_path=os.path.realpath("../RunDatasets"), + use_ray=use_ray, +) +# plot results +# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} +# lower_bounds = {"erupt": 0.06, "bite": 0.75} +generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds) +print("yay!") diff --git a/notebooks/RunExperiments/runners/rct.py b/notebooks/RunExperiments/runners/rct.py index 797bc69..41e1c9c 100644 --- a/notebooks/RunExperiments/runners/rct.py +++ b/notebooks/RunExperiments/runners/rct.py @@ -1,9 +1,11 @@ import os -from experiment_runner import run_batch, generate_plots +from experiment_runner import run_batch, get_estimator_list +from experiment_plots import generate_plots identifier = "Egor_test" kind = "RCT" + metrics = [ "erupt", # "greedy_erupt", # regular erupt was made probabilistic, @@ -15,8 +17,16 @@ "codec", # NEW "bite", # NEW ] - -out_dir = run_batch(identifier, kind, metrics, dataset_path=os.path.realpath("../RunDatasets")) +estimators = get_estimator_list(kind) +use_ray = True +out_dir = run_batch( + identifier, + kind, + metrics, + estimators=estimators, + dataset_path=os.path.realpath("../RunDatasets"), + use_ray=use_ray, +) # plot results # upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} # lower_bounds = {"erupt": 0.06, "bite": 0.75} diff --git a/notebooks/RunExperiments/runners/rct_no_meta.py b/notebooks/RunExperiments/runners/rct_no_meta.py new file mode 100644 index 0000000..fae51e9 --- /dev/null +++ b/notebooks/RunExperiments/runners/rct_no_meta.py @@ -0,0 +1,35 @@ +import os + +from experiment_runner import run_batch, get_estimator_list +from experiment_plots import generate_plots + +identifier = "Egor_test_no_meta" +kind = "RCT" +metrics = [ + "erupt", + # "greedy_erupt", # regular erupt was made probabilistic, + "policy_risk", # NEW + "qini", + "auc", + "psw_energy_distance", + "frobenius_norm", # NEW + "codec", # NEW + "bite", # NEW +] + +estimators = get_estimator_list(kind, exclude_patterns=["SLearner", "TLearner", "XLearner"]) + +use_ray = True +out_dir = run_batch( + identifier, + kind, + metrics, + estimators=estimators, + dataset_path=os.path.realpath("../RunDatasets"), + use_ray=use_ray, +) +# plot results +# upper_bounds = {"MSE": 1e2, "policy_risk": 0.2} +# lower_bounds = {"erupt": 0.06, "bite": 0.75} +generate_plots(os.path.join(out_dir, kind)) # , upper_bounds, lower_bounds) +print("yay!")