diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 929c555d..407a381f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -48,3 +48,10 @@ repos: additional_dependencies: # Support pyproject.toml configuration - tomli + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.18.2 + hooks: + - id: mypy + args: [--ignore-missing-imports] + files: ^causalpy/ + additional_dependencies: [numpy>=1.20, pandas-stubs] diff --git a/AGENTS.md b/AGENTS.md index 5bc7c16e..e1094297 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -37,3 +37,13 @@ - **Formulas**: Use patsy for formula parsing (via `dmatrices()`) - **Custom exceptions**: Use project-specific exceptions from `causalpy.custom_exceptions`: `FormulaException`, `DataException`, `BadIndexException` - **File organization**: Experiments in `causalpy/experiments/`, PyMC models in `causalpy/pymc_models.py`, scikit-learn models in `causalpy/skl_models.py` + +## Type Checking + +- **Tool**: MyPy +- **Configuration**: Integrated as a pre-commit hook. +- **Scope**: Checks Python files within the `causalpy/` directory. +- **Settings**: + - `ignore-missing-imports`: Enabled to allow for gradual adoption of type hints without requiring all third-party libraries to have stubs. + - `additional_dependencies`: Includes `numpy` and `pandas-stubs` to provide type information for these libraries. +- **Execution**: Run automatically via `pre-commit run --all-files` or on commit. diff --git a/causalpy/data/datasets.py b/causalpy/data/datasets.py index 3102bdf0..40085799 100644 --- a/causalpy/data/datasets.py +++ b/causalpy/data/datasets.py @@ -43,15 +43,28 @@ } -def _get_data_home() -> pathlib.PosixPath: +def _get_data_home() -> pathlib.Path: """Return the path of the data directory""" return pathlib.Path(cp.__file__).parents[1] / "causalpy" / "data" -def load_data(dataset: str = None) -> pd.DataFrame: - """Loads the requested dataset and returns a pandas DataFrame. +def load_data(dataset: str | None = None) -> pd.DataFrame: + """Load the requested dataset and return a pandas DataFrame. - :param dataset: The desired dataset to load + Parameters + ---------- + dataset : str, optional + The desired dataset to load. If None, raises ValueError. + + Returns + ------- + pd.DataFrame + The loaded dataset as a pandas DataFrame. + + Raises + ------ + ValueError + If the requested dataset is not found. """ if dataset in DATASETS: diff --git a/causalpy/data/simulate_data.py b/causalpy/data/simulate_data.py index c21b98a2..de253420 100644 --- a/causalpy/data/simulate_data.py +++ b/causalpy/data/simulate_data.py @@ -15,8 +15,6 @@ Functions that generate data sets used in examples """ -from typing import Any - import numpy as np import pandas as pd from scipy.stats import dirichlet, gamma, norm, uniform @@ -31,7 +29,7 @@ def _smoothed_gaussian_random_walk( gaussian_random_walk_mu: float, gaussian_random_walk_sigma: float, N: int, - lowess_kwargs: dict[str, Any], + lowess_kwargs: dict, ) -> tuple[np.ndarray, np.ndarray]: """ Generates Gaussian random walk data and applies LOWESS. @@ -57,7 +55,7 @@ def generate_synthetic_control_data( treatment_time: int = 70, grw_mu: float = 0.25, grw_sigma: float = 1, - lowess_kwargs: dict[str, Any] | None = None, + lowess_kwargs: dict = default_lowess_kwargs, ) -> tuple[pd.DataFrame, np.ndarray]: """ Generates data for synthetic control example. @@ -78,9 +76,6 @@ def generate_synthetic_control_data( >>> from causalpy.data.simulate_data import generate_synthetic_control_data >>> df, weightings_true = generate_synthetic_control_data(treatment_time=70) """ - if lowess_kwargs is None: - lowess_kwargs = default_lowess_kwargs - # 1. Generate non-treated variables df = pd.DataFrame( { @@ -166,7 +161,9 @@ def generate_time_series_data( return df -def generate_time_series_data_seasonal(treatment_time: pd.Timestamp) -> pd.DataFrame: +def generate_time_series_data_seasonal( + treatment_time: pd.Timestamp, +) -> pd.DataFrame: """ Generates 10 years of monthly data with seasonality """ @@ -180,11 +177,13 @@ def generate_time_series_data_seasonal(treatment_time: pd.Timestamp) -> pd.DataF t=df.index, ).set_index("date", drop=True) month_effect = np.array([11, 13, 12, 15, 19, 23, 21, 28, 20, 17, 15, 12]) - df["y"] = 0.2 * df["t"] + 2 * month_effect[df.month.values - 1] + df["y"] = 0.2 * df["t"] + 2 * month_effect[np.asarray(df.month.values) - 1] N = df.shape[0] idx = np.arange(N)[df.index > treatment_time] - df["causal effect"] = 100 * gamma(10).pdf(np.arange(0, N, 1) - np.min(idx)) + df["causal effect"] = 100 * gamma(10).pdf( + np.array(np.arange(0, N, 1)) - int(np.min(idx)) + ) df["y"] += df["causal effect"] df["y"] += norm(0, 2).rvs(N) @@ -263,13 +262,13 @@ def outcome( df["post_treatment"] = df["t"] > intervention_time df["y"] = outcome( - df["t"], + np.asarray(df["t"]), control_intercept, treat_intercept_delta, trend, Δ, - df["group"], - df["post_treatment"], + np.asarray(df["group"]), + np.asarray(df["post_treatment"]), ) df["y"] += rng.normal(0, 0.1, df.shape[0]) return df @@ -310,8 +309,8 @@ def impact(x: np.ndarray) -> np.ndarray: def generate_ancova_data( N: int = 200, pre_treatment_means: np.ndarray = np.array([10, 12]), - treatment_effect: float = 2, - sigma: float = 1, + treatment_effect: int = 2, + sigma: int = 1, ) -> pd.DataFrame: """ Generate ANCOVA example data @@ -445,7 +444,7 @@ def generate_multicell_geolift_data() -> pd.DataFrame: def generate_seasonality( - n: int = 12, amplitude: float = 1, length_scale: float = 0.5 + n: int = 12, amplitude: int = 1, length_scale: float = 0.5 ) -> np.ndarray: """Generate monthly seasonality by sampling from a Gaussian process with a Gaussian kernel, using numpy code""" @@ -463,9 +462,9 @@ def generate_seasonality( def periodic_kernel( x1: np.ndarray, x2: np.ndarray, - period: float = 1, - length_scale: float = 1, - amplitude: float = 1, + period: int = 1, + length_scale: float = 1.0, + amplitude: int = 1, ) -> np.ndarray: """Generate a periodic kernel for gaussian process""" return amplitude**2 * np.exp( @@ -475,10 +474,10 @@ def periodic_kernel( def create_series( n: int = 52, - amplitude: float = 1, - length_scale: float = 2, + amplitude: int = 1, + length_scale: int = 2, n_years: int = 4, - intercept: float = 3, + intercept: int = 3, ) -> np.ndarray: """ Returns numpy tile with generated seasonality data repeated over diff --git a/causalpy/experiments/base.py b/causalpy/experiments/base.py index f24cc69a..b3d3d80d 100644 --- a/causalpy/experiments/base.py +++ b/causalpy/experiments/base.py @@ -16,6 +16,7 @@ """ from abc import abstractmethod +from typing import Any, Union import arviz as az import matplotlib.pyplot as plt @@ -29,10 +30,12 @@ class BaseExperiment: """Base class for quasi experimental designs.""" + labels: list[str] + supports_bayes: bool supports_ols: bool - def __init__(self, model=None): + def __init__(self, model: Union[PyMCModel, RegressorMixin] | None = None) -> None: # Ensure we've made any provided Scikit Learn model (as identified as being type # RegressorMixin) compatible with CausalPy by appending our custom methods. if isinstance(model, RegressorMixin): @@ -50,16 +53,26 @@ def __init__(self, model=None): if self.model is None: raise ValueError("model not set or passed.") + def fit(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("fit method not implemented") + @property - def idata(self): + def idata(self) -> az.InferenceData: """Return the InferenceData object of the model. Only relevant for PyMC models.""" return self.model.idata - def print_coefficients(self, round_to=None): - """Ask the model to print its coefficients.""" + def print_coefficients(self, round_to: int | None = None) -> None: + """Ask the model to print its coefficients. + + Parameters + ---------- + round_to : int, optional + Number of significant figures to round to. Defaults to None, + in which case 2 significant figures are used. + """ self.model.print_coefficients(self.labels, round_to) - def plot(self, *args, **kwargs) -> tuple: + def plot(self, *args: Any, **kwargs: Any) -> tuple: """Plot the model. Internally, this function dispatches to either `_bayesian_plot` or `_ols_plot` @@ -75,16 +88,16 @@ def plot(self, *args, **kwargs) -> tuple: raise ValueError("Unsupported model type") @abstractmethod - def _bayesian_plot(self, *args, **kwargs): + def _bayesian_plot(self, *args: Any, **kwargs: Any) -> tuple: """Abstract method for plotting the model.""" raise NotImplementedError("_bayesian_plot method not yet implemented") @abstractmethod - def _ols_plot(self, *args, **kwargs): + def _ols_plot(self, *args: Any, **kwargs: Any) -> tuple: """Abstract method for plotting the model.""" raise NotImplementedError("_ols_plot method not yet implemented") - def get_plot_data(self, *args, **kwargs) -> pd.DataFrame: + def get_plot_data(self, *args: Any, **kwargs: Any) -> pd.DataFrame: """Recover the data of an experiment along with the prediction and causal impact information. Internally, this function dispatches to either :func:`get_plot_data_bayesian` or :func:`get_plot_data_ols` @@ -98,11 +111,11 @@ def get_plot_data(self, *args, **kwargs) -> pd.DataFrame: raise ValueError("Unsupported model type") @abstractmethod - def get_plot_data_bayesian(self, *args, **kwargs): + def get_plot_data_bayesian(self, *args: Any, **kwargs: Any) -> pd.DataFrame: """Abstract method for recovering plot data.""" raise NotImplementedError("get_plot_data_bayesian method not yet implemented") @abstractmethod - def get_plot_data_ols(self, *args, **kwargs): + def get_plot_data_ols(self, *args: Any, **kwargs: Any) -> pd.DataFrame: """Abstract method for recovering plot data.""" raise NotImplementedError("get_plot_data_ols method not yet implemented") diff --git a/causalpy/experiments/diff_in_diff.py b/causalpy/experiments/diff_in_diff.py index 7a453a5d..02cb8e6c 100644 --- a/causalpy/experiments/diff_in_diff.py +++ b/causalpy/experiments/diff_in_diff.py @@ -15,6 +15,8 @@ Difference in differences """ +from typing import Union + import arviz as az import numpy as np import pandas as pd @@ -47,20 +49,24 @@ class DifferenceInDifferences(BaseExperiment): .. note:: - There is no pre/post intervention data distinction for DiD, we fit all the - data available. - :param data: - A pandas dataframe - :param formula: - A statistical model formula - :param time_variable_name: - Name of the data column for the time variable - :param group_variable_name: - Name of the data column for the group variable - :param post_treatment_variable_name: - Name of the data column indicating post-treatment period (default: "post_treatment") - :param model: - A PyMC model for difference in differences + There is no pre/post intervention data distinction for DiD, we fit + all the data available. + + Parameters + ---------- + data : pd.DataFrame + A pandas dataframe. + formula : str + A statistical model formula. + time_variable_name : str + Name of the data column for the time variable. + group_variable_name : str + Name of the data column for the group variable. + post_treatment_variable_name : str, optional + Name of the data column indicating post-treatment period. + Defaults to "post_treatment". + model : PyMCModel or RegressorMixin, optional + A PyMC model for difference in differences. Defaults to None. Example -------- @@ -92,10 +98,11 @@ def __init__( time_variable_name: str, group_variable_name: str, post_treatment_variable_name: str = "post_treatment", - model=None, - **kwargs, + model: Union[PyMCModel, RegressorMixin] | None = None, + **kwargs: dict, ) -> None: super().__init__(model=model) + self.causal_impact: xr.DataArray | float | None # rename the index to "obs_ind" data.index.name = "obs_ind" self.data = data @@ -213,6 +220,7 @@ def __init__( # calculate causal impact if isinstance(self.model, PyMCModel): + assert self.model.idata is not None # This is the coefficient on the interaction term coeff_names = self.model.idata.posterior.coords["coeffs"].data for i, label in enumerate(coeff_names): @@ -232,14 +240,14 @@ def __init__( f"{self.group_variable_name}:{self.post_treatment_variable_name}" ) matched_key = next((k for k in coef_map if interaction_term in k), None) - att = coef_map.get(matched_key) + att = coef_map.get(matched_key) if matched_key is not None else None self.causal_impact = att else: raise ValueError("Model type not recognized") return - def input_validation(self): + def input_validation(self) -> None: # Validate formula structure and interaction interaction terms self._validate_formula_interaction_terms() @@ -267,7 +275,7 @@ def input_validation(self): coded. Consisting of 0's and 1's only.""" ) - def _validate_formula_interaction_terms(self): + def _validate_formula_interaction_terms(self) -> None: """ Validate that the formula contains at most one interaction term and no three-way or higher-order interactions. Raises FormulaException if more than one interaction term is found or if any interaction term has more than 2 variables. @@ -297,7 +305,7 @@ def _validate_formula_interaction_terms(self): "Multiple interaction terms are not currently supported as they complicate interpretation of the causal effect." ) - def summary(self, round_to=None) -> None: + def summary(self, round_to: int | None = 2) -> None: """Print summary of main results and model coefficients. :param round_to: @@ -309,11 +317,13 @@ def summary(self, round_to=None) -> None: print(self._causal_impact_summary_stat(round_to)) self.print_coefficients(round_to) - def _causal_impact_summary_stat(self, round_to=None) -> str: + def _causal_impact_summary_stat(self, round_to: int | None = None) -> str: """Computes the mean and 94% credible interval bounds for the causal impact.""" return f"Causal impact = {convert_to_string(self.causal_impact, round_to=round_to)}" - def _bayesian_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, plt.Axes]: + def _bayesian_plot( + self, round_to: int | None = None, **kwargs: dict + ) -> tuple[plt.Figure, plt.Axes]: """ Plot the results @@ -461,9 +471,10 @@ def _plot_causal_impact_arrow(results, ax): ) return fig, ax - def _ols_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, plt.Axes]: + def _ols_plot( + self, round_to: int | None = 2, **kwargs: dict + ) -> tuple[plt.Figure, plt.Axes]: """Generate plot for difference-in-differences""" - round_to = kwargs.get("round_to") fig, ax = plt.subplots() # Plot raw data @@ -526,11 +537,15 @@ def _ols_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, plt.Axes]: va="center", ) # formatting + # In OLS context, causal_impact should be a float, but mypy doesn't know this + causal_impact_value = ( + float(self.causal_impact) if self.causal_impact is not None else 0.0 + ) ax.set( xlim=[-0.05, 1.1], xticks=[0, 1], xticklabels=["pre", "post"], - title=f"Causal impact = {round_num(self.causal_impact, round_to)}", + title=f"Causal impact = {round_num(causal_impact_value, round_to)}", ) ax.legend(fontsize=LEGEND_FONT_SIZE) return fig, ax diff --git a/causalpy/experiments/instrumental_variable.py b/causalpy/experiments/instrumental_variable.py index 001ce9af..15427b40 100644 --- a/causalpy/experiments/instrumental_variable.py +++ b/causalpy/experiments/instrumental_variable.py @@ -27,31 +27,30 @@ class InstrumentalVariable(BaseExperiment): - """ - A class to analyse instrumental variable style experiments. - - :param instruments_data: A pandas dataframe of instruments - for our treatment variable. Should contain - instruments Z, and treatment t - :param data: A pandas dataframe of covariates for fitting - the focal regression of interest. Should contain covariates X - including treatment t and outcome y - :param instruments_formula: A statistical model formula for - the instrumental stage regression - e.g. t ~ 1 + z1 + z2 + z3 - :param formula: A statistical model formula for the \n - focal regression e.g. y ~ 1 + t + x1 + x2 + x3 - :param model: A PyMC model - :param priors: An optional dictionary of priors for the - mus and sigmas of both regressions. If priors are not - specified we will substitute MLE estimates for the beta - coefficients. Greater control can be achieved - by specifying the priors directly e.g. priors = { - "mus": [0, 0], - "sigmas": [1, 1], - "eta": 2, - "lkj_sd": 2, - } + """A class to analyse instrumental variable style experiments. + + Parameters + ---------- + instruments_data : pd.DataFrame + A pandas dataframe of instruments for our treatment variable. + Should contain instruments Z, and treatment t. + data : pd.DataFrame + A pandas dataframe of covariates for fitting the focal regression + of interest. Should contain covariates X including treatment t and + outcome y. + instruments_formula : str + A statistical model formula for the instrumental stage regression, + e.g. ``t ~ 1 + z1 + z2 + z3``. + formula : str + A statistical model formula for the focal regression, + e.g. ``y ~ 1 + t + x1 + x2 + x3``. + model : BaseExperiment, optional + A PyMC model. Defaults to None. + priors : dict, optional + Dictionary of priors for the mus and sigmas of both regressions. + If priors are not specified we will substitute MLE estimates for + the beta coefficients. Example: ``priors = {"mus": [0, 0], + "sigmas": [1, 1], "eta": 2, "lkj_sd": 2}``. Example -------- @@ -97,10 +96,10 @@ def __init__( data: pd.DataFrame, instruments_formula: str, formula: str, - model=None, - priors=None, - **kwargs, - ): + model: BaseExperiment | None = None, + priors: dict | None = None, + **kwargs: dict, + ) -> None: super().__init__(model=model) self.expt_type = "Instrumental Variable Regression" self.data = data @@ -138,11 +137,11 @@ def __init__( "lkj_sd": 1, } self.priors = priors - self.model.fit( + self.model.fit( # type: ignore[call-arg,union-attr] X=self.X, Z=self.Z, y=self.y, t=self.t, coords=COORDS, priors=self.priors ) - def input_validation(self): + def input_validation(self) -> None: """Validate the input data and model formula for correctness""" treatment = self.instruments_formula.split("~")[0] test = treatment.strip() in self.instruments_data.columns @@ -165,7 +164,7 @@ def input_validation(self): The coefficients should be interpreted appropriately.""" ) - def get_2SLS_fit(self): + def get_2SLS_fit(self) -> None: """ Two Stage Least Squares Fit @@ -187,7 +186,7 @@ def get_2SLS_fit(self): self.first_stage_reg = first_stage_reg self.second_stage_reg = second_stage_reg - def get_naive_OLS_fit(self): + def get_naive_OLS_fit(self) -> None: """ Naive Ordinary Least Squares @@ -199,7 +198,7 @@ def get_naive_OLS_fit(self): self.ols_beta_params = dict(zip(self._x_design_info.column_names, beta_params)) self.ols_reg = ols_reg - def plot(self, round_to=None): + def plot(self, *args, **kwargs) -> None: # type: ignore[override] """ Plot the results @@ -208,7 +207,7 @@ def plot(self, round_to=None): """ raise NotImplementedError("Plot method not implemented.") - def summary(self, round_to=None) -> None: + def summary(self, round_to: int | None = None) -> None: """Print summary of main results and model coefficients. :param round_to: diff --git a/causalpy/experiments/interrupted_time_series.py b/causalpy/experiments/interrupted_time_series.py index dce9d710..25ff1932 100644 --- a/causalpy/experiments/interrupted_time_series.py +++ b/causalpy/experiments/interrupted_time_series.py @@ -89,10 +89,12 @@ def __init__( data: pd.DataFrame, treatment_time: Union[int, float, pd.Timestamp], formula: str, - model=None, - **kwargs, + model: Union[PyMCModel, RegressorMixin] | None = None, + **kwargs: dict, ) -> None: super().__init__(model=model) + self.pre_y: xr.DataArray + self.post_y: xr.DataArray # rename the index to "obs_ind" data.index.name = "obs_ind" self.input_validation(data, treatment_time) @@ -187,7 +189,9 @@ def __init__( self.post_impact ) - def input_validation(self, data, treatment_time): + def input_validation( + self, data: pd.DataFrame, treatment_time: Union[int, float, pd.Timestamp] + ) -> None: """Validate the input data and model formula for correctness""" if isinstance(data.index, pd.DatetimeIndex) and not isinstance( treatment_time, pd.Timestamp @@ -202,7 +206,7 @@ def input_validation(self, data, treatment_time): "If data.index is not DatetimeIndex, treatment_time must be pd.Timestamp." # noqa: E501 ) - def summary(self, round_to=None) -> None: + def summary(self, round_to: int | None = None) -> None: """Print summary of main results and model coefficients. :param round_to: @@ -213,7 +217,7 @@ def summary(self, round_to=None) -> None: self.print_coefficients(round_to) def _bayesian_plot( - self, round_to=None, **kwargs + self, round_to: int | None = 2, **kwargs: dict ) -> tuple[plt.Figure, List[plt.Axes]]: """ Plot the results @@ -338,7 +342,9 @@ def _bayesian_plot( return fig, ax - def _ols_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, List[plt.Axes]]: + def _ols_plot( + self, round_to: int | None = 2, **kwargs: dict + ) -> tuple[plt.Figure, List[plt.Axes]]: """ Plot the results @@ -361,7 +367,7 @@ def _ols_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, List[plt.Axes] c="k", ) ax[0].set( - title=f"$R^2$ on pre-intervention data = {round_num(self.score, round_to)}" + title=f"$R^2$ on pre-intervention data = {round_num(float(self.score), round_to)}" ) ax[1].plot(self.datapre.index, self.pre_impact, "k.") diff --git a/causalpy/experiments/inverse_propensity_weighting.py b/causalpy/experiments/inverse_propensity_weighting.py index 1a04c070..92933203 100644 --- a/causalpy/experiments/inverse_propensity_weighting.py +++ b/causalpy/experiments/inverse_propensity_weighting.py @@ -31,22 +31,23 @@ class InversePropensityWeighting(BaseExperiment): - """ - A class to analyse inverse propensity weighting experiments. + """A class to analyse inverse propensity weighting experiments. - :param data: - A pandas dataframe - :param formula: - A statistical model formula for the propensity model - :param outcome_variable - A string denoting the outcome variable in datq to be reweighted - :param weighting_scheme: - A string denoting which weighting scheme to use among: 'raw', 'robust', - 'doubly robust' or 'overlap'. See Aronow and Miller "Foundations - of Agnostic Statistics" for discussion and computation of these - weighting schemes. - :param model: - A PyMC model + Parameters + ---------- + data : pd.DataFrame + A pandas dataframe. + formula : str + A statistical model formula for the propensity model. + outcome_variable : str + A string denoting the outcome variable in data to be reweighted. + weighting_scheme : str + A string denoting which weighting scheme to use among: 'raw', + 'robust', 'doubly robust' or 'overlap'. See Aronow and Miller + "Foundations of Agnostic Statistics" for discussion and computation + of these weighting schemes. + model : BaseExperiment, optional + A PyMC model. Defaults to None. Example -------- @@ -78,9 +79,9 @@ def __init__( formula: str, outcome_variable: str, weighting_scheme: str, - model=None, - **kwargs, - ): + model: BaseExperiment | None = None, + **kwargs: dict, + ) -> None: super().__init__(model=model) self.expt_type = "Inverse Propensity Score Weighting" self.data = data @@ -98,12 +99,12 @@ def __init__( COORDS = {"obs_ind": list(range(self.X.shape[0])), "coeffs": self.labels} self.coords = COORDS - self.X_outcome = pd.DataFrame(self.X, columns=self.coords["coeffs"]) + self.X_outcome = pd.DataFrame(self.X, columns=self.labels) self.X_outcome["trt"] = self.t self.coords["outcome_coeffs"] = self.X_outcome.columns - self.model.fit(X=self.X, t=self.t, coords=COORDS) + self.model.fit(X=self.X, t=self.t, coords=COORDS) # type: ignore[call-arg] - def input_validation(self): + def input_validation(self) -> None: """Validate the input data and model formula for correctness""" treatment = self.formula.split("~")[0] test = treatment.strip() in self.data.columns @@ -125,7 +126,9 @@ def input_validation(self): """ ) - def make_robust_adjustments(self, ps): + def make_robust_adjustments( + self, ps: np.ndarray + ) -> tuple[pd.Series, pd.Series, int, int]: """This estimator is discussed in Aronow and Miller's book as being related to the Horvitz Thompson method""" @@ -145,7 +148,9 @@ def make_robust_adjustments(self, ps): weighted_outcome0 = outcome_ntrt * i_propensity0 return weighted_outcome0, weighted_outcome1, n_ntrt, n_trt - def make_raw_adjustments(self, ps): + def make_raw_adjustments( + self, ps: np.ndarray + ) -> tuple[pd.Series, pd.Series, int, int]: """This estimator is discussed in Aronow and Miller as the simplest of base form of inverse propensity weighting schemes""" @@ -164,7 +169,9 @@ def make_raw_adjustments(self, ps): weighted_outcome0 = outcome_ntrt * i_propensity0 return weighted_outcome0, weighted_outcome1, n_ntrt, n_trt - def make_overlap_adjustments(self, ps): + def make_overlap_adjustments( + self, ps: np.ndarray + ) -> tuple[pd.Series, pd.Series, pd.Series, pd.Series]: """This weighting scheme was adapted from Lucy D’Agostino McGowan's blog on Propensity Score Weights referenced in @@ -184,7 +191,9 @@ def make_overlap_adjustments(self, ps): weighted_outcome0 = (1 - t[t == 0]) * outcome_ntrt * i_propensity0 return weighted_outcome0, weighted_outcome1, n_ntrt, n_trt - def make_doubly_robust_adjustment(self, ps): + def make_doubly_robust_adjustment( + self, ps: np.ndarray + ) -> tuple[pd.Series, pd.Series, None, None]: """The doubly robust weighting scheme is also discussed in Aronow and Miller, but a bit more generally than our implementation here. Here we have specified @@ -203,7 +212,9 @@ def make_doubly_robust_adjustment(self, ps): weighted_outcome1 = t * (self.y - m1_pred) / X["ps"] + m1_pred return weighted_outcome0, weighted_outcome1, None, None - def get_ate(self, i, idata, method="doubly_robust"): + def get_ate( + self, i: int, idata: az.InferenceData, method: str = "doubly_robust" + ) -> list[float]: ### Post processing the sample posterior distribution for propensity scores ### One sample at a time. ps = idata["posterior"]["p"].stack(z=("chain", "draw"))[:, i].values @@ -231,23 +242,27 @@ def get_ate(self, i, idata, method="doubly_robust"): weighted_outcome_trt, n_ntrt, n_trt, - ) = self.make_overlap_adjustments(ps) - ntrt = np.sum(weighted_outcome_ntrt) / np.sum(n_ntrt) - trt = np.sum(weighted_outcome_trt) / np.sum(n_trt) + ) = self.make_overlap_adjustments(ps) # type: ignore[assignment] + ntrt = np.sum(weighted_outcome_ntrt) / np.sum(n_ntrt) # type: ignore[arg-type] + trt = np.sum(weighted_outcome_trt) / np.sum(n_trt) # type: ignore[arg-type] else: ( weighted_outcome_ntrt, weighted_outcome_trt, n_ntrt, n_trt, - ) = self.make_doubly_robust_adjustment(ps) + ) = self.make_doubly_robust_adjustment(ps) # type: ignore[assignment] trt = np.mean(weighted_outcome_trt) ntrt = np.mean(weighted_outcome_ntrt) ate = trt - ntrt return [ate, trt, ntrt] def plot_ate( - self, idata=None, method=None, prop_draws=100, ate_draws=300 + self, + idata: az.InferenceData | None = None, + method: str | None = None, + prop_draws: int = 100, + ate_draws: int = 300, ) -> tuple[plt.Figure, List[plt.Axes]]: if idata is None: idata = self.model.idata @@ -376,7 +391,9 @@ def make_hists(idata, i, axs, method=method): return fig, axs - def weighted_percentile(self, data, weights, perc): + def weighted_percentile( + self, data: np.ndarray, weights: np.ndarray, perc: float + ) -> float: """ perc : percentile in [0-1]! """ @@ -391,7 +408,10 @@ def weighted_percentile(self, data, weights, perc): return np.interp(perc, cdf, data) def plot_balance_ecdf( - self, covariate, idata=None, weighting_scheme=None + self, + covariate: str, + idata: az.InferenceData | None = None, + weighting_scheme: str | None = None, ) -> tuple[plt.Figure, List[plt.Axes]]: """ Plotting function takes a single covariate and shows the diff --git a/causalpy/experiments/prepostnegd.py b/causalpy/experiments/prepostnegd.py index 32c1ceb1..f67b0008 100644 --- a/causalpy/experiments/prepostnegd.py +++ b/causalpy/experiments/prepostnegd.py @@ -94,10 +94,14 @@ def __init__( formula: str, group_variable_name: str, pretreatment_variable_name: str, - model=None, - **kwargs, - ): + model: PyMCModel | None = None, + **kwargs: dict, + ) -> None: super().__init__(model=model) + self.causal_impact: xr.DataArray + self.pred_xi: np.ndarray + self.pred_untreated: az.InferenceData + self.pred_treated: az.InferenceData self.data = data self.expt_type = "Pretest/posttest Nonequivalent Group Design" self.formula = formula @@ -140,6 +144,7 @@ def __init__( else: raise ValueError("Model type not recognized") + assert self.model.idata is not None # Calculate the posterior predictive for the treatment and control for an # interpolated set of pretest values # get the model predictions of the observed data @@ -197,7 +202,7 @@ def _get_treatment_effect_coeff(self) -> str: raise NameError("Unable to find coefficient name for the treatment effect") - def _causal_impact_summary_stat(self, round_to) -> str: + def _causal_impact_summary_stat(self, round_to: int | None = 2) -> str: """Computes the mean and 94% credible interval bounds for the causal impact.""" percentiles = self.causal_impact.quantile([0.03, 1 - 0.03]).values ci = ( @@ -207,7 +212,7 @@ def _causal_impact_summary_stat(self, round_to) -> str: causal_impact = f"{round_num(self.causal_impact.mean(), round_to)}, " return f"Causal impact = {causal_impact + ci}" - def summary(self, round_to=None) -> None: + def summary(self, round_to: int | None = None) -> None: """Print summary of main results and model coefficients. :param round_to: @@ -221,7 +226,7 @@ def summary(self, round_to=None) -> None: self.print_coefficients(round_to) def _bayesian_plot( - self, round_to=None, **kwargs + self, round_to: int | None = None, **kwargs: dict ) -> tuple[plt.Figure, List[plt.Axes]]: """Generate plot for ANOVA-like experiments with non-equivalent group designs.""" fig, ax = plt.subplots( diff --git a/causalpy/experiments/regression_discontinuity.py b/causalpy/experiments/regression_discontinuity.py index ec24ba0b..8f4d45bb 100644 --- a/causalpy/experiments/regression_discontinuity.py +++ b/causalpy/experiments/regression_discontinuity.py @@ -16,6 +16,8 @@ """ import warnings # noqa: I001 +from typing import Union + import numpy as np import pandas as pd @@ -86,12 +88,12 @@ def __init__( data: pd.DataFrame, formula: str, treatment_threshold: float, - model=None, + model: Union[PyMCModel, RegressorMixin] | None = None, running_variable_name: str = "x", epsilon: float = 0.001, bandwidth: float = np.inf, - **kwargs, - ): + **kwargs: dict, + ) -> None: super().__init__(model=model) self.expt_type = "Regression Discontinuity" self.data = data @@ -198,7 +200,7 @@ def __init__( ) - np.squeeze(self.pred_discon[0]) # ****************************************************************************** - def input_validation(self): + def input_validation(self) -> None: """Validate the input data and model formula for correctness""" if "treated" not in self.formula: raise FormulaException( @@ -216,7 +218,7 @@ def input_validation(self): self.data = self.data.copy() self.data["treated"] = self.data["treated"].astype(bool) - def _is_treated(self, x): + def _is_treated(self, x: Union[np.ndarray, pd.Series]) -> np.ndarray: """Returns ``True`` if `x` is greater than or equal to the treatment threshold. .. warning:: @@ -225,7 +227,7 @@ def _is_treated(self, x): """ return np.greater_equal(x, self.treatment_threshold) - def summary(self, round_to=None) -> None: + def summary(self, round_to: int | None = None) -> None: """ Print summary of main results and model coefficients @@ -243,7 +245,9 @@ def summary(self, round_to=None) -> None: print("\n") self.print_coefficients(round_to) - def _bayesian_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, plt.Axes]: + def _bayesian_plot( + self, round_to: int | None = 2, **kwargs: dict + ) -> tuple[plt.Figure, plt.Axes]: """Generate plot for regression discontinuity designs.""" fig, ax = plt.subplots() # Plot raw data @@ -292,7 +296,9 @@ def _bayesian_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, plt.Axes] ) return (fig, ax) - def _ols_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, plt.Axes]: + def _ols_plot( + self, round_to: int | None = None, **kwargs: dict + ) -> tuple[plt.Figure, plt.Axes]: """Generate plot for regression discontinuity designs.""" fig, ax = plt.subplots() # Plot raw data @@ -312,7 +318,7 @@ def _ols_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, plt.Axes]: label="model fit", ) # create strings to compose title - r2 = f"$R^2$ on all data = {round_num(self.score, round_to)}" + r2 = f"$R^2$ on all data = {round_num(float(self.score), round_to)}" discon = f"Discontinuity at threshold = {round_num(self.discontinuity_at_threshold, round_to)}" ax.set(title=r2 + "\n" + discon) # Intervention line diff --git a/causalpy/experiments/regression_kink.py b/causalpy/experiments/regression_kink.py index 9e1f3aa5..fe58e079 100644 --- a/causalpy/experiments/regression_kink.py +++ b/causalpy/experiments/regression_kink.py @@ -17,6 +17,8 @@ """ import warnings # noqa: I001 +from typing import Union + from matplotlib import pyplot as plt import numpy as np @@ -49,12 +51,12 @@ def __init__( data: pd.DataFrame, formula: str, kink_point: float, - model=None, + model: BaseExperiment | None = None, running_variable_name: str = "x", epsilon: float = 0.001, bandwidth: float = np.inf, - **kwargs, - ): + **kwargs: dict, + ) -> None: super().__init__(model=model) self.expt_type = "Regression Kink" self.data = data @@ -130,7 +132,7 @@ def __init__( mu_kink_left, mu_kink, mu_kink_right, epsilon ) - def input_validation(self): + def input_validation(self) -> None: """Validate the input data and model formula for correctness""" if "treated" not in self.formula: raise FormulaException( @@ -149,7 +151,12 @@ def input_validation(self): raise ValueError("Epsilon must be greater than zero.") @staticmethod - def _eval_gradient_change(mu_kink_left, mu_kink, mu_kink_right, epsilon): + def _eval_gradient_change( + mu_kink_left: xr.DataArray, + mu_kink: xr.DataArray, + mu_kink_right: xr.DataArray, + epsilon: float, + ) -> xr.DataArray: """Evaluate the gradient change at the kink point. It works by evaluating the model below the kink point, at the kink point, and above the kink point. @@ -160,7 +167,7 @@ def _eval_gradient_change(mu_kink_left, mu_kink, mu_kink_right, epsilon): gradient_change = gradient_right - gradient_left return gradient_change - def _probe_kink_point(self): + def _probe_kink_point(self) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]: """Probe the kink point to evaluate the predicted outcome at the kink point and either side.""" # Create a dataframe to evaluate predicted outcome at the kink point and either @@ -185,11 +192,11 @@ def _probe_kink_point(self): mu_kink_right = predicted["posterior_predictive"].sel(obs_ind=2)["mu"] return mu_kink_left, mu_kink, mu_kink_right - def _is_treated(self, x): + def _is_treated(self, x: Union[np.ndarray, pd.Series]) -> np.ndarray: """Returns ``True`` if `x` is greater than or equal to the treatment threshold.""" # noqa: E501 return np.greater_equal(x, self.kink_point) - def summary(self, round_to=None) -> None: + def summary(self, round_to: int | None = 2) -> None: """Print summary of main results and model coefficients. :param round_to: @@ -208,7 +215,9 @@ def summary(self, round_to=None) -> None: ) self.print_coefficients(round_to) - def _bayesian_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, plt.Axes]: + def _bayesian_plot( + self, round_to: int | None = 2, **kwargs: dict + ) -> tuple[plt.Figure, plt.Axes]: """Generate plot for regression kink designs.""" fig, ax = plt.subplots() # Plot raw data @@ -231,15 +240,15 @@ def _bayesian_plot(self, round_to=None, **kwargs) -> tuple[plt.Figure, plt.Axes] labels = ["Posterior mean"] # create strings to compose title - title_info = f"{round_num(self.score['unit_0_r2'], round_to)} (std = {round_num(self.score['unit_0_r2_std'], round_to)})" + title_info = f"{round_num(self.score['unit_0_r2'], round_to if round_to is not None else 2)} (std = {round_num(self.score['unit_0_r2_std'], round_to if round_to is not None else 2)})" r2 = f"Bayesian $R^2$ on all data = {title_info}" percentiles = self.gradient_change.quantile([0.03, 1 - 0.03]).values ci = ( r"$CI_{94\%}$" - + f"[{round_num(percentiles[0], round_to)}, {round_num(percentiles[1], round_to)}]" + + f"[{round_num(percentiles[0], round_to if round_to is not None else 2)}, {round_num(percentiles[1], round_to if round_to is not None else 2)}]" ) grad_change = f""" - Change in gradient = {round_num(self.gradient_change.mean(), round_to)}, + Change in gradient = {round_num(self.gradient_change.mean(), round_to if round_to is not None else 2)}, """ ax.set(title=r2 + "\n" + grad_change + ci) # Intervention line diff --git a/causalpy/experiments/synthetic_control.py b/causalpy/experiments/synthetic_control.py index c1060638..3a1692e0 100644 --- a/causalpy/experiments/synthetic_control.py +++ b/causalpy/experiments/synthetic_control.py @@ -15,7 +15,7 @@ Synthetic Control Experiment """ -from typing import List, Optional, Union +from typing import List, Union import arviz as az import numpy as np @@ -86,8 +86,8 @@ def __init__( treatment_time: Union[int, float, pd.Timestamp], control_units: list[str], treated_units: list[str], - model=None, - **kwargs, + model: Union[PyMCModel, RegressorMixin] | None = None, + **kwargs: dict, ) -> None: super().__init__(model=model) # rename the index to "obs_ind" @@ -185,7 +185,9 @@ def __init__( self.post_impact ) - def input_validation(self, data, treatment_time): + def input_validation( + self, data: pd.DataFrame, treatment_time: Union[int, float, pd.Timestamp] + ) -> None: """Validate the input data and model formula for correctness""" if isinstance(data.index, pd.DatetimeIndex) and not isinstance( treatment_time, pd.Timestamp @@ -200,7 +202,7 @@ def input_validation(self, data, treatment_time): "If data.index is not DatetimeIndex, treatment_time must be pd.Timestamp." # noqa: E501 ) - def summary(self, round_to=None) -> None: + def summary(self, round_to: int | None = None) -> None: """Print summary of main results and model coefficients. :param round_to: @@ -215,7 +217,10 @@ def summary(self, round_to=None) -> None: self.print_coefficients(round_to) def _bayesian_plot( - self, round_to=None, treated_unit: str | None = None, **kwargs + self, + round_to: int | None = None, + treated_unit: str | None = None, + **kwargs: dict, ) -> tuple[plt.Figure, List[plt.Axes]]: """ Plot the results for a specific treated unit @@ -366,7 +371,10 @@ def _bayesian_plot( return fig, ax def _ols_plot( - self, round_to=None, treated_unit: str | None = None, **kwargs + self, + round_to: int | None = None, + treated_unit: str | None = None, + **kwargs: dict, ) -> tuple[plt.Figure, List[plt.Axes]]: """ Plot the results for OLS model for a specific treated unit @@ -569,16 +577,20 @@ def get_plot_data_bayesian( return self.plot_data - def _get_score_title( - self, treated_unit: str, round_to: Optional[int] = None - ) -> str: + def _get_score_title(self, treated_unit: str, round_to: int | None = 2) -> str: """Generate appropriate score title for the specified treated unit""" if isinstance(self.model, PyMCModel): # Bayesian model - get unit-specific R² scores using unified format unit_index = self.treated_units.index(treated_unit) - r2_val = round_num(self.score[f"unit_{unit_index}_r2"], round_to) - r2_std_val = round_num(self.score[f"unit_{unit_index}_r2_std"], round_to) + r2_val = round_num( + self.score[f"unit_{unit_index}_r2"], + round_to if round_to is not None else 2, + ) + r2_std_val = round_num( + self.score[f"unit_{unit_index}_r2_std"], + round_to if round_to is not None else 2, + ) return f"Pre-intervention Bayesian $R^2$: {r2_val} (std = {r2_std_val})" else: # OLS model - simple float score - return f"$R^2$ on pre-intervention data = {round_num(self.score, round_to)}" + return f"$R^2$ on pre-intervention data = {round_num(float(self.score), round_to if round_to is not None else 2)}" diff --git a/causalpy/plot_utils.py b/causalpy/plot_utils.py index 4eefa173..2a65d62a 100644 --- a/causalpy/plot_utils.py +++ b/causalpy/plot_utils.py @@ -15,7 +15,7 @@ Plotting utility functions. """ -from typing import Any, Dict, Optional, Tuple, Union +from typing import Any, Dict, Tuple, Union import arviz as az import matplotlib.pyplot as plt @@ -24,31 +24,39 @@ import xarray as xr from matplotlib.collections import PolyCollection from matplotlib.lines import Line2D +from pandas.api.extensions import ExtensionArray def plot_xY( - x: Union[pd.DatetimeIndex, np.array], + x: Union[pd.DatetimeIndex, np.ndarray, pd.Index, pd.Series, ExtensionArray], Y: xr.DataArray, ax: plt.Axes, - plot_hdi_kwargs: Optional[Dict[str, Any]] = None, + plot_hdi_kwargs: Dict[str, Any] | None = None, hdi_prob: float = 0.94, - label: Union[str, None] = None, + label: str | None = None, ) -> Tuple[Line2D, PolyCollection]: - """ - Utility function to plot HDI intervals. - - :param x: - Pandas datetime index or numpy array of x-axis values - :param y: - Xarray data array of y-axis data - :param ax: - Matplotlib ax object - :param plot_hdi_kwargs: - Dictionary of keyword arguments passed to ax.plot() - :param hdi_prob: - The size of the HDI, default is 0.94 - :param label: - The plot label + """Plot HDI intervals. + + Parameters + ---------- + x : pd.DatetimeIndex, np.ndarray, pd.Index, pd.Series, or ExtensionArray + Pandas datetime index or numpy array of x-axis values. + Y : xr.DataArray + Xarray data array of y-axis data. + ax : plt.Axes + Matplotlib axes object. + plot_hdi_kwargs : dict, optional + Dictionary of keyword arguments passed to ax.plot(). + hdi_prob : float, optional + The size of the HDI. Default is 0.94. + label : str, optional + The plot label. + + Returns + ------- + tuple + Tuple of (Line2D, PolyCollection) handles for the plot line and + HDI patch. """ if plot_hdi_kwargs is None: @@ -85,13 +93,20 @@ def get_hdi_to_df( x: xr.DataArray, hdi_prob: float = 0.94, ) -> pd.DataFrame: - """ - Utility function to calculate and recover HDI intervals. + """Calculate and recover HDI intervals. + + Parameters + ---------- + x : xr.DataArray + Xarray data array. + hdi_prob : float, optional + The size of the HDI. Default is 0.94. - :param x: - Xarray data array - :param hdi_prob: - The size of the HDI, default is 0.94 + Returns + ------- + pd.DataFrame + DataFrame containing the HDI intervals with 'lower' and 'higher' + columns. """ hdi_result = az.hdi(x, hdi_prob=hdi_prob) diff --git a/causalpy/pymc_models.py b/causalpy/pymc_models.py index 75dc14a7..e4f82624 100644 --- a/causalpy/pymc_models.py +++ b/causalpy/pymc_models.py @@ -13,7 +13,7 @@ # limitations under the License. """Custom PyMC models for causal inference""" -from typing import Any, Dict, Optional +from typing import Any, Dict import arviz as az import numpy as np @@ -91,7 +91,7 @@ class PyMCModel(pm.Model): Inference data... """ - default_priors = {} + default_priors: Dict[str, Prior] = {} def priors_from_data(self, X, y) -> Dict[str, Any]: """ @@ -169,12 +169,19 @@ def priors_from_data(self, X, y) -> Dict[str, Any]: def __init__( self, - sample_kwargs: Optional[Dict[str, Any]] = None, + sample_kwargs: Dict[str, Any] | None = None, priors: dict[str, Any] | None = None, - ): + ) -> None: """ - :param sample_kwargs: A dictionary of kwargs that get unpacked and passed to the - :func:`pymc.sample` function. Defaults to an empty dictionary. + Parameters + ---------- + sample_kwargs : dict, optional + Dictionary of kwargs that get unpacked and passed to the + :func:`pymc.sample` function. Defaults to an empty dictionary + if None. + priors : dict, optional + Dictionary of priors for the model. Defaults to None, in which + case default priors are used. """ super().__init__() self.idata = None @@ -182,8 +189,9 @@ def __init__( self.priors = {**self.default_priors, **(priors or {})} - def build_model(self, X, y, coords) -> None: - """Build the model, must be implemented by subclass.""" + def build_model( + self, X: xr.DataArray, y: xr.DataArray, coords: Dict[str, Any] | None + ) -> None: raise NotImplementedError( "This method must be implemented by a subclass" ) # pragma: no cover @@ -220,9 +228,26 @@ def _data_setter(self, X: xr.DataArray) -> None: coords={"obs_ind": obs_coords}, ) - def fit(self, X, y, coords: Optional[Dict[str, Any]] = None) -> None: - """Draw samples from posterior, prior predictive, and posterior predictive - distributions, placing them in the model's idata attribute. + def fit( + self, X: xr.DataArray, y: xr.DataArray, coords: Dict[str, Any] | None = None + ) -> az.InferenceData: + """Draw samples from posterior, prior predictive, and posterior + predictive distributions. + + Parameters + ---------- + X : xr.DataArray + Input features as an xarray DataArray. + y : xr.DataArray + Target variable as an xarray DataArray. + coords : dict, optional + Dictionary with coordinate names for named dimensions. + Defaults to None. + + Returns + ------- + az.InferenceData + InferenceData object containing the samples. """ # Ensure random_seed is used in sample_prior_predictive() and @@ -236,6 +261,8 @@ def fit(self, X, y, coords: Optional[Dict[str, Any]] = None) -> None: self.build_model(X, y, coords) with self: self.idata = pm.sample(**self.sample_kwargs) + if self.idata is None: + raise RuntimeError("pm.sample() returned None") self.idata.extend(pm.sample_prior_predictive(random_seed=random_seed)) self.idata.extend( pm.sample_posterior_predictive( @@ -244,7 +271,7 @@ def fit(self, X, y, coords: Optional[Dict[str, Any]] = None) -> None: ) return self.idata - def predict(self, X: xr.DataArray): + def predict(self, X: xr.DataArray) -> az.InferenceData: """ Predict data given input data `X` @@ -345,10 +372,25 @@ def calculate_impact( impact = y_true - y_pred["posterior_predictive"]["mu"] return impact.transpose(..., "obs_ind") - def calculate_cumulative_impact(self, impact): + def calculate_cumulative_impact(self, impact: xr.DataArray) -> xr.DataArray: return impact.cumsum(dim="obs_ind") - def print_coefficients(self, labels, round_to=None) -> None: + def print_coefficients( + self, labels: list[str], round_to: int | None = None + ) -> None: + """Print the model coefficients with their labels. + + Parameters + ---------- + labels : list of str + List of strings representing the coefficient names. + round_to : int, optional + Number of significant figures to round to. Defaults to None, + in which case 2 significant figures are used. + """ + if self.idata is None: + raise RuntimeError("Model has not been fit") + def print_row( max_label_length: int, name: str, coeff_samples: xr.DataArray, round_to: int ) -> None: @@ -446,13 +488,15 @@ class LinearRegression(PyMCModel): ), } - def build_model(self, X, y, coords): + def build_model( + self, X: xr.DataArray, y: xr.DataArray, coords: Dict[str, Any] | None + ) -> None: """ Defines the PyMC model """ with self: # Ensure treated_units coordinate exists for consistency - if "treated_units" not in coords: + if coords is not None and "treated_units" not in coords: coords = coords.copy() coords["treated_units"] = ["unit_0"] @@ -543,7 +587,9 @@ def priors_from_data(self, X, y) -> Dict[str, Any]: ), } - def build_model(self, X, y, coords): + def build_model( + self, X: xr.DataArray, y: xr.DataArray, coords: Dict[str, Any] | None + ) -> None: """ Defines the PyMC model """ @@ -604,20 +650,36 @@ class InstrumentalVariableRegression(PyMCModel): Inference data... """ - def build_model(self, X, Z, y, t, coords, priors): - """Specify model with treatment regression and focal regression data and priors - - :param X: A pandas dataframe used to predict our outcome y - :param Z: A pandas dataframe used to predict our treatment variable t - :param y: An array of values representing our focal outcome y - :param t: An array of values representing the treatment t of - which we're interested in estimating the causal impact - :param coords: A dictionary with the coordinate names for our - instruments and covariates - :param priors: An optional dictionary of priors for the mus and - sigmas of both regressions - :code:`priors = {"mus": [0, 0], "sigmas": [1, 1], - "eta": 2, "lkj_sd": 2}` + def build_model( # type: ignore + self, + X: np.ndarray, + Z: np.ndarray, + y: np.ndarray, + t: np.ndarray, + coords: Dict[str, Any], + priors: Dict[str, Any], + ) -> None: + """Specify model with treatment regression and focal regression + data and priors. + + Parameters + ---------- + X : np.ndarray + Array used to predict our outcome y. + Z : np.ndarray + Array used to predict our treatment variable t. + y : np.ndarray + Array of values representing our focal outcome y. + t : np.ndarray + Array representing the treatment t of which we're interested + in estimating the causal impact. + coords : dict + Dictionary with the coordinate names for our instruments and + covariates. + priors : dict + Dictionary of priors for the mus and sigmas of both + regressions. Example: ``priors = {"mus": [0, 0], + "sigmas": [1, 1], "eta": 2, "lkj_sd": 2}``. """ # --- Priors --- @@ -661,7 +723,7 @@ def build_model(self, X, Z, y, t, coords, priors): shape=(X.shape[0], 2), ) - def sample_predictive_distribution(self, ppc_sampler="jax"): + def sample_predictive_distribution(self, ppc_sampler: str | None = "jax") -> None: """Function to sample the Multivariate Normal posterior predictive Likelihood term in the IV class. This can be slow without using the JAX sampler compilation method. If using the @@ -671,32 +733,65 @@ def sample_predictive_distribution(self, ppc_sampler="jax"): random_seed = self.sample_kwargs.get("random_seed", None) if ppc_sampler == "jax": - with self: - self.idata.extend( - pm.sample_posterior_predictive( - self.idata, - random_seed=random_seed, - compile_kwargs={"mode": "JAX"}, + if self.idata is not None: + with self: + self.idata.extend( + pm.sample_posterior_predictive( + self.idata, + random_seed=random_seed, + compile_kwargs={"mode": "JAX"}, + ) ) - ) elif ppc_sampler == "pymc": - with self: - self.idata.extend(pm.sample_prior_predictive(random_seed=random_seed)) - self.idata.extend( - pm.sample_posterior_predictive( - self.idata, - random_seed=random_seed, + if self.idata is not None: + with self: + self.idata.extend( + pm.sample_prior_predictive(random_seed=random_seed) ) - ) + self.idata.extend( + pm.sample_posterior_predictive( + self.idata, + random_seed=random_seed, + ) + ) + + def fit( # type: ignore + self, + X: np.ndarray, + Z: np.ndarray, + y: np.ndarray, + t: np.ndarray, + coords: Dict[str, Any], + priors: Dict[str, Any], + ppc_sampler: str | None = None, + ) -> az.InferenceData: + """Draw samples from posterior distribution and potentially from + the prior and posterior predictive distributions. - def fit(self, X, Z, y, t, coords, priors, ppc_sampler=None): - """Draw samples from posterior distribution and potentially - from the prior and posterior predictive distributions. The - fit call can take values for the - ppc_sampler = ['jax', 'pymc', None] - We default to None, so the user can determine if they wish - to spend time sampling the posterior predictive distribution - independently. + Parameters + ---------- + X : np.ndarray + Array used to predict our outcome y. + Z : np.ndarray + Array used to predict our treatment variable t. + y : np.ndarray + Array of values representing our focal outcome y. + t : np.ndarray + Array representing the treatment variable. + coords : dict + Dictionary with coordinate names for named dimensions. + priors : dict + Dictionary of priors for the model. + ppc_sampler : str, optional + Sampler for posterior predictive distribution. Can be 'jax', + 'pymc', or None. Defaults to None, so the user can determine + if they wish to spend time sampling the posterior predictive + distribution independently. + + Returns + ------- + az.InferenceData + InferenceData object containing the samples. """ # Ensure random_seed is used in sample_prior_predictive() and @@ -749,7 +844,14 @@ class PropensityScore(PyMCModel): "b": Prior("Normal", mu=0, sigma=1, dims="coeffs"), } - def build_model(self, X, t, coords, prior=None, noncentred=True): + def build_model( # type: ignore + self, + X: np.ndarray, + t: np.ndarray, + coords: Dict[str, Any], + prior: Dict[str, Any] | None = None, + noncentred: bool = True, + ) -> None: "Defines the PyMC propensity model" with self: self.add_coords(coords) @@ -760,7 +862,14 @@ def build_model(self, X, t, coords, prior=None, noncentred=True): p = pm.Deterministic("p", pm.math.invlogit(mu)) pm.Bernoulli("t_pred", p=p, observed=t_data, dims="obs_ind") - def fit(self, X, t, coords, prior={"b": [0, 1]}, noncentred=True): + def fit( # type: ignore + self, + X: np.ndarray, + t: np.ndarray, + coords: Dict[str, Any], + prior: Dict[str, list] = {"b": [0, 1]}, + noncentred: bool = True, + ) -> az.InferenceData: """Draw samples from posterior, prior predictive, and posterior predictive distributions. We overwrite the base method because the base method assumes a variable y and we use t to indicate the treatment variable here. @@ -772,29 +881,30 @@ def fit(self, X, t, coords, prior={"b": [0, 1]}, noncentred=True): self.build_model(X, t, coords, prior, noncentred) with self: self.idata = pm.sample(**self.sample_kwargs) - self.idata.extend(pm.sample_prior_predictive(random_seed=random_seed)) - self.idata.extend( - pm.sample_posterior_predictive( - self.idata, progressbar=False, random_seed=random_seed + if self.idata is not None: + self.idata.extend(pm.sample_prior_predictive(random_seed=random_seed)) + self.idata.extend( + pm.sample_posterior_predictive( + self.idata, progressbar=False, random_seed=random_seed + ) ) - ) return self.idata def fit_outcome_model( self, - X_outcome, - y, - coords, - priors={ + X_outcome: pd.DataFrame, + y: pd.Series, + coords: Dict[str, Any], + priors: Dict[str, Any] = { "b_outcome": [0, 1], "sigma": 1, "beta_ps": [0, 1], }, - noncentred=True, - normal_outcome=True, - spline_component=False, - winsorize_boundary=0.0, - ): + noncentred: bool = True, + normal_outcome: bool = True, + spline_component: bool = False, + winsorize_boundary: float = 0.0, + ) -> tuple[az.InferenceData, pm.Model]: """ Fit a Bayesian outcome model using covariates and previously estimated propensity scores. diff --git a/causalpy/skl_models.py b/causalpy/skl_models.py index 5aaca205..5100ec02 100644 --- a/causalpy/skl_models.py +++ b/causalpy/skl_models.py @@ -26,16 +26,29 @@ class ScikitLearnAdaptor: """Base class for scikit-learn models that can be used for causal inference.""" - def calculate_impact(self, y_true, y_pred): + coef_: np.ndarray + + def calculate_impact(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray: """Calculate the causal impact of the intervention.""" return y_true - y_pred - def calculate_cumulative_impact(self, impact): + def calculate_cumulative_impact(self, impact: np.ndarray) -> np.ndarray: """Calculate the cumulative impact intervention.""" return np.cumsum(impact) - def print_coefficients(self, labels, round_to=None) -> None: - """Print the coefficients of the model with the corresponding labels.""" + def print_coefficients( + self, labels: list[str], round_to: int | None = None + ) -> None: + """Print the coefficients of the model with the corresponding labels. + + Parameters + ---------- + labels : list of str + List of strings representing the coefficient names. + round_to : int, optional + Number of significant figures to round to. Defaults to None, + in which case 2 significant figures are used. + """ print("Model coefficients:") coef_ = self.get_coeffs() # Determine the width of the longest label @@ -45,10 +58,12 @@ def print_coefficients(self, labels, round_to=None) -> None: # Left-align the name formatted_name = f"{name:<{max_label_length}}" # Right-align the value with width 10 - formatted_val = f"{round_num(val, round_to):>10}" + formatted_val = ( + f"{round_num(val, round_to if round_to is not None else 2):>10}" + ) print(f" {formatted_name}\t{formatted_val}") - def get_coeffs(self): + def get_coeffs(self) -> np.ndarray: """Get the coefficients of the model as a numpy array.""" return np.squeeze(self.coef_) @@ -57,11 +72,11 @@ class WeightedProportion(ScikitLearnAdaptor, LinearModel, RegressorMixin): """Weighted proportion model for causal inference. Used for synthetic control methods for example""" - def loss(self, W, X, y): + def loss(self, W: np.ndarray, X: np.ndarray, y: np.ndarray) -> float: """Compute root mean squared loss with data X, weights W, and predictor y""" return np.sqrt(np.mean((y - np.dot(X, W.T)) ** 2)) - def fit(self, X, y): + def fit(self, X: np.ndarray, y: np.ndarray) -> "WeightedProportion": """Fit model on data X with predictor y""" w_start = [1 / X.shape[1]] * X.shape[1] coef_ = fmin_slsqp( @@ -75,7 +90,7 @@ def fit(self, X, y): self.mse = self.loss(W=self.coef_, X=X, y=y) return self - def predict(self, X): + def predict(self, X: np.ndarray) -> np.ndarray: """Predict results for data X""" return np.dot(X, self.coef_.T) @@ -89,7 +104,9 @@ def create_causalpy_compatible_class( return estimator -def _add_mixin_methods(model_instance, mixin_class): +def _add_mixin_methods( + model_instance: RegressorMixin, mixin_class: type +) -> RegressorMixin: """Utility function to bind mixin methods to an existing model instance.""" for attr_name in dir(mixin_class): attr = getattr(mixin_class, attr_name) diff --git a/causalpy/utils.py b/causalpy/utils.py index 5b7c601b..4d1a60c8 100644 --- a/causalpy/utils.py +++ b/causalpy/utils.py @@ -34,22 +34,27 @@ def _series_has_2_levels(series: pd.Series) -> bool: return len(pd.Categorical(series).categories) == 2 -def round_num(n, round_to): - """ - Return a string representing a number with `round_to` significant figures. +def round_num(n: float, round_to: int | None) -> str: + """Return a string representing a number with significant figures. Parameters ---------- n : float - number to round - round_to : int - number of significant figures + Number to round. + round_to : int, optional + Number of significant figures. If None, defaults to 2. + + Returns + ------- + str + String representation of the number with specified significant + figures. """ sig_figs = _format_sig_figs(n, round_to) return f"{n:.{sig_figs}g}" -def _format_sig_figs(value, default=None): +def _format_sig_figs(value: float, default: int | None = None) -> int: """Get a default number of significant figures. Gives the integer part or `default`, whichever is bigger. @@ -68,8 +73,27 @@ def _format_sig_figs(value, default=None): return max(int(np.log10(np.abs(value))) + 1, default) -def convert_to_string(x: Union[float, xr.DataArray], round_to: int = 2) -> str: - """Utility function which takes in numeric inputs and returns a string.""" +def convert_to_string(x: Union[float, xr.DataArray], round_to: int | None = 2) -> str: + """Convert numeric inputs to a formatted string representation. + + Parameters + ---------- + x : float or xr.DataArray + The numeric value or xarray DataArray to convert. + round_to : int, optional + Number of significant figures to round to. Defaults to 2. + + Returns + ------- + str + Formatted string representation. For floats, returns rounded + decimal. For DataArrays, returns mean with 94% credible interval. + + Raises + ------ + ValueError + If `x` is neither a float nor an xarray DataArray. + """ if isinstance(x, float): # In the case of a float, we return the number rounded to 2 decimal places return f"{x:.2f}" diff --git a/docs/source/_static/interrogate_badge.svg b/docs/source/_static/interrogate_badge.svg index 26433625..4704ef6c 100644 --- a/docs/source/_static/interrogate_badge.svg +++ b/docs/source/_static/interrogate_badge.svg @@ -1,5 +1,5 @@ - interrogate: 95.9% + interrogate: 95.5% @@ -12,8 +12,8 @@ interrogate interrogate - 95.9% - 95.9% + 95.5% + 95.5% diff --git a/pyproject.toml b/pyproject.toml index 9dc0e453..c212f395 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,15 @@ dependencies = [ # # Similar to `dependencies` above, these must be valid existing projects. [project.optional-dependencies] -dev = ["pathlib", "pre-commit", "twine", "interrogate", "codespell", "nbformat", "nbconvert"] +dev = [ + "pathlib", + "pre-commit", + "twine", + "interrogate", + "codespell", + "nbformat", + "nbconvert", +] docs = [ "ipykernel", "daft-pgm", @@ -71,7 +79,7 @@ docs = [ "sphinx-design", "sphinx-togglebutton", ] -lint = ["interrogate", "pre-commit", "ruff"] +lint = ["interrogate", "pre-commit", "ruff", "mypy"] test = ["pytest", "pytest-cov", "codespell", "nbformat", "nbconvert"] [project.urls] @@ -129,10 +137,7 @@ ignore-words = "./docs/source/.codespell/codespell-whitelist.txt" skip = "*.ipynb,*.csv,pyproject.toml,docs/source/.codespell/codespell-whitelist.txt" [tool.coverage.run] -omit = [ - "*/conftest.py", - "*/tests/conftest.py", -] +omit = ["*/conftest.py", "*/tests/conftest.py"] [tool.coverage.report] exclude_lines = [ @@ -147,3 +152,22 @@ exclude_lines = [ "class .*\\bProtocol\\):", "@(abc\\.)?abstractmethod", ] + +[tool.mypy] +files = "causalpy/*.py" +exclude = "build|dist|docs|notebooks|tests|setup.py" + +[tool.mypy-matplotlib] +ignore_missing_imports = true + +[tool.mypy-pymc] +ignore_missing_imports = true + +[tool.mypy-seaborn] +ignore_missing_imports = true + +[tool.mypy-sklearn] +ignore_missing_imports = true + +[tool.mypy-scipy] +ignore_missing_imports = true