diff --git a/causal_testing/__main__.py b/causal_testing/__main__.py index 433358d3..a02d8b49 100644 --- a/causal_testing/__main__.py +++ b/causal_testing/__main__.py @@ -1,6 +1,10 @@ """This module contains the main entrypoint functionality to the Causal Testing Framework.""" import logging +import tempfile +import json +import os + from .main import setup_logging, parse_args, CausalTestingPaths, CausalTestingFramework @@ -34,13 +38,27 @@ def main() -> None: if args.batch_size > 0: logging.info(f"Running tests in batches of size {args.batch_size}") - results = framework.run_tests_in_batches(batch_size=args.batch_size, silent=args.silent) + with tempfile.TemporaryDirectory() as tmpdir: + output_files = [] + for i, results in enumerate(framework.run_tests_in_batches(batch_size=args.batch_size, silent=args.silent)): + temp_file_path = os.path.join(tmpdir, f"output_{i}.json") + framework.save_results(results, temp_file_path) + output_files.append(temp_file_path) + del results + + # Now stitch the results together from the temporary files + all_results = [] + for file_path in output_files: + with open(file_path, "r", encoding="utf-8") as f: + all_results.extend(json.load(f)) + + # Save the final stitched results to your desired location + with open(args.output, "w", encoding="utf-8") as f: + json.dump(all_results, f, indent=4) else: logging.info("Running tests in regular mode") results = framework.run_tests(silent=args.silent) - - # Save results - framework.save_results(results) + framework.save_results(results) logging.info("Causal testing completed successfully.") diff --git a/causal_testing/estimation/abstract_regression_estimator.py b/causal_testing/estimation/abstract_regression_estimator.py index 4b0fba80..64f6c77c 100644 --- a/causal_testing/estimation/abstract_regression_estimator.py +++ b/causal_testing/estimation/abstract_regression_estimator.py @@ -45,7 +45,6 @@ def __init__( query=query, ) - self.model = None if effect_modifiers is None: effect_modifiers = [] if adjustment_set is None: @@ -79,7 +78,7 @@ def add_modelling_assumptions(self): "do not need to be linear." ) - def _run_regression(self, data=None) -> RegressionResultsWrapper: + def fit_model(self, data=None) -> RegressionResultsWrapper: """Run logistic regression of the treatment and adjustment set against the outcome and return the model. :return: The model after fitting to data. @@ -87,7 +86,6 @@ def _run_regression(self, data=None) -> RegressionResultsWrapper: if data is None: data = self.df model = self.regressor(formula=self.formula, data=data).fit(disp=0) - self.model = model return model def _predict(self, data=None, adjustment_config: dict = None) -> pd.DataFrame: @@ -102,7 +100,7 @@ def _predict(self, data=None, adjustment_config: dict = None) -> pd.DataFrame: if adjustment_config is None: adjustment_config = {} - model = self._run_regression(data) + model = self.fit_model(data) x = pd.DataFrame(columns=self.df.columns) x["Intercept"] = 1 # self.intercept diff --git a/causal_testing/estimation/cubic_spline_estimator.py b/causal_testing/estimation/cubic_spline_estimator.py index c32fecca..19f48a11 100644 --- a/causal_testing/estimation/cubic_spline_estimator.py +++ b/causal_testing/estimation/cubic_spline_estimator.py @@ -59,7 +59,7 @@ def estimate_ate_calculated(self, adjustment_config: dict = None) -> pd.Series: :return: The average treatment effect. """ - model = self._run_regression() + model = self.fit_model() x = {"Intercept": 1, self.base_test_case.treatment_variable.name: self.treatment_value} if adjustment_config is not None: diff --git a/causal_testing/estimation/linear_regression_estimator.py b/causal_testing/estimation/linear_regression_estimator.py index e1b0a774..d818a9e9 100644 --- a/causal_testing/estimation/linear_regression_estimator.py +++ b/causal_testing/estimation/linear_regression_estimator.py @@ -98,7 +98,7 @@ def estimate_coefficient(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]: :return: The unit average treatment effect and the 95% Wald confidence intervals. """ - model = self._run_regression() + model = self.fit_model() newline = "\n" patsy_md = ModelDesc.from_formula(self.base_test_case.treatment_variable.name) @@ -129,7 +129,7 @@ def estimate_ate(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]: :return: The average treatment effect and the 95% Wald confidence intervals. """ - model = self._run_regression() + model = self.fit_model() # Create an empty individual for the control and treated individuals = pd.DataFrame(1, index=["control", "treated"], columns=model.params.index) diff --git a/causal_testing/estimation/logistic_regression_estimator.py b/causal_testing/estimation/logistic_regression_estimator.py index 55f79f25..091d9c30 100644 --- a/causal_testing/estimation/logistic_regression_estimator.py +++ b/causal_testing/estimation/logistic_regression_estimator.py @@ -38,7 +38,7 @@ def estimate_unit_odds_ratio(self) -> tuple[pd.Series, list[pd.Series, pd.Series :return: The odds ratio. Confidence intervals are not yet supported. """ - model = self._run_regression(self.df) + model = self.fit_model(self.df) ci_low, ci_high = np.exp(model.conf_int(self.alpha).loc[self.base_test_case.treatment_variable.name]) return pd.Series(np.exp(model.params[self.base_test_case.treatment_variable.name])), [ pd.Series(ci_low), diff --git a/causal_testing/main.py b/causal_testing/main.py index 89ae0e44..aed161a8 100644 --- a/causal_testing/main.py +++ b/causal_testing/main.py @@ -8,6 +8,7 @@ from typing import Dict, Any, Optional, List, Union, Sequence from tqdm import tqdm + import pandas as pd import numpy as np @@ -344,7 +345,6 @@ def run_tests_in_batches(self, batch_size: int = 100, silent: bool = False) -> L num_batches = int(np.ceil(num_tests / batch_size)) logger.info(f"Processing {num_tests} tests in {num_batches} batches of up to {batch_size} tests each") - all_results = [] with tqdm(total=num_tests, desc="Overall progress", mininterval=0.1) as progress: # Process each batch for batch_idx in range(num_batches): @@ -360,26 +360,23 @@ def run_tests_in_batches(self, batch_size: int = 100, silent: bool = False) -> L batch_results = [] for test_case in current_batch: try: - result = test_case.execute_test() - batch_results.append(result) - except (TypeError, AttributeError) as e: + batch_results.append(test_case.execute_test()) + # pylint: disable=broad-exception-caught + except Exception as e: if not silent: logger.error(f"Type or attribute error in test: {str(e)}") raise - result = CausalTestResult( - estimator=test_case.estimator, - test_value=TestValue("Error", str(e)), + batch_results.append( + CausalTestResult( + estimator=test_case.estimator, + test_value=TestValue("Error", str(e)), + ) ) - batch_results.append(result) progress.update(1) - all_results.extend(batch_results) - - logger.info(f"Completed batch {batch_idx + 1} of {num_batches}") - - logger.info(f"Completed processing all {len(all_results)} tests in {num_batches} batches") - return all_results + yield batch_results + logger.info(f"Completed processing in {num_batches} batches") def run_tests(self, silent=False) -> List[CausalTestResult]: """ @@ -399,7 +396,6 @@ def run_tests(self, silent=False) -> List[CausalTestResult]: try: result = test_case.execute_test() results.append(result) - logger.info(f"Test completed: {test_case}") # pylint: disable=broad-exception-caught except Exception as e: if not silent: @@ -414,9 +410,11 @@ def run_tests(self, silent=False) -> List[CausalTestResult]: return results - def save_results(self, results: List[CausalTestResult]) -> None: + def save_results(self, results: List[CausalTestResult], output_path: str = None) -> None: """Save test results to JSON file in the expected format.""" - logger.info(f"Saving results to {self.paths.output_path}") + if output_path is None: + output_path = self.paths.output_path + logger.info(f"Saving results to {output_path}") # Load original test configs to preserve test metadata with open(self.paths.test_config_path, "r", encoding="utf-8") as f: @@ -460,7 +458,7 @@ def save_results(self, results: List[CausalTestResult]) -> None: json_results.append(output) # Save to file - with open(self.paths.output_path, "w", encoding="utf-8") as f: + with open(output_path, "w", encoding="utf-8") as f: json.dump(json_results, f, indent=2) logger.info("Results saved successfully") diff --git a/tests/estimation_tests/test_cubic_spline_estimator.py b/tests/estimation_tests/test_cubic_spline_estimator.py index 2f7ecaef..38b230d7 100644 --- a/tests/estimation_tests/test_cubic_spline_estimator.py +++ b/tests/estimation_tests/test_cubic_spline_estimator.py @@ -1,19 +1,13 @@ import unittest -import pandas as pd -import numpy as np -import matplotlib.pyplot as plt -from causal_testing.specification.variable import Input -from causal_testing.utils.validation import CausalValidator from causal_testing.estimation.cubic_spline_estimator import CubicSplineRegressionEstimator -from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator - -from tests.estimation_tests.test_linear_regression_estimator import TestLinearRegressionEstimator from causal_testing.testing.base_test_case import BaseTestCase from causal_testing.specification.variable import Input, Output +from tests.estimation_tests.test_linear_regression_estimator import load_chapter_11_df + -class TestCubicSplineRegressionEstimator(TestLinearRegressionEstimator): +class TestCubicSplineRegressionEstimator(unittest.TestCase): @classmethod def setUpClass(cls): super().setUpClass() @@ -24,7 +18,7 @@ def test_program_11_3_cublic_spline(self): Slightly modified as Hernan et al. use linear regression for this example. """ - df = self.chapter_11_df.copy() + df = load_chapter_11_df() base_test_case = BaseTestCase(Input("treatments", float), Output("outcomes", float)) @@ -32,14 +26,6 @@ def test_program_11_3_cublic_spline(self): ate_1 = cublic_spline_estimator.estimate_ate_calculated() - self.assertEqual( - round( - cublic_spline_estimator.model.predict({"Intercept": 1, "treatments": 90}).iloc[0], - 1, - ), - 195.6, - ) - cublic_spline_estimator.treatment_value = 2 ate_2 = cublic_spline_estimator.estimate_ate_calculated() diff --git a/tests/estimation_tests/test_linear_regression_estimator.py b/tests/estimation_tests/test_linear_regression_estimator.py index 0aa121ed..a1af4c9a 100644 --- a/tests/estimation_tests/test_linear_regression_estimator.py +++ b/tests/estimation_tests/test_linear_regression_estimator.py @@ -1,13 +1,11 @@ import unittest import pandas as pd import numpy as np -import matplotlib.pyplot as plt -from causal_testing.specification.variable import Input +from causal_testing.specification.variable import Input, Output from causal_testing.utils.validation import CausalValidator from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator from causal_testing.testing.base_test_case import BaseTestCase -from causal_testing.specification.variable import Input, Output def load_nhefs_df(): @@ -77,7 +75,7 @@ def test_linear_regression_categorical_ate(self): df = self.scarf_df.copy() base_test_case = BaseTestCase(Input("color", float), Output("completed", float)) logistic_regression_estimator = LinearRegressionEstimator(base_test_case, None, None, set(), df) - ate, confidence = logistic_regression_estimator.estimate_coefficient() + _, confidence = logistic_regression_estimator.estimate_coefficient() self.assertTrue(all([ci_low < 0 < ci_high for ci_low, ci_high in zip(confidence[0], confidence[1])])) def test_program_11_2(self): @@ -86,22 +84,8 @@ def test_program_11_2(self): linear_regression_estimator = LinearRegressionEstimator(self.base_test_case, None, None, set(), df) ate, _ = linear_regression_estimator.estimate_coefficient() - self.assertEqual( - round( - linear_regression_estimator.model.params["Intercept"] - + 90 * linear_regression_estimator.model.params["treatments"], - 1, - ), - 216.9, - ) - # Increasing treatments from 90 to 100 should be the same as 10 times the unit ATE - self.assertTrue( - all( - round(linear_regression_estimator.model.params["treatments"], 1) == round(ate_single, 1) - for ate_single in ate - ) - ) + self.assertTrue(all(round(ate["treatments"], 1) == round(ate_single, 1) for ate_single in ate)) def test_program_11_3(self): """Test whether our linear regression implementation produces the same results as program 11.3 (p. 144).""" @@ -110,23 +94,8 @@ def test_program_11_3(self): self.base_test_case, None, None, set(), df, formula="outcomes ~ treatments + I(treatments ** 2)" ) ate, _ = linear_regression_estimator.estimate_coefficient() - print(linear_regression_estimator.model.summary()) - self.assertEqual( - round( - linear_regression_estimator.model.params["Intercept"] - + 90 * linear_regression_estimator.model.params["treatments"] - + 90 * 90 * linear_regression_estimator.model.params["I(treatments ** 2)"], - 1, - ), - 197.1, - ) # Increasing treatments from 90 to 100 should be the same as 10 times the unit ATE - self.assertTrue( - all( - round(linear_regression_estimator.model.params["treatments"], 3) == round(ate_single, 3) - for ate_single in ate - ) - ) + self.assertTrue(all(round(ate["treatments"], 3) == round(ate_single, 3) for ate_single in ate)) def test_program_15_1A(self): """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184).""" @@ -161,15 +130,9 @@ def test_program_15_1A(self): I(smokeyrs ** 2) + (qsmk * smokeintensity)""", ) - # terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"] - # terms_to_product = [("qsmk", "smokeintensity")] - # for term_to_square in terms_to_square: - # for term_a, term_b in terms_to_product: - # linear_regression_estimator.add_product_term_to_df(term_a, term_b) - linear_regression_estimator.estimate_coefficient() - self.assertEqual(round(linear_regression_estimator.model.params["qsmk"], 1), 2.6) - self.assertEqual(round(linear_regression_estimator.model.params["qsmk:smokeintensity"], 2), 0.05) + coefficient, _ = linear_regression_estimator.estimate_coefficient() + self.assertEqual(round(coefficient["qsmk"], 1), 2.6) def test_program_15_no_interaction(self): """Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184) @@ -281,10 +244,11 @@ def test_program_11_2_with_robustness_validation(self): """Test whether our linear regression estimator, as used in test_program_11_2 can correctly estimate robustness.""" df = self.chapter_11_df.copy() linear_regression_estimator = LinearRegressionEstimator(self.base_test_case, 100, 90, set(), df) - linear_regression_estimator.estimate_coefficient() cv = CausalValidator() - self.assertEqual(round(cv.estimate_robustness(linear_regression_estimator.model)["treatments"], 4), 0.7353) + self.assertEqual( + round(cv.estimate_robustness(linear_regression_estimator.fit_model())["treatments"], 4), 0.7353 + ) def test_gp(self): df = pd.DataFrame() diff --git a/tests/main_tests/test_main.py b/tests/main_tests/test_main.py index 81e46742..07d864c4 100644 --- a/tests/main_tests/test_main.py +++ b/tests/main_tests/test_main.py @@ -1,12 +1,16 @@ import unittest +from pathlib import Path +import tempfile +import os + import shutil import json import pandas as pd -from pathlib import Path -from causal_testing.main import CausalTestingPaths, CausalTestingFramework, parse_args -from causal_testing.__main__ import main from unittest.mock import patch +from causal_testing.main import CausalTestingPaths, CausalTestingFramework +from causal_testing.__main__ import main + class TestCausalTestingPaths(unittest.TestCase): @@ -144,6 +148,97 @@ def test_ctf(self): self.assertEqual(tests_passed, [True]) + def test_ctf_batches(self): + framework = CausalTestingFramework(self.paths) + framework.setup() + + # Load and run tests + framework.load_tests() + + output_files = [] + with tempfile.TemporaryDirectory() as tmpdir: + for i, results in enumerate(framework.run_tests_in_batches()): + temp_file_path = os.path.join(tmpdir, f"output_{i}.json") + framework.save_results(results, temp_file_path) + output_files.append(temp_file_path) + del results + + # Now stitch the results together from the temporary files + all_results = [] + for file_path in output_files: + with open(file_path, "r", encoding="utf-8") as f: + all_results.extend(json.load(f)) + + self.assertEqual([result["passed"] for result in all_results], [True]) + + def test_ctf_batches_exception_silent(self): + framework = CausalTestingFramework(self.paths, query="test_input < 0") + framework.setup() + + # Load and run tests + framework.load_tests() + + output_files = [] + with tempfile.TemporaryDirectory() as tmpdir: + for i, results in enumerate(framework.run_tests_in_batches(silent=True)): + temp_file_path = os.path.join(tmpdir, f"output_{i}.json") + framework.save_results(results, temp_file_path) + output_files.append(temp_file_path) + del results + + # Now stitch the results together from the temporary files + all_results = [] + for file_path in output_files: + with open(file_path, "r", encoding="utf-8") as f: + all_results.extend(json.load(f)) + + self.assertEqual([result["passed"] for result in all_results], [False]) + self.assertEqual([result["result"]["effect_measure"] for result in all_results], ["Error"]) + + def test_ctf_batches_exception(self): + framework = CausalTestingFramework(self.paths, query="test_input < 0") + framework.setup() + + # Load and run tests + framework.load_tests() + with self.assertRaises(ValueError): + list(framework.run_tests_in_batches()) + + def test_ctf_batches_matches_run_tests(self): + # Run the tests normally + framework = CausalTestingFramework(self.paths) + framework.setup() + framework.load_tests() + normale_results = framework.run_tests() + + # Run the tests in batches + output_files = [] + with tempfile.TemporaryDirectory() as tmpdir: + for i, results in enumerate(framework.run_tests_in_batches()): + temp_file_path = os.path.join(tmpdir, f"output_{i}.json") + framework.save_results(results, temp_file_path) + output_files.append(temp_file_path) + del results + + # Now stitch the results together from the temporary files + all_results = [] + for file_path in output_files: + with open(file_path, "r", encoding="utf-8") as f: + all_results.extend(json.load(f)) + + with tempfile.TemporaryDirectory() as tmpdir: + normal_output = os.path.join(tmpdir, f"normal.json") + framework.save_results(normale_results, normal_output) + with open(normal_output) as f: + normal_results = json.load(f) + + batch_output = os.path.join(tmpdir, f"batch.json") + with open(batch_output, "w") as f: + json.dump(all_results, f) + with open(batch_output) as f: + batch_results = json.load(f) + self.assertEqual(normal_results, batch_results) + def test_global_query(self): framework = CausalTestingFramework(self.paths) framework.setup() @@ -223,6 +318,26 @@ def test_parse_args(self): main() self.assertTrue((self.output_path.parent / "main.json").exists()) + def test_parse_args_batches(self): + with unittest.mock.patch( + "sys.argv", + [ + "causal_testing", + "--dag_path", + str(self.dag_path), + "--data_paths", + str(self.data_paths[0]), + "--test_config", + str(self.test_config_path), + "--output", + str(self.output_path.parent / "main_batch.json"), + "--batch-size", + "5", + ], + ): + main() + self.assertTrue((self.output_path.parent / "main_batch.json").exists()) + def tearDown(self): if self.output_path.parent.exists(): shutil.rmtree(self.output_path.parent)