diff --git a/causal_testing/json_front/json_class.py b/causal_testing/json_front/json_class.py index cca98a20..8c6c265c 100644 --- a/causal_testing/json_front/json_class.py +++ b/causal_testing/json_front/json_class.py @@ -276,7 +276,7 @@ def _execute_test_case( test_passes = causal_test_case.expected_causal_effect.apply(causal_test_result) if "coverage" in test and test["coverage"]: - adequacy_metric = DataAdequacy(causal_test_case, estimation_model, self.data_collector) + adequacy_metric = DataAdequacy(causal_test_case, estimation_model) adequacy_metric.measure_adequacy() causal_test_result.adequacy = adequacy_metric diff --git a/causal_testing/specification/capabilities.py b/causal_testing/specification/capabilities.py new file mode 100644 index 00000000..ed692e6d --- /dev/null +++ b/causal_testing/specification/capabilities.py @@ -0,0 +1,76 @@ +""" +This module contains the Capability and TreatmentSequence classes to implement +treatment sequences that operate over time. +""" +from typing import Any +from causal_testing.specification.variable import Variable + + +class Capability: + """ + Data class to encapsulate temporal interventions. + """ + + def __init__(self, variable: Variable, value: Any, start_time: int, end_time: int): + self.variable = variable + self.value = value + self.start_time = start_time + self.end_time = end_time + + def __eq__(self, other): + return ( + isinstance(other, type(self)) + and self.variable == other.variable + and self.value == other.value + and self.start_time == other.start_time + and self.end_time == other.end_time + ) + + def __repr__(self): + return f"({self.variable}, {self.value}, {self.start_time}-{self.end_time})" + + +class TreatmentSequence: + """ + Class to represent a list of capabilities, i.e. a treatment regime. + """ + + def __init__(self, timesteps_per_intervention, capabilities): + self.timesteps_per_intervention = timesteps_per_intervention + self.capabilities = [ + Capability(var, val, t, t + timesteps_per_intervention) + for (var, val), t in zip( + capabilities, + range( + timesteps_per_intervention, + (len(capabilities) * timesteps_per_intervention) + 1, + timesteps_per_intervention, + ), + ) + ] + # This is a bodge so that causal test adequacy works + self.name = tuple(c.variable for c in self.capabilities) + + def set_value(self, index: int, value: float): + """ + Set the value of capability at the given index. + :param index - the index of the element to update. + :param value - the desired value of the capability. + """ + self.capabilities[index].value = value + + def copy(self): + """ + Return a deep copy of the capability list. + """ + strategy = TreatmentSequence( + self.timesteps_per_intervention, + [(c.variable, c.value) for c in self.capabilities], + ) + return strategy + + def total_time(self): + """ + Calculate the total duration of the treatment strategy. + """ + return (len(self.capabilities) + 1) * self.timesteps_per_intervention diff --git a/causal_testing/testing/causal_test_adequacy.py b/causal_testing/testing/causal_test_adequacy.py index 2a9bff93..740bba5e 100644 --- a/causal_testing/testing/causal_test_adequacy.py +++ b/causal_testing/testing/causal_test_adequacy.py @@ -2,16 +2,20 @@ This module contains code to measure various aspects of causal test adequacy. """ +import logging from itertools import combinations from copy import deepcopy import pandas as pd +from numpy.linalg import LinAlgError +from lifelines.exceptions import ConvergenceError from causal_testing.testing.causal_test_suite import CausalTestSuite -from causal_testing.data_collection.data_collector import DataCollector from causal_testing.specification.causal_dag import CausalDAG from causal_testing.testing.estimators import Estimator from causal_testing.testing.causal_test_case import CausalTestCase +logger = logging.getLogger(__name__) + class DAGAdequacy: """ @@ -70,15 +74,21 @@ class DataAdequacy: - Zero kurtosis is optimal. """ + # pylint: disable=too-many-instance-attributes def __init__( - self, test_case: CausalTestCase, estimator: Estimator, data_collector: DataCollector, bootstrap_size: int = 100 + self, + test_case: CausalTestCase, + estimator: Estimator, + bootstrap_size: int = 100, + group_by=None, ): self.test_case = test_case self.estimator = estimator - self.data_collector = data_collector self.kurtosis = None self.outcomes = None + self.successful = None self.bootstrap_size = bootstrap_size + self.group_by = group_by def measure_adequacy(self): """ @@ -87,11 +97,24 @@ def measure_adequacy(self): results = [] for i in range(self.bootstrap_size): estimator = deepcopy(self.estimator) - estimator.df = estimator.df.sample(len(estimator.df), replace=True, random_state=i) - # try: - results.append(self.test_case.execute_test(estimator, self.data_collector)) - # except np.LinAlgError: - # continue + + if self.group_by is not None: + ids = pd.Series(estimator.df[self.group_by].unique()) + ids = ids.sample(len(ids), replace=True, random_state=i) + estimator.df = estimator.df[estimator.df[self.group_by].isin(ids)] + else: + estimator.df = estimator.df.sample(len(estimator.df), replace=True, random_state=i) + try: + results.append(self.test_case.execute_test(estimator, None)) + except LinAlgError: + logger.warning("Adequacy LinAlgError") + continue + except ConvergenceError: + logger.warning("Adequacy ConvergenceError") + continue + except ValueError as e: + logger.warning(f"Adequacy ValueError: {e}") + continue outcomes = [self.test_case.expected_causal_effect.apply(c) for c in results] results = pd.DataFrame(c.to_dict() for c in results)[["effect_estimate", "ci_low", "ci_high"]] @@ -111,8 +134,14 @@ def convert_to_df(field): effect_estimate = pd.concat(results["effect_estimate"].tolist(), axis=1).transpose().reset_index(drop=True) self.kurtosis = effect_estimate.kurtosis() - self.outcomes = sum(outcomes) + self.outcomes = sum(filter(lambda x: x is not None, outcomes)) + self.successful = sum(x is not None for x in outcomes) def to_dict(self): "Returns the adequacy object as a dictionary." - return {"kurtosis": self.kurtosis.to_dict(), "bootstrap_size": self.bootstrap_size, "passing": self.outcomes} + return { + "kurtosis": self.kurtosis.to_dict(), + "bootstrap_size": self.bootstrap_size, + "passing": self.outcomes, + "successful": self.successful, + } diff --git a/causal_testing/testing/causal_test_case.py b/causal_testing/testing/causal_test_case.py index da47c126..52daf040 100644 --- a/causal_testing/testing/causal_test_case.py +++ b/causal_testing/testing/causal_test_case.py @@ -92,7 +92,7 @@ def _return_causal_test_results(self, estimator) -> CausalTestResult: except np.linalg.LinAlgError: return CausalTestResult( estimator=estimator, - test_value=TestValue(self.estimate_type, "LinAlgError"), + test_value=TestValue(self.estimate_type, None), effect_modifier_configuration=self.effect_modifier_configuration, confidence_intervals=None, ) diff --git a/causal_testing/testing/causal_test_outcome.py b/causal_testing/testing/causal_test_outcome.py index 3846b514..be10566b 100644 --- a/causal_testing/testing/causal_test_outcome.py +++ b/causal_testing/testing/causal_test_outcome.py @@ -27,7 +27,9 @@ class SomeEffect(CausalTestOutcome): """An extension of TestOutcome representing that the expected causal effect should not be zero.""" def apply(self, res: CausalTestResult) -> bool: - if res.test_value.type == "risk_ratio": + if res.ci_low() is None or res.ci_high() is None: + return None + if res.test_value.type in ("risk_ratio", "hazard_ratio"): return any( 1 < ci_low < ci_high or ci_low < ci_high < 1 for ci_low, ci_high in zip(res.ci_low(), res.ci_high()) ) @@ -52,7 +54,7 @@ def __init__(self, atol: float = 1e-10, ctol: float = 0.05): self.ctol = ctol def apply(self, res: CausalTestResult) -> bool: - if res.test_value.type == "risk_ratio": + if res.test_value.type in ("risk_ratio", "hazard_ratio"): return any( ci_low < 1 < ci_high or np.isclose(value, 1.0, atol=self.atol) for ci_low, ci_high, value in zip(res.ci_low(), res.ci_high(), res.test_value.value) diff --git a/causal_testing/testing/estimators.py b/causal_testing/testing/estimators.py index fdd834bc..3920e1f2 100644 --- a/causal_testing/testing/estimators.py +++ b/causal_testing/testing/estimators.py @@ -14,8 +14,10 @@ from patsy import ModelDesc from statsmodels.regression.linear_model import RegressionResultsWrapper from statsmodels.tools.sm_exceptions import PerfectSeparationError +from lifelines import CoxPHFitter from causal_testing.specification.variable import Variable +from causal_testing.specification.capabilities import TreatmentSequence, Capability logger = logging.getLogger(__name__) @@ -360,7 +362,6 @@ def estimate_coefficient(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]: if factor.name() in self.df.dtypes ) ): - design_info = dmatrix(self.formula.split("~")[1], self.df).design_info treatment = design_info.column_names[design_info.term_name_slices[self.treatment]] else: @@ -602,3 +603,238 @@ def estimate_coefficient(self, bootstrap_size=100) -> tuple[pd.Series, list[pd.S ci_high = pd.Series(bootstraps[bootstrap_size - bound]) return pd.Series(self.estimate_iv_coefficient(self.df)), [ci_low, ci_high] + + +class IPCWEstimator(Estimator): + """ + Class to perform inverse probability of censoring weighting (IPCW) estimation + for sequences of treatments over time-varying data. + """ + + # pylint: disable=too-many-arguments + # pylint: disable=too-many-instance-attributes + def __init__( + self, + df: pd.DataFrame, + timesteps_per_intervention: int, + control_strategy: TreatmentSequence, + treatment_strategy: TreatmentSequence, + outcome: str, + fault_column: str, + fit_bl_switch_formula: str, + fit_bltd_switch_formula: str, + eligibility=None, + alpha: float = 0.05, + ): + super().__init__( + [c.variable for c in treatment_strategy.capabilities], + [c.value for c in treatment_strategy.capabilities], + [c.value for c in control_strategy.capabilities], + None, + outcome, + df, + None, + alpha=alpha, + query="", + ) + self.timesteps_per_intervention = timesteps_per_intervention + self.control_strategy = control_strategy + self.treatment_strategy = treatment_strategy + self.outcome = outcome + self.fault_column = fault_column + self.timesteps_per_intervention = timesteps_per_intervention + self.fit_bl_switch_formula = fit_bl_switch_formula + self.fit_bltd_switch_formula = fit_bltd_switch_formula + self.eligibility = eligibility + self.df = df + self.preprocess_data() + + def add_modelling_assumptions(self): + self.modelling_assumptions.append("The variables in the data vary over time.") + + def setup_xo_t_do(self, strategy_assigned: list, strategy_followed: list, eligible: pd.Series): + """ + Return a binary sequence with each bit representing whether the current + index is the time point at which the individual diverted from the + assigned treatment strategy (and thus should be censored). + + :param strategy_assigned - the assigned treatment strategy + :param strategy_followed - the strategy followed by the individual + :param eligible - binary sequence represnting the eligibility of the individual at each time step + """ + strategy_assigned = [1] + strategy_assigned + [1] + strategy_followed = [1] + strategy_followed + [1] + + mask = ( + pd.Series(strategy_assigned, index=eligible.index) != pd.Series(strategy_followed, index=eligible.index) + ).astype("boolean") + mask = mask | ~eligible + mask.reset_index(inplace=True, drop=True) + false = mask.loc[mask] + if false.empty: + return np.zeros(len(mask)) + mask = (mask * 1).tolist() + cutoff = false.index[0] + 1 + return mask[:cutoff] + ([None] * (len(mask) - cutoff)) + + def setup_fault_t_do(self, individual: pd.DataFrame): + """ + Return a binary sequence with each bit representing whether the current + index is the time point at which the event of interest (i.e. a fault) + occurred. + """ + fault = individual[~individual[self.fault_column]] + fault_t_do = pd.Series(np.zeros(len(individual)), index=individual.index) + + if not fault.empty: + fault_time = individual["time"].loc[fault.index[0]] + # Ceiling to nearest observation point + fault_time = ceil(fault_time / self.timesteps_per_intervention) * self.timesteps_per_intervention + # Set the correct observation point to be the fault time of doing (fault_t_do) + observations = individual.loc[ + (individual["time"] % self.timesteps_per_intervention == 0) & (individual["time"] < fault_time) + ] + if not observations.empty: + fault_t_do.loc[observations.index[0]] = 1 + assert sum(fault_t_do) <= 1, f"Multiple fault times for\n{individual}" + + return pd.DataFrame({"fault_t_do": fault_t_do}) + + def setup_fault_time(self, individual: pd.DataFrame, perturbation: float = -0.001): + """ + Return the time at which the event of interest (i.e. a fault) occurred. + """ + fault = individual[~individual[self.fault_column]] + fault_time = ( + individual["time"].loc[fault.index[0]] + if not fault.empty + else (individual["time"].max() + self.timesteps_per_intervention) + ) + return pd.DataFrame({"fault_time": np.repeat(fault_time + perturbation, len(individual))}) + + def preprocess_data(self): + """ + Set up the treatment-specific columns in the data that are needed to estimate the hazard ratio. + """ + self.df["trtrand"] = None # treatment/control arm + self.df["xo_t_do"] = None # did the individual deviate from the treatment of interest here? + self.df["eligible"] = self.df.eval(self.eligibility) if self.eligibility is not None else True + + # when did a fault occur? + self.df["fault_time"] = self.df.groupby("id")[[self.fault_column, "time"]].apply(self.setup_fault_time).values + self.df["fault_t_do"] = ( + self.df.groupby("id")[["id", "time", self.fault_column]].apply(self.setup_fault_t_do).values + ) + assert not pd.isnull(self.df["fault_time"]).any() + + living_runs = self.df.query("fault_time > 0").loc[ + (self.df["time"] % self.timesteps_per_intervention == 0) + & (self.df["time"] <= self.control_strategy.total_time()) + ] + + individuals = [] + new_id = 0 + logging.debug(" Preprocessing groups") + for _, individual in living_runs.groupby("id"): + assert sum(individual["fault_t_do"]) <= 1, ( + f"Error initialising fault_t_do for individual\n" + f"{individual[['id', 'time', 'fault_time', 'fault_t_do']]}\n" + "with fault at {individual.fault_time.iloc[0]}" + ) + + strategy_followed = [ + Capability( + c.variable, + individual.loc[individual["time"] == c.start_time, c.variable].values[0], + c.start_time, + c.end_time, + ) + for c in self.treatment_strategy.capabilities + ] + + # Control flow: + # Individuals that start off in both arms, need cloning (hence incrementing the ID within the if statement) + # Individuals that don't start off in either arm are left out + for inx, strategy_assigned in [(0, self.control_strategy), (1, self.treatment_strategy)]: + if strategy_assigned.capabilities[0] == strategy_followed[0] and individual.eligible.iloc[0]: + individual["id"] = new_id + new_id += 1 + individual["trtrand"] = inx + individual["xo_t_do"] = self.setup_xo_t_do( + strategy_assigned.capabilities, strategy_followed, individual["eligible"] + ) + individuals.append(individual.loc[individual["time"] <= individual["fault_time"]].copy()) + if len(individuals) == 0: + raise ValueError("No individuals followed either strategy.") + + self.df = pd.concat(individuals) + + def estimate_hazard_ratio(self): + """ + Estimate the hazard ratio. + """ + + if self.df["fault_t_do"].sum() == 0: + raise ValueError("No recorded faults") + + preprocessed_data = self.df.loc[self.df["xo_t_do"] == 0].copy() + + # Use logistic regression to predict switching given baseline covariates + fit_bl_switch = smf.logit(self.fit_bl_switch_formula, data=self.df).fit() + + preprocessed_data["pxo1"] = fit_bl_switch.predict(preprocessed_data) + + # Use logistic regression to predict switching given baseline and time-updated covariates (model S12) + fit_bltd_switch = smf.logit( + self.fit_bltd_switch_formula, + data=self.df, + ).fit() + + preprocessed_data["pxo2"] = fit_bltd_switch.predict(preprocessed_data) + + # IPCW step 3: For each individual at each time, compute the inverse probability of remaining uncensored + # Estimate the probabilities of remaining ‘un-switched’ and hence the weights + + preprocessed_data["num"] = 1 - preprocessed_data["pxo1"] + preprocessed_data["denom"] = 1 - preprocessed_data["pxo2"] + preprocessed_data[["num", "denom"]] = ( + preprocessed_data.sort_values(["id", "time"]).groupby("id")[["num", "denom"]].cumprod() + ) + + assert ( + not preprocessed_data["num"].isnull().any() + ), f"{len(preprocessed_data['num'].isnull())} null numerator values" + assert ( + not preprocessed_data["denom"].isnull().any() + ), f"{len(preprocessed_data['denom'].isnull())} null denom values" + + preprocessed_data["weight"] = 1 / preprocessed_data["denom"] + preprocessed_data["sweight"] = preprocessed_data["num"] / preprocessed_data["denom"] + + preprocessed_data["tin"] = preprocessed_data["time"] + preprocessed_data["tout"] = pd.concat( + [(preprocessed_data["time"] + self.timesteps_per_intervention), preprocessed_data["fault_time"]], + axis=1, + ).min(axis=1) + + assert (preprocessed_data["tin"] <= preprocessed_data["tout"]).all(), ( + f"Left before joining\n" f"{preprocessed_data.loc[preprocessed_data['tin'] >= preprocessed_data['tout']]}" + ) + + # IPCW step 4: Use these weights in a weighted analysis of the outcome model + # Estimate the KM graph and IPCW hazard ratio using Cox regression. + cox_ph = CoxPHFitter() + cox_ph.fit( + df=preprocessed_data, + duration_col="tout", + event_col="fault_t_do", + weights_col="weight", + cluster_col="id", + robust=True, + formula="trtrand", + entry_col="tin", + ) + + ci_low, ci_high = [np.exp(cox_ph.confidence_intervals_)[col] for col in cox_ph.confidence_intervals_.columns] + + return (cox_ph.hazard_ratios_, (ci_low, ci_high)) diff --git a/pyproject.toml b/pyproject.toml index cf8ca872..e8e9fc68 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,10 +17,11 @@ keywords = ["causal inference", "verification"] dependencies = [ "z3_solver~=4.11.2", # z3_solver does not follow semantic versioning and tying to 4.11 introduces problems "fitter~=1.7", + "lifelines~=0.29.0", "lhsmdu~=1.1", "networkx~=2.6", "numpy~=1.26", - "pandas~=1.5", + "pandas>=2.1", "scikit_learn~=1.4", "scipy~=1.7", "statsmodels~=0.14", diff --git a/tests/resources/data/data.csv b/tests/resources/data/data.csv index 2a8e8e3e..e95cae74 100644 --- a/tests/resources/data/data.csv +++ b/tests/resources/data/data.csv @@ -1,2 +1,2 @@ index,test_input,test_input_no_dist,test_output -0,1,1,2 +0,1.0,1.0,2.0 diff --git a/tests/resources/data/data_with_meta.csv b/tests/resources/data/data_with_meta.csv index 3c7d7e15..23ed882e 100644 --- a/tests/resources/data/data_with_meta.csv +++ b/tests/resources/data/data_with_meta.csv @@ -1,2 +1,2 @@ index,test_input,test_input_no_dist,test_output,test_meta -0,1,1,2,3 +0,1.0,1.0,2.0,3 diff --git a/tests/data/nhefs.csv b/tests/resources/data/nhefs.csv similarity index 100% rename from tests/data/nhefs.csv rename to tests/resources/data/nhefs.csv diff --git a/tests/data/scarf_data.csv b/tests/resources/data/scarf_data.csv similarity index 100% rename from tests/data/scarf_data.csv rename to tests/resources/data/scarf_data.csv diff --git a/tests/resources/data/temporal_data.csv b/tests/resources/data/temporal_data.csv new file mode 100644 index 00000000..a239e041 --- /dev/null +++ b/tests/resources/data/temporal_data.csv @@ -0,0 +1,66 @@ +t,outcome,id,time +0,1,0,0 +0,1,0,1 +0,1,0,2 +0,0,0,3 +0,0,0,4 +0,1,1,0 +0,1,1,1 +0,1,1,2 +0,0,1,3 +0,0,1,4 +0,1,2,0 +0,1,2,1 +0,1,2,2 +0,0,2,3 +0,0,2,4 +0,1,3,0 +0,1,3,1 +0,1,3,2 +0,0,3,3 +0,0,3,4 +0,1,4,0 +0,1,4,1 +0,1,4,2 +0,0,4,3 +0,0,4,4 +0,1,5,0 +0,1,5,1 +0,1,5,2 +0,0,5,3 +0,0,5,4 +0,1,6,0 +0,1,6,1 +0,0,6,2 +0,0,6,3 +0,0,6,4 +1,1,7,0 +1,1,7,1 +1,0,7,2 +1,0,7,3 +1,0,7,4 +1,1,8,0 +1,1,8,1 +1,0,8,2 +1,0,8,3 +1,0,8,4 +1,1,9,0 +1,1,9,1 +1,0,9,2 +1,0,9,3 +1,0,9,4 +1,1,10,0 +1,1,10,1 +1,0,10,2 +1,0,10,3 +1,0,10,4 +1,1,11,0 +1,1,11,1 +1,0,11,2 +1,0,11,3 +1,0,11,4 +0,1,12,0 +1,1,12,1 +0,1,12,2 +1,0,12,3 +0,0,12,4 diff --git a/tests/specification_tests/test_capabilities.py b/tests/specification_tests/test_capabilities.py new file mode 100644 index 00000000..505c7eff --- /dev/null +++ b/tests/specification_tests/test_capabilities.py @@ -0,0 +1,38 @@ +import unittest +from causal_testing.specification.capabilities import Capability, TreatmentSequence + + +class TestCapability(unittest.TestCase): + + """ + Test the Capability class for basic methods. + """ + + def setUp(self) -> None: + pass + + def test_repr(self): + cap = Capability("v", 1, 0, 1) + self.assertEqual(str(cap), "(v, 1, 0-1)") + + +class TestTreatmentSequence(unittest.TestCase): + + """ + Test the TreatmentSequence class for basic methods. + """ + + def setUp(self) -> None: + self.timesteps_per_intervention = 1 + + def test_set_value(self): + treatment_strategy = TreatmentSequence(self.timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + treatment_strategy.set_value(0, 0) + self.assertEqual([x.value for x in treatment_strategy.capabilities], [0, 1, 1]) + + def test_copy(self): + control_strategy = TreatmentSequence(self.timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + treatment_strategy = control_strategy.copy() + treatment_strategy.set_value(0, 0) + self.assertEqual([x.value for x in control_strategy.capabilities], [1, 1, 1]) + self.assertEqual([x.value for x in treatment_strategy.capabilities], [0, 1, 1]) diff --git a/tests/testing_tests/test_causal_test_adequacy.py b/tests/testing_tests/test_causal_test_adequacy.py index 1f8d2ffa..e29a8207 100644 --- a/tests/testing_tests/test_causal_test_adequacy.py +++ b/tests/testing_tests/test_causal_test_adequacy.py @@ -3,17 +3,20 @@ from statistics import StatisticsError import scipy import os +import pandas as pd -from causal_testing.testing.estimators import LinearRegressionEstimator +from causal_testing.testing.estimators import LinearRegressionEstimator, IPCWEstimator from causal_testing.testing.base_test_case import BaseTestCase from causal_testing.testing.causal_test_case import CausalTestCase from causal_testing.testing.causal_test_suite import CausalTestSuite from causal_testing.testing.causal_test_adequacy import DAGAdequacy -from causal_testing.testing.causal_test_outcome import NoEffect, Positive +from causal_testing.testing.causal_test_outcome import NoEffect, Positive, SomeEffect from causal_testing.json_front.json_class import JsonUtility, CausalVariables from causal_testing.specification.variable import Input, Output, Meta from causal_testing.specification.scenario import Scenario from causal_testing.specification.causal_specification import CausalSpecification +from causal_testing.specification.capabilities import TreatmentSequence +from causal_testing.testing.causal_test_adequacy import DataAdequacy class TestCausalTestAdequacy(unittest.TestCase): @@ -70,7 +73,7 @@ def test_data_adequacy_numeric(self): ) self.assertEqual( test_results[0]["result"].adequacy.to_dict(), - {"kurtosis": {"test_input": 0.0}, "bootstrap_size": 100, "passing": 100}, + {"kurtosis": {"test_input": 0.0}, "bootstrap_size": 100, "passing": 100, "successful": 100}, ) def test_data_adequacy_cateogorical(self): @@ -103,7 +106,49 @@ def test_data_adequacy_cateogorical(self): print(test_results[0]["result"]) self.assertEqual( test_results[0]["result"].adequacy.to_dict(), - {"kurtosis": {"test_input_no_dist[T.b]": 0.0}, "bootstrap_size": 100, "passing": 100}, + {"kurtosis": {"test_input_no_dist[T.b]": 0.0}, "bootstrap_size": 100, "passing": 100, "successful": 100}, + ) + + def test_data_adequacy_group_by(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["ok"] = df["outcome"] == 1 + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + base_test_case = BaseTestCase( + treatment_variable=control_strategy, + outcome_variable=outcome, + effect="temporal", + ) + + causal_test_case = CausalTestCase( + base_test_case=base_test_case, + expected_causal_effect=SomeEffect(), + control_value=control_strategy, + treatment_value=treatment_strategy, + estimate_type="hazard_ratio", + ) + causal_test_result = causal_test_case.execute_test(estimation_model, None) + adequacy_metric = DataAdequacy(causal_test_case, estimation_model, group_by="id") + adequacy_metric.measure_adequacy() + causal_test_result.adequacy = adequacy_metric + print(causal_test_result.adequacy.to_dict()) + self.assertEqual( + causal_test_result.adequacy.to_dict(), + {"kurtosis": {"trtrand": 0.0}, "bootstrap_size": 100, "passing": 0, "successful": 95}, ) def test_dag_adequacy_dependent(self): diff --git a/tests/testing_tests/test_estimators.py b/tests/testing_tests/test_estimators.py index cac19afc..b7604b86 100644 --- a/tests/testing_tests/test_estimators.py +++ b/tests/testing_tests/test_estimators.py @@ -7,9 +7,11 @@ LogisticRegressionEstimator, InstrumentalVariableEstimator, CubicSplineRegressionEstimator, + IPCWEstimator, ) from causal_testing.specification.variable import Input from causal_testing.utils.validation import CausalValidator +from causal_testing.specification.capabilities import TreatmentSequence def plot_results_df(df): @@ -33,7 +35,7 @@ def load_nhefs_df(): """Get the NHEFS data from chapter 12 and put into a dataframe. NHEFS = National Health and Nutrition Examination Survey Data I Epidemiological Follow-up Study.""" - nhefs_df = pd.read_csv("tests/data/nhefs.csv") + nhefs_df = pd.read_csv("tests/resources/data/nhefs.csv") nhefs_df["one"] = 1 nhefs_df["zero"] = 0 edu_dummies = pd.get_dummies(nhefs_df.education, prefix="edu") @@ -77,7 +79,7 @@ class TestLogisticRegressionEstimator(unittest.TestCase): @classmethod def setUpClass(cls) -> None: - cls.scarf_df = pd.read_csv("tests/data/scarf_data.csv") + cls.scarf_df = pd.read_csv("tests/resources/data/scarf_data.csv") # Yes, this probably shouldn't be in here, but it uses the scarf data so it makes more sense to put it # here than duplicating the scarf data for a single test @@ -404,11 +406,9 @@ def test_program_11_2_with_robustness_validation(self): class TestCubicSplineRegressionEstimator(TestLinearRegressionEstimator): @classmethod def setUpClass(cls): - super().setUpClass() def test_program_11_3_cublic_spline(self): - """Test whether the cublic_spline regression implementation produces the same results as program 11.3 (p. 162). https://www.hsph.harvard.edu/miguel-hernan/wp-content/uploads/sites/1268/2023/10/hernanrobins_WhatIf_30sep23.pdf Slightly modified as Hernan et al. use linear regression for this example. @@ -446,7 +446,7 @@ def setUpClass(cls) -> None: df = pd.DataFrame({"X1": np.random.uniform(-1000, 1000, 1000), "X2": np.random.uniform(-1000, 1000, 1000)}) df["Y"] = 2 * df["X1"] - 3 * df["X2"] + 2 * df["X1"] * df["X2"] + 10 cls.df = df - cls.scarf_df = pd.read_csv("tests/data/scarf_data.csv") + cls.scarf_df = pd.read_csv("tests/resources/data/scarf_data.csv") def test_X1_effect(self): """When we fix the value of X2 to 0, the effect of X1 on Y should become ~2 (because X2 terms are cancelled).""" @@ -472,3 +472,76 @@ def test_categorical_confidence_intervals(self): self.assertTrue(coefficients.round(2).equals(pd.Series({"color[T.grey]": 0.92, "color[T.orange]": -4.25}))) self.assertTrue(ci_low.round(2).equals(pd.Series({"color[T.grey]": -22.12, "color[T.orange]": -25.58}))) self.assertTrue(ci_high.round(2).equals(pd.Series({"color[T.grey]": 23.95, "color[T.orange]": 17.08}))) + + +class TestIPCWEstimator(unittest.TestCase): + """ + Test the IPCW estimator class + """ + + def test_estimate_hazard_ratio(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["ok"] = df["outcome"] == 1 + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + estimate, intervals = estimation_model.estimate_hazard_ratio() + self.assertEqual(estimate["trtrand"], 1.0) + + def test_invalid_treatment_strategies(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["t"] = (["1", "0"] * len(df))[: len(df)] + df["ok"] = df["outcome"] == 1 + with self.assertRaises(ValueError): + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + + def test_invalid_fault_t_do(self): + timesteps_per_intervention = 1 + control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) + treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["ok"] = df["outcome"] == 1 + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + estimation_model.df["fault_t_do"] = 0 + with self.assertRaises(ValueError): + estimate, intervals = estimation_model.estimate_hazard_ratio()