diff --git a/causal_testing/estimation/ipcw_estimator.py b/causal_testing/estimation/ipcw_estimator.py index 584182ff..a7ff15cc 100644 --- a/causal_testing/estimation/ipcw_estimator.py +++ b/causal_testing/estimation/ipcw_estimator.py @@ -1,14 +1,15 @@ """This module contains the IPCWEstimator class, for estimating the time to a particular event""" import logging -from math import ceil +from typing import Any +from uuid import uuid4 + import numpy as np import pandas as pd import statsmodels.formula.api as smf from lifelines import CoxPHFitter -from causal_testing.specification.capabilities import TreatmentSequence, Capability from causal_testing.estimation.abstract_estimator import Estimator logger = logging.getLogger(__name__) @@ -16,29 +17,48 @@ class IPCWEstimator(Estimator): """ - Class to perform inverse probability of censoring weighting (IPCW) estimation + Class to perform Inverse Probability of Censoring Weighting (IPCW) estimation for sequences of treatments over time-varying data. """ # pylint: disable=too-many-arguments + # pylint: disable=too-many-locals # pylint: disable=too-many-instance-attributes def __init__( self, df: pd.DataFrame, - timesteps_per_intervention: int, - control_strategy: TreatmentSequence, - treatment_strategy: TreatmentSequence, + timesteps_per_observation: int, + control_strategy: list[tuple[int, str, Any]], + treatment_strategy: list[tuple[int, str, Any]], outcome: str, - fault_column: str, + status_column: str, fit_bl_switch_formula: str, fit_bltd_switch_formula: str, eligibility=None, alpha: float = 0.05, + total_time: float = None, ): + """ + Initialise IPCWEstimator. + + :param: df: Input DataFrame containing time-varying data. + :param: timesteps_per_observation: Number of timesteps per observation. + :param: control_strategy: The control strategy, with entries of the form (timestep, variable, value). + :param: treatment_strategy: The treatment strategy, with entries of the form (timestep, variable, value). + :param: outcome: Name of the outcome column in the DataFrame. + :param: status_column: Name of the status column in the DataFrame, which should be True for operating normally, + False for a fault. + :param: fit_bl_switch_formula: Formula for fitting the baseline switch model. + :param: fit_bltd_switch_formula: Formula for fitting the baseline time-dependent switch model. + :param: eligibility: Function to determine eligibility for treatment. Defaults to None for "always eligible". + :param: alpha: Significance level for hypothesis testing. Defaults to 0.05. + :param: total_time: Total time for the analysis. Defaults to one plus the length of of the strategy (control or + treatment) with the most elements multiplied by `timesteps_per_observation`. + """ super().__init__( - [c.variable for c in treatment_strategy.capabilities], - [c.value for c in treatment_strategy.capabilities], - [c.value for c in control_strategy.capabilities], + [var for _, var, _ in treatment_strategy], + [val for _, _, val in treatment_strategy], + [val for _, _, val in control_strategy], None, outcome, df, @@ -46,137 +66,224 @@ def __init__( alpha=alpha, query="", ) - self.timesteps_per_intervention = timesteps_per_intervention + self.timesteps_per_observation = timesteps_per_observation self.control_strategy = control_strategy self.treatment_strategy = treatment_strategy self.outcome = outcome - self.fault_column = fault_column - self.timesteps_per_intervention = timesteps_per_intervention + self.status_column = status_column self.fit_bl_switch_formula = fit_bl_switch_formula self.fit_bltd_switch_formula = fit_bltd_switch_formula self.eligibility = eligibility - self.df = df + self.df = df.sort_values(["id", "time"]) + self.len_control_group = None + self.len_treatment_group = None + + if total_time is None: + total_time = ( + max(len(self.control_strategy), len(self.treatment_strategy)) + 1 + ) * self.timesteps_per_observation + self.total_time = total_time self.preprocess_data() def add_modelling_assumptions(self): self.modelling_assumptions.append("The variables in the data vary over time.") - def setup_xo_t_do(self, strategy_assigned: list, strategy_followed: list, eligible: pd.Series): + def setup_xo_t_do(self, individual: pd.DataFrame, strategy_assigned: list): """ Return a binary sequence with each bit representing whether the current index is the time point at which the individual diverted from the assigned treatment strategy (and thus should be censored). - :param strategy_assigned - the assigned treatment strategy - :param strategy_followed - the strategy followed by the individual - :param eligible - binary sequence represnting the eligibility of the individual at each time step + :param individual: DataFrame representing the individual. + :param strategy_assigned: The assigned treatment strategy. """ - strategy_assigned = [1] + strategy_assigned + [1] - strategy_followed = [1] + strategy_followed + [1] + + default = {t: (-1, -1) for t in individual["time"].values} + + strategy_assigned = default | {t: (var, val) for t, var, val in strategy_assigned} + strategy_followed = default | { + t: (var, individual.loc[individual["time"] == t, var].values[0]) + for t, var, val in self.treatment_strategy + if t in individual["time"].values + } + + strategy_assigned = sorted( + [(t, var, val) for t, (var, val) in strategy_assigned.items() if t in individual["time"].values] + ) + strategy_followed = sorted( + [(t, var, val) for t, (var, val) in strategy_followed.items() if t in individual["time"].values] + ) mask = ( - pd.Series(strategy_assigned, index=eligible.index) != pd.Series(strategy_followed, index=eligible.index) + pd.Series(strategy_assigned, index=individual.index) != pd.Series(strategy_followed, index=individual.index) ).astype("boolean") - mask = mask | ~eligible + mask = mask | ~individual["eligible"] mask.reset_index(inplace=True, drop=True) false = mask.loc[mask] if false.empty: - return np.zeros(len(mask)) + return pd.DataFrame( + { + "id": [str(uuid4())] * len(individual), + "xo_t_do": np.zeros(len(mask)), + } + ) mask = (mask * 1).tolist() cutoff = false.index[0] + 1 - return mask[:cutoff] + ([None] * (len(mask) - cutoff)) + + return pd.DataFrame( + { + "id": [str(uuid4())] * len(individual), + "xo_t_do": pd.Series(mask[:cutoff] + ([None] * (len(mask) - cutoff)), index=individual.index), + } + ) def setup_fault_t_do(self, individual: pd.DataFrame): """ Return a binary sequence with each bit representing whether the current - index is the time point at which the event of interest (i.e. a fault) - occurred. + index is the time point at which the event of interest (i.e. a fault) occurred. + + N.B. This is rounded _up_ to the nearest multiple of `self.timesteps_per_observation`. + That is, if the fault occurs at time 22, and `self.timesteps_per_observation == 5`, then + `fault_t_do` will be 25. """ - fault = individual[~individual[self.fault_column]] - fault_t_do = pd.Series(np.zeros(len(individual)), index=individual.index) + + fault = individual[~individual[self.status_column]] + individual["fault_t_do"] = 0 if not fault.empty: - fault_time = individual["time"].loc[fault.index[0]] - # Ceiling to nearest observation point - fault_time = ceil(fault_time / self.timesteps_per_intervention) * self.timesteps_per_intervention - # Set the correct observation point to be the fault time of doing (fault_t_do) - observations = individual.loc[ - (individual["time"] % self.timesteps_per_intervention == 0) & (individual["time"] < fault_time) - ] - if not observations.empty: - fault_t_do.loc[observations.index[0]] = 1 - assert sum(fault_t_do) <= 1, f"Multiple fault times for\n{individual}" - - return pd.DataFrame({"fault_t_do": fault_t_do}) + time_fault_observed = ( + max(0, np.ceil(fault["time"].min() / self.timesteps_per_observation) - 1) + ) * self.timesteps_per_observation + individual.loc[individual["time"] == time_fault_observed, "fault_t_do"] = 1 + + assert sum(individual["fault_t_do"]) <= 1, f"Multiple fault times for\n{individual}" + + return pd.DataFrame({"fault_t_do": individual["fault_t_do"]}) def setup_fault_time(self, individual: pd.DataFrame, perturbation: float = -0.001): """ Return the time at which the event of interest (i.e. a fault) occurred. """ - fault = individual[~individual[self.fault_column]] + fault = individual[~individual[self.status_column]] fault_time = ( individual["time"].loc[fault.index[0]] if not fault.empty - else (individual["time"].max() + self.timesteps_per_intervention) + else (self.total_time + self.timesteps_per_observation) + ) + return pd.DataFrame( + { + "fault_time": np.repeat(fault_time + perturbation, len(individual)), + } ) - return pd.DataFrame({"fault_time": np.repeat(fault_time + perturbation, len(individual))}) def preprocess_data(self): """ Set up the treatment-specific columns in the data that are needed to estimate the hazard ratio. """ + self.df["trtrand"] = None # treatment/control arm self.df["xo_t_do"] = None # did the individual deviate from the treatment of interest here? self.df["eligible"] = self.df.eval(self.eligibility) if self.eligibility is not None else True # when did a fault occur? - self.df["fault_time"] = self.df.groupby("id")[[self.fault_column, "time"]].apply(self.setup_fault_time).values - self.df["fault_t_do"] = ( - self.df.groupby("id")[["id", "time", self.fault_column]].apply(self.setup_fault_t_do).values + fault_time_df = self.df.groupby("id", sort=False)[[self.status_column, "time", "id"]].apply( + self.setup_fault_time + ) + + assert len(fault_time_df) == len(self.df), "Fault times error" + self.df["fault_time"] = fault_time_df["fault_time"].values + + assert ( + self.df.groupby("id", sort=False).apply(lambda x: len(set(x["fault_time"])) == 1).all() + ), "Each individual must have a unique fault time." + + fault_t_do_df = self.df.groupby("id", sort=False)[["id", "time", self.status_column]].apply( + self.setup_fault_t_do ) - assert not pd.isnull(self.df["fault_time"]).any() + assert len(fault_t_do_df) == len(self.df), "Fault t_do error" + self.df["fault_t_do"] = fault_t_do_df["fault_t_do"].values living_runs = self.df.query("fault_time > 0").loc[ - (self.df["time"] % self.timesteps_per_intervention == 0) - & (self.df["time"] <= self.control_strategy.total_time()) + (self.df["time"] % self.timesteps_per_observation == 0) & (self.df["time"] <= self.total_time) ] - individuals = [] - new_id = 0 logging.debug(" Preprocessing groups") - for _, individual in living_runs.groupby("id"): - assert sum(individual["fault_t_do"]) <= 1, ( - f"Error initialising fault_t_do for individual\n" - f"{individual[['id', 'time', 'fault_time', 'fault_t_do']]}\n" - "with fault at {individual.fault_time.iloc[0]}" - ) - strategy_followed = [ - Capability( - c.variable, - individual.loc[individual["time"] == c.start_time, c.variable].values[0], - c.start_time, - c.end_time, + ctrl_time_0, ctrl_var_0, ctrl_val_0 = self.control_strategy[0] + ctrl_time, ctrl_var, ctrl_val = min( + set(map(tuple, self.control_strategy)).difference(map(tuple, self.treatment_strategy)) + ) + control_group = ( + living_runs.groupby("id", sort=False) + .filter(lambda gp: len(gp.loc[(gp["time"] == ctrl_time) & (gp[ctrl_var] == ctrl_val)]) > 0) + .groupby("id", sort=False) + .filter(lambda gp: len(gp.loc[(gp["time"] == ctrl_time_0) & (gp[ctrl_var_0] == ctrl_val_0)]) > 0) + .copy() + ) + control_group["trtrand"] = 0 + ctrl_xo_t_do_df = control_group.groupby("id", sort=False).apply( + self.setup_xo_t_do, strategy_assigned=self.control_strategy + ) + control_group["xo_t_do"] = ctrl_xo_t_do_df["xo_t_do"].values + control_group["old_id"] = control_group["id"] + # control_group["id"] = ctrl_xo_t_do_df["id"].values + control_group["id"] = [f"c-{id}" for id in control_group["id"]] + assert not control_group["id"].isnull().any(), "Null control IDs" + + trt_time_0, trt_var_0, trt_val_0 = self.treatment_strategy[0] + trt_time, trt_var, trt_val = min( + set(map(tuple, self.treatment_strategy)).difference(map(tuple, self.control_strategy)) + ) + treatment_group = ( + living_runs.groupby("id", sort=False) + .filter(lambda gp: len(gp.loc[(gp["time"] == trt_time) & (gp[trt_var] == trt_val)]) > 0) + .groupby("id", sort=False) + .filter(lambda gp: len(gp.loc[(gp["time"] == trt_time_0) & (gp[trt_var_0] == trt_val_0)]) > 0) + .copy() + ) + treatment_group["trtrand"] = 1 + trt_xo_t_do_df = treatment_group.groupby("id", sort=False).apply( + self.setup_xo_t_do, strategy_assigned=self.treatment_strategy + ) + treatment_group["xo_t_do"] = trt_xo_t_do_df["xo_t_do"].values + treatment_group["old_id"] = treatment_group["id"] + # treatment_group["id"] = trt_xo_t_do_df["id"].values + treatment_group["id"] = [f"t-{id}" for id in treatment_group["id"]] + assert not treatment_group["id"].isnull().any(), "Null treatment IDs" + + premature_failures = living_runs.groupby("id", sort=False).filter(lambda gp: gp["time"].max() < trt_time) + logger.debug( + f"{len(control_group.groupby('id'))} control individuals " + f"{len(treatment_group.groupby('id'))} treatment individuals " + f"{len(premature_failures.groupby('id'))} premature failures" + ) + + self.len_control_group = len(control_group.groupby("id")) + self.len_treatment_group = len(treatment_group.groupby("id")) + individuals = pd.concat([control_group, treatment_group]) + individuals = individuals.loc[ + ( + ( + individuals["time"] + < np.ceil(individuals["fault_time"] / self.timesteps_per_observation) + * self.timesteps_per_observation ) - for c in self.treatment_strategy.capabilities - ] - - # Control flow: - # Individuals that start off in both arms, need cloning (hence incrementing the ID within the if statement) - # Individuals that don't start off in either arm are left out - for inx, strategy_assigned in [(0, self.control_strategy), (1, self.treatment_strategy)]: - if strategy_assigned.capabilities[0] == strategy_followed[0] and individual.eligible.iloc[0]: - individual["id"] = new_id - new_id += 1 - individual["trtrand"] = inx - individual["xo_t_do"] = self.setup_xo_t_do( - strategy_assigned.capabilities, strategy_followed, individual["eligible"] - ) - individuals.append(individual.loc[individual["time"] <= individual["fault_time"]].copy()) + & (~individuals["xo_t_do"].isnull()) + ) + ] + if len(individuals) == 0: raise ValueError("No individuals followed either strategy.") + self.df = individuals.loc[ + individuals["time"] + < np.ceil(individuals["fault_time"] / self.timesteps_per_observation) * self.timesteps_per_observation + ].reset_index() + logger.debug(f"{len(individuals.groupby('id'))} individuals") - self.df = pd.concat(individuals) + if len(self.df.loc[self.df["trtrand"] == 0]) == 0: + raise ValueError(f"No individuals began the control strategy {self.control_strategy}") + if len(self.df.loc[self.df["trtrand"] == 1]) == 0: + raise ValueError(f"No individuals began the treatment strategy {self.treatment_strategy}") def estimate_hazard_ratio(self): """ @@ -186,23 +293,51 @@ def estimate_hazard_ratio(self): if self.df["fault_t_do"].sum() == 0: raise ValueError("No recorded faults") - preprocessed_data = self.df.loc[self.df["xo_t_do"] == 0].copy() + preprocessed_data = self.df.copy() # Use logistic regression to predict switching given baseline covariates - fit_bl_switch = smf.logit(self.fit_bl_switch_formula, data=self.df).fit() + logger.debug("Use logistic regression to predict switching given baseline covariates") + fit_bl_switch_c = smf.logit(self.fit_bl_switch_formula, data=self.df.loc[self.df.trtrand == 0]).fit( + method="bfgs" + ) + fit_bl_switch_t = smf.logit(self.fit_bl_switch_formula, data=self.df.loc[self.df.trtrand == 1]).fit( + method="bfgs" + ) - preprocessed_data["pxo1"] = fit_bl_switch.predict(preprocessed_data) + preprocessed_data.loc[preprocessed_data["trtrand"] == 0, "pxo1"] = fit_bl_switch_c.predict( + self.df.loc[self.df.trtrand == 0] + ) + preprocessed_data.loc[preprocessed_data["trtrand"] == 1, "pxo1"] = fit_bl_switch_t.predict( + self.df.loc[self.df.trtrand == 1] + ) # Use logistic regression to predict switching given baseline and time-updated covariates (model S12) - fit_bltd_switch = smf.logit( + logger.debug( + "Use logistic regression to predict switching given baseline and time-updated covariates (model S12)" + ) + fit_bltd_switch_c = smf.logit( self.fit_bltd_switch_formula, - data=self.df, - ).fit() + data=self.df.loc[self.df.trtrand == 0], + ).fit(method="bfgs") + fit_bltd_switch_t = smf.logit( + self.fit_bltd_switch_formula, + data=self.df.loc[self.df.trtrand == 1], + ).fit(method="bfgs") - preprocessed_data["pxo2"] = fit_bltd_switch.predict(preprocessed_data) + preprocessed_data.loc[preprocessed_data["trtrand"] == 0, "pxo2"] = fit_bltd_switch_c.predict( + self.df.loc[self.df.trtrand == 0] + ) + preprocessed_data.loc[preprocessed_data["trtrand"] == 1, "pxo2"] = fit_bltd_switch_t.predict( + self.df.loc[self.df.trtrand == 1] + ) + if (preprocessed_data["pxo2"] == 1).any(): + raise ValueError( + "Probability of switching given baseline and time-varying confounders (pxo2) cannot be one." + ) # IPCW step 3: For each individual at each time, compute the inverse probability of remaining uncensored - # Estimate the probabilities of remaining ‘un-switched’ and hence the weights + # Estimate the probabilities of remaining 'un-switched' and hence the weights + logger.debug("Estimate the probabilities of remaining 'un-switched' and hence the weights") preprocessed_data["num"] = 1 - preprocessed_data["pxo1"] preprocessed_data["denom"] = 1 - preprocessed_data["pxo2"] @@ -222,17 +357,16 @@ def estimate_hazard_ratio(self): preprocessed_data["tin"] = preprocessed_data["time"] preprocessed_data["tout"] = pd.concat( - [(preprocessed_data["time"] + self.timesteps_per_intervention), preprocessed_data["fault_time"]], + [(preprocessed_data["time"] + self.timesteps_per_observation), preprocessed_data["fault_time"]], axis=1, ).min(axis=1) - assert (preprocessed_data["tin"] <= preprocessed_data["tout"]).all(), ( - f"Left before joining\n" f"{preprocessed_data.loc[preprocessed_data['tin'] >= preprocessed_data['tout']]}" - ) + assert (preprocessed_data["tin"] <= preprocessed_data["tout"]).all(), "Individuals left before joining." # IPCW step 4: Use these weights in a weighted analysis of the outcome model # Estimate the KM graph and IPCW hazard ratio using Cox regression. - cox_ph = CoxPHFitter() + logger.debug("Estimate the KM graph and IPCW hazard ratio using Cox regression.") + cox_ph = CoxPHFitter(penalizer=0.2, alpha=self.alpha) cox_ph.fit( df=preprocessed_data, duration_col="tout", diff --git a/causal_testing/specification/capabilities.py b/causal_testing/specification/capabilities.py deleted file mode 100644 index 1cb6d30b..00000000 --- a/causal_testing/specification/capabilities.py +++ /dev/null @@ -1,77 +0,0 @@ -""" -This module contains the Capability and TreatmentSequence classes to implement -treatment sequences that operate over time. -""" - -from typing import Any -from causal_testing.specification.variable import Variable - - -class Capability: - """ - Data class to encapsulate temporal interventions. - """ - - def __init__(self, variable: Variable, value: Any, start_time: int, end_time: int): - self.variable = variable - self.value = value - self.start_time = start_time - self.end_time = end_time - - def __eq__(self, other): - return ( - isinstance(other, type(self)) - and self.variable == other.variable - and self.value == other.value - and self.start_time == other.start_time - and self.end_time == other.end_time - ) - - def __repr__(self): - return f"({self.variable}, {self.value}, {self.start_time}-{self.end_time})" - - -class TreatmentSequence: - """ - Class to represent a list of capabilities, i.e. a treatment regime. - """ - - def __init__(self, timesteps_per_intervention, capabilities): - self.timesteps_per_intervention = timesteps_per_intervention - self.capabilities = [ - Capability(var, val, t, t + timesteps_per_intervention) - for (var, val), t in zip( - capabilities, - range( - timesteps_per_intervention, - (len(capabilities) * timesteps_per_intervention) + 1, - timesteps_per_intervention, - ), - ) - ] - # This is a bodge so that causal test adequacy works - self.name = tuple(c.variable for c in self.capabilities) - - def set_value(self, index: int, value: float): - """ - Set the value of capability at the given index. - :param index - the index of the element to update. - :param value - the desired value of the capability. - """ - self.capabilities[index].value = value - - def copy(self): - """ - Return a deep copy of the capability list. - """ - strategy = TreatmentSequence( - self.timesteps_per_intervention, - [(c.variable, c.value) for c in self.capabilities], - ) - return strategy - - def total_time(self): - """ - Calculate the total duration of the treatment strategy. - """ - return (len(self.capabilities) + 1) * self.timesteps_per_intervention diff --git a/causal_testing/testing/causal_test_case.py b/causal_testing/testing/causal_test_case.py index 08ad54f1..ea3795f2 100644 --- a/causal_testing/testing/causal_test_case.py +++ b/causal_testing/testing/causal_test_case.py @@ -2,7 +2,6 @@ import logging from typing import Any -import numpy as np from causal_testing.specification.variable import Variable from causal_testing.testing.causal_test_outcome import CausalTestOutcome @@ -81,21 +80,13 @@ def _return_causal_test_results(self, estimator) -> CausalTestResult: if not hasattr(estimator, f"estimate_{self.estimate_type}"): raise AttributeError(f"{estimator.__class__} has no {self.estimate_type} method.") estimate_effect = getattr(estimator, f"estimate_{self.estimate_type}") - try: - effect, confidence_intervals = estimate_effect(**self.estimate_params) - return CausalTestResult( - estimator=estimator, - test_value=TestValue(self.estimate_type, effect), - effect_modifier_configuration=self.effect_modifier_configuration, - confidence_intervals=confidence_intervals, - ) - except np.linalg.LinAlgError: - return CausalTestResult( - estimator=estimator, - test_value=TestValue(self.estimate_type, None), - effect_modifier_configuration=self.effect_modifier_configuration, - confidence_intervals=None, - ) + effect, confidence_intervals = estimate_effect(**self.estimate_params) + return CausalTestResult( + estimator=estimator, + test_value=TestValue(self.estimate_type, effect), + effect_modifier_configuration=self.effect_modifier_configuration, + confidence_intervals=confidence_intervals, + ) def __str__(self): treatment_config = {self.treatment_variable.name: self.treatment_value} diff --git a/tests/estimation_tests/test_cubic_spline_estimator.py b/tests/estimation_tests/test_cubic_spline_estimator.py index e8560b88..b55b0a5a 100644 --- a/tests/estimation_tests/test_cubic_spline_estimator.py +++ b/tests/estimation_tests/test_cubic_spline_estimator.py @@ -4,7 +4,6 @@ import matplotlib.pyplot as plt from causal_testing.specification.variable import Input from causal_testing.utils.validation import CausalValidator -from causal_testing.specification.capabilities import TreatmentSequence from causal_testing.estimation.cubic_spline_estimator import CubicSplineRegressionEstimator from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator diff --git a/tests/estimation_tests/test_instrumental_variable_estimator.py b/tests/estimation_tests/test_instrumental_variable_estimator.py index 4cfa7dfe..c166b75e 100644 --- a/tests/estimation_tests/test_instrumental_variable_estimator.py +++ b/tests/estimation_tests/test_instrumental_variable_estimator.py @@ -4,7 +4,6 @@ import matplotlib.pyplot as plt from causal_testing.specification.variable import Input from causal_testing.utils.validation import CausalValidator -from causal_testing.specification.capabilities import TreatmentSequence from causal_testing.estimation.instrumental_variable_estimator import InstrumentalVariableEstimator diff --git a/tests/estimation_tests/test_ipcw_estimator.py b/tests/estimation_tests/test_ipcw_estimator.py index d4383d6e..1ab37dce 100644 --- a/tests/estimation_tests/test_ipcw_estimator.py +++ b/tests/estimation_tests/test_ipcw_estimator.py @@ -4,7 +4,6 @@ import matplotlib.pyplot as plt from causal_testing.specification.variable import Input from causal_testing.utils.validation import CausalValidator -from causal_testing.specification.capabilities import TreatmentSequence from causal_testing.estimation.ipcw_estimator import IPCWEstimator @@ -16,8 +15,8 @@ class TestIPCWEstimator(unittest.TestCase): def test_estimate_hazard_ratio(self): timesteps_per_intervention = 1 - control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) - treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + control_strategy = [[t, "t", 0] for t in range(1, 4, timesteps_per_intervention)] + treatment_strategy = [[t, "t", 1] for t in range(1, 4, timesteps_per_intervention)] outcome = "outcome" fit_bl_switch_formula = "xo_t_do ~ time" df = pd.read_csv("tests/resources/data/temporal_data.csv") @@ -34,12 +33,12 @@ def test_estimate_hazard_ratio(self): eligibility=None, ) estimate, intervals = estimation_model.estimate_hazard_ratio() - self.assertEqual(estimate["trtrand"], 1.0) + self.assertEqual(round(estimate["trtrand"], 3), 1.351) def test_invalid_treatment_strategies(self): timesteps_per_intervention = 1 - control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) - treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + control_strategy = [[t, "t", 0] for t in range(1, 4, timesteps_per_intervention)] + treatment_strategy = [[t, "t", 1] for t in range(1, 4, timesteps_per_intervention)] outcome = "outcome" fit_bl_switch_formula = "xo_t_do ~ time" df = pd.read_csv("tests/resources/data/temporal_data.csv") @@ -60,8 +59,8 @@ def test_invalid_treatment_strategies(self): def test_invalid_fault_t_do(self): timesteps_per_intervention = 1 - control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) - treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + control_strategy = [[t, "t", 0] for t in range(1, 4, timesteps_per_intervention)] + treatment_strategy = [[t, "t", 1] for t in range(1, 4, timesteps_per_intervention)] outcome = "outcome" fit_bl_switch_formula = "xo_t_do ~ time" df = pd.read_csv("tests/resources/data/temporal_data.csv") @@ -80,3 +79,47 @@ def test_invalid_fault_t_do(self): estimation_model.df["fault_t_do"] = 0 with self.assertRaises(ValueError): estimate, intervals = estimation_model.estimate_hazard_ratio() + + def test_no_individual_began_control_strategy(self): + timesteps_per_intervention = 1 + control_strategy = [[t, "t", 0] for t in range(1, 4, timesteps_per_intervention)] + treatment_strategy = [[t, "t", 1] for t in range(1, 4, timesteps_per_intervention)] + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["t"] = 1 + df["ok"] = df["outcome"] == 1 + with self.assertRaises(ValueError): + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) + + def test_no_individual_began_treatment_strategy(self): + timesteps_per_intervention = 1 + control_strategy = [[t, "t", 0] for t in range(1, 4, timesteps_per_intervention)] + treatment_strategy = [[t, "t", 1] for t in range(1, 4, timesteps_per_intervention)] + outcome = "outcome" + fit_bl_switch_formula = "xo_t_do ~ time" + df = pd.read_csv("tests/resources/data/temporal_data.csv") + df["t"] = 0 + df["ok"] = df["outcome"] == 1 + with self.assertRaises(ValueError): + estimation_model = IPCWEstimator( + df, + timesteps_per_intervention, + control_strategy, + treatment_strategy, + outcome, + "ok", + fit_bl_switch_formula=fit_bl_switch_formula, + fit_bltd_switch_formula=fit_bl_switch_formula, + eligibility=None, + ) diff --git a/tests/estimation_tests/test_linear_regression_estimator.py b/tests/estimation_tests/test_linear_regression_estimator.py index c3749fdb..db2c3e48 100644 --- a/tests/estimation_tests/test_linear_regression_estimator.py +++ b/tests/estimation_tests/test_linear_regression_estimator.py @@ -4,7 +4,6 @@ import matplotlib.pyplot as plt from causal_testing.specification.variable import Input from causal_testing.utils.validation import CausalValidator -from causal_testing.specification.capabilities import TreatmentSequence from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator from causal_testing.estimation.genetic_programming_regression_fitter import reciprocal diff --git a/tests/estimation_tests/test_logistic_regression_estimator.py b/tests/estimation_tests/test_logistic_regression_estimator.py index a5d104e3..ac9688ae 100644 --- a/tests/estimation_tests/test_logistic_regression_estimator.py +++ b/tests/estimation_tests/test_logistic_regression_estimator.py @@ -4,7 +4,6 @@ import matplotlib.pyplot as plt from causal_testing.specification.variable import Input from causal_testing.utils.validation import CausalValidator -from causal_testing.specification.capabilities import TreatmentSequence from causal_testing.estimation.logistic_regression_estimator import LogisticRegressionEstimator diff --git a/tests/specification_tests/test_capabilities.py b/tests/specification_tests/test_capabilities.py deleted file mode 100644 index 98fd466c..00000000 --- a/tests/specification_tests/test_capabilities.py +++ /dev/null @@ -1,36 +0,0 @@ -import unittest -from causal_testing.specification.capabilities import Capability, TreatmentSequence - - -class TestCapability(unittest.TestCase): - """ - Test the Capability class for basic methods. - """ - - def setUp(self) -> None: - pass - - def test_repr(self): - cap = Capability("v", 1, 0, 1) - self.assertEqual(str(cap), "(v, 1, 0-1)") - - -class TestTreatmentSequence(unittest.TestCase): - """ - Test the TreatmentSequence class for basic methods. - """ - - def setUp(self) -> None: - self.timesteps_per_intervention = 1 - - def test_set_value(self): - treatment_strategy = TreatmentSequence(self.timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) - treatment_strategy.set_value(0, 0) - self.assertEqual([x.value for x in treatment_strategy.capabilities], [0, 1, 1]) - - def test_copy(self): - control_strategy = TreatmentSequence(self.timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) - treatment_strategy = control_strategy.copy() - treatment_strategy.set_value(0, 0) - self.assertEqual([x.value for x in control_strategy.capabilities], [1, 1, 1]) - self.assertEqual([x.value for x in treatment_strategy.capabilities], [0, 1, 1]) diff --git a/tests/testing_tests/test_causal_test_adequacy.py b/tests/testing_tests/test_causal_test_adequacy.py index d5aeb5b1..208bb007 100644 --- a/tests/testing_tests/test_causal_test_adequacy.py +++ b/tests/testing_tests/test_causal_test_adequacy.py @@ -1,8 +1,7 @@ +import os import unittest from pathlib import Path -from statistics import StatisticsError import scipy -import os import pandas as pd from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator @@ -11,12 +10,9 @@ from causal_testing.testing.causal_test_case import CausalTestCase from causal_testing.testing.causal_test_suite import CausalTestSuite from causal_testing.testing.causal_test_adequacy import DAGAdequacy -from causal_testing.testing.causal_test_outcome import NoEffect, Positive, SomeEffect +from causal_testing.testing.causal_test_outcome import NoEffect, SomeEffect from causal_testing.json_front.json_class import JsonUtility, CausalVariables -from causal_testing.specification.variable import Input, Output, Meta from causal_testing.specification.scenario import Scenario -from causal_testing.specification.causal_specification import CausalSpecification -from causal_testing.specification.capabilities import TreatmentSequence from causal_testing.testing.causal_test_adequacy import DataAdequacy @@ -112,8 +108,8 @@ def test_data_adequacy_cateogorical(self): def test_data_adequacy_group_by(self): timesteps_per_intervention = 1 - control_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 0), ("t", 0), ("t", 0)]) - treatment_strategy = TreatmentSequence(timesteps_per_intervention, [("t", 1), ("t", 1), ("t", 1)]) + control_strategy = [[t, "t", 0] for t in range(1, 4, timesteps_per_intervention)] + treatment_strategy = [[t, "t", 1] for t in range(1, 4, timesteps_per_intervention)] outcome = "outcome" fit_bl_switch_formula = "xo_t_do ~ time" df = pd.read_csv("tests/resources/data/temporal_data.csv") @@ -145,11 +141,12 @@ def test_data_adequacy_group_by(self): causal_test_result = causal_test_case.execute_test(estimation_model, None) adequacy_metric = DataAdequacy(causal_test_case, estimation_model, group_by="id") adequacy_metric.measure_adequacy() - causal_test_result.adequacy = adequacy_metric - print(causal_test_result.adequacy.to_dict()) + adequacy_dict = adequacy_metric.to_dict() + self.assertEqual(round(adequacy_dict["kurtosis"]["trtrand"], 3), -0.857) + adequacy_dict.pop("kurtosis") self.assertEqual( - causal_test_result.adequacy.to_dict(), - {"kurtosis": {"trtrand": 0.0}, "bootstrap_size": 100, "passing": 0, "successful": 95}, + adequacy_dict, + {"bootstrap_size": 100, "passing": 32, "successful": 100}, ) def test_dag_adequacy_dependent(self):