From 495102919a2a17ee6d7e0f1dd47a361b20fd0486 Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Tue, 23 Jul 2024 16:32:42 +0100 Subject: [PATCH 01/12] Temporal estimation in --- causal_testing/specification/capabilities.py | 71 ++++++ .../testing/causal_test_adequacy.py | 30 ++- causal_testing/testing/causal_test_case.py | 2 +- causal_testing/testing/causal_test_outcome.py | 6 +- causal_testing/testing/estimators.py | 241 +++++++++++++++++- pyproject.toml | 3 +- 6 files changed, 341 insertions(+), 12 deletions(-) create mode 100644 causal_testing/specification/capabilities.py diff --git a/causal_testing/specification/capabilities.py b/causal_testing/specification/capabilities.py new file mode 100644 index 00000000..b382ac88 --- /dev/null +++ b/causal_testing/specification/capabilities.py @@ -0,0 +1,71 @@ +from causal_testing.specification.variable import Variable + + +class Capability: + """ + Data class to encapsulate temporal interventions. + """ + + def __init__(self, variable: Variable, value: any, start_time: int, end_time: float): + self.variable = variable + self.value = value + self.start_time = start_time + self.end_time = end_time + + def __eq__(self, other): + return ( + type(other) == type(self) + and self.variable == other.variable + and self.value == other.value + and self.start_time == other.start_time + and self.end_time == other.end_time + ) + + def __repr__(self): + return f"({self.variable}, {self.value}, {self.start_time}-{self.end_time})" + + +class TreatmentSequence: + """ + Class to represent a list of capabilities, i.e. a treatment regime. + """ + + def __init__(self, timesteps_per_intervention, capabilities): + self.timesteps_per_intervention = timesteps_per_intervention + self.capabilities = [ + Capability(var, val, t, t + timesteps_per_intervention) + for (var, val), t in zip( + capabilities, + range( + timesteps_per_intervention, + (len(capabilities) * timesteps_per_intervention) + 1, + timesteps_per_intervention, + ), + ) + ] + # This is a bodge so that causal test adequacy works + self.name = tuple([c.variable for c in self.capabilities]) + + def set_value(self, index: int, value: float): + """ + Set the value of capability at the given index. + :param index - the index of the element to update. + :param value - the desired value of the capability. + """ + self.capabilities[index].value = value + + def copy(self): + """ + Return a deep copy of the capability list. + """ + strategy = TreatmentSequence( + self.timesteps_per_intervention, + [(c.variable, c.value) for c in self.capabilities], + ) + return strategy + + def total_time(self): + """ + Calculate the total duration of the treatment strategy. + """ + return (len(self.capabilities) + 1) * self.timesteps_per_intervention diff --git a/causal_testing/testing/causal_test_adequacy.py b/causal_testing/testing/causal_test_adequacy.py index 2a9bff93..bd639b12 100644 --- a/causal_testing/testing/causal_test_adequacy.py +++ b/causal_testing/testing/causal_test_adequacy.py @@ -71,14 +71,21 @@ class DataAdequacy: """ def __init__( - self, test_case: CausalTestCase, estimator: Estimator, data_collector: DataCollector, bootstrap_size: int = 100 + self, + test_case: CausalTestCase, + estimator: Estimator, + data_collector: DataCollector, + bootstrap_size: int = 100, + group_by=None, ): self.test_case = test_case self.estimator = estimator self.data_collector = data_collector self.kurtosis = None self.outcomes = None + self.successful = None self.bootstrap_size = bootstrap_size + self.group_by = group_by def measure_adequacy(self): """ @@ -87,11 +94,14 @@ def measure_adequacy(self): results = [] for i in range(self.bootstrap_size): estimator = deepcopy(self.estimator) - estimator.df = estimator.df.sample(len(estimator.df), replace=True, random_state=i) - # try: + + if self.group_by is not None: + ids = pd.Series(estimator.df[self.group_by].unique()) + ids = ids.sample(len(ids), replace=True, random_state=i) + estimator.df = estimator.df[estimator.df[self.group_by].isin(ids)] + else: + estimator.df = estimator.df.sample(len(estimator.df), replace=True, random_state=i) results.append(self.test_case.execute_test(estimator, self.data_collector)) - # except np.LinAlgError: - # continue outcomes = [self.test_case.expected_causal_effect.apply(c) for c in results] results = pd.DataFrame(c.to_dict() for c in results)[["effect_estimate", "ci_low", "ci_high"]] @@ -111,8 +121,14 @@ def convert_to_df(field): effect_estimate = pd.concat(results["effect_estimate"].tolist(), axis=1).transpose().reset_index(drop=True) self.kurtosis = effect_estimate.kurtosis() - self.outcomes = sum(outcomes) + self.outcomes = sum(filter(lambda x: x is not None, outcomes)) + self.successful = sum([x is not None for x in outcomes]) def to_dict(self): "Returns the adequacy object as a dictionary." - return {"kurtosis": self.kurtosis.to_dict(), "bootstrap_size": self.bootstrap_size, "passing": self.outcomes} + return { + "kurtosis": self.kurtosis.to_dict(), + "bootstrap_size": self.bootstrap_size, + "passing": self.outcomes, + "successful": self.successful, + } diff --git a/causal_testing/testing/causal_test_case.py b/causal_testing/testing/causal_test_case.py index da47c126..52daf040 100644 --- a/causal_testing/testing/causal_test_case.py +++ b/causal_testing/testing/causal_test_case.py @@ -92,7 +92,7 @@ def _return_causal_test_results(self, estimator) -> CausalTestResult: except np.linalg.LinAlgError: return CausalTestResult( estimator=estimator, - test_value=TestValue(self.estimate_type, "LinAlgError"), + test_value=TestValue(self.estimate_type, None), effect_modifier_configuration=self.effect_modifier_configuration, confidence_intervals=None, ) diff --git a/causal_testing/testing/causal_test_outcome.py b/causal_testing/testing/causal_test_outcome.py index 3846b514..be10566b 100644 --- a/causal_testing/testing/causal_test_outcome.py +++ b/causal_testing/testing/causal_test_outcome.py @@ -27,7 +27,9 @@ class SomeEffect(CausalTestOutcome): """An extension of TestOutcome representing that the expected causal effect should not be zero.""" def apply(self, res: CausalTestResult) -> bool: - if res.test_value.type == "risk_ratio": + if res.ci_low() is None or res.ci_high() is None: + return None + if res.test_value.type in ("risk_ratio", "hazard_ratio"): return any( 1 < ci_low < ci_high or ci_low < ci_high < 1 for ci_low, ci_high in zip(res.ci_low(), res.ci_high()) ) @@ -52,7 +54,7 @@ def __init__(self, atol: float = 1e-10, ctol: float = 0.05): self.ctol = ctol def apply(self, res: CausalTestResult) -> bool: - if res.test_value.type == "risk_ratio": + if res.test_value.type in ("risk_ratio", "hazard_ratio"): return any( ci_low < 1 < ci_high or np.isclose(value, 1.0, atol=self.atol) for ci_low, ci_high, value in zip(res.ci_low(), res.ci_high(), res.test_value.value) diff --git a/causal_testing/testing/estimators.py b/causal_testing/testing/estimators.py index 5133744b..86d6c571 100644 --- a/causal_testing/testing/estimators.py +++ b/causal_testing/testing/estimators.py @@ -14,8 +14,10 @@ from patsy import ModelDesc from statsmodels.regression.linear_model import RegressionResultsWrapper from statsmodels.tools.sm_exceptions import PerfectSeparationError +from lifelines import CoxPHFitter from causal_testing.specification.variable import Variable +from causal_testing.specification.capabilities import TreatmentSequence, Capability logger = logging.getLogger(__name__) @@ -360,7 +362,6 @@ def estimate_coefficient(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]: if factor.name() in self.df.dtypes ) ): - design_info = dmatrix(self.formula.split("~")[1], self.df).design_info treatment = design_info.column_names[design_info.term_name_slices[self.treatment]] else: @@ -599,3 +600,241 @@ def estimate_coefficient(self, bootstrap_size=100) -> tuple[pd.Series, list[pd.S ci_high = pd.Series(bootstraps[bootstrap_size - bound]) return pd.Series(self.estimate_iv_coefficient(self.df)), [ci_low, ci_high] + + +class IPCWEstimator(Estimator): + """ + Class to perform inverse probability of censoring weighting (IPCW) estimation + for sequences of treatments over time-varying data. + """ + + def __init__( + self, + df: pd.DataFrame, + timesteps_per_intervention: int, + control_strategy: TreatmentSequence, + treatment_strategy: TreatmentSequence, + outcome: str, + min: float, + max: float, + fitBLswitch_formula: str, + fitBLTDswitch_formula: str, + eligibility=None, + alpha: float = 0.05, + query: str = "", + ): + super().__init__( + [c.variable for c in treatment_strategy.capabilities], + [c.value for c in treatment_strategy.capabilities], + [c.value for c in control_strategy.capabilities], + None, + outcome, + df, + None, + alpha=alpha, + query=query, + ) + self.timesteps_per_intervention = timesteps_per_intervention + self.control_strategy = control_strategy + self.treatment_strategy = treatment_strategy + self.outcome = outcome + self.min = min + self.max = max + self.timesteps_per_intervention = timesteps_per_intervention + self.fitBLswitch_formula = fitBLswitch_formula + self.fitBLTDswitch_formula = fitBLTDswitch_formula + self.eligibility = eligibility + self.df = self.preprocess_data(df) + + def add_modelling_assumptions(self): + self.modelling_assumptions.append("The variables in the data vary over time.") + + def setup_xo_t_do(self, strategy_assigned: list, strategy_followed: list, eligible: pd.Series): + """ + Return a binary sequence with each bit representing whether the current + index is the time point at which the individual diverted from the + assigned treatment strategy (and thus should be censored). + + :param strategy_assigned - the assigned treatment strategy + :param strategy_followed - the strategy followed by the individual + :param eligible - binary sequence represnting the eligibility of the individual at each time step + """ + strategy_assigned = [1] + strategy_assigned + [1] + strategy_followed = [1] + strategy_followed + [1] + + mask = ( + pd.Series(strategy_assigned, index=eligible.index) != pd.Series(strategy_followed, index=eligible.index) + ).astype("boolean") + mask = mask | ~eligible + mask.reset_index(inplace=True, drop=True) + false = mask.loc[mask == True] + if false.empty: + return np.zeros(len(mask)) + else: + mask = (mask * 1).tolist() + cutoff = false.index[0] + 1 + return mask[:cutoff] + ([None] * (len(mask) - cutoff)) + + def setup_fault_t_do(self, individual: pd.DataFrame): + """ + Return a binary sequence with each bit representing whether the current + index is the time point at which the event of interest (i.e. a fault) + occurred. + """ + fault = individual[individual["within_safe_range"] == False] + fault_t_do = pd.Series(np.zeros(len(individual)), index=individual.index) + + if not fault.empty: + fault_time = individual["time"].loc[fault.index[0]] + # Ceiling to nearest observation point + fault_time = ceil(fault_time / self.timesteps_per_intervention) * self.timesteps_per_intervention + # Set the correct observation point to be the fault time of doing (fault_t_do) + observations = individual.loc[ + (individual["time"] % self.timesteps_per_intervention == 0) & (individual["time"] < fault_time) + ] + if not observations.empty: + fault_t_do.loc[observations.index[0]] = 1 + assert sum(fault_t_do) <= 1, f"Multiple fault times for\n{individual}" + + return pd.DataFrame({"fault_t_do": fault_t_do}) + + def setup_fault_time(self, individual, perturbation=-0.001): + """ + Return the time at which the event of interest (i.e. a fault) occurred. + """ + fault = individual[individual["within_safe_range"] == False] + fault_time = ( + individual["time"].loc[fault.index[0]] + if not fault.empty + else (individual["time"].max() + self.timesteps_per_intervention) + ) + return pd.DataFrame({"fault_time": np.repeat(fault_time + perturbation, len(individual))}) + + def preprocess_data(self, df): + """ + Set up the treatment-specific columns in the data that are needed to estimate the hazard ratio. + """ + df["trtrand"] = None # treatment/control arm + df["xo_t_do"] = None # did the individual deviate from the treatment of interest here? + df["eligible"] = df.eval(self.eligibility) if self.eligibility is not None else True + + # when did a fault occur? + df["within_safe_range"] = df[self.outcome].between(self.min, self.max) + df["fault_time"] = df.groupby("id")[["within_safe_range", "time"]].apply(self.setup_fault_time).values + df["fault_t_do"] = df.groupby("id")[["id", "time", "within_safe_range"]].apply(self.setup_fault_t_do).values + assert not pd.isnull(df["fault_time"]).any() + + living_runs = df.query("fault_time > 0").loc[ + (df["time"] % self.timesteps_per_intervention == 0) & (df["time"] <= self.control_strategy.total_time()) + ] + + individuals = [] + new_id = 0 + logging.debug(" Preprocessing groups") + for id, individual in living_runs.groupby("id"): + assert ( + sum(individual["fault_t_do"]) <= 1 + ), f"Error initialising fault_t_do for individual\n{individual[['id', 'time', 'fault_time', 'fault_t_do']]}\nwith fault at {individual.fault_time.iloc[0]}" + + strategy_followed = [ + Capability( + c.variable, + individual.loc[individual["time"] == c.start_time, c.variable].values[0], + c.start_time, + c.end_time, + ) + for c in self.treatment_strategy.capabilities + ] + + # Control flow: + # Individuals that start off in both arms, need cloning (hence incrementing the ID within the if statement) + # Individuals that don't start off in either arm are left out + for inx, strategy_assigned in [(0, self.control_strategy), (1, self.treatment_strategy)]: + if strategy_assigned.capabilities[0] == strategy_followed[0] and individual.eligible.iloc[0]: + individual["id"] = new_id + new_id += 1 + individual["trtrand"] = inx + individual["xo_t_do"] = self.setup_xo_t_do( + strategy_assigned.capabilities, strategy_followed, individual["eligible"] + ) + individuals.append(individual.loc[individual["time"] <= individual["fault_time"]].copy()) + if len(individuals) == 0: + raise ValueError("No individuals followed either strategy.") + return pd.concat(individuals) + + def estimate_hazard_ratio(self): + """ + Estimate the hazard ratio. + """ + + if self.df["fault_t_do"].sum() == 0: + raise ValueError("No recorded faults") + + logging.debug(f" {int(self.df['fault_t_do'].sum())}/{len(self.df.groupby('id'))} faulty runs observed") + + # Use logistic regression to predict switching given baseline covariates + logging.debug(f" predict switching given baseline covariates: {self.fitBLswitch_formula}") + fitBLswitch = smf.logit(self.fitBLswitch_formula, data=self.df).fit() + + # Estimate the probability of switching for each patient-observation included in the regression. + novCEA = pd.DataFrame() + novCEA["pxo1"] = fitBLswitch.predict(self.df) + + # Use logistic regression to predict switching given baseline and time-updated covariates (model S12) + logging.debug(f" predict switching given baseline and time-updated covariates: {self.fitBLTDswitch_formula}") + + fitBLTDswitch = smf.logit( + self.fitBLTDswitch_formula, + data=self.df, + ).fit() + + # Estimate the probability of switching for each patient-observation included in the regression. + novCEA["pxo2"] = fitBLTDswitch.predict(self.df) + + # IPCW step 3: For each individual at each time, compute the inverse probability of remaining uncensored + # Estimate the probabilities of remaining ‘un-switched’ and hence the weights + + novCEA["num"] = 1 - novCEA["pxo1"] + novCEA["denom"] = 1 - novCEA["pxo2"] + prod = ( + pd.concat([self.df, novCEA], axis=1).sort_values(["id", "time"]).groupby("id")[["num", "denom"]].cumprod() + ) + novCEA["num"] = prod["num"] + novCEA["denom"] = prod["denom"] + + assert not novCEA["num"].isnull().any(), f"{len(novCEA['num'].isnull())} null numerator values" + assert not novCEA["denom"].isnull().any(), f"{len(novCEA['denom'].isnull())} null denom values" + + novCEA["weight"] = 1 / novCEA["denom"] + novCEA["sweight"] = novCEA["num"] / novCEA["denom"] + + novCEA_KM = novCEA.loc[self.df["xo_t_do"] == 0].copy() + novCEA_KM["tin"] = self.df["time"] + novCEA_KM["tout"] = pd.concat( + [(self.df["time"] + self.timesteps_per_intervention), self.df["fault_time"]], axis=1 + ).min(axis=1) + + assert ( + novCEA_KM["tin"] <= novCEA_KM["tout"] + ).all(), f"Left before joining\n{novCEA_KM.loc[novCEA_KM['tin'] >= novCEA_KM['tout']]}" + + novCEA_KM.dropna(axis=1, inplace=True) + novCEA_KM.replace([float("inf")], 100, inplace=True) + + # IPCW step 4: Use these weights in a weighted analysis of the outcome model + # Estimate the KM graph and IPCW hazard ratio using Cox regression. + cox_ph = CoxPHFitter(alpha=self.alpha) + + cox_ph.fit( + df=pd.concat([self.df, novCEA_KM], axis=1), + duration_col="tout", + event_col="fault_t_do", + weights_col="weight", + cluster_col="id", + robust=True, + formula="trtrand", + entry_col="tin", + ) + ci_low, ci_high = sorted(np.exp(cox_ph.confidence_intervals_).T["trtrand"].tolist()) + + return (cox_ph.hazard_ratios_["trtrand"], ([ci_low], [ci_high])) diff --git a/pyproject.toml b/pyproject.toml index cf8ca872..141e2206 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,10 +17,11 @@ keywords = ["causal inference", "verification"] dependencies = [ "z3_solver~=4.11.2", # z3_solver does not follow semantic versioning and tying to 4.11 introduces problems "fitter~=1.7", + "lifelines~=0.29.0" "lhsmdu~=1.1", "networkx~=2.6", "numpy~=1.26", - "pandas~=1.5", + "pandas>=2.1", "scikit_learn~=1.4", "scipy~=1.7", "statsmodels~=0.14", From b321170592b43f5e1f4cdbb34eb3519ab8cdf8c9 Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Wed, 24 Jul 2024 16:06:42 +0100 Subject: [PATCH 02/12] Fixed pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 141e2206..e8e9fc68 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ keywords = ["causal inference", "verification"] dependencies = [ "z3_solver~=4.11.2", # z3_solver does not follow semantic versioning and tying to 4.11 introduces problems "fitter~=1.7", - "lifelines~=0.29.0" + "lifelines~=0.29.0", "lhsmdu~=1.1", "networkx~=2.6", "numpy~=1.26", From 0d76e927968b6fbf68802abf4692c5de0f86f5d9 Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Thu, 25 Jul 2024 13:46:48 +0100 Subject: [PATCH 03/12] Fixed estimation bug --- .../testing/causal_test_adequacy.py | 9 +++- causal_testing/testing/estimators.py | 54 ++++++++----------- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/causal_testing/testing/causal_test_adequacy.py b/causal_testing/testing/causal_test_adequacy.py index bd639b12..3d7953dd 100644 --- a/causal_testing/testing/causal_test_adequacy.py +++ b/causal_testing/testing/causal_test_adequacy.py @@ -5,6 +5,8 @@ from itertools import combinations from copy import deepcopy import pandas as pd +from numpy.linalg import LinAlgError +from lifelines.exceptions import ConvergenceError from causal_testing.testing.causal_test_suite import CausalTestSuite from causal_testing.data_collection.data_collector import DataCollector @@ -101,7 +103,12 @@ def measure_adequacy(self): estimator.df = estimator.df[estimator.df[self.group_by].isin(ids)] else: estimator.df = estimator.df.sample(len(estimator.df), replace=True, random_state=i) - results.append(self.test_case.execute_test(estimator, self.data_collector)) + try: + results.append(self.test_case.execute_test(estimator, self.data_collector)) + except LinAlgError: + continue + except ConvergenceError: + continue outcomes = [self.test_case.expected_causal_effect.apply(c) for c in results] results = pd.DataFrame(c.to_dict() for c in results)[["effect_estimate", "ci_low", "ci_high"]] diff --git a/causal_testing/testing/estimators.py b/causal_testing/testing/estimators.py index 86d6c571..5269d491 100644 --- a/causal_testing/testing/estimators.py +++ b/causal_testing/testing/estimators.py @@ -760,47 +760,31 @@ def preprocess_data(self, df): individuals.append(individual.loc[individual["time"] <= individual["fault_time"]].copy()) if len(individuals) == 0: raise ValueError("No individuals followed either strategy.") - return pd.concat(individuals) - def estimate_hazard_ratio(self): - """ - Estimate the hazard ratio. - """ + novCEA = pd.concat(individuals) - if self.df["fault_t_do"].sum() == 0: + if novCEA["fault_t_do"].sum() == 0: raise ValueError("No recorded faults") - logging.debug(f" {int(self.df['fault_t_do'].sum())}/{len(self.df.groupby('id'))} faulty runs observed") - # Use logistic regression to predict switching given baseline covariates - logging.debug(f" predict switching given baseline covariates: {self.fitBLswitch_formula}") - fitBLswitch = smf.logit(self.fitBLswitch_formula, data=self.df).fit() + fitBLswitch = smf.logit(self.fitBLswitch_formula, data=novCEA).fit() - # Estimate the probability of switching for each patient-observation included in the regression. - novCEA = pd.DataFrame() - novCEA["pxo1"] = fitBLswitch.predict(self.df) + novCEA["pxo1"] = fitBLswitch.predict(novCEA) # Use logistic regression to predict switching given baseline and time-updated covariates (model S12) - logging.debug(f" predict switching given baseline and time-updated covariates: {self.fitBLTDswitch_formula}") - fitBLTDswitch = smf.logit( self.fitBLTDswitch_formula, - data=self.df, + data=novCEA, ).fit() - # Estimate the probability of switching for each patient-observation included in the regression. - novCEA["pxo2"] = fitBLTDswitch.predict(self.df) + novCEA["pxo2"] = fitBLTDswitch.predict(novCEA) # IPCW step 3: For each individual at each time, compute the inverse probability of remaining uncensored # Estimate the probabilities of remaining ‘un-switched’ and hence the weights novCEA["num"] = 1 - novCEA["pxo1"] novCEA["denom"] = 1 - novCEA["pxo2"] - prod = ( - pd.concat([self.df, novCEA], axis=1).sort_values(["id", "time"]).groupby("id")[["num", "denom"]].cumprod() - ) - novCEA["num"] = prod["num"] - novCEA["denom"] = prod["denom"] + novCEA[["num", "denom"]] = novCEA.sort_values(["id", "time"]).groupby("id")[["num", "denom"]].cumprod() assert not novCEA["num"].isnull().any(), f"{len(novCEA['num'].isnull())} null numerator values" assert not novCEA["denom"].isnull().any(), f"{len(novCEA['denom'].isnull())} null denom values" @@ -808,25 +792,28 @@ def estimate_hazard_ratio(self): novCEA["weight"] = 1 / novCEA["denom"] novCEA["sweight"] = novCEA["num"] / novCEA["denom"] - novCEA_KM = novCEA.loc[self.df["xo_t_do"] == 0].copy() - novCEA_KM["tin"] = self.df["time"] + novCEA_KM = novCEA.loc[novCEA["xo_t_do"] == 0].copy() + novCEA_KM["tin"] = novCEA_KM["time"] novCEA_KM["tout"] = pd.concat( - [(self.df["time"] + self.timesteps_per_intervention), self.df["fault_time"]], axis=1 + [(novCEA_KM["time"] + self.timesteps_per_intervention), novCEA_KM["fault_time"]], axis=1 ).min(axis=1) assert ( novCEA_KM["tin"] <= novCEA_KM["tout"] ).all(), f"Left before joining\n{novCEA_KM.loc[novCEA_KM['tin'] >= novCEA_KM['tout']]}" - novCEA_KM.dropna(axis=1, inplace=True) - novCEA_KM.replace([float("inf")], 100, inplace=True) + return novCEA_KM + + def estimate_hazard_ratio(self): + """ + Estimate the hazard ratio. + """ # IPCW step 4: Use these weights in a weighted analysis of the outcome model # Estimate the KM graph and IPCW hazard ratio using Cox regression. - cox_ph = CoxPHFitter(alpha=self.alpha) - + cox_ph = CoxPHFitter() cox_ph.fit( - df=pd.concat([self.df, novCEA_KM], axis=1), + df=self.df, duration_col="tout", event_col="fault_t_do", weights_col="weight", @@ -835,6 +822,7 @@ def estimate_hazard_ratio(self): formula="trtrand", entry_col="tin", ) - ci_low, ci_high = sorted(np.exp(cox_ph.confidence_intervals_).T["trtrand"].tolist()) - return (cox_ph.hazard_ratios_["trtrand"], ([ci_low], [ci_high])) + ci_low, ci_high = [np.exp(cox_ph.confidence_intervals_)[col] for col in cox_ph.confidence_intervals_.columns] + + return (cox_ph.hazard_ratios_, (ci_low, ci_high)) From e2aef7562ea234ac4e15a4e606c42abbdcaa39ae Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Thu, 25 Jul 2024 13:59:59 +0100 Subject: [PATCH 04/12] Fixed causal test adequacy unit tests --- tests/testing_tests/test_causal_test_adequacy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/testing_tests/test_causal_test_adequacy.py b/tests/testing_tests/test_causal_test_adequacy.py index 1f8d2ffa..ee13d8aa 100644 --- a/tests/testing_tests/test_causal_test_adequacy.py +++ b/tests/testing_tests/test_causal_test_adequacy.py @@ -70,7 +70,7 @@ def test_data_adequacy_numeric(self): ) self.assertEqual( test_results[0]["result"].adequacy.to_dict(), - {"kurtosis": {"test_input": 0.0}, "bootstrap_size": 100, "passing": 100}, + {"kurtosis": {"test_input": 0.0}, "bootstrap_size": 100, "passing": 100, "successful": 100}, ) def test_data_adequacy_cateogorical(self): @@ -103,7 +103,7 @@ def test_data_adequacy_cateogorical(self): print(test_results[0]["result"]) self.assertEqual( test_results[0]["result"].adequacy.to_dict(), - {"kurtosis": {"test_input_no_dist[T.b]": 0.0}, "bootstrap_size": 100, "passing": 100}, + {"kurtosis": {"test_input_no_dist[T.b]": 0.0}, "bootstrap_size": 100, "passing": 100, "successful": 100}, ) def test_dag_adequacy_dependent(self): From 392387cebc0537a3937bae50346595bfe2fd4a4d Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Thu, 25 Jul 2024 15:50:15 +0100 Subject: [PATCH 05/12] pytest and linting --- causal_testing/json_front/json_class.py | 2 +- causal_testing/specification/capabilities.py | 8 +- .../testing/causal_test_adequacy.py | 8 +- causal_testing/testing/estimators.py | 131 ++++++++++-------- tests/data/temporal_data.csv | 61 ++++++++ tests/testing_tests/test_estimators.py | 32 ++++- 6 files changed, 172 insertions(+), 70 deletions(-) create mode 100644 tests/data/temporal_data.csv diff --git a/causal_testing/json_front/json_class.py b/causal_testing/json_front/json_class.py index cca98a20..8c6c265c 100644 --- a/causal_testing/json_front/json_class.py +++ b/causal_testing/json_front/json_class.py @@ -276,7 +276,7 @@ def _execute_test_case( test_passes = causal_test_case.expected_causal_effect.apply(causal_test_result) if "coverage" in test and test["coverage"]: - adequacy_metric = DataAdequacy(causal_test_case, estimation_model, self.data_collector) + adequacy_metric = DataAdequacy(causal_test_case, estimation_model) adequacy_metric.measure_adequacy() causal_test_result.adequacy = adequacy_metric diff --git a/causal_testing/specification/capabilities.py b/causal_testing/specification/capabilities.py index b382ac88..699b77c6 100644 --- a/causal_testing/specification/capabilities.py +++ b/causal_testing/specification/capabilities.py @@ -1,3 +1,7 @@ +""" +This module contains the Capability and TreatmentSequence classes to implement +treatment sequences that operate over time. +""" from causal_testing.specification.variable import Variable @@ -14,7 +18,7 @@ def __init__(self, variable: Variable, value: any, start_time: int, end_time: fl def __eq__(self, other): return ( - type(other) == type(self) + isinstance(other, type(self)) and self.variable == other.variable and self.value == other.value and self.start_time == other.start_time @@ -44,7 +48,7 @@ def __init__(self, timesteps_per_intervention, capabilities): ) ] # This is a bodge so that causal test adequacy works - self.name = tuple([c.variable for c in self.capabilities]) + self.name = tuple(c.variable for c in self.capabilities) def set_value(self, index: int, value: float): """ diff --git a/causal_testing/testing/causal_test_adequacy.py b/causal_testing/testing/causal_test_adequacy.py index 3d7953dd..2eb0d597 100644 --- a/causal_testing/testing/causal_test_adequacy.py +++ b/causal_testing/testing/causal_test_adequacy.py @@ -9,7 +9,6 @@ from lifelines.exceptions import ConvergenceError from causal_testing.testing.causal_test_suite import CausalTestSuite -from causal_testing.data_collection.data_collector import DataCollector from causal_testing.specification.causal_dag import CausalDAG from causal_testing.testing.estimators import Estimator from causal_testing.testing.causal_test_case import CausalTestCase @@ -72,17 +71,16 @@ class DataAdequacy: - Zero kurtosis is optimal. """ + # pylint: disable=too-many-instance-attributes def __init__( self, test_case: CausalTestCase, estimator: Estimator, - data_collector: DataCollector, bootstrap_size: int = 100, group_by=None, ): self.test_case = test_case self.estimator = estimator - self.data_collector = data_collector self.kurtosis = None self.outcomes = None self.successful = None @@ -104,7 +102,7 @@ def measure_adequacy(self): else: estimator.df = estimator.df.sample(len(estimator.df), replace=True, random_state=i) try: - results.append(self.test_case.execute_test(estimator, self.data_collector)) + results.append(self.test_case.execute_test(estimator, None)) except LinAlgError: continue except ConvergenceError: @@ -129,7 +127,7 @@ def convert_to_df(field): effect_estimate = pd.concat(results["effect_estimate"].tolist(), axis=1).transpose().reset_index(drop=True) self.kurtosis = effect_estimate.kurtosis() self.outcomes = sum(filter(lambda x: x is not None, outcomes)) - self.successful = sum([x is not None for x in outcomes]) + self.successful = sum(x is not None for x in outcomes) def to_dict(self): "Returns the adequacy object as a dictionary." diff --git a/causal_testing/testing/estimators.py b/causal_testing/testing/estimators.py index 5269d491..e701e664 100644 --- a/causal_testing/testing/estimators.py +++ b/causal_testing/testing/estimators.py @@ -608,6 +608,8 @@ class IPCWEstimator(Estimator): for sequences of treatments over time-varying data. """ + # pylint: disable=too-many-arguments + # pylint: disable=too-many-instance-attributes def __init__( self, df: pd.DataFrame, @@ -615,10 +617,9 @@ def __init__( control_strategy: TreatmentSequence, treatment_strategy: TreatmentSequence, outcome: str, - min: float, - max: float, - fitBLswitch_formula: str, - fitBLTDswitch_formula: str, + fault_column: str, + fit_bl_switch_formula: str, + fit_bltd_switch_formula: str, eligibility=None, alpha: float = 0.05, query: str = "", @@ -638,13 +639,12 @@ def __init__( self.control_strategy = control_strategy self.treatment_strategy = treatment_strategy self.outcome = outcome - self.min = min - self.max = max + self.fault_column = fault_column self.timesteps_per_intervention = timesteps_per_intervention - self.fitBLswitch_formula = fitBLswitch_formula - self.fitBLTDswitch_formula = fitBLTDswitch_formula + self.fit_bl_switch_formula = fit_bl_switch_formula + self.fit_bltd_switch_formula = fit_bltd_switch_formula self.eligibility = eligibility - self.df = self.preprocess_data(df) + self.df = df def add_modelling_assumptions(self): self.modelling_assumptions.append("The variables in the data vary over time.") @@ -667,13 +667,12 @@ def setup_xo_t_do(self, strategy_assigned: list, strategy_followed: list, eligib ).astype("boolean") mask = mask | ~eligible mask.reset_index(inplace=True, drop=True) - false = mask.loc[mask == True] + false = mask.loc[mask] if false.empty: return np.zeros(len(mask)) - else: - mask = (mask * 1).tolist() - cutoff = false.index[0] + 1 - return mask[:cutoff] + ([None] * (len(mask) - cutoff)) + mask = (mask * 1).tolist() + cutoff = false.index[0] + 1 + return mask[:cutoff] + ([None] * (len(mask) - cutoff)) def setup_fault_t_do(self, individual: pd.DataFrame): """ @@ -681,7 +680,7 @@ def setup_fault_t_do(self, individual: pd.DataFrame): index is the time point at which the event of interest (i.e. a fault) occurred. """ - fault = individual[individual["within_safe_range"] == False] + fault = individual[~individual[self.fault_column]] fault_t_do = pd.Series(np.zeros(len(individual)), index=individual.index) if not fault.empty: @@ -702,7 +701,7 @@ def setup_fault_time(self, individual, perturbation=-0.001): """ Return the time at which the event of interest (i.e. a fault) occurred. """ - fault = individual[individual["within_safe_range"] == False] + fault = individual[~individual[self.fault_column]] fault_time = ( individual["time"].loc[fault.index[0]] if not fault.empty @@ -710,31 +709,35 @@ def setup_fault_time(self, individual, perturbation=-0.001): ) return pd.DataFrame({"fault_time": np.repeat(fault_time + perturbation, len(individual))}) - def preprocess_data(self, df): + def preprocess_data(self): """ Set up the treatment-specific columns in the data that are needed to estimate the hazard ratio. """ - df["trtrand"] = None # treatment/control arm - df["xo_t_do"] = None # did the individual deviate from the treatment of interest here? - df["eligible"] = df.eval(self.eligibility) if self.eligibility is not None else True + self.df["trtrand"] = None # treatment/control arm + self.df["xo_t_do"] = None # did the individual deviate from the treatment of interest here? + self.df["eligible"] = self.df.eval(self.eligibility) if self.eligibility is not None else True # when did a fault occur? - df["within_safe_range"] = df[self.outcome].between(self.min, self.max) - df["fault_time"] = df.groupby("id")[["within_safe_range", "time"]].apply(self.setup_fault_time).values - df["fault_t_do"] = df.groupby("id")[["id", "time", "within_safe_range"]].apply(self.setup_fault_t_do).values - assert not pd.isnull(df["fault_time"]).any() + self.df["fault_time"] = self.df.groupby("id")[[self.fault_column, "time"]].apply(self.setup_fault_time).values + self.df["fault_t_do"] = ( + self.df.groupby("id")[["id", "time", self.fault_column]].apply(self.setup_fault_t_do).values + ) + assert not pd.isnull(self.df["fault_time"]).any() - living_runs = df.query("fault_time > 0").loc[ - (df["time"] % self.timesteps_per_intervention == 0) & (df["time"] <= self.control_strategy.total_time()) + living_runs = self.df.query("fault_time > 0").loc[ + (self.df["time"] % self.timesteps_per_intervention == 0) + & (self.df["time"] <= self.control_strategy.total_time()) ] individuals = [] new_id = 0 logging.debug(" Preprocessing groups") - for id, individual in living_runs.groupby("id"): - assert ( - sum(individual["fault_t_do"]) <= 1 - ), f"Error initialising fault_t_do for individual\n{individual[['id', 'time', 'fault_time', 'fault_t_do']]}\nwith fault at {individual.fault_time.iloc[0]}" + for _, individual in living_runs.groupby("id"): + assert sum(individual["fault_t_do"]) <= 1, ( + f"Error initialising fault_t_do for individual\n" + f"{individual[['id', 'time', 'fault_time', 'fault_t_do']]}\n" + "with fault at {individual.fault_time.iloc[0]}" + ) strategy_followed = [ Capability( @@ -761,59 +764,67 @@ def preprocess_data(self, df): if len(individuals) == 0: raise ValueError("No individuals followed either strategy.") - novCEA = pd.concat(individuals) + return pd.concat(individuals) - if novCEA["fault_t_do"].sum() == 0: + def estimate_hazard_ratio(self): + """ + Estimate the hazard ratio. + """ + + preprocessed_data = self.preprocess_data() + + if preprocessed_data["fault_t_do"].sum() == 0: raise ValueError("No recorded faults") # Use logistic regression to predict switching given baseline covariates - fitBLswitch = smf.logit(self.fitBLswitch_formula, data=novCEA).fit() + fit_bl_switch = smf.logit(self.fit_bl_switch_formula, data=preprocessed_data).fit() - novCEA["pxo1"] = fitBLswitch.predict(novCEA) + preprocessed_data["pxo1"] = fit_bl_switch.predict(preprocessed_data) # Use logistic regression to predict switching given baseline and time-updated covariates (model S12) - fitBLTDswitch = smf.logit( - self.fitBLTDswitch_formula, - data=novCEA, + fit_bltd_switch = smf.logit( + self.fit_bltd_switch_formula, + data=preprocessed_data, ).fit() - novCEA["pxo2"] = fitBLTDswitch.predict(novCEA) + preprocessed_data["pxo2"] = fit_bltd_switch.predict(preprocessed_data) # IPCW step 3: For each individual at each time, compute the inverse probability of remaining uncensored # Estimate the probabilities of remaining ‘un-switched’ and hence the weights - novCEA["num"] = 1 - novCEA["pxo1"] - novCEA["denom"] = 1 - novCEA["pxo2"] - novCEA[["num", "denom"]] = novCEA.sort_values(["id", "time"]).groupby("id")[["num", "denom"]].cumprod() + preprocessed_data["num"] = 1 - preprocessed_data["pxo1"] + preprocessed_data["denom"] = 1 - preprocessed_data["pxo2"] + preprocessed_data[["num", "denom"]] = ( + preprocessed_data.sort_values(["id", "time"]).groupby("id")[["num", "denom"]].cumprod() + ) - assert not novCEA["num"].isnull().any(), f"{len(novCEA['num'].isnull())} null numerator values" - assert not novCEA["denom"].isnull().any(), f"{len(novCEA['denom'].isnull())} null denom values" + assert ( + not preprocessed_data["num"].isnull().any() + ), f"{len(preprocessed_data['num'].isnull())} null numerator values" + assert ( + not preprocessed_data["denom"].isnull().any() + ), f"{len(preprocessed_data['denom'].isnull())} null denom values" - novCEA["weight"] = 1 / novCEA["denom"] - novCEA["sweight"] = novCEA["num"] / novCEA["denom"] + preprocessed_data["weight"] = 1 / preprocessed_data["denom"] + preprocessed_data["sweight"] = preprocessed_data["num"] / preprocessed_data["denom"] - novCEA_KM = novCEA.loc[novCEA["xo_t_do"] == 0].copy() - novCEA_KM["tin"] = novCEA_KM["time"] - novCEA_KM["tout"] = pd.concat( - [(novCEA_KM["time"] + self.timesteps_per_intervention), novCEA_KM["fault_time"]], axis=1 + preprocessed_data_km = preprocessed_data.loc[preprocessed_data["xo_t_do"] == 0].copy() + preprocessed_data_km["tin"] = preprocessed_data_km["time"] + preprocessed_data_km["tout"] = pd.concat( + [(preprocessed_data_km["time"] + self.timesteps_per_intervention), preprocessed_data_km["fault_time"]], + axis=1, ).min(axis=1) - assert ( - novCEA_KM["tin"] <= novCEA_KM["tout"] - ).all(), f"Left before joining\n{novCEA_KM.loc[novCEA_KM['tin'] >= novCEA_KM['tout']]}" - - return novCEA_KM - - def estimate_hazard_ratio(self): - """ - Estimate the hazard ratio. - """ + assert (preprocessed_data_km["tin"] <= preprocessed_data_km["tout"]).all(), ( + f"Left before joining\n" + f"{preprocessed_data_km.loc[preprocessed_data_km['tin'] >= preprocessed_data_km['tout']]}" + ) # IPCW step 4: Use these weights in a weighted analysis of the outcome model # Estimate the KM graph and IPCW hazard ratio using Cox regression. cox_ph = CoxPHFitter() cox_ph.fit( - df=self.df, + df=preprocessed_data_km, duration_col="tout", event_col="fault_t_do", weights_col="weight", diff --git a/tests/data/temporal_data.csv b/tests/data/temporal_data.csv new file mode 100644 index 00000000..12749b0b --- /dev/null +++ b/tests/data/temporal_data.csv @@ -0,0 +1,61 @@ +t,outcome,id,time +0,1,0,0 +0,1,0,1 +0,1,0,2 +0,0,0,3 +0,0,0,4 +0,1,1,0 +0,1,1,1 +0,1,1,2 +0,0,1,3 +0,0,1,4 +0,1,2,0 +0,1,2,1 +0,1,2,2 +0,0,2,3 +0,0,2,4 +0,1,3,0 +0,1,3,1 +0,1,3,2 +0,0,3,3 +0,0,3,4 +0,1,4,0 +0,1,4,1 +0,1,4,2 +0,0,4,3 +0,0,4,4 +0,1,5,0 +0,1,5,1 +0,1,5,2 +0,0,5,3 +0,0,5,4 +0,1,6,0 +0,1,6,1 +0,0,6,2 +0,0,6,3 +0,0,6,4 +1,1,7,0 +1,1,7,1 +1,0,7,2 +1,0,7,3 +1,0,7,4 +1,1,8,0 +1,1,8,1 +1,0,8,2 +1,0,8,3 +1,0,8,4 +1,1,9,0 +1,1,9,1 +1,0,9,2 +1,0,9,3 +1,0,9,4 +1,1,10,0 +1,1,10,1 +1,0,10,2 +1,0,10,3 +1,0,10,4 +1,1,11,0 +1,1,11,1 +1,0,11,2 +1,0,11,3 +1,0,11,4 diff --git a/tests/testing_tests/test_estimators.py b/tests/testing_tests/test_estimators.py index cac19afc..6fb52322 100644 --- a/tests/testing_tests/test_estimators.py +++ b/tests/testing_tests/test_estimators.py @@ -7,9 +7,11 @@ LogisticRegressionEstimator, InstrumentalVariableEstimator, CubicSplineRegressionEstimator, + IPCWEstimator, ) from causal_testing.specification.variable import Input from causal_testing.utils.validation import CausalValidator +from causal_testing.specification.capabilities import TreatmentSequence def plot_results_df(df): @@ -404,11 +406,9 @@ def test_program_11_2_with_robustness_validation(self): class TestCubicSplineRegressionEstimator(TestLinearRegressionEstimator): @classmethod def setUpClass(cls): - super().setUpClass() def test_program_11_3_cublic_spline(self): - """Test whether the cublic_spline regression implementation produces the same results as program 11.3 (p. 162). https://www.hsph.harvard.edu/miguel-hernan/wp-content/uploads/sites/1268/2023/10/hernanrobins_WhatIf_30sep23.pdf Slightly modified as Hernan et al. use linear regression for this example. @@ -472,3 +472,31 @@ def test_categorical_confidence_intervals(self): self.assertTrue(coefficients.round(2).equals(pd.Series({"color[T.grey]": 0.92, "color[T.orange]": -4.25}))) self.assertTrue(ci_low.round(2).equals(pd.Series({"color[T.grey]": -22.12, "color[T.orange]": -25.58}))) self.assertTrue(ci_high.round(2).equals(pd.Series({"color[T.grey]": 23.95, "color[T.orange]": 17.08}))) + + +class TestIPCWEstimator(unittest.TestCase): + """ + Test the IPCW estimator class + """ + + def test_estimate_hazard_ratio(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/data/temporal_data.csv") + df["ok"] = df["outcome"] == 1 + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + estimate, intervals = estimation_model.estimate_hazard_ratio() + self.assertEqual(estimate["trtrand"], 1.0) From bffe8276804592d2c85b24bc1b7710dfe91283b8 Mon Sep 17 00:00:00 2001 From: Farhad Allian Date: Fri, 26 Jul 2024 13:48:49 +0100 Subject: [PATCH 06/12] fix: tests in pandas version > 2 --- causal_testing/surrogate/causal_surrogate_assisted.py | 11 ++++++++--- .../surrogate_tests/test_causal_surrogate_assisted.py | 5 ++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/causal_testing/surrogate/causal_surrogate_assisted.py b/causal_testing/surrogate/causal_surrogate_assisted.py index 74f309be..8c0f157f 100644 --- a/causal_testing/surrogate/causal_surrogate_assisted.py +++ b/causal_testing/surrogate/causal_surrogate_assisted.py @@ -8,7 +8,7 @@ from causal_testing.specification.causal_specification import CausalSpecification from causal_testing.testing.base_test_case import BaseTestCase from causal_testing.testing.estimators import CubicSplineRegressionEstimator - +import pandas as pd @dataclass class SimulationResult: @@ -18,6 +18,11 @@ class SimulationResult: fault: bool relationship: str + def to_dataframe(self) -> pd.DataFrame: + """Convert the simulation result data to a pandas DataFrame""" + data_as_lists = {k: v if isinstance(v, list) else [v] for k,v in self.data.items()} + return pd.DataFrame(data_as_lists) + class SearchAlgorithm(ABC): # pylint: disable=too-few-public-methods """Class to be inherited with the search algorithm consisting of a search function and the fitness function of the @@ -87,14 +92,14 @@ def execute( self.simulator.startup() test_result = self.simulator.run_with_config(candidate_test_case) + test_result_df = test_result.to_dataframe() self.simulator.shutdown() if custom_data_aggregator is not None: if data_collector.data is not None: data_collector.data = custom_data_aggregator(data_collector.data, test_result.data) else: - data_collector.data = data_collector.data.append(test_result.data, ignore_index=True) - + data_collector.data = pd.concat([data_collector.data, test_result_df], ignore_index=True) if test_result.fault: print( f"Fault found between {surrogate.treatment} causing {surrogate.outcome}. Contradiction with " diff --git a/tests/surrogate_tests/test_causal_surrogate_assisted.py b/tests/surrogate_tests/test_causal_surrogate_assisted.py index c5eb6e2c..54c93af1 100644 --- a/tests/surrogate_tests/test_causal_surrogate_assisted.py +++ b/tests/surrogate_tests/test_causal_surrogate_assisted.py @@ -231,4 +231,7 @@ def shutdown(self): pass def data_double_aggregator(data, new_data): - return data.append(new_data, ignore_index=True).append(new_data, ignore_index=True) \ No newline at end of file + """Previously used data.append(new_data), however, pandas version >2 requires pd.concat() since append is now a private method. + Converting new_data to a pd.DataFrame is required to use pd.concat(). """ + new_data = pd.DataFrame([new_data]) + return pd.concat([data, new_data, new_data], ignore_index=True) From fbfe76a89ca86e6578c2ecfce6ff30eaeac49b33 Mon Sep 17 00:00:00 2001 From: Farhad Allian Date: Fri, 26 Jul 2024 14:03:33 +0100 Subject: [PATCH 07/12] fix: linting --- causal_testing/surrogate/causal_surrogate_assisted.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/causal_testing/surrogate/causal_surrogate_assisted.py b/causal_testing/surrogate/causal_surrogate_assisted.py index 8c0f157f..c30b2086 100644 --- a/causal_testing/surrogate/causal_surrogate_assisted.py +++ b/causal_testing/surrogate/causal_surrogate_assisted.py @@ -3,12 +3,11 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Callable - +import pandas as pd from causal_testing.data_collection.data_collector import ObservationalDataCollector from causal_testing.specification.causal_specification import CausalSpecification from causal_testing.testing.base_test_case import BaseTestCase from causal_testing.testing.estimators import CubicSplineRegressionEstimator -import pandas as pd @dataclass class SimulationResult: From 6ceef1a83ed849b78c9f22fb7169aff0d4151fc2 Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Mon, 29 Jul 2024 08:38:50 +0100 Subject: [PATCH 08/12] Some more linting --- .../testing/causal_test_adequacy.py | 2 ++ causal_testing/testing/estimators.py | 30 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/causal_testing/testing/causal_test_adequacy.py b/causal_testing/testing/causal_test_adequacy.py index 2eb0d597..fc837d23 100644 --- a/causal_testing/testing/causal_test_adequacy.py +++ b/causal_testing/testing/causal_test_adequacy.py @@ -107,6 +107,8 @@ def measure_adequacy(self): continue except ConvergenceError: continue + except ValueError: + continue outcomes = [self.test_case.expected_causal_effect.apply(c) for c in results] results = pd.DataFrame(c.to_dict() for c in results)[["effect_estimate", "ci_low", "ci_high"]] diff --git a/causal_testing/testing/estimators.py b/causal_testing/testing/estimators.py index e701e664..44eb0c6d 100644 --- a/causal_testing/testing/estimators.py +++ b/causal_testing/testing/estimators.py @@ -622,7 +622,6 @@ def __init__( fit_bltd_switch_formula: str, eligibility=None, alpha: float = 0.05, - query: str = "", ): super().__init__( [c.variable for c in treatment_strategy.capabilities], @@ -633,7 +632,7 @@ def __init__( df, None, alpha=alpha, - query=query, + query="", ) self.timesteps_per_intervention = timesteps_per_intervention self.control_strategy = control_strategy @@ -645,6 +644,7 @@ def __init__( self.fit_bltd_switch_formula = fit_bltd_switch_formula self.eligibility = eligibility self.df = df + self.preprocess_data() def add_modelling_assumptions(self): self.modelling_assumptions.append("The variables in the data vary over time.") @@ -764,27 +764,27 @@ def preprocess_data(self): if len(individuals) == 0: raise ValueError("No individuals followed either strategy.") - return pd.concat(individuals) + self.df = pd.concat(individuals) def estimate_hazard_ratio(self): """ Estimate the hazard ratio. """ - preprocessed_data = self.preprocess_data() - - if preprocessed_data["fault_t_do"].sum() == 0: + if self.df["fault_t_do"].sum() == 0: raise ValueError("No recorded faults") + preprocessed_data = self.df.loc[self.df["xo_t_do"] == 0].copy() + # Use logistic regression to predict switching given baseline covariates - fit_bl_switch = smf.logit(self.fit_bl_switch_formula, data=preprocessed_data).fit() + fit_bl_switch = smf.logit(self.fit_bl_switch_formula, data=self.df).fit() preprocessed_data["pxo1"] = fit_bl_switch.predict(preprocessed_data) # Use logistic regression to predict switching given baseline and time-updated covariates (model S12) fit_bltd_switch = smf.logit( self.fit_bltd_switch_formula, - data=preprocessed_data, + data=self.df, ).fit() preprocessed_data["pxo2"] = fit_bltd_switch.predict(preprocessed_data) @@ -808,23 +808,21 @@ def estimate_hazard_ratio(self): preprocessed_data["weight"] = 1 / preprocessed_data["denom"] preprocessed_data["sweight"] = preprocessed_data["num"] / preprocessed_data["denom"] - preprocessed_data_km = preprocessed_data.loc[preprocessed_data["xo_t_do"] == 0].copy() - preprocessed_data_km["tin"] = preprocessed_data_km["time"] - preprocessed_data_km["tout"] = pd.concat( - [(preprocessed_data_km["time"] + self.timesteps_per_intervention), preprocessed_data_km["fault_time"]], + preprocessed_data["tin"] = preprocessed_data["time"] + preprocessed_data["tout"] = pd.concat( + [(preprocessed_data["time"] + self.timesteps_per_intervention), preprocessed_data["fault_time"]], axis=1, ).min(axis=1) - assert (preprocessed_data_km["tin"] <= preprocessed_data_km["tout"]).all(), ( - f"Left before joining\n" - f"{preprocessed_data_km.loc[preprocessed_data_km['tin'] >= preprocessed_data_km['tout']]}" + assert (preprocessed_data["tin"] <= preprocessed_data["tout"]).all(), ( + f"Left before joining\n" f"{preprocessed_data.loc[preprocessed_data['tin'] >= preprocessed_data['tout']]}" ) # IPCW step 4: Use these weights in a weighted analysis of the outcome model # Estimate the KM graph and IPCW hazard ratio using Cox regression. cox_ph = CoxPHFitter() cox_ph.fit( - df=preprocessed_data_km, + df=preprocessed_data, duration_col="tout", event_col="fault_t_do", weights_col="weight", From 41ea94973511f17581be594dda59fad55991d75b Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Mon, 29 Jul 2024 09:58:05 +0100 Subject: [PATCH 09/12] pytest --- tests/resources/data/data.csv | 2 +- tests/resources/data/data_with_meta.csv | 2 +- tests/{ => resources}/data/nhefs.csv | 0 tests/{ => resources}/data/scarf_data.csv | 0 tests/{ => resources}/data/temporal_data.csv | 5 ++ .../test_causal_test_adequacy.py | 49 ++++++++++++++++- tests/testing_tests/test_estimators.py | 53 +++++++++++++++++-- 7 files changed, 103 insertions(+), 8 deletions(-) rename tests/{ => resources}/data/nhefs.csv (100%) rename tests/{ => resources}/data/scarf_data.csv (100%) rename tests/{ => resources}/data/temporal_data.csv (91%) diff --git a/tests/resources/data/data.csv b/tests/resources/data/data.csv index 2a8e8e3e..e95cae74 100644 --- a/tests/resources/data/data.csv +++ b/tests/resources/data/data.csv @@ -1,2 +1,2 @@ index,test_input,test_input_no_dist,test_output -0,1,1,2 +0,1.0,1.0,2.0 diff --git a/tests/resources/data/data_with_meta.csv b/tests/resources/data/data_with_meta.csv index 3c7d7e15..23ed882e 100644 --- a/tests/resources/data/data_with_meta.csv +++ b/tests/resources/data/data_with_meta.csv @@ -1,2 +1,2 @@ index,test_input,test_input_no_dist,test_output,test_meta -0,1,1,2,3 +0,1.0,1.0,2.0,3 diff --git a/tests/data/nhefs.csv b/tests/resources/data/nhefs.csv similarity index 100% rename from tests/data/nhefs.csv rename to tests/resources/data/nhefs.csv diff --git a/tests/data/scarf_data.csv b/tests/resources/data/scarf_data.csv similarity index 100% rename from tests/data/scarf_data.csv rename to tests/resources/data/scarf_data.csv diff --git a/tests/data/temporal_data.csv b/tests/resources/data/temporal_data.csv similarity index 91% rename from tests/data/temporal_data.csv rename to tests/resources/data/temporal_data.csv index 12749b0b..a239e041 100644 --- a/tests/data/temporal_data.csv +++ b/tests/resources/data/temporal_data.csv @@ -59,3 +59,8 @@ t,outcome,id,time 1,0,11,2 1,0,11,3 1,0,11,4 +0,1,12,0 +1,1,12,1 +0,1,12,2 +1,0,12,3 +0,0,12,4 diff --git a/tests/testing_tests/test_causal_test_adequacy.py b/tests/testing_tests/test_causal_test_adequacy.py index ee13d8aa..e29a8207 100644 --- a/tests/testing_tests/test_causal_test_adequacy.py +++ b/tests/testing_tests/test_causal_test_adequacy.py @@ -3,17 +3,20 @@ from statistics import StatisticsError import scipy import os +import pandas as pd -from causal_testing.testing.estimators import LinearRegressionEstimator +from causal_testing.testing.estimators import LinearRegressionEstimator, IPCWEstimator from causal_testing.testing.base_test_case import BaseTestCase from causal_testing.testing.causal_test_case import CausalTestCase from causal_testing.testing.causal_test_suite import CausalTestSuite from causal_testing.testing.causal_test_adequacy import DAGAdequacy -from causal_testing.testing.causal_test_outcome import NoEffect, Positive +from causal_testing.testing.causal_test_outcome import NoEffect, Positive, SomeEffect from causal_testing.json_front.json_class import JsonUtility, CausalVariables from causal_testing.specification.variable import Input, Output, Meta from causal_testing.specification.scenario import Scenario from causal_testing.specification.causal_specification import CausalSpecification +from causal_testing.specification.capabilities import TreatmentSequence +from causal_testing.testing.causal_test_adequacy import DataAdequacy class TestCausalTestAdequacy(unittest.TestCase): @@ -106,6 +109,48 @@ def test_data_adequacy_cateogorical(self): {"kurtosis": {"test_input_no_dist[T.b]": 0.0}, "bootstrap_size": 100, "passing": 100, "successful": 100}, ) + def test_data_adequacy_group_by(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["ok"] = df["outcome"] == 1 + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + base_test_case = BaseTestCase( + treatment_variable=control_strategy, + outcome_variable=outcome, + effect="temporal", + ) + + causal_test_case = CausalTestCase( + base_test_case=base_test_case, + expected_causal_effect=SomeEffect(), + control_value=control_strategy, + treatment_value=treatment_strategy, + estimate_type="hazard_ratio", + ) + causal_test_result = causal_test_case.execute_test(estimation_model, None) + adequacy_metric = DataAdequacy(causal_test_case, estimation_model, group_by="id") + adequacy_metric.measure_adequacy() + causal_test_result.adequacy = adequacy_metric + print(causal_test_result.adequacy.to_dict()) + self.assertEqual( + causal_test_result.adequacy.to_dict(), + {"kurtosis": {"trtrand": 0.0}, "bootstrap_size": 100, "passing": 0, "successful": 95}, + ) + def test_dag_adequacy_dependent(self): base_test_case = BaseTestCase( treatment_variable="test_input", diff --git a/tests/testing_tests/test_estimators.py b/tests/testing_tests/test_estimators.py index 6fb52322..b7604b86 100644 --- a/tests/testing_tests/test_estimators.py +++ b/tests/testing_tests/test_estimators.py @@ -35,7 +35,7 @@ def load_nhefs_df(): """Get the NHEFS data from chapter 12 and put into a dataframe. NHEFS = National Health and Nutrition Examination Survey Data I Epidemiological Follow-up Study.""" - nhefs_df = pd.read_csv("tests/data/nhefs.csv") + nhefs_df = pd.read_csv("tests/resources/data/nhefs.csv") nhefs_df["one"] = 1 nhefs_df["zero"] = 0 edu_dummies = pd.get_dummies(nhefs_df.education, prefix="edu") @@ -79,7 +79,7 @@ class TestLogisticRegressionEstimator(unittest.TestCase): @classmethod def setUpClass(cls) -> None: - cls.scarf_df = pd.read_csv("tests/data/scarf_data.csv") + cls.scarf_df = pd.read_csv("tests/resources/data/scarf_data.csv") # Yes, this probably shouldn't be in here, but it uses the scarf data so it makes more sense to put it # here than duplicating the scarf data for a single test @@ -446,7 +446,7 @@ def setUpClass(cls) -> None: df = pd.DataFrame({"X1": np.random.uniform(-1000, 1000, 1000), "X2": np.random.uniform(-1000, 1000, 1000)}) df["Y"] = 2 * df["X1"] - 3 * df["X2"] + 2 * df["X1"] * df["X2"] + 10 cls.df = df - cls.scarf_df = pd.read_csv("tests/data/scarf_data.csv") + cls.scarf_df = pd.read_csv("tests/resources/data/scarf_data.csv") def test_X1_effect(self): """When we fix the value of X2 to 0, the effect of X1 on Y should become ~2 (because X2 terms are cancelled).""" @@ -485,7 +485,7 @@ def test_estimate_hazard_ratio(self): treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) outcome = "outcome" fit_bl_switch_formula = "xo_t_do ~ time" - df = pd.read_csv("tests/data/temporal_data.csv") + df = pd.read_csv("tests/resources/data/temporal_data.csv") df["ok"] = df["outcome"] == 1 estimation_model = IPCWEstimator( df, @@ -500,3 +500,48 @@ def test_estimate_hazard_ratio(self): ) estimate, intervals = estimation_model.estimate_hazard_ratio() self.assertEqual(estimate["trtrand"], 1.0) + + def test_invalid_treatment_strategies(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["t"] = (["1", "0"] * len(df))[: len(df)] + df["ok"] = df["outcome"] == 1 + with self.assertRaises(ValueError): + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + + def test_invalid_fault_t_do(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["ok"] = df["outcome"] == 1 + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + estimation_model.df["fault_t_do"] = 0 + with self.assertRaises(ValueError): + estimate, intervals = estimation_model.estimate_hazard_ratio() From a61906975c7db0b5ae760d585542eab5148ba6ce Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Mon, 29 Jul 2024 10:39:49 +0100 Subject: [PATCH 10/12] Test capabilities --- .../specification_tests/test_capabilities.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 tests/specification_tests/test_capabilities.py diff --git a/tests/specification_tests/test_capabilities.py b/tests/specification_tests/test_capabilities.py new file mode 100644 index 00000000..a471d9b9 --- /dev/null +++ b/tests/specification_tests/test_capabilities.py @@ -0,0 +1,42 @@ +import unittest +from enum import Enum +import z3 +from scipy.stats import norm, kstest + +from causal_testing.specification.capabilities import Capability, TreatmentSequence + + +class TestCapability(unittest.TestCase): + + """ + Test the Capability class for basic methods. + """ + + def setUp(self) -> None: + pass + + def test_repr(self): + cap = Capability("v", 1, 0, 1) + self.assertEqual(str(cap), "(v, 1, 0-1)") + + +class TestTreatmentSequence(unittest.TestCase): + + """ + Test the TreatmentSequence class for basic methods. + """ + + def setUp(self) -> None: + self.timesteps_per_intervention = 1 + + def test_set_value(self): + treatment_strategy = TreatmentSequence(self.timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + treatment_strategy.set_value(0, 0) + self.assertEqual([x.value for x in treatment_strategy.capabilities], [0, 1, 1]) + + def test_copy(self): + control_strategy = TreatmentSequence(self.timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + treatment_strategy = control_strategy.copy() + treatment_strategy.set_value(0, 0) + self.assertEqual([x.value for x in control_strategy.capabilities], [1, 1, 1]) + self.assertEqual([x.value for x in treatment_strategy.capabilities], [0, 1, 1]) From 4abe735750daabc6959a0390065c3b52690cde15 Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Wed, 31 Jul 2024 08:10:24 +0100 Subject: [PATCH 11/12] Minor corrections --- causal_testing/specification/capabilities.py | 3 ++- causal_testing/testing/causal_test_adequacy.py | 8 +++++++- causal_testing/testing/estimators.py | 2 +- tests/specification_tests/test_capabilities.py | 4 ---- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/causal_testing/specification/capabilities.py b/causal_testing/specification/capabilities.py index 699b77c6..60d248b0 100644 --- a/causal_testing/specification/capabilities.py +++ b/causal_testing/specification/capabilities.py @@ -3,6 +3,7 @@ treatment sequences that operate over time. """ from causal_testing.specification.variable import Variable +from typing import Any class Capability: @@ -10,7 +11,7 @@ class Capability: Data class to encapsulate temporal interventions. """ - def __init__(self, variable: Variable, value: any, start_time: int, end_time: float): + def __init__(self, variable: Variable, value: Any, start_time: int, end_time: int): self.variable = variable self.value = value self.start_time = start_time diff --git a/causal_testing/testing/causal_test_adequacy.py b/causal_testing/testing/causal_test_adequacy.py index fc837d23..cd7b2735 100644 --- a/causal_testing/testing/causal_test_adequacy.py +++ b/causal_testing/testing/causal_test_adequacy.py @@ -12,6 +12,9 @@ from causal_testing.specification.causal_dag import CausalDAG from causal_testing.testing.estimators import Estimator from causal_testing.testing.causal_test_case import CausalTestCase +import logging + +logger = logging.getLogger(__name__) class DAGAdequacy: @@ -104,10 +107,13 @@ def measure_adequacy(self): try: results.append(self.test_case.execute_test(estimator, None)) except LinAlgError: + logger.warning("Adequacy LinAlgError") continue except ConvergenceError: + logger.warning("Adequacy ConvergenceError") continue - except ValueError: + except ValueError as e: + logger.warning(f"Adequacy ValueError: {e}") continue outcomes = [self.test_case.expected_causal_effect.apply(c) for c in results] results = pd.DataFrame(c.to_dict() for c in results)[["effect_estimate", "ci_low", "ci_high"]] diff --git a/causal_testing/testing/estimators.py b/causal_testing/testing/estimators.py index 44eb0c6d..102bc697 100644 --- a/causal_testing/testing/estimators.py +++ b/causal_testing/testing/estimators.py @@ -697,7 +697,7 @@ def setup_fault_t_do(self, individual: pd.DataFrame): return pd.DataFrame({"fault_t_do": fault_t_do}) - def setup_fault_time(self, individual, perturbation=-0.001): + def setup_fault_time(self, individual: pd.DataFrame, perturbation: float = -0.001): """ Return the time at which the event of interest (i.e. a fault) occurred. """ diff --git a/tests/specification_tests/test_capabilities.py b/tests/specification_tests/test_capabilities.py index a471d9b9..505c7eff 100644 --- a/tests/specification_tests/test_capabilities.py +++ b/tests/specification_tests/test_capabilities.py @@ -1,8 +1,4 @@ import unittest -from enum import Enum -import z3 -from scipy.stats import norm, kstest - from causal_testing.specification.capabilities import Capability, TreatmentSequence From 2b76d3cf37a49ba05e964e1ce7ddb55dad103271 Mon Sep 17 00:00:00 2001 From: Michael Foster Date: Wed, 31 Jul 2024 08:13:34 +0100 Subject: [PATCH 12/12] pylint --- causal_testing/specification/capabilities.py | 2 +- causal_testing/testing/causal_test_adequacy.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/causal_testing/specification/capabilities.py b/causal_testing/specification/capabilities.py index 60d248b0..ed692e6d 100644 --- a/causal_testing/specification/capabilities.py +++ b/causal_testing/specification/capabilities.py @@ -2,8 +2,8 @@ This module contains the Capability and TreatmentSequence classes to implement treatment sequences that operate over time. """ -from causal_testing.specification.variable import Variable from typing import Any +from causal_testing.specification.variable import Variable class Capability: diff --git a/causal_testing/testing/causal_test_adequacy.py b/causal_testing/testing/causal_test_adequacy.py index cd7b2735..740bba5e 100644 --- a/causal_testing/testing/causal_test_adequacy.py +++ b/causal_testing/testing/causal_test_adequacy.py @@ -2,6 +2,7 @@ This module contains code to measure various aspects of causal test adequacy. """ +import logging from itertools import combinations from copy import deepcopy import pandas as pd @@ -12,7 +13,6 @@ from causal_testing.specification.causal_dag import CausalDAG from causal_testing.testing.estimators import Estimator from causal_testing.testing.causal_test_case import CausalTestCase -import logging logger = logging.getLogger(__name__)