diff --git a/README.md b/README.md index 0b957b3a..7f6cfb1d 100644 --- a/README.md +++ b/README.md @@ -56,23 +56,42 @@ For more information on how to use the Causal Testing Framework, please refer to >[!NOTE] >We recommend you use a 64 bit OS (standard in most modern machines) as we have had reports of the installation crashing on some 32 bit Debian installations. +## Usage +>[!NOTE] +> Example usage can be found in the `examples` directory. + +1. To run the causal testing framework, you need some runtime data from your system, some causal test cases, and a causal DAG that specifies the expected causal relationships between the variables in your runtime data (and any other relevant variables that are _not_ recorded in the data but are known to be relevant). + +2. If you do not already have causal test cases, you can convert your causal DAG to causal tests by running the following command. + +``` +python causal_testing/testing/metamorphic_relation.py --dag_path $PATH_TO_DAG --output_path $PATH_TO_TESTS +``` + +3. You can now execute your tests by running the following command. +``` +python -m causal_testing --dag_path $PATH_TO_DAG --data_paths $PATH_TO_DATA --test_config $PATH_TO_TESTS --output $OUTPUT +``` +The results will be saved for inspection in a JSON file located at `$OUTPUT`. +In the future, we hope to add a visualisation tool to assist with this. + ## How to Cite If you use our framework in your work, please cite the following: -``This research has used version X.Y.Z (software citation) of the +``This research has used version X.Y.Z (software citation) of the Causal Testing Framework (paper citation).`` -The paper citation should be the Causal Testing Framework [paper](https://dl.acm.org/doi/10.1145/3607184), +The paper citation should be the Causal Testing Framework [paper](https://dl.acm.org/doi/10.1145/3607184), and the software citation should contain the specific Figshare [DOI](https://orda.shef.ac.uk/articles/software/CITCOM_Software_Release/24427516) of the version used in your work.
BibTeX Citations - +
Paper - + ``` @ARTICLE{Clark_etal_2023, author = {Clark, Andrew G. and Foster, Michael and Prifling, Benedikt and Walkinshaw, Neil and Hierons, Robert M. @@ -89,10 +108,10 @@ and the software citation should contain the specific Figshare [DOI](https://ord ```
- +
Software (example) - + ``` @ARTICLE{Wild2023, author = {Foster, Michael and Clark, Andrew G. and Somers, Richard and Wild, Christopher and Allian, Farhad and Hierons, Robert M. and Wagg, David and Walkinshaw, Neil}, @@ -114,15 +133,15 @@ To contribute to our work, please ensure the following: 1. [Fork the repository](https://help.github.com/articles/fork-a-repo/) into your own GitHub account, and [clone](https://docs.github.com/en/repositories/creating-and-managing-repositories/cloning-a-repository) it to your local machine. 2. [Create a new branch](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-and-deleting-branches-within-your-repository) in your forked repository. Give this branch an appropriate name, and create commits that describe the changes. 3. [Push your changes](https://docs.github.com/en/get-started/using-git/pushing-commits-to-a-remote-repository) to your new branch in your remote fork, compare with `CausalTestingFramework/main`, and ensure any conflicts are resolved. -4. Create a draft [pull request](https://docs.github.com/en/get-started/quickstart/hello-world#opening-a-pull-request) from your branch, and ensure you have [linked](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/autolinked-references-and-urls) it to any relevant issues in your description. +4. Create a draft [pull request](https://docs.github.com/en/get-started/quickstart/hello-world#opening-a-pull-request) from your branch, and ensure you have [linked](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/autolinked-references-and-urls) it to any relevant issues in your description. We use the [unittest]() module to develop our tests and the [pytest](https://pytest.org/en/latest/) framework as our test discovery, [pylint](https://pypi.org/project/pylint/) for our code analyser, and [black](https://pypi.org/project/black/) for our code formatting. To find the other (optional) developer dependencies, please check `pyproject.toml`. -## Acknowledgements +## Acknowledgements The Causal Testing Framework is supported by the UK's Engineering and Physical Sciences Research Council (EPSRC), -with the project name [CITCOM](https://gow.epsrc.ukri.org/NGBOViewGrant.aspx?GrantRef=EP/T030526/1) - "_Causal Inference for Testing of Computational Models_" +with the project name [CITCOM](https://gow.epsrc.ukri.org/NGBOViewGrant.aspx?GrantRef=EP/T030526/1) - "_Causal Inference for Testing of Computational Models_" under the grant EP/T030526/1. diff --git a/causal_testing/__main__.py b/causal_testing/__main__.py new file mode 100644 index 00000000..433358d3 --- /dev/null +++ b/causal_testing/__main__.py @@ -0,0 +1,49 @@ +"""This module contains the main entrypoint functionality to the Causal Testing Framework.""" + +import logging +from .main import setup_logging, parse_args, CausalTestingPaths, CausalTestingFramework + + +def main() -> None: + """ + + Main entry point for the Causal Testing Framework + + """ + + # Parse arguments + args = parse_args() + + # Setup logging + setup_logging(args.verbose) + + # Create paths object + paths = CausalTestingPaths( + dag_path=args.dag_path, + data_paths=args.data_paths, + test_config_path=args.test_config, + output_path=args.output, + ) + + # Create and setup framework + framework = CausalTestingFramework(paths, ignore_cycles=args.ignore_cycles, query=args.query) + framework.setup() + + # Load and run tests + framework.load_tests() + + if args.batch_size > 0: + logging.info(f"Running tests in batches of size {args.batch_size}") + results = framework.run_tests_in_batches(batch_size=args.batch_size, silent=args.silent) + else: + logging.info("Running tests in regular mode") + results = framework.run_tests(silent=args.silent) + + # Save results + framework.save_results(results) + + logging.info("Causal testing completed successfully.") + + +if __name__ == "__main__": + main() diff --git a/causal_testing/main.py b/causal_testing/main.py new file mode 100644 index 00000000..c2a78977 --- /dev/null +++ b/causal_testing/main.py @@ -0,0 +1,483 @@ +"""This module contains the classes that executes the Causal Testing Framework.""" + +import argparse +import json +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Any, Optional, List, Union, Sequence +from tqdm import tqdm + +import pandas as pd +import numpy as np + +from causal_testing.specification.causal_dag import CausalDAG +from causal_testing.specification.scenario import Scenario +from causal_testing.specification.variable import Input, Output +from causal_testing.specification.causal_specification import CausalSpecification +from causal_testing.testing.causal_test_case import CausalTestCase +from causal_testing.testing.base_test_case import BaseTestCase +from causal_testing.testing.causal_test_outcome import NoEffect, SomeEffect, Positive, Negative +from causal_testing.testing.causal_test_result import CausalTestResult, TestValue +from causal_testing.estimation.linear_regression_estimator import LinearRegressionEstimator +from causal_testing.estimation.logistic_regression_estimator import LogisticRegressionEstimator + +logger = logging.getLogger(__name__) + + +@dataclass +class CausalTestingPaths: + """ + Class for managing paths for causal testing inputs and outputs. + + :param dag_path: Path to the DAG definition file + :param data_paths: List of paths to input data files + :param test_config_path: Path to the test configuration file + :param output_path: Path where test results will be written + """ + + dag_path: Path + data_paths: List[Path] + test_config_path: Path + output_path: Path + + def __init__( + self, + dag_path: Union[str, Path], + data_paths: List[Union[str, Path]], + test_config_path: Union[str, Path], + output_path: Union[str, Path], + ): + self.dag_path = Path(dag_path) + self.data_paths = [Path(p) for p in data_paths] + self.test_config_path = Path(test_config_path) + self.output_path = Path(output_path) + + def validate_paths(self) -> None: + """ + Validate existence of all input paths and writability of output path. + + :raises: FileNotFoundError if any required input file is missing. + """ + + if not self.dag_path.exists(): + raise FileNotFoundError(f"DAG file not found: {self.dag_path}") + + for data_path in self.data_paths: + if not data_path.exists(): + raise FileNotFoundError(f"Data file not found: {data_path}") + + if not self.test_config_path.exists(): + raise FileNotFoundError(f"Test configuration file not found: {self.test_config_path}") + + if not self.output_path.parent.exists(): + self.output_path.parent.mkdir(parents=True) + + +class CausalTestingFramework: + # pylint: disable=too-many-instance-attributes + """ + Main class for running causal tests. + + :param paths: CausalTestingPaths object containing required file paths + :param ignore_cycles: Flag to ignore cycles in the DAG + :param query: Optional query string to filter the input dataframe + + """ + + def __init__(self, paths: CausalTestingPaths, ignore_cycles: bool = False, query: Optional[str] = None): + self.paths = paths + self.ignore_cycles = ignore_cycles + self.query = query + + # These will be populated during setup + self.dag: Optional[CausalDAG] = None + self.data: Optional[pd.DataFrame] = None + self.variables: Dict[str, Any] = {"inputs": {}, "outputs": {}, "metas": {}} + self.scenario: Optional[Scenario] = None + self.causal_specification: Optional[CausalSpecification] = None + self.test_cases: Optional[List[CausalTestCase]] = None + + def setup(self) -> None: + """ + Set up the framework by loading DAG, runtime csv data, creating the scenario and causal specification. + + :raises: FileNotFoundError if required files are missing + """ + + logger.info("Setting up Causal Testing Framework...") + + # Load and validate all paths + self.paths.validate_paths() + + # Load DAG + self.dag = self.load_dag() + + # Load data + self.data = self.load_data(self.query) + + # Create variables from DAG + self.create_variables() + + # Create scenario and specification + self.create_scenario_and_specification() + + logger.info("Setup completed successfully") + + def load_dag(self) -> CausalDAG: + """ + Load the causal DAG from the specified file path. + """ + logger.info(f"Loading DAG from {self.paths.dag_path}") + dag = CausalDAG(str(self.paths.dag_path), ignore_cycles=self.ignore_cycles) + logger.info(f"DAG loaded with {len(dag.graph.nodes)} nodes and {len(dag.graph.edges)} edges") + return dag + + def _read_dataframe(self, data_path): + if str(data_path).endswith(".csv"): + return pd.read_csv(data_path) + if str(data_path).endswith(".pqt"): + return pd.read_parquet(data_path) + raise ValueError(f"Invalid file type {data_path}. Can only read CSV (.csv) or parquet (.pqt) files.") + + def load_data(self, query: Optional[str] = None) -> pd.DataFrame: + """Load and combine all data sources with optional filtering. + + :param query: Optional pandas query string to filter the loaded data + :return: Combined pandas DataFrame containing all loaded and filtered data + """ + logger.info(f"Loading data from {len(self.paths.data_paths)} source(s)") + + dfs = [self._read_dataframe(data_path) for data_path in self.paths.data_paths] + data = pd.concat(dfs, axis=0, ignore_index=True) + logger.info(f"Initial data shape: {data.shape}") + + if query: + logger.info(f"Attempting to apply query: '{query}'") + data = data.query(query) + + return data + + def create_variables(self) -> None: + """ + Create variable objects from DAG nodes based on their connectivity. + """ + for node_name, node_data in self.dag.graph.nodes(data=True): + if node_name not in self.data.columns and not node_data.get("hidden", False): + raise ValueError(f"Node {node_name} missing from data. Should it be marked as hidden?") + + dtype = self.data.dtypes.get(node_name) + + # If node has no incoming edges, it's an input + if self.dag.graph.in_degree(node_name) == 0: + self.variables["inputs"][node_name] = Input(name=node_name, datatype=dtype) + + # Otherwise it's an output + if self.dag.graph.in_degree(node_name) > 0: + self.variables["outputs"][node_name] = Output(name=node_name, datatype=dtype) + + def create_scenario_and_specification(self) -> None: + """Create scenario and causal specification objects from loaded data.""" + # Create scenario + all_variables = list(self.variables["inputs"].values()) + list(self.variables["outputs"].values()) + self.scenario = Scenario(variables=all_variables) + + # Set up treatment variables + self.scenario.setup_treatment_variables() + + # Create causal specification + self.causal_specification = CausalSpecification(scenario=self.scenario, causal_dag=self.dag) + + def load_tests(self) -> None: + """ + Load and prepare test configurations from file. + """ + logger.info(f"Loading test configurations from {self.paths.test_config_path}") + + with open(self.paths.test_config_path, "r", encoding="utf-8") as f: + test_configs = json.load(f) + + self.test_cases = self.create_test_cases(test_configs) + + def create_base_test(self, test: dict) -> BaseTestCase: + """ + Create base test case from test configuration. + + :param test: Dictionary containing test configuration parameters + + :return: BaseTestCase object + :raises: KeyError if required variables are not found in inputs or outputs + """ + treatment_name = test["treatment_variable"] + outcome_name = next(iter(test["expected_effect"].keys())) + + # Look for treatment variable in both inputs and outputs + treatment_var = self.variables["inputs"].get(treatment_name) or self.variables["outputs"].get(treatment_name) + if not treatment_var: + raise KeyError(f"Treatment variable '{treatment_name}' not found in inputs or outputs") + + # Look for outcome variable in both inputs and outputs + outcome_var = self.variables["inputs"].get(outcome_name) or self.variables["outputs"].get(outcome_name) + if not outcome_var: + raise KeyError(f"Outcome variable '{outcome_name}' not found in inputs or outputs") + + return BaseTestCase( + treatment_variable=treatment_var, outcome_variable=outcome_var, effect=test.get("effect", "total") + ) + + def create_test_cases(self, test_configs: dict) -> List[CausalTestCase]: + """Create test case objects from configuration dictionary. + + :param test_configs: Dictionary containing test configurations + + :return: List of CausalTestCase objects containing the initialised test cases + :raises: KeyError if required variables are not found + :raises: ValueError if invalid test configuration is provided + """ + test_cases = [] + + for test in test_configs.get("tests", []): + if test.get("skip", False): + continue + + # Create base test case + base_test = self.create_base_test(test) + + # Create causal test case + causal_test = self.create_causal_test(test, base_test) + test_cases.append(causal_test) + + return test_cases + + def create_causal_test(self, test: dict, base_test: BaseTestCase) -> CausalTestCase: + """ + Create causal test case from test configuration and base test. + + :param test: Dictionary containing test configuration parameters + :param base_test: BaseTestCase object + + :return: CausalTestCase object + :raises: ValueError if invalid estimator or configuration is provided + """ + # Map effect string to effect class + effect_map = { + "NoEffect": NoEffect(), + "SomeEffect": SomeEffect(), + "Positive": Positive(), + "Negative": Negative(), + } + + # Map estimator string to estimator class + estimator_map = { + "LinearRegressionEstimator": LinearRegressionEstimator, + "LogisticRegressionEstimator": LogisticRegressionEstimator, + } + + if "estimator" not in test: + raise ValueError("Test configuration must specify an estimator") + + # Get the estimator class + estimator_class = estimator_map.get(test["estimator"]) + if estimator_class is None: + raise ValueError(f"Unknown estimator: {test['estimator']}") + + # Create the estimator with correct parameters + estimator = estimator_class( + base_test_case=base_test, + treatment_value=test.get("treatment_value"), + control_value=test.get("control_value"), + adjustment_set=test.get("adjustment_set", self.causal_specification.causal_dag.identification(base_test)), + df=self.data, + effect_modifiers=None, + formula=test.get("formula"), + alpha=test.get("alpha", 0.05), + query="", + ) + + # Get effect type and create expected effect + effect_type = test["expected_effect"][base_test.outcome_variable.name] + expected_effect = effect_map[effect_type] + + return CausalTestCase( + base_test_case=base_test, + expected_causal_effect=expected_effect, + estimate_type=test.get("estimate_type", "ate"), + estimate_params=test.get("estimate_params"), + effect_modifier_configuration=test.get("effect_modifier_configuration"), + estimator=estimator, + ) + + def run_tests_in_batches(self, batch_size: int = 100, silent: bool = False) -> List[CausalTestResult]: + """ + Run tests in batches to reduce memory usage. + + :param batch_size: Number of tests to run in each batch + :param silent: Whether to suppress errors + :return: List of all test results + :raises: ValueError if no tests are loaded + """ + logger.info("Running causal tests in batches...") + + if not self.test_cases: + raise ValueError("No tests loaded. Call load_tests() first.") + + num_tests = len(self.test_cases) + num_batches = int(np.ceil(num_tests / batch_size)) + + logger.info(f"Processing {num_tests} tests in {num_batches} batches of up to {batch_size} tests each") + all_results = [] + with tqdm(total=num_tests, desc="Overall progress", mininterval=0.1) as progress: + # Process each batch + for batch_idx in range(num_batches): + start_idx = batch_idx * batch_size + end_idx = min(start_idx + batch_size, num_tests) + + logger.info(f"Processing batch {batch_idx + 1} of {num_batches} (tests {start_idx} to {end_idx - 1})") + + # Get current batch of tests + current_batch = self.test_cases[start_idx:end_idx] + + # Process the current batch + batch_results = [] + for test_case in current_batch: + try: + result = test_case.execute_test() + batch_results.append(result) + except (TypeError, AttributeError) as e: + if not silent: + logger.error(f"Type or attribute error in test: {str(e)}") + raise + result = CausalTestResult( + estimator=test_case.estimator, + test_value=TestValue("Error", str(e)), + ) + batch_results.append(result) + + progress.update(1) + + all_results.extend(batch_results) + + logger.info(f"Completed batch {batch_idx + 1} of {num_batches}") + + logger.info(f"Completed processing all {len(all_results)} tests in {num_batches} batches") + return all_results + + def run_tests(self, silent=False) -> List[CausalTestResult]: + """ + Run all test cases and return their results. + + :return: List of CausalTestResult objects + :raises: ValueError if no tests are loaded + :raises: Exception if test execution fails + """ + logger.info("Running causal tests...") + + if not self.test_cases: + raise ValueError("No tests loaded. Call load_tests() first.") + + results = [] + for test_case in tqdm(self.test_cases): + try: + result = test_case.execute_test() + results.append(result) + logger.info(f"Test completed: {test_case}") + # pylint: disable=broad-exception-caught + except Exception as e: + if not silent: + logger.error(f"Error running test {test_case}: {str(e)}") + raise + result = CausalTestResult( + estimator=test_case.estimator, + test_value=TestValue("Error", str(e)), + ) + results.append(result) + logger.info(f"Test errored: {test_case}") + + return results + + def save_results(self, results: List[CausalTestResult]) -> None: + """Save test results to JSON file in the expected format.""" + logger.info(f"Saving results to {self.paths.output_path}") + + # Load original test configs to preserve test metadata + with open(self.paths.test_config_path, "r", encoding="utf-8") as f: + test_configs = json.load(f) + + # Combine test configs with their results + json_results = [] + for test_config, test_case, result in zip(test_configs["tests"], self.test_cases, results): + # Handle effect estimate - could be a Series or other format + effect_estimate = result.test_value.value + if isinstance(effect_estimate, pd.Series): + effect_estimate = effect_estimate.to_dict() + + # Handle confidence intervals - convert to list if needed + ci_low = result.ci_low() + ci_high = result.ci_high() + + # Determine if test failed based on expected vs actual effect + test_passed = test_case.expected_causal_effect.apply(result) if result.test_value.type != "Error" else False + + output = { + "name": test_config["name"], + "estimate_type": test_config["estimate_type"], + "effect": test_config.get("effect", "direct"), + "treatment_variable": test_config["treatment_variable"], + "expected_effect": test_config["expected_effect"], + "formula": test_config.get("formula"), + "alpha": test_config.get("alpha", 0.05), + "skip": test_config.get("skip", False), + "passed": test_passed, + "result": { + "treatment": result.estimator.base_test_case.treatment_variable.name, + "outcome": result.estimator.base_test_case.outcome_variable.name, + "adjustment_set": list(result.adjustment_set) if result.adjustment_set else [], + "effect_measure": result.test_value.type, + "effect_estimate": effect_estimate, + "ci_low": ci_low, + "ci_high": ci_high, + }, + } + json_results.append(output) + + # Save to file + with open(self.paths.output_path, "w", encoding="utf-8") as f: + json.dump(json_results, f, indent=2) + + logger.info("Results saved successfully") + return json_results + + +def setup_logging(verbose: bool = False) -> None: + """Set up logging configuration.""" + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + + +def parse_args(args: Optional[Sequence[str]] = None) -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="Causal Testing Framework") + parser.add_argument("-D", "--dag_path", help="Path to the DAG file (.dot)", required=True) + parser.add_argument("-d", "--data_paths", help="Paths to data files (.csv)", nargs="+", required=True) + parser.add_argument("-t", "--test_config", help="Path to test configuration file (.json)", required=True) + parser.add_argument("-o", "--output", help="Path for output file (.json)", required=True) + parser.add_argument("-v", "--verbose", help="Enable verbose logging", action="store_true", default=False) + parser.add_argument("-i", "--ignore-cycles", help="Ignore cycles in DAG", action="store_true", default=False) + parser.add_argument("-q", "--query", help="Query string to filter data (e.g. 'age > 18')", type=str) + parser.add_argument( + "-s", + "--silent", + action="store_true", + help="Do not crash on error. If set to true, errors are recorded as test results.", + default=False, + ) + parser.add_argument( + "--batch-size", + type=int, + default=0, + help="Run tests in batches of the specified size (default: 0, which means no batching)", + ) + + return parser.parse_args(args) diff --git a/causal_testing/specification/causal_dag.py b/causal_testing/specification/causal_dag.py index 33fba9c8..07fe42b6 100644 --- a/causal_testing/specification/causal_dag.py +++ b/causal_testing/specification/causal_dag.py @@ -132,6 +132,7 @@ class CausalDAG(nx.DiGraph): def __init__(self, dot_path: str = None, ignore_cycles: bool = False, **attr): super().__init__(**attr) + self.ignore_cycles = ignore_cycles if dot_path: with open(dot_path, "r", encoding="utf-8") as file: dot_content = file.read().replace("\n", "") @@ -556,6 +557,8 @@ def identification(self, base_test_case: BaseTestCase, scenario: Scenario = None :return minimal_adjustment_set: The smallest set of variables which can be adjusted for to obtain a causal estimate as opposed to a purely associational estimate. """ + if self.ignore_cycles: + return set(self.graph.predecessors(base_test_case.treatment_variable.name)) minimal_adjustment_sets = [] if base_test_case.effect == "total": minimal_adjustment_sets = self.enumerate_minimal_adjustment_sets( @@ -575,7 +578,7 @@ def identification(self, base_test_case: BaseTestCase, scenario: Scenario = None return set() minimal_adjustment_set = min(minimal_adjustment_sets, key=len) - return minimal_adjustment_set + return set(minimal_adjustment_set) def to_dot_string(self) -> str: """Return a string of the DOT representation of the causal DAG. diff --git a/causal_testing/testing/metamorphic_relation.py b/causal_testing/testing/metamorphic_relation.py index 8165e0fb..c901ff01 100644 --- a/causal_testing/testing/metamorphic_relation.py +++ b/causal_testing/testing/metamorphic_relation.py @@ -45,7 +45,7 @@ def to_json_stub(self, skip=True) -> dict: "estimator": "LinearRegressionEstimator", "estimate_type": "coefficient", "effect": "direct", - "mutations": [self.base_test_case.treatment_variable], + "treatment_variable": self.base_test_case.treatment_variable, "expected_effect": {self.base_test_case.outcome_variable: "SomeEffect"}, "formula": ( f"{self.base_test_case.outcome_variable} ~ " @@ -71,7 +71,7 @@ def to_json_stub(self, skip=True) -> dict: "estimator": "LinearRegressionEstimator", "estimate_type": "coefficient", "effect": "direct", - "mutations": [self.base_test_case.treatment_variable], + "treatment_variable": self.base_test_case.treatment_variable, "expected_effect": {self.base_test_case.outcome_variable: "NoEffect"}, "formula": ( f"{self.base_test_case.outcome_variable} ~ " diff --git a/examples/poisson-line-process/README.md b/examples/poisson-line-process/README.md index de28a98e..e50ff26f 100644 --- a/examples/poisson-line-process/README.md +++ b/examples/poisson-line-process/README.md @@ -9,3 +9,10 @@ To run this case study: 3. Run the command `python example_pure_python.py` to demonstrate causal testing using pure python. This should print a series of causal test results and produce two CSV files. `intensity_num_shapes_results_random_1000.csv` corresponds to table 1, and `width_num_shapes_results_random_1000.csv` relates to our findings regarding the relationship of width and `P_u`. + +## Running using the main entrypoint +You should be able to run the main entrypoint by simply running the following command from within this directory: + +``` +python -m causal_testing --dag_path dag.dot --data_paths data/random/data_random_1000.csv --test_config causal_tests.json --output results/test_results.json +``` diff --git a/examples/poisson-line-process/causal_tests.json b/examples/poisson-line-process/causal_tests.json index 69b107bf..3e30194e 100644 --- a/examples/poisson-line-process/causal_tests.json +++ b/examples/poisson-line-process/causal_tests.json @@ -1,240 +1,226 @@ { "tests": [ { - "name": "width__num_lines_abs", - "mutations": {"width": "Increase"}, - "estimator": "LinearRegressionEstimator", - "formula": "num_lines_abs ~ I(intensity * (width + height))", - "estimate_type": "ate", - "effect_modifiers": ["intensity", "height"], - "expected_effect": {"num_lines_abs": "PoissonWidthHeight"}, - "skip": true - }, - { - "name": "width__num_shapes_abs", - "mutations": {"width": "Increase"}, - "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": ["intensity", "height"], - "expected_effect": {"num_shapes_abs": "Positive"}, - "skip": true - }, - { - "name": "width__num_lines_unit", - "mutations": {"width": "Increase"}, - "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": ["intensity", "height"], - "expected_effect": {"num_lines_unit": "Negative"}, - "skip": true - }, - { - "name": "width__num_shapes_unit", - "mutations": {"width": "Increase"}, - "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": ["intensity", "height"], - "expected_effect": {"num_shapes_unit": "Negative"}, - "skip": true - }, - { - "name": "width__height", - "mutations": {"width": "Increase"}, - "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"height": "NoEffect"}, - "skip": true - }, - { - "name": "width__intensity", - "mutations": {"width": "Increase"}, - "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"intensity": "NoEffect"}, - "skip": true - }, - { - "name": "num_lines_abs__num_shapes_abs", - "mutations": {"num_lines_abs": "Increase"}, - "estimator": "CausalForestEstimator", - "estimate_type": "ate", - "effect_modifiers": ["intensity", "width"], - "expected_effect": {"num_shapes_abs": "Positive"}, - "skip": true - }, - { - "name": "num_lines_abs__num_lines_unit", - "mutations": {"num_lines_abs": "Increase"}, - "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"num_lines_unit": "Positive"}, - "skip": true - }, - { - "name": "num_lines_abs__num_shapes_unit", - "mutations": {"num_lines_abs": "Increase"}, - "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"num_shapes_unit": "NoEffect"}, - "skip": true + "name": "width --> num_lines_abs", + "estimator": "LinearRegressionEstimator", + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "width", + "expected_effect": { + "num_lines_abs": "SomeEffect" + }, + "formula": "num_lines_abs ~ width", + "skip": false }, { - "name": "num_shapes_abs__num_lines_unit", - "mutations": {"num_shapes_abs": "Increase"}, + "name": "width --> num_shapes_abs | ['height', 'num_lines_abs']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"num_lines_unit": "NoEffect"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "width", + "expected_effect": { + "num_shapes_abs": "SomeEffect" + }, + "formula": "num_shapes_abs ~ width + height + num_lines_abs", + "skip": false }, { - "name": "num_shapes_abs__num_shapes_unit", - "mutations": {"num_shapes_abs": "Increase"}, + "name": "width --> num_lines_unit | ['height', 'num_lines_abs']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"num_shapes_unit": "Positive"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "width", + "expected_effect": { + "num_lines_unit": "SomeEffect" + }, + "formula": "num_lines_unit ~ width + height + num_lines_abs", + "skip": false }, { - "name": "num_lines_unit__num_shapes_abs", - "mutations": {"num_lines_unit": "Increase"}, + "name": "width --> num_shapes_unit | ['height', 'num_shapes_abs']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"num_shapes_abs": "NoEffect"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "width", + "expected_effect": { + "num_shapes_unit": "SomeEffect" + }, + "formula": "num_shapes_unit ~ width + height + num_shapes_abs", + "skip": false }, { - "name": "num_lines_unit__num_shapes_unit", - "mutations": {"num_lines_unit": "Increase"}, + "name": "num_lines_abs --> num_shapes_abs | ['height', 'width']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"num_shapes_unit": "NoEffect"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "num_lines_abs", + "expected_effect": { + "num_shapes_abs": "SomeEffect" + }, + "formula": "num_shapes_abs ~ num_lines_abs + height + width", + "skip": false }, { - "name": "num_shapes_unit__num_lines_unit", - "mutations": {"num_shapes_unit": "Increase"}, + "name": "num_lines_abs --> num_lines_unit | ['height', 'width']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"num_lines_unit": "NoEffect"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "num_lines_abs", + "expected_effect": { + "num_lines_unit": "SomeEffect" + }, + "formula": "num_lines_unit ~ num_lines_abs + height + width", + "skip": false }, { - "name": "height__width", - "mutations": {"height": "Increase"}, + "name": "num_lines_abs _||_ num_shapes_unit | ['height', 'num_shapes_abs', 'width']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"width": "NoEffect"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "num_lines_abs", + "expected_effect": { + "num_shapes_unit": "NoEffect" + }, + "formula": "num_shapes_unit ~ num_lines_abs + height + num_shapes_abs + width", + "alpha": 0.05, + "skip": false }, { - "name": "height__num_lines_abs", - "mutations": {"height": "Increase"}, + "name": "height --> num_lines_abs", "estimator": "LinearRegressionEstimator", - "formula": "num_lines_abs ~ I(intensity * (width + height))", - "estimate_type": "ate", - "effect_modifiers": ["intensity", "width"], - "expected_effect": {"num_lines_abs": "PoissonWidthHeight"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "height", + "expected_effect": { + "num_lines_abs": "SomeEffect" + }, + "formula": "num_lines_abs ~ height", + "skip": false }, { - "name": "height__num_shapes_abs", - "mutations": {"height": "Increase"}, + "name": "intensity --> num_lines_abs", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": ["intensity", "width"], - "expected_effect": {"num_shapes_abs": "Positive"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "intensity", + "expected_effect": { + "num_lines_abs": "SomeEffect" + }, + "formula": "num_lines_abs ~ intensity", + "skip": false }, { - "name": "height__num_lines_unit", - "mutations": {"height": "Increase"}, + "name": "num_shapes_abs _||_ num_lines_unit | ['height', 'width', 'num_lines_abs']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": ["intensity", "width"], - "expected_effect": {"num_lines_unit": "Negative"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "num_shapes_abs", + "expected_effect": { + "num_lines_unit": "NoEffect" + }, + "formula": "num_lines_unit ~ num_shapes_abs + height + width + num_lines_abs", + "alpha": 0.05, + "skip": false }, { - "name": "height__num_shapes_unit", - "mutations": {"height": "Increase"}, + "name": "num_shapes_abs --> num_shapes_unit | ['height', 'width']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": ["intensity", "width"], - "expected_effect": {"num_shapes_unit": "Negative"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "num_shapes_abs", + "expected_effect": { + "num_shapes_unit": "SomeEffect" + }, + "formula": "num_shapes_unit ~ num_shapes_abs + height + width", + "skip": false }, { - "name": "height__intensity", - "mutations": {"height": "Increase"}, + "name": "height --> num_shapes_abs | ['num_lines_abs', 'width']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"intensity": "NoEffect"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "height", + "expected_effect": { + "num_shapes_abs": "SomeEffect" + }, + "formula": "num_shapes_abs ~ height + num_lines_abs + width", + "skip": false }, { - "name": "intensity__width", - "mutations": {"intensity": "Increase"}, + "name": "intensity _||_ num_shapes_abs | ['height', 'width', 'num_lines_abs']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"width": "NoEffect"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "intensity", + "expected_effect": { + "num_shapes_abs": "NoEffect" + }, + "formula": "num_shapes_abs ~ intensity + height + width + num_lines_abs", + "alpha": 0.05, + "skip": false }, { - "name": "intensity__num_lines_abs", - "mutations": {"intensity": "Increase"}, + "name": "num_lines_unit _||_ num_shapes_unit | ['height', 'num_shapes_abs', 'width']", "estimator": "LinearRegressionEstimator", - "formula": "num_lines_abs ~ I(intensity * (width + height))", - "effect_modifiers": ["height", "width"], - "estimate_type": "ate", - "expected_effect": {"num_lines_abs": "PoissonIntensity"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "num_lines_unit", + "expected_effect": { + "num_shapes_unit": "NoEffect" + }, + "formula": "num_shapes_unit ~ num_lines_unit + height + num_shapes_abs + width", + "alpha": 0.05, + "skip": false }, { - "name": "intensity__num_shapes_abs", - "mutations": {"intensity": "Increase"}, + "name": "height --> num_lines_unit | ['num_lines_abs', 'width']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": ["height", "width"], - "expected_effect": {"num_shapes_abs": "Positive"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "height", + "expected_effect": { + "num_lines_unit": "SomeEffect" + }, + "formula": "num_lines_unit ~ height + num_lines_abs + width", + "skip": false }, { - "name": "intensity__num_lines_unit", - "mutations": {"intensity": "Increase"}, + "name": "intensity _||_ num_lines_unit | ['height', 'width', 'num_lines_abs']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": ["height", "width"], - "expected_effect": {"num_lines_unit": "NoEffect"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "intensity", + "expected_effect": { + "num_lines_unit": "NoEffect" + }, + "formula": "num_lines_unit ~ intensity + height + width + num_lines_abs", + "alpha": 0.05, + "skip": false }, { - "name": "intensity__num_shapes_unit", - "mutations": {"intensity": "ChangeByFactor(2)"}, + "name": "height --> num_shapes_unit | ['num_shapes_abs', 'width']", "estimator": "LinearRegressionEstimator", - "estimate_type": "risk_ratio", - "expected_effect": {"num_shapes_unit": "ExactValue4_05"}, + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "height", + "expected_effect": { + "num_shapes_unit": "SomeEffect" + }, + "formula": "num_shapes_unit ~ height + num_shapes_abs + width", "skip": false }, { - "name": "intensity__height", - "mutations": {"intensity": "Increase"}, + "name": "intensity _||_ num_shapes_unit | ['height', 'num_shapes_abs', 'width']", "estimator": "LinearRegressionEstimator", - "estimate_type": "ate", - "effect_modifiers": [], - "expected_effect": {"height": "NoEffect"}, - "skip": true + "estimate_type": "coefficient", + "effect": "direct", + "treatment_variable": "intensity", + "expected_effect": { + "num_shapes_unit": "NoEffect" + }, + "formula": "num_shapes_unit ~ intensity + height + num_shapes_abs + width", + "alpha": 0.05, + "skip": false } ] -} +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 5810ce97..0eaee0ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ dependencies = [ "deap~=1.4.1", "sympy~=1.13.1", "deap~=1.4.1", + "pyarrow~=19.0.1", + "fastparquet~=2024.11.0", ] dynamic = ["version"] diff --git a/tests/main_tests/test_main.py b/tests/main_tests/test_main.py new file mode 100644 index 00000000..1a0538f1 --- /dev/null +++ b/tests/main_tests/test_main.py @@ -0,0 +1,167 @@ +import unittest +import shutil +import json +import pandas as pd +from pathlib import Path +from causal_testing.main import CausalTestingPaths, CausalTestingFramework, parse_args +from causal_testing.__main__ import main +from unittest.mock import patch + + +class TestCausalTestingPaths(unittest.TestCase): + + def setUp(self): + self.dag_path = "tests/resources/data/dag.dot" + self.data_paths = ["tests/resources/data/data.csv"] + self.test_config_path = "tests/resources/data/tests.json" + self.output_path = Path("results/results.json") + + def test_missing_dag(self): + with self.assertRaises(FileNotFoundError) as e: + CausalTestingPaths("missing.dot", self.data_paths, self.test_config_path, self.output_path).validate_paths() + self.assertEqual("DAG file not found: missing.dot", str(e.exception)) + + def test_missing_data(self): + with self.assertRaises(FileNotFoundError) as e: + CausalTestingPaths(self.dag_path, ["missing.csv"], self.test_config_path, self.output_path).validate_paths() + self.assertEqual("Data file not found: missing.csv", str(e.exception)) + + def test_missing_tests(self): + with self.assertRaises(FileNotFoundError) as e: + CausalTestingPaths(self.dag_path, self.data_paths, "missing.json", self.output_path).validate_paths() + self.assertEqual("Test configuration file not found: missing.json", str(e.exception)) + + def test_output_file_created(self): + self.assertFalse(self.output_path.parent.exists()) + CausalTestingPaths(self.dag_path, self.data_paths, self.test_config_path, self.output_path).validate_paths() + self.assertTrue(self.output_path.parent.exists()) + + def tearDown(self): + if self.output_path.parent.exists(): + shutil.rmtree(self.output_path.parent) + + +class TestCausalTestingFramework(unittest.TestCase): + def setUp(self): + self.dag_path = "tests/resources/data/dag.dot" + self.data_paths = ["tests/resources/data/data.csv"] + self.test_config_path = "tests/resources/data/tests.json" + self.output_path = Path("results/results.json") + self.paths = CausalTestingPaths( + dag_path=self.dag_path, + data_paths=self.data_paths, + test_config_path=self.test_config_path, + output_path=self.output_path, + ) + + def test_load_data(self): + csv_framework = CausalTestingFramework(self.paths) + csv_df = csv_framework.load_data() + + pqt_framework = CausalTestingFramework( + CausalTestingPaths( + dag_path=self.dag_path, + data_paths=["tests/resources/data/data.pqt"], + test_config_path=self.test_config_path, + output_path=self.output_path, + ) + ) + pqt_df = pqt_framework.load_data() + pd.testing.assert_frame_equal(csv_df, pqt_df) + + def test_load_data_invalid(self): + framework = CausalTestingFramework( + CausalTestingPaths( + dag_path=self.dag_path, + data_paths=[self.dag_path], + test_config_path=self.test_config_path, + output_path=self.output_path, + ) + ) + with self.assertRaises(ValueError): + framework.load_data() + + def test_load_data_query(self): + framework = CausalTestingFramework(self.paths) + self.assertFalse((framework.load_data()["test_input"] > 4).all()) + self.assertTrue((framework.load_data("test_input > 4")["test_input"] > 4).all()) + + def test_load_dag_missing_node(self): + framework = CausalTestingFramework(self.paths) + framework.setup() + framework.dag.graph.add_node("missing") + with self.assertRaises(ValueError): + framework.create_variables() + + def test_create_base_test_case_missing_treatment(self): + framework = CausalTestingFramework(self.paths) + framework.setup() + with self.assertRaises(KeyError) as e: + framework.create_base_test( + {"treatment_variable": "missing", "expected_effect": {"test_outcome": "NoEffect"}} + ) + self.assertEqual("\"Treatment variable 'missing' not found in inputs or outputs\"", str(e.exception)) + + def test_create_base_test_case_missing_estimator(self): + framework = CausalTestingFramework(self.paths) + framework.setup() + with self.assertRaises(ValueError) as e: + framework.create_causal_test({}, None) + self.assertEqual("Test configuration must specify an estimator", str(e.exception)) + + def test_create_base_test_case_invalid_estimator(self): + framework = CausalTestingFramework(self.paths) + framework.setup() + with self.assertRaises(ValueError) as e: + framework.create_causal_test({"estimator": "InvalidEstimator"}, None) + self.assertEqual("Unknown estimator: InvalidEstimator", str(e.exception)) + + def test_create_base_test_case_missing_outcome(self): + framework = CausalTestingFramework(self.paths) + framework.setup() + with self.assertRaises(KeyError) as e: + framework.create_base_test({"treatment_variable": "test_input", "expected_effect": {"missing": "NoEffect"}}) + self.assertEqual("\"Outcome variable 'missing' not found in inputs or outputs\"", str(e.exception)) + + def test_ctf(self): + framework = CausalTestingFramework(self.paths) + framework.setup() + + # Load and run tests + framework.load_tests() + results = framework.run_tests() + + # Save results + framework.save_results(results) + + with open(self.test_config_path, "r", encoding="utf-8") as f: + test_configs = json.load(f) + + tests_passed = [ + test_case.expected_causal_effect.apply(result) if result.test_value.type != "Error" else False + for test_config, test_case, result in zip(test_configs["tests"], framework.test_cases, results) + ] + + self.assertEqual(tests_passed, [True]) + + def test_parse_args(self): + with unittest.mock.patch( + "sys.argv", + [ + "causal_testing", + "--dag_path", + str(self.dag_path), + "--data_paths", + str(self.data_paths[0]), + "--test_config", + str(self.test_config_path), + "--output", + str(self.output_path.parent / "main.json"), + ], + ): + main() + self.assertTrue((self.output_path.parent / "main.json").exists()) + + def tearDown(self): + if self.output_path.parent.exists(): + shutil.rmtree(self.output_path.parent) diff --git a/tests/resources/data/data.csv b/tests/resources/data/data.csv index e95cae74..ec2d6002 100644 --- a/tests/resources/data/data.csv +++ b/tests/resources/data/data.csv @@ -1,2 +1,6 @@ -index,test_input,test_input_no_dist,test_output -0,1.0,1.0,2.0 +test_input,test_input_no_dist,test_output,B,C +1.0,1.1,2.2,0,0 +2.0,1.1,2.8,0,0 +3.0,1.0,1.0,0,0 +4.0,1.2,6.0,0,0 +5.0,0.9,2.5,0,0 diff --git a/tests/resources/data/data.pqt b/tests/resources/data/data.pqt new file mode 100644 index 00000000..526a04e6 Binary files /dev/null and b/tests/resources/data/data.pqt differ diff --git a/tests/resources/data/tests.json b/tests/resources/data/tests.json index afbce326..43d875a6 100644 --- a/tests/resources/data/tests.json +++ b/tests/resources/data/tests.json @@ -1,11 +1,20 @@ { "tests": [{ "name": "test1", - "mutations": {}, - "estimator": null, - "estimate_type": null, + "treatment_variable": "test_input", + "estimator": "LinearRegressionEstimator", + "estimate_type": "coefficient", "effect_modifiers": [], - "expected_effect": {}, + "expected_effect": {"test_output": "NoEffect"}, "skip": false + }, + { + "name": "test2", + "treatment_variable": "test_input", + "estimator": "LinearRegressionEstimator", + "estimate_type": "coefficient", + "effect_modifiers": [], + "expected_effect": {"test_output": "NoEffect"}, + "skip": true }] } diff --git a/tests/specification_tests/test_causal_dag.py b/tests/specification_tests/test_causal_dag.py index 28551dc3..a619122d 100644 --- a/tests/specification_tests/test_causal_dag.py +++ b/tests/specification_tests/test_causal_dag.py @@ -85,7 +85,6 @@ def setUp(self) -> None: def test_valid_causal_dag(self): """Test whether the Causal DAG is valid.""" causal_dag = CausalDAG(self.dag_dot_path) - print(causal_dag) assert list(causal_dag.nodes) == ["A", "B", "C", "D"] and list(causal_dag.edges) == [ ("A", "B"), ("B", "C"), @@ -127,6 +126,11 @@ def setUp(self) -> None: def test_invalid_causal_dag(self): self.assertRaises(nx.HasACycle, CausalDAG, self.dag_dot_path) + def test_ignore_cycles(self): + dag = CausalDAG(self.dag_dot_path, ignore_cycles=True) + base_test_case = BaseTestCase(Output("B", float), Output("C", float)) + self.assertEqual(dag.identification(base_test_case), {"A"}) + def tearDown(self) -> None: shutil.rmtree(self.temp_dir_path) diff --git a/tests/testing_tests/test_metamorphic_relations.py b/tests/testing_tests/test_metamorphic_relations.py index a3f5495a..723285b4 100644 --- a/tests/testing_tests/test_metamorphic_relations.py +++ b/tests/testing_tests/test_metamorphic_relations.py @@ -55,7 +55,7 @@ def test_should_not_cause_json_stub(self): "estimate_type": "coefficient", "estimator": "LinearRegressionEstimator", "expected_effect": {"Z": "NoEffect"}, - "mutations": ["X1"], + "treatment_variable": "X1", "name": "X1 _||_ Z", "formula": "Z ~ X1", "alpha": 0.05, @@ -78,7 +78,7 @@ def test_should_cause_json_stub(self): "estimator": "LinearRegressionEstimator", "expected_effect": {"Z": "SomeEffect"}, "formula": "Z ~ X1", - "mutations": ["X1"], + "treatment_variable": "X1", "name": "X1 --> Z", "skip": True, },