Skip to content

Batch test execution #324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions causal_testing/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""This module contains the main entrypoint functionality to the Causal Testing Framework."""

import logging
import tempfile
import json
import os

from .main import setup_logging, parse_args, CausalTestingPaths, CausalTestingFramework


Expand Down Expand Up @@ -34,13 +38,27 @@ def main() -> None:

if args.batch_size > 0:
logging.info(f"Running tests in batches of size {args.batch_size}")
results = framework.run_tests_in_batches(batch_size=args.batch_size, silent=args.silent)
with tempfile.TemporaryDirectory() as tmpdir:
output_files = []
for i, results in enumerate(framework.run_tests_in_batches(batch_size=args.batch_size, silent=args.silent)):
temp_file_path = os.path.join(tmpdir, f"output_{i}.json")
framework.save_results(results, temp_file_path)
output_files.append(temp_file_path)
del results

# Now stitch the results together from the temporary files
all_results = []
for file_path in output_files:
with open(file_path, "r", encoding="utf-8") as f:
all_results.extend(json.load(f))

# Save the final stitched results to your desired location
with open(args.output, "w", encoding="utf-8") as f:
json.dump(all_results, f, indent=4)
else:
logging.info("Running tests in regular mode")
results = framework.run_tests(silent=args.silent)

# Save results
framework.save_results(results)
framework.save_results(results)

logging.info("Causal testing completed successfully.")

Expand Down
6 changes: 2 additions & 4 deletions causal_testing/estimation/abstract_regression_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def __init__(
query=query,
)

self.model = None
if effect_modifiers is None:
effect_modifiers = []
if adjustment_set is None:
Expand Down Expand Up @@ -79,15 +78,14 @@ def add_modelling_assumptions(self):
"do not need to be linear."
)

def _run_regression(self, data=None) -> RegressionResultsWrapper:
def fit_model(self, data=None) -> RegressionResultsWrapper:
"""Run logistic regression of the treatment and adjustment set against the outcome and return the model.

:return: The model after fitting to data.
"""
if data is None:
data = self.df
model = self.regressor(formula=self.formula, data=data).fit(disp=0)
self.model = model
return model

def _predict(self, data=None, adjustment_config: dict = None) -> pd.DataFrame:
Expand All @@ -102,7 +100,7 @@ def _predict(self, data=None, adjustment_config: dict = None) -> pd.DataFrame:
if adjustment_config is None:
adjustment_config = {}

model = self._run_regression(data)
model = self.fit_model(data)

x = pd.DataFrame(columns=self.df.columns)
x["Intercept"] = 1 # self.intercept
Expand Down
2 changes: 1 addition & 1 deletion causal_testing/estimation/cubic_spline_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def estimate_ate_calculated(self, adjustment_config: dict = None) -> pd.Series:

:return: The average treatment effect.
"""
model = self._run_regression()
model = self.fit_model()

x = {"Intercept": 1, self.base_test_case.treatment_variable.name: self.treatment_value}
if adjustment_config is not None:
Expand Down
4 changes: 2 additions & 2 deletions causal_testing/estimation/linear_regression_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def estimate_coefficient(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]:

:return: The unit average treatment effect and the 95% Wald confidence intervals.
"""
model = self._run_regression()
model = self.fit_model()
newline = "\n"
patsy_md = ModelDesc.from_formula(self.base_test_case.treatment_variable.name)

Expand Down Expand Up @@ -129,7 +129,7 @@ def estimate_ate(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]:

:return: The average treatment effect and the 95% Wald confidence intervals.
"""
model = self._run_regression()
model = self.fit_model()

# Create an empty individual for the control and treated
individuals = pd.DataFrame(1, index=["control", "treated"], columns=model.params.index)
Expand Down
2 changes: 1 addition & 1 deletion causal_testing/estimation/logistic_regression_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def estimate_unit_odds_ratio(self) -> tuple[pd.Series, list[pd.Series, pd.Series

:return: The odds ratio. Confidence intervals are not yet supported.
"""
model = self._run_regression(self.df)
model = self.fit_model(self.df)
ci_low, ci_high = np.exp(model.conf_int(self.alpha).loc[self.base_test_case.treatment_variable.name])
return pd.Series(np.exp(model.params[self.base_test_case.treatment_variable.name])), [
pd.Series(ci_low),
Expand Down
34 changes: 16 additions & 18 deletions causal_testing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Dict, Any, Optional, List, Union, Sequence
from tqdm import tqdm


import pandas as pd
import numpy as np

Expand Down Expand Up @@ -344,7 +345,6 @@ def run_tests_in_batches(self, batch_size: int = 100, silent: bool = False) -> L
num_batches = int(np.ceil(num_tests / batch_size))

logger.info(f"Processing {num_tests} tests in {num_batches} batches of up to {batch_size} tests each")
all_results = []
with tqdm(total=num_tests, desc="Overall progress", mininterval=0.1) as progress:
# Process each batch
for batch_idx in range(num_batches):
Expand All @@ -360,26 +360,23 @@ def run_tests_in_batches(self, batch_size: int = 100, silent: bool = False) -> L
batch_results = []
for test_case in current_batch:
try:
result = test_case.execute_test()
batch_results.append(result)
except (TypeError, AttributeError) as e:
batch_results.append(test_case.execute_test())
# pylint: disable=broad-exception-caught
except Exception as e:
if not silent:
logger.error(f"Type or attribute error in test: {str(e)}")
raise
result = CausalTestResult(
estimator=test_case.estimator,
test_value=TestValue("Error", str(e)),
batch_results.append(
CausalTestResult(
estimator=test_case.estimator,
test_value=TestValue("Error", str(e)),
)
)
batch_results.append(result)

progress.update(1)

all_results.extend(batch_results)

logger.info(f"Completed batch {batch_idx + 1} of {num_batches}")

logger.info(f"Completed processing all {len(all_results)} tests in {num_batches} batches")
return all_results
yield batch_results
logger.info(f"Completed processing in {num_batches} batches")

def run_tests(self, silent=False) -> List[CausalTestResult]:
"""
Expand All @@ -399,7 +396,6 @@ def run_tests(self, silent=False) -> List[CausalTestResult]:
try:
result = test_case.execute_test()
results.append(result)
logger.info(f"Test completed: {test_case}")
# pylint: disable=broad-exception-caught
except Exception as e:
if not silent:
Expand All @@ -414,9 +410,11 @@ def run_tests(self, silent=False) -> List[CausalTestResult]:

return results

def save_results(self, results: List[CausalTestResult]) -> None:
def save_results(self, results: List[CausalTestResult], output_path: str = None) -> None:
"""Save test results to JSON file in the expected format."""
logger.info(f"Saving results to {self.paths.output_path}")
if output_path is None:
output_path = self.paths.output_path
logger.info(f"Saving results to {output_path}")

# Load original test configs to preserve test metadata
with open(self.paths.test_config_path, "r", encoding="utf-8") as f:
Expand Down Expand Up @@ -460,7 +458,7 @@ def save_results(self, results: List[CausalTestResult]) -> None:
json_results.append(output)

# Save to file
with open(self.paths.output_path, "w", encoding="utf-8") as f:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(json_results, f, indent=2)

logger.info("Results saved successfully")
Expand Down
22 changes: 4 additions & 18 deletions tests/estimation_tests/test_cubic_spline_estimator.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
import unittest
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from causal_testing.specification.variable import Input
from causal_testing.utils.validation import CausalValidator

from causal_testing.estimation.cubic_spline_estimator import CubicSplineRegressionEstimator
from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator

from tests.estimation_tests.test_linear_regression_estimator import TestLinearRegressionEstimator
from causal_testing.testing.base_test_case import BaseTestCase
from causal_testing.specification.variable import Input, Output

from tests.estimation_tests.test_linear_regression_estimator import load_chapter_11_df


class TestCubicSplineRegressionEstimator(TestLinearRegressionEstimator):
class TestCubicSplineRegressionEstimator(unittest.TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
Expand All @@ -24,22 +18,14 @@ def test_program_11_3_cublic_spline(self):
Slightly modified as Hernan et al. use linear regression for this example.
"""

df = self.chapter_11_df.copy()
df = load_chapter_11_df()

base_test_case = BaseTestCase(Input("treatments", float), Output("outcomes", float))

cublic_spline_estimator = CubicSplineRegressionEstimator(base_test_case, 1, 0, set(), 3, df)

ate_1 = cublic_spline_estimator.estimate_ate_calculated()

self.assertEqual(
round(
cublic_spline_estimator.model.predict({"Intercept": 1, "treatments": 90}).iloc[0],
1,
),
195.6,
)

cublic_spline_estimator.treatment_value = 2
ate_2 = cublic_spline_estimator.estimate_ate_calculated()

Expand Down
54 changes: 9 additions & 45 deletions tests/estimation_tests/test_linear_regression_estimator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import unittest
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from causal_testing.specification.variable import Input
from causal_testing.specification.variable import Input, Output
from causal_testing.utils.validation import CausalValidator

from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator
from causal_testing.testing.base_test_case import BaseTestCase
from causal_testing.specification.variable import Input, Output


def load_nhefs_df():
Expand Down Expand Up @@ -77,7 +75,7 @@ def test_linear_regression_categorical_ate(self):
df = self.scarf_df.copy()
base_test_case = BaseTestCase(Input("color", float), Output("completed", float))
logistic_regression_estimator = LinearRegressionEstimator(base_test_case, None, None, set(), df)
ate, confidence = logistic_regression_estimator.estimate_coefficient()
_, confidence = logistic_regression_estimator.estimate_coefficient()
self.assertTrue(all([ci_low < 0 < ci_high for ci_low, ci_high in zip(confidence[0], confidence[1])]))

def test_program_11_2(self):
Expand All @@ -86,22 +84,8 @@ def test_program_11_2(self):
linear_regression_estimator = LinearRegressionEstimator(self.base_test_case, None, None, set(), df)
ate, _ = linear_regression_estimator.estimate_coefficient()

self.assertEqual(
round(
linear_regression_estimator.model.params["Intercept"]
+ 90 * linear_regression_estimator.model.params["treatments"],
1,
),
216.9,
)

# Increasing treatments from 90 to 100 should be the same as 10 times the unit ATE
self.assertTrue(
all(
round(linear_regression_estimator.model.params["treatments"], 1) == round(ate_single, 1)
for ate_single in ate
)
)
self.assertTrue(all(round(ate["treatments"], 1) == round(ate_single, 1) for ate_single in ate))

def test_program_11_3(self):
"""Test whether our linear regression implementation produces the same results as program 11.3 (p. 144)."""
Expand All @@ -110,23 +94,8 @@ def test_program_11_3(self):
self.base_test_case, None, None, set(), df, formula="outcomes ~ treatments + I(treatments ** 2)"
)
ate, _ = linear_regression_estimator.estimate_coefficient()
print(linear_regression_estimator.model.summary())
self.assertEqual(
round(
linear_regression_estimator.model.params["Intercept"]
+ 90 * linear_regression_estimator.model.params["treatments"]
+ 90 * 90 * linear_regression_estimator.model.params["I(treatments ** 2)"],
1,
),
197.1,
)
# Increasing treatments from 90 to 100 should be the same as 10 times the unit ATE
self.assertTrue(
all(
round(linear_regression_estimator.model.params["treatments"], 3) == round(ate_single, 3)
for ate_single in ate
)
)
self.assertTrue(all(round(ate["treatments"], 3) == round(ate_single, 3) for ate_single in ate))

def test_program_15_1A(self):
"""Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184)."""
Expand Down Expand Up @@ -161,15 +130,9 @@ def test_program_15_1A(self):
I(smokeyrs ** 2) +
(qsmk * smokeintensity)""",
)
# terms_to_square = ["age", "wt71", "smokeintensity", "smokeyrs"]
# terms_to_product = [("qsmk", "smokeintensity")]
# for term_to_square in terms_to_square:
# for term_a, term_b in terms_to_product:
# linear_regression_estimator.add_product_term_to_df(term_a, term_b)

linear_regression_estimator.estimate_coefficient()
self.assertEqual(round(linear_regression_estimator.model.params["qsmk"], 1), 2.6)
self.assertEqual(round(linear_regression_estimator.model.params["qsmk:smokeintensity"], 2), 0.05)
coefficient, _ = linear_regression_estimator.estimate_coefficient()
self.assertEqual(round(coefficient["qsmk"], 1), 2.6)

def test_program_15_no_interaction(self):
"""Test whether our linear regression implementation produces the same results as program 15.1 (p. 163, 184)
Expand Down Expand Up @@ -281,10 +244,11 @@ def test_program_11_2_with_robustness_validation(self):
"""Test whether our linear regression estimator, as used in test_program_11_2 can correctly estimate robustness."""
df = self.chapter_11_df.copy()
linear_regression_estimator = LinearRegressionEstimator(self.base_test_case, 100, 90, set(), df)
linear_regression_estimator.estimate_coefficient()

cv = CausalValidator()
self.assertEqual(round(cv.estimate_robustness(linear_regression_estimator.model)["treatments"], 4), 0.7353)
self.assertEqual(
round(cv.estimate_robustness(linear_regression_estimator.fit_model())["treatments"], 4), 0.7353
)

def test_gp(self):
df = pd.DataFrame()
Expand Down
Loading