diff --git a/econml/tests/test_drtester.py b/econml/tests/test_drtester.py index 3396f0fe2..e80cea590 100644 --- a/econml/tests/test_drtester.py +++ b/econml/tests/test_drtester.py @@ -12,7 +12,7 @@ class TestDRTester(unittest.TestCase): @staticmethod - def _get_data(num_treatments=1): + def _get_data(num_treatments=1, use_sample_weights=False): np.random.seed(576) N = 20000 # number of units @@ -50,10 +50,21 @@ def _get_data(num_treatments=1): train_ind = np.random.choice(N, int(train_N), replace=False) val_ind = ind[~np.isin(ind, train_ind)] + # sample weights + if use_sample_weights: + sample_weights = np.random.randint(1, 1000, size=N) + Xtrain, Dtrain, Ytrain = X[train_ind], D[train_ind], Y[train_ind] Xval, Dval, Yval = X[val_ind], D[val_ind], Y[val_ind] - return Xtrain, Dtrain, Ytrain, Xval, Dval, Yval + if use_sample_weights: + sampleweightstrain = sample_weights[train_ind] + sampleweightsval = sample_weights[val_ind] + + if use_sample_weights: + return Xtrain, Dtrain, Ytrain, Xval, Dval, Yval, sampleweightstrain, sampleweightsval + else: + return Xtrain, Dtrain, Ytrain, Xval, Dval, Yval def test_multi(self): Xtrain, Dtrain, Ytrain, Xval, Dval, Yval = self._get_data(num_treatments=2) @@ -286,3 +297,192 @@ def test_exceptions(self): autoc_res = my_dr_tester.evaluate_uplift(Xval, Xtrain, metric='toc') self.assertLess(autoc_res.pvals[0], 0.05) + + def test_multi_with_weights(self): + (Xtrain, + Dtrain, + Ytrain, + Xval, + Dval, + Yval, + sampleweightstrain, + sampleweightsval) = self._get_data(num_treatments=2, use_sample_weights=True) + + # Simple classifier and regressor for propensity, outcome, and cate + reg_t = RandomForestClassifier(random_state=0) + reg_y = GradientBoostingRegressor(random_state=0) + + cate = DML( + model_y=reg_y, + model_t=reg_t, + model_final=reg_y, + discrete_treatment=True + ).fit(Y=Ytrain, + T=Dtrain, + X=Xtrain, + sample_weight=sampleweightstrain) + + # test the DR outcome difference + my_dr_tester = DRTester( + model_regression=reg_y, + model_propensity=reg_t, + cate=cate + ).fit_nuisance( + Xval, + Dval, + Yval, + Xtrain, + Dtrain, + Ytrain, + sampleweightval=sampleweightsval, + sampleweighttrain=sampleweightstrain + ) + dr_outcomes = my_dr_tester.dr_val_ + + ates = np.average(dr_outcomes, axis=0, weights=sampleweightsval) + for k in range(dr_outcomes.shape[1]): + ate_errs = np.sqrt(((dr_outcomes[:, k] - ates[k]) ** 2).sum() / + (dr_outcomes.shape[0] * (dr_outcomes.shape[0] - 1))) + + self.assertLess(abs(ates[k] - (k + 1)), 2 * ate_errs) + + res = my_dr_tester.evaluate_all(Xval, Xtrain) + res_df = res.summary() + + for k in range(4): + if k in [0, 3]: + self.assertRaises(ValueError, res.plot_cal, k) + self.assertRaises(ValueError, res.plot_qini, k) + self.assertRaises(ValueError, res.plot_toc, k) + else: # real treatments, k = 1 or 2 + self.assertTrue(res.plot_cal(k) is not None) + self.assertTrue(res.plot_qini(k) is not None) + self.assertTrue(res.plot_toc(k) is not None) + + self.assertGreater(res_df.blp_pval.values[0], 0.1) # no heterogeneity + self.assertLess(res_df.blp_pval.values[1], 0.05) # heterogeneity + + self.assertLess(res_df.cal_r_squared.values[0], 0) # poor R2 + self.assertGreater(res_df.cal_r_squared.values[1], 0) # good R2 + + self.assertLess(res_df.qini_pval.values[1], res_df.qini_pval.values[0]) + self.assertLess(res_df.autoc_pval.values[1], res_df.autoc_pval.values[0]) + + def test_binary_with_weights(self): + (Xtrain, + Dtrain, + Ytrain, + Xval, + Dval, + Yval, + sampleweightstrain, + sampleweightsval) = self._get_data(num_treatments=1, use_sample_weights=True) + + # Simple classifier and regressor for propensity, outcome, and cate + reg_t = RandomForestClassifier(random_state=0) + reg_y = GradientBoostingRegressor(random_state=0) + + cate = DML( + model_y=reg_y, + model_t=reg_t, + model_final=reg_y, + discrete_treatment=True + ).fit(Y=Ytrain, + T=Dtrain, + X=Xtrain, + sample_weight=sampleweightstrain) + + # test the DR outcome difference + my_dr_tester = DRTester( + model_regression=reg_y, + model_propensity=reg_t, + cate=cate + ).fit_nuisance( + Xval, + Dval, + Yval, + Xtrain, + Dtrain, + Ytrain, + sampleweightval=sampleweightsval, + sampleweighttrain=sampleweightstrain + ) + dr_outcomes = my_dr_tester.dr_val_ + + ate = np.average(dr_outcomes, axis=0, weights=sampleweightsval) + ate_err = np.sqrt(((dr_outcomes - ate) ** 2).sum() / + (dr_outcomes.shape[0] * (dr_outcomes.shape[0] - 1))) + truth = 1 + self.assertLess(abs(ate - truth), 2 * ate_err) + + res = my_dr_tester.evaluate_all(Xval, Xtrain) + res_df = res.summary() + + for k in range(3): + if k in [0, 2]: + self.assertRaises(ValueError, res.plot_cal, k) + self.assertRaises(ValueError, res.plot_qini, k) + self.assertRaises(ValueError, res.plot_toc, k) + else: # real treatment, k = 1 + self.assertTrue(res.plot_cal(k) is not None) + self.assertTrue(res.plot_qini(k, 'ucb2') is not None) + self.assertTrue(res.plot_toc(k, 'ucb1') is not None) + + self.assertLess(res_df.blp_pval.values[0], 0.05) # heterogeneity + self.assertGreater(res_df.cal_r_squared.values[0], 0) # good R2 + self.assertLess(res_df.qini_pval.values[0], 0.05) # heterogeneity + self.assertLess(res_df.autoc_pval.values[0], 0.05) # heterogeneity + + def test_nuisance_val_fit_with_weights(self): + (Xtrain, + Dtrain, + Ytrain, + Xval, + Dval, + Yval, + sampleweightstrain, + sampleweightsval) = self._get_data(num_treatments=1, use_sample_weights=True) + + # Simple classifier and regressor for propensity, outcome, and cate + reg_t = RandomForestClassifier(random_state=0) + reg_y = GradientBoostingRegressor(random_state=0) + + cate = DML( + model_y=reg_y, + model_t=reg_t, + model_final=reg_y, + discrete_treatment=True + ).fit(Y=Ytrain, + T=Dtrain, + X=Xtrain, + sample_weight=sampleweightstrain) + + # test the DR outcome difference + my_dr_tester = DRTester( + model_regression=reg_y, + model_propensity=reg_t, + cate=cate + ).fit_nuisance(Xval, + Dval, + Yval, + sampleweightval=sampleweightsval) + + dr_outcomes = my_dr_tester.dr_val_ + + ate = np.average(dr_outcomes, axis=0, weights=sampleweightsval) + ate_err = np.sqrt(((dr_outcomes - ate) ** 2).sum() / + (dr_outcomes.shape[0] * (dr_outcomes.shape[0] - 1))) + truth = 1 + self.assertLess(abs(ate - truth), 2 * ate_err) + + # use evaluate_blp to fit on validation only + blp_res = my_dr_tester.evaluate_blp(Xval) + + self.assertLess(blp_res.pvals[0], 0.05) # heterogeneity + + for kwargs in [{}, {'Xval': Xval}]: + with self.assertRaises(Exception) as exc: + my_dr_tester.evaluate_cal(kwargs) + self.assertEqual( + str(exc.exception), "Must fit nuisance models on training sample data to use calibration test" + ) diff --git a/econml/validate/drtester.py b/econml/validate/drtester.py index c79330218..19aa27ede 100644 --- a/econml/validate/drtester.py +++ b/econml/validate/drtester.py @@ -3,16 +3,18 @@ import numpy as np import pandas as pd import scipy.stats as st +from copy import deepcopy from sklearn.model_selection import check_cv -from sklearn.model_selection import cross_val_predict, StratifiedKFold, KFold -from statsmodels.api import OLS +from sklearn.model_selection import StratifiedKFold, KFold +from statsmodels.api import WLS from statsmodels.tools import add_constant from econml.utilities import deprecated +from econml._cate_estimator import BaseCateEstimator from .results import CalibrationEvaluationResults, BLPEvaluationResults, UpliftEvaluationResults, EvaluationResults from .utils import calculate_dr_outcomes, calc_uplift - +from .weighted_utils import weighted_stat, weighted_se, cross_val_predict_with_weights class DRTester: """ @@ -20,6 +22,7 @@ class DRTester: Includes the best linear predictor (BLP) test as in Chernozhukov et al. (2022), the calibration test in Dwivedi et al. (2020), and the QINI coefficient as in Radcliffe (2007). + Has support for sample weights in all tests. **Best Linear Predictor (BLP)** @@ -124,7 +127,7 @@ def __init__( *, model_regression, model_propensity, - cate, + cate: BaseCateEstimator, cv: Union[int, List] = 5 ): self.model_regression = model_regression @@ -154,7 +157,7 @@ def get_cv_splitter(self, random_state: int = 123): splitter.random_state = random_state self.cv_splitter = splitter - def get_cv_splits(self, vars: np.array, T: np.array): + def get_cv_splits(self, vars: np.ndarray, T: np.ndarray): """ Generate splits for cross-validation, given a set of variables and treatment vector. @@ -181,12 +184,14 @@ def get_cv_splits(self, vars: np.array, T: np.array): def fit_nuisance( self, - Xval: np.array, - Dval: np.array, - yval: np.array, - Xtrain: np.array = None, - Dtrain: np.array = None, - ytrain: np.array = None, + Xval: np.ndarray, + Dval: np.ndarray, + yval: np.ndarray, + Xtrain: np.ndarray = None, + Dtrain: np.ndarray = None, + ytrain: np.ndarray = None, + sampleweightval: np.ndarray = None, + sampleweighttrain: np.ndarray = None ): """ Generate nuisance predictions and calculates doubly robust (DR) outcomes. @@ -212,6 +217,10 @@ def fit_nuisance( the control status be equal to 0, and all other treatments integers starting at 1. ytrain: vector of length n_train, optional Outcomes for the training sample + sampleweightval: vector of length n_val, optional + Sample weights for validation sample + sampleweighttrain: vector of length n_train, optional + Sample weights for training sample Returns ------- @@ -234,29 +243,59 @@ def fit_nuisance( if self.fit_on_train: # Get DR outcomes in training sample - reg_preds_train, prop_preds_train = self.fit_nuisance_cv(Xtrain, Dtrain, ytrain) - self.dr_train_ = calculate_dr_outcomes(Dtrain, ytrain, reg_preds_train, prop_preds_train) + reg_preds_train, prop_preds_train = self.fit_nuisance_cv(Xtrain, Dtrain, ytrain, sampleweighttrain) + self.dr_train_ = calculate_dr_outcomes(Dtrain, + ytrain, + reg_preds_train, + prop_preds_train, + self.treatments) + + # standardize to always have 2 dimensions + if self.dr_train_.ndim == 1: + self.dr_train_ = self.dr_train_[..., np.newaxis] # Get DR outcomes in validation sample - reg_preds_val, prop_preds_val = self.fit_nuisance_train(Xtrain, Dtrain, ytrain, Xval) - self.dr_val_ = calculate_dr_outcomes(Dval, yval, reg_preds_val, prop_preds_val) + reg_preds_val, prop_preds_val = self.fit_nuisance_train(Xtrain, + Dtrain, + ytrain, + Xval, + sampleweighttrain) + self.dr_val_ = calculate_dr_outcomes(Dval, + yval, + reg_preds_val, + prop_preds_val, + self.treatments) + + # standardize to always have 2 dimensions + if self.dr_val_.ndim == 1: + self.dr_val_ = self.dr_val_[..., np.newaxis] + else: # Get DR outcomes in validation sample - reg_preds_val, prop_preds_val = self.fit_nuisance_cv(Xval, Dval, yval) - self.dr_val_ = calculate_dr_outcomes(Dval, yval, reg_preds_val, prop_preds_val) + reg_preds_val, prop_preds_val = self.fit_nuisance_cv(Xval, Dval, yval, sampleweightval) + self.dr_val_ = calculate_dr_outcomes(Dval, + yval, + reg_preds_val, + prop_preds_val, + self.treatments) + + # standardize to always have 2 dimensions + if self.dr_val_.ndim == 1: + self.dr_val_ = self.dr_val_[..., np.newaxis] # Calculate ATE in the validation sample - self.ate_val = self.dr_val_.mean(axis=0) + self.ate_val = np.average(self.dr_val_, axis=0, weights=sampleweightval) return self def fit_nuisance_train( self, - Xtrain: np.array, - Dtrain: np.array, - ytrain: np.array, - Xval: np.array - ) -> Tuple[np.array, np.array]: + Xtrain: np.ndarray, + Dtrain: np.ndarray, + ytrain: np.ndarray, + Xval: np.ndarray, + sampleweighttrain: np.ndarray, + ) -> Tuple[np.ndarray, np.ndarray]: """ Fits nuisance models in training sample and applies to generate predictions in validation sample. @@ -272,33 +311,43 @@ def fit_nuisance_train( Outcomes for training sample Xval: (n_train x k) matrix Validation sample features used to predict both treatment status and outcomes + sampleweighttrain: array of length n_train, optional + Sample weights for training sample Returns ------- 2 (n_val x n_treatment + 1) arrays corresponding to the predicted outcomes under treatment status and predicted treatment probabilities, respectively. Both evaluated on validation set. """ + # set default weights if none provided + if sampleweighttrain is None: + sampleweighttrain = np.ones(Xtrain.shape[0]) # Fit propensity in treatment - model_propensity_fitted = self.model_propensity.fit(Xtrain, Dtrain) + self.model_propensity_ = deepcopy(self.model_propensity) + self.model_propensity_.fit(Xtrain, Dtrain, sample_weight=sampleweighttrain) # Predict propensity scores - prop_preds = model_propensity_fitted.predict_proba(Xval) + prop_preds = self.model_propensity_.predict_proba(Xval) # Possible treatments (need to allow more than 2) n = Xval.shape[0] reg_preds = np.zeros((n, self.n_treat + 1)) for i in range(self.n_treat + 1): model_regression_fitted = self.model_regression.fit( - Xtrain[Dtrain == self.treatments[i]], ytrain[Dtrain == self.treatments[i]]) + Xtrain[Dtrain == self.treatments[i]], + ytrain[Dtrain == self.treatments[i]], + sample_weight=sampleweighttrain[Dtrain == self.treatments[i]] + ) reg_preds[:, i] = model_regression_fitted.predict(Xval) return reg_preds, prop_preds def fit_nuisance_cv( self, - X: np.array, - D: np.array, - y: np.array - ) -> Tuple[np.array, np.array]: + X: np.ndarray, + D: np.ndarray, + y: np.ndarray, + sampleweight: np.ndarray = None + ) -> Tuple[np.ndarray, np.ndarray]: """ Generate nuisance function predictions using k-folds cross validation. @@ -312,14 +361,25 @@ def fit_nuisance_cv( starting at 1 y: array of length n Outcomes + sampleweight: array of length n, optional + Sample weights Returns ------- 2 (n x n_treatment + 1) arrays corresponding to the predicted outcomes under treatment status and predicted treatment probabilities, respectively. """ + # set default weights if none provided + if sampleweight is None: + sampleweight = np.ones(X.shape[0]) + splits = self.get_cv_splits([X], D) - prop_preds = cross_val_predict(self.model_propensity, X, D, cv=splits, method='predict_proba') + prop_preds = cross_val_predict_with_weights( + self.model_propensity, + X, D, + sample_weight=sampleweight, + cv=splits, + method="predict_proba") # Predict outcomes # T-learner logic @@ -327,16 +387,18 @@ def fit_nuisance_cv( reg_preds = np.zeros((N, self.n_treat + 1)) for k in range(self.n_treat + 1): for train, test in splits: - model_regression_fitted = self.model_regression.fit(X[train][D[train] == self.treatments[k]], - y[train][D[train] == self.treatments[k]]) + model_regression_fitted = self.model_regression.fit( + X[train][D[train] == self.treatments[k]], + y[train][D[train] == self.treatments[k]], + sample_weight=sampleweight[train][D[train] == self.treatments[k]]) reg_preds[test, k] = model_regression_fitted.predict(X[test]) return reg_preds, prop_preds def get_cate_preds( self, - Xval: np.array, - Xtrain: np.array = None + Xval: np.ndarray, + Xtrain: np.ndarray = None ): """ Generate predictions from fitted CATE model. @@ -360,17 +422,31 @@ def get_cate_preds( """ base = self.treatments[0] vals = [self.cate.effect(X=Xval, T0=base, T1=t) for t in self.treatments[1:]] - self.cate_preds_val_ = np.stack(vals).T + + #squeeze to avoid unnecessary dimensions with 1 value (some cases with 3 dimensions) + self.cate_preds_val_ = np.stack(vals).T.squeeze() + + # standardize to always have 2 dimensions + if self.cate_preds_val_.ndim == 1: + self.cate_preds_val_ = self.cate_preds_val_[..., np.newaxis] if Xtrain is not None: trains = [self.cate.effect(X=Xtrain, T0=base, T1=t) for t in self.treatments[1:]] - self.cate_preds_train_ = np.stack(trains).T + + #squeeze to avoid unnecessary dimensions with 1 value (some cases with 3 dimensions) + self.cate_preds_train_ = np.stack(trains).T.squeeze() + + # standardize to always have 2 dimensions + if self.cate_preds_train_.ndim == 1: + self.cate_preds_train_ = self.cate_preds_train_[..., np.newaxis] def evaluate_cal( self, - Xval: np.array = None, - Xtrain: np.array = None, - n_groups: int = 4 + Xval: np.ndarray = None, + Xtrain: np.ndarray = None, + n_groups: int = 4, + sampleweightval: np.ndarray = None, + sampleweighttrain: np.ndarray = None, ) -> CalibrationEvaluationResults: """ Implement calibration test as in [Dwivedi2020]. @@ -385,6 +461,10 @@ def evaluate_cal( implemented (with Xtrain specified) n_groups: integer, default 4 Number of quantile-based groups used to calculate calibration score. + sampleweightval: np.ndarray = None, + Sample weights for validation sample + sampleweighttrain: np.ndarray = None, + Sample weights for training sample Returns ------- @@ -399,26 +479,65 @@ def evaluate_cal( raise Exception('CATE predictions not yet calculated - must provide both Xval, Xtrain') self.get_cate_preds(Xval, Xtrain) + # Default to equal weights if none provided + if sampleweighttrain is None: + sampleweighttrain = np.ones(self.cate_preds_train_.shape[0]) + if sampleweightval is None: + sampleweightval = np.ones(self.cate_preds_val_.shape[0]) + + # Convert weights to integer values + sampleweightval = sampleweightval.astype(int) + sampleweighttrain = sampleweighttrain.astype(int) + + # Check weights are valid + assert (np.all(sampleweightval >= 1)), "Sample weights must be integer and >= 1" + assert (np.all(sampleweighttrain >= 1)), "Sample weights must be integer and >= 1" + cal_r_squared = np.zeros(self.n_treat) plot_data_dict = dict() + for k in range(self.n_treat): - cuts = np.quantile(self.cate_preds_train_[:, k], np.linspace(0, 1, n_groups + 1)) + + # Determine quantile-based cuts based on training set + cuts = weighted_stat(values=self.cate_preds_train_[:, k], + q=np.linspace(0, 1, n_groups + 1), + sample_weight=sampleweighttrain, + mode='quantile') + + # Initialize arrays probs = np.zeros(n_groups) g_cate = np.zeros(n_groups) se_g_cate = np.zeros(n_groups) gate = np.zeros(n_groups) se_gate = np.zeros(n_groups) + + w_tot = np.sum(sampleweightval) + + # Calculate group statistics for i in range(n_groups): + # Assign units in validation set to groups - ind = (self.cate_preds_val_[:, k] >= cuts[i]) & (self.cate_preds_val_[:, k] <= cuts[i + 1]) + if i < n_groups - 1: + ind = (self.cate_preds_val_[:, k] >= cuts[i]) & (self.cate_preds_val_[:, k] < cuts[i + 1]) + else: + ind = (self.cate_preds_val_[:, k] >= cuts[i]) & (self.cate_preds_val_[:, k] <= cuts[i + 1]) + + # Skip if no units in group + if not np.any(ind): + continue + + w_tot_inds = np.sum(sampleweightval[ind]) + # Proportion of validations set in group - probs[i] = np.mean(ind) + probs[i] = w_tot_inds / w_tot + # Group average treatment effect (GATE) -- average of DR outcomes in group - gate[i] = np.mean(self.dr_val_[ind, k]) - se_gate[i] = np.std(self.dr_val_[ind, k]) / np.sqrt(np.sum(ind)) + gate[i] = np.average(self.dr_val_[ind, k], weights=sampleweightval[ind]) + se_gate[i] = weighted_se(values=self.dr_val_[ind, k], weights=sampleweightval[ind]) + # Average of CATE predictions in group - g_cate[i] = np.mean(self.cate_preds_val_[ind, k]) - se_g_cate[i] = np.std(self.cate_preds_val_[ind, k]) / np.sqrt(np.sum(ind)) + g_cate[i] = np.average(self.cate_preds_val_[ind, k], weights=sampleweightval[ind]) + se_g_cate[i] = weighted_se(values=self.cate_preds_val_[ind, k], weights=sampleweightval[ind]) # Calculate group calibration score cal_score_g = np.sum(abs(gate - g_cate) * probs) @@ -447,8 +566,9 @@ def evaluate_cal( def evaluate_blp( self, - Xval: np.array = None, - Xtrain: np.array = None + Xval: np.ndarray = None, + Xtrain: np.ndarray = None, + sampleweightval: np.ndarray = None, ) -> BLPEvaluationResults: """ Implement the best linear predictor (BLP) test as in [Chernozhukov2022]. @@ -464,6 +584,8 @@ def evaluate_blp( Training sample features for CATE model. If specified, then CATE is fitted on training sample and applied to Xval. If specified, then Xtrain, Dtrain, Ytrain must have been specified in `fit_nuisance` method (and vice-versa) + sampleweightval: np.ndarray = None, + Sample weights for validation sample Returns ------- @@ -478,20 +600,21 @@ def evaluate_blp( raise Exception('CATE predictions not yet calculated - must provide Xval') self.get_cate_preds(Xval, Xtrain) - if self.n_treat == 1: # binary treatment - reg = OLS(self.dr_val_, add_constant(self.cate_preds_val_)).fit() - params = [reg.params[1]] - errs = [reg.bse[1]] - pvals = [reg.pvalues[1]] - else: # categorical treatment - params = [] - errs = [] - pvals = [] - for k in range(self.n_treat): # run a separate regression for each - reg = OLS(self.dr_val_[:, k], add_constant(self.cate_preds_val_[:, k])).fit(cov_type='HC1') - params.append(reg.params[1]) - errs.append(reg.bse[1]) - pvals.append(reg.pvalues[1]) + # Default to equal weights if none provided + if sampleweightval is None: + sampleweightval = np.ones(self.cate_preds_val_.shape[0]) + + params = [] + errs = [] + pvals = [] + + # run a separate regression for each treatment + for k in range(self.n_treat): + reg = WLS(self.dr_val_[:, k], add_constant(self.cate_preds_val_[:, k]), + weights=sampleweightval).fit(cov_type='HC1') + params.append(reg.params[1]) + errs.append(reg.bse[1]) + pvals.append(reg.pvalues[1]) self.blp_res = BLPEvaluationResults( params=params, @@ -504,11 +627,13 @@ def evaluate_blp( def evaluate_uplift( self, - Xval: np.array = None, - Xtrain: np.array = None, - percentiles: np.array = np.linspace(5, 95, 50), + Xval: np.ndarray = None, + Xtrain: np.ndarray = None, + percentiles: np.ndarray = np.linspace(5, 95, 50), metric: str = 'qini', - n_bootstrap: int = 1000 + n_bootstrap: int = 1000, + sampleweightval: np.ndarray = None, + sampleweighttrain: np.ndarray = None ) -> UpliftEvaluationResults: """ Calculate uplift curves and coefficients for the given model. @@ -536,6 +661,10 @@ def evaluate_uplift( Which type of uplift curve to evaluate. Must be one of ['toc', 'qini'] n_bootstrap: integer, default 1000 Number of bootstrap samples to run when calculating uniform confidence bands. + sampleweightval: np.ndarray = None, + Sample weights for validation sample + sampleweighttrain: np.ndarray = None, + Sample weights for training sample Returns ------- @@ -549,36 +678,48 @@ def evaluate_uplift( raise Exception('CATE predictions not yet calculated - must provide both Xval, Xtrain') self.get_cate_preds(Xval, Xtrain) + # Default to equal weights if none provided + if sampleweightval is None: + sampleweightval = np.ones(self.cate_preds_val_.shape[0]) + if sampleweighttrain is None: + sampleweighttrain = np.ones(self.cate_preds_train_.shape[0]) + + assert np.min(sampleweightval) > 0, "Sample weights must be positive" + assert np.min(sampleweighttrain) > 0, "Sample weights must be positive" + + # cast sample weights to float + sampleweightval = sampleweightval.astype(np.float64) + sampleweighttrain = sampleweighttrain.astype(np.float64) + + # Convert weights to minimum value of 1 + sampleweightval /= sampleweightval.min() + sampleweighttrain /= sampleweighttrain.min() + + # Check weights are valid + assert (np.all(sampleweightval >= 1)), "Sample weights must be >= 1" + assert (np.all(sampleweighttrain >= 1)), "Sample weights must be >= 1" + curve_data_dict = dict() - if self.n_treat == 1: + coeffs = [] + errs = [] + + # run a separate regression for each treatment + for k in range(self.n_treat): coeff, err, curve_df = calc_uplift( - self.cate_preds_train_, - self.cate_preds_val_, - self.dr_val_, - percentiles, - metric, - n_bootstrap + cate_preds_train=self.cate_preds_train_[:, k], + cate_preds_val=self.cate_preds_val_[:, k], + dr_val=self.dr_val_[:, k], + percentiles=percentiles, + metric=metric, + n_bootstrap=n_bootstrap, + sample_weight_train=sampleweighttrain, + sample_weight_val=sampleweightval, ) - coeffs = [coeff] - errs = [err] - curve_data_dict[self.treatments[1]] = curve_df - else: - coeffs = [] - errs = [] - for k in range(self.n_treat): - coeff, err, curve_df = calc_uplift( - self.cate_preds_train_[:, k], - self.cate_preds_val_[:, k], - self.dr_val_[:, k], - percentiles, - metric, - n_bootstrap - ) - coeffs.append(coeff) - errs.append(err) - curve_data_dict[self.treatments[k + 1]] = curve_df + coeffs.append(coeff) + errs.append(err) + curve_data_dict[self.treatments[k + 1]] = curve_df - pvals = [st.norm.sf(abs(q / e)) for q, e in zip(coeffs, errs)] + pvals = [2*st.norm.sf(abs(q / e)) for q, e in zip(coeffs, errs)] self.uplift_res = UpliftEvaluationResults( params=coeffs, @@ -592,10 +733,12 @@ def evaluate_uplift( def evaluate_all( self, - Xval: np.array = None, - Xtrain: np.array = None, + Xval: np.ndarray = None, + Xtrain: np.ndarray = None, n_groups: int = 4, - n_bootstrap: int = 1000 + n_bootstrap: int = 1000, + sampleweightval: np.ndarray = None, + sampleweighttrain: np.ndarray = None, ) -> EvaluationResults: """ Combine the best linear prediction, calibration, and uplift curve methods into a single summary. @@ -612,6 +755,10 @@ def evaluate_all( Number of quantile-based groups used to calculate calibration score. n_bootstrap: integer, default 1000 Number of bootstrap samples to run when calculating uniform confidence bands for uplift curves. + sampleweightval: np.ndarray = None, + Sample weights for validation sample + sampleweighttrain: np.ndarray = None, + Sample weights for training sample Returns ------- @@ -622,10 +769,40 @@ def evaluate_all( raise Exception('CATE predictions not yet calculated - must provide both Xval, Xtrain') self.get_cate_preds(Xval, Xtrain) - blp_res = self.evaluate_blp() - cal_res = self.evaluate_cal(n_groups=n_groups) - qini_res = self.evaluate_uplift(metric='qini', n_bootstrap=n_bootstrap) - toc_res = self.evaluate_uplift(metric='toc', n_bootstrap=n_bootstrap) + # Default to equal weights if none provided + if sampleweightval is None: + sampleweightval = np.ones(self.cate_preds_val_.shape[0]) + if sampleweighttrain is None: + sampleweighttrain = np.ones(self.cate_preds_train_.shape[0]) + + # Convert weights to integer values + sampleweightval = sampleweightval.astype(int) + sampleweighttrain = sampleweighttrain.astype(int) + + # Check weights are valid + assert (np.all(sampleweightval >= 1)), "Sample weights must be integer and >= 1" + assert (np.all(sampleweighttrain >= 1)), "Sample weights must be integer and >= 1" + + blp_res = self.evaluate_blp(Xtrain=Xtrain, + Xval=Xval, + sampleweightval=sampleweightval) + cal_res = self.evaluate_cal(Xtrain=Xtrain, + Xval=Xval, + sampleweightval=sampleweightval, + sampleweighttrain=sampleweighttrain, + n_groups=n_groups) + qini_res = self.evaluate_uplift(Xtrain=Xtrain, + Xval=Xval, + sampleweightval=sampleweightval, + sampleweighttrain=sampleweighttrain, + metric='qini', + n_bootstrap=n_bootstrap) + toc_res = self.evaluate_uplift(Xtrain=Xtrain, + Xval=Xval, + sampleweightval=sampleweightval, + sampleweighttrain=sampleweighttrain, + metric='toc', + n_bootstrap=n_bootstrap) self.res = EvaluationResults( blp_res=blp_res, diff --git a/econml/validate/results.py b/econml/validate/results.py index 1f05a5bbf..13bac5384 100644 --- a/econml/validate/results.py +++ b/econml/validate/results.py @@ -23,9 +23,9 @@ class CalibrationEvaluationResults: def __init__( self, - cal_r_squared: np.array, + cal_r_squared: np.ndarray, plot_data_dict: Dict[Any, pd.DataFrame], - treatments: np.array + treatments: np.ndarray ): self.cal_r_squared = cal_r_squared self.plot_data_dict = plot_data_dict @@ -105,7 +105,7 @@ def __init__( params: List[float], errs: List[float], pvals: List[float], - treatments: np.array + treatments: np.ndarray ): self.params = params self.errs = errs @@ -161,7 +161,7 @@ def __init__( params: List[float], errs: List[float], pvals: List[float], - treatments: np.array, + treatments: np.ndarray, curve_data_dict: Dict[Any, pd.DataFrame] ): self.params = params diff --git a/econml/validate/utils.py b/econml/validate/utils.py index f2f8c0756..cbedac12f 100644 --- a/econml/validate/utils.py +++ b/econml/validate/utils.py @@ -3,13 +3,15 @@ import numpy as np import pandas as pd +from .weighted_utils import weighted_stat, weighted_se def calculate_dr_outcomes( - D: np.array, - y: np.array, - reg_preds: np.array, - prop_preds: np.array -) -> np.array: + D: np.ndarray, + y: np.ndarray, + reg_preds: np.ndarray, + prop_preds: np.ndarray, + treatments: np.ndarray = None +) -> np.ndarray: """ Calculate doubly-robust (DR) outcomes using predictions from nuisance models. @@ -25,6 +27,9 @@ def calculate_dr_outcomes( Outcome predictions for each potential treatment prop_preds: (n x n_treat) matrix Propensity score predictions for each treatment + treatments: vector of length n_treat, default None + Unique treatment values. If None, will be inferred from D. + Should be sorted in increasing order and include control. Returns ------- @@ -33,28 +38,35 @@ def calculate_dr_outcomes( # treat each treatment as a separate regression # here, prop_preds should be a matrix # with rows corresponding to units and columns corresponding to treatment statuses + + if treatments is None: + treatments = np.sort(np.unique(D)) + dr_vec = [] d0_mask = np.where(D == 0, 1, 0) y_dr_0 = reg_preds[:, 0] + (d0_mask / np.clip(prop_preds[:, 0], .01, np.inf)) * (y - reg_preds[:, 0]) - for k in np.sort(np.unique(D)): # pick a treatment status - if k > 0: # make sure it is not control - dk_mask = np.where(D == k, 1, 0) - y_dr_k = reg_preds[:, k] + (dk_mask / np.clip(prop_preds[:, k], .01, np.inf)) * (y - reg_preds[:, k]) - dr_k = y_dr_k - y_dr_0 # this is an n x 1 vector - dr_vec.append(dr_k) - dr = np.column_stack(dr_vec) # this is an n x n_treatment matrix - return dr + for ind, k in enumerate(treatments[1:]): # pick a treatment status + dk_mask = np.where(D == k, 1, 0) + y_dr_k = reg_preds[:, ind + 1] + \ + (dk_mask / np.clip(prop_preds[:, ind + 1], .01, np.inf)) * (y - reg_preds[:, ind + 1]) + dr_k = y_dr_k - y_dr_0 # this is an n x 1 vector + dr_vec.append(dr_k) + dr = np.column_stack(dr_vec) # this is an n x n_treatment-1 matrix + + return dr def calc_uplift( - cate_preds_train: np.array, - cate_preds_val: np.array, - dr_val: np.array, - percentiles: np.array, + cate_preds_train: np.ndarray, + cate_preds_val: np.ndarray, + dr_val: np.ndarray, + percentiles: np.ndarray, metric: str, - n_bootstrap: int = 1000 -) -> Tuple[float, float, pd.DataFrame]: + n_bootstrap: int = 1000, + sample_weight_train: np.ndarray = None, + sample_weight_val: np.ndarray = None + ) -> Tuple[float, float, pd.DataFrame]: """ Calculate uplift curve points, integral, and errors on both points and integral. @@ -76,34 +88,83 @@ def calc_uplift( String indicating whether to calculate TOC or QINI; should be one of ['toc', 'qini'] n_bootstrap: integer, default 1000 Number of bootstrap samples to run when calculating uniform confidence bands. + sample_weight_train: vector of length n_train, default None + Sample weights for the training sample. + sample_weight_val: vector of length n_val, default None + Sample weights for the validation sample. Returns ------- Uplift coefficient and associated standard error, as well as associated curve. """ - qs = np.percentile(cate_preds_train, percentiles) + # Default to equal weights if none provided + if sample_weight_train is None: + sample_weight_train = np.ones(cate_preds_train.shape) + if sample_weight_val is None: + sample_weight_val = np.ones(cate_preds_val.shape) + + # Broadcast weights if needed to match cate_preds shape + if sample_weight_train.shape != cate_preds_train.shape: + if sample_weight_train.ndim == 1: + sample_weight_train = sample_weight_train[:, np.newaxis] + sample_weight_train = np.broadcast_to(sample_weight_train, cate_preds_train.shape) + if sample_weight_val.shape != cate_preds_val.shape: + if sample_weight_val.ndim == 1: + sample_weight_val = sample_weight_val[:, np.newaxis] + sample_weight_val = np.broadcast_to(sample_weight_val, cate_preds_val.shape) + + # calculate weighted quantiles of CATE predictions in training set + qs = weighted_stat(values=cate_preds_train, + q=percentiles, + sample_weight=sample_weight_train, + mode="percentile") + + # initialize arrays toc, toc_std, group_prob = np.zeros(len(qs)), np.zeros(len(qs)), np.zeros(len(qs)) toc_psi = np.zeros((len(qs), dr_val.shape[0])) - n = len(dr_val) - ate = np.mean(dr_val) + + # total sample size (weighted). if sample weights are all 1's, this is just n + w_tot = np.sum(sample_weight_val) + + # ATE estimate in validation set. If sample weights are all 1's, this is just the average + # of the DR outcomes - E[Y(1) - Y(0)] + ate = np.average(dr_val, weights=sample_weight_val) + + # calculate point estimates and influence functions for each quantile for it in range(len(qs)): - inds = (qs[it] <= cate_preds_val) # group with larger CATE prediction than the q-th quantile - group_prob = np.sum(inds) / n # fraction of population in this group + + # group with larger CATE prediction than the q-th quantile + inds = (qs[it] <= cate_preds_val) + w_tot_inds = np.sum(sample_weight_val[inds]) + + # fraction of population in this group + group_prob = w_tot_inds / w_tot + + # calculate point estimate and influence function for tau(q) if metric == 'qini': + # tau(q) = q * E[Y(1) - Y(0) | tau(X) >= q[it]] - E[Y(1) - Y(0)] toc[it] = group_prob * ( - np.mean(dr_val[inds]) - ate) # tau(q) = q * E[Y(1) - Y(0) | tau(X) >= q[it]] - E[Y(1) - Y(0)] + np.average(dr_val[inds], weights=sample_weight_val[inds]) - ate) + # influence function for the tau(q) toc_psi[it, :] = np.squeeze( - (dr_val - ate) * (inds - group_prob) - toc[it]) # influence function for the tau(q) + (dr_val - ate) * (inds - group_prob) - toc[it]) elif metric == 'toc': - toc[it] = np.mean(dr_val[inds]) - ate # tau(q) := E[Y(1) - Y(0) | tau(X) >= q[it]] - E[Y(1) - Y(0)] + # tau(q) := E[Y(1) - Y(0) | tau(X) >= q[it]] - E[Y(1) - Y(0)] + toc[it] = np.average(dr_val[inds], weights=sample_weight_val[inds]) - ate toc_psi[it, :] = np.squeeze((dr_val - ate) * (inds / group_prob - 1) - toc[it]) else: raise ValueError(f"Unsupported metric {metric!r} - must be one of ['toc', 'qini']") - toc_std[it] = np.sqrt(np.mean(toc_psi[it] ** 2) / n) # standard error of tau(q) + # standard error of tau(q) + if sample_weight_val.ndim > 1: + toc_std[it] = weighted_se(values=toc_psi[it], weights=sample_weight_val[:, 0]) + else: + toc_std[it] = weighted_se(values=toc_psi[it], weights=sample_weight_val) - w = np.random.normal(0, 1, size=(n, n_bootstrap)) - mboot = (toc_psi / toc_std.reshape(-1, 1)) @ w / n + # calculate critical values for uniform confidence bands via multiplier bootstrap + n_size_bootstrap = sample_weight_val.shape[0] + w = np.random.normal(0, 1, size=(n_size_bootstrap, n_bootstrap)) + mboot = (toc_psi / toc_std.reshape(-1, 1)) @ w / w_tot max_mboot = np.max(np.abs(mboot), axis=0) uniform_critical_value = np.percentile(max_mboot, 95) @@ -111,9 +172,15 @@ def calc_uplift( min_mboot = np.min(mboot, axis=0) uniform_one_side_critical_value = np.abs(np.percentile(min_mboot, 5)) + # calculate coefficient and standard error of the coefficient coeff_psi = np.sum(toc_psi[:-1] * np.diff(percentiles).reshape(-1, 1) / 100, 0) coeff = np.sum(toc[:-1] * np.diff(percentiles) / 100) - coeff_stderr = np.sqrt(np.mean(coeff_psi ** 2) / n) + + # standard error of the coefficient + if sample_weight_val.ndim > 1: + coeff_stderr = weighted_se(values=coeff_psi, weights=sample_weight_val[:, 0]) + else: + coeff_stderr = weighted_se(values=coeff_psi, weights=sample_weight_val) curve_df = pd.DataFrame({ 'Percentage treated': 100 - percentiles, diff --git a/econml/validate/weighted_utils.py b/econml/validate/weighted_utils.py new file mode 100644 index 000000000..f640c0518 --- /dev/null +++ b/econml/validate/weighted_utils.py @@ -0,0 +1,218 @@ +import numpy as np +from sklearn.base import clone, is_classifier +from sklearn.utils import indexable +from sklearn.utils.validation import _num_samples +from sklearn.model_selection import check_cv +from joblib import Parallel, delayed + +def _weighted_quantile_1d(values, quantiles, sample_weight): + """ + Compute weighted quantiles or percentiles for 1D arrays using binary search + local interpolation. + + Parameters + ---------- + values : array-like + Input data (1-D array). + quantiles : array-like + Quantiles to compute (must be in [0, 1]). + sample_weight : array-like + Weights associated with each value (1-D array of the same length as `values`). + + Returns + ------- + array-like + Weighted quantile values. + """ + sorter = np.argsort(values) + values_sorted = values[sorter] + weights_sorted = sample_weight[sorter] + + cdf = np.cumsum(weights_sorted) - 0.5 * weights_sorted + cdf /= np.sum(weights_sorted) + + idx = np.searchsorted(cdf, quantiles, side="right") + + results = [] + for q, i in zip(quantiles, idx): + if i == 0: + results.append(values_sorted[0]) + elif i == len(values_sorted): + results.append(values_sorted[-1]) + else: + cdf_lo, cdf_hi = cdf[i-1], cdf[i] + val_lo, val_hi = values_sorted[i-1], values_sorted[i] + t = (q - cdf_lo) / (cdf_hi - cdf_lo) + results.append(val_lo + t * (val_hi - val_lo)) + return np.array(results) + +def weighted_stat(values, q, sample_weight=None, axis=None, keepdims=False, mode="quantile"): + """ + Compute weighted quantiles or percentiles along a given axis using binary search + local interpolation. + + Parameters + ---------- + values : array-like + Input data (N-D array). + q : array-like or float + Quantiles or percentiles to compute. + - If mode="quantile", q must be in [0, 1]. + - If mode="percentile", q must be in [0, 100]. + sample_weight : array-like or None, optional + Weights associated with each value. Must be broadcastable to `values`. + If None, equal weights are assumed. + axis : int or None, optional + Axis along which the computation is performed. Default is None (flatten the array). + keepdims : bool, optional + If True, the reduced axes are left in the result as dimensions with size one. + mode : {"quantile", "percentile"}, default="quantile" + Whether `q` is specified in quantiles ([0,1]) or percentiles ([0,100]). + + Returns + ------- + result : ndarray + Weighted quantile/percentile values. + """ + values = np.asarray(values) + q = np.atleast_1d(q) + + if mode == "percentile": + q = q / 100.0 + elif mode != "quantile": + raise ValueError("mode must be either 'quantile' or 'percentile'") + + if sample_weight is None: + sample_weight = np.ones_like(values, dtype=float) + else: + sample_weight = np.asarray(sample_weight, dtype=float) + + # Flatten if axis=None + if axis is None: + values = values.ravel() + sample_weight = sample_weight.ravel() + return _weighted_quantile_1d(values, q, sample_weight) + + # Move axis to front, then iterate slice by slice + values = np.moveaxis(values, axis, 0) + sample_weight = np.moveaxis(sample_weight, axis, 0) + + result = [] + for v, w in zip(values, sample_weight): + result.append(_weighted_quantile_1d(v.ravel(), q, w.ravel())) + result = np.stack(result, axis=0) + + if not keepdims: + result = np.moveaxis(result, 0, axis) + else: + shape = list(values.shape) + shape[0] = 1 + result = result.reshape([len(q)] + shape) + + return result + +def weighted_se(values: np.ndarray, weights: np.ndarray) -> float: + """ + Compute the standard error of the weighted mean. + + Parameters + ---------- + values : array-like + Data values (x_i). + weights : array-like + Sample weights (w_i), must be non-negative and same shape as values. + + Returns + ------- + float + Standard error of the weighted mean. + """ + values = np.asarray(values) + weights = np.asarray(weights) + if values.shape != weights.shape: + raise ValueError("values and weights must have the same shape") + + W_sum = np.sum(weights) + if W_sum == 0: + return np.nan + + wmean = np.average(values, weights=weights) + diff = values - wmean + + num = np.sum((weights**2) * (diff**2)) + den = W_sum**2 + + return np.sqrt(num / den) + +def cross_val_predict_with_weights( + estimator, X, y, *, sample_weight=None, cv=5, method="predict", n_jobs=None, verbose=0, fit_params=None +): + """ + Cross-validation predictions with support for sample_weight. + + Works like sklearn.model_selection.cross_val_predict, but passes sample_weight to estimator.fit. + + Parameters + ---------- + estimator : estimator object implementing 'fit' + The object to use to fit the data. + X : array-like + Input features. + y : array-like + Target variable. + sample_weight : array-like of shape (n_samples,), default=None + Sample weights to pass to fit. + cv : int, cross-validation generator, or iterable, default=5 + Determines the cross-validation splitting strategy. + method : str, default='predict' + Invokes the passed method name of the estimator. + n_jobs : int, default=None + Number of jobs to run in parallel. + verbose : int, default=0 + Verbosity level. + fit_params : dict, default=None + Additional fit parameters. + + Returns + ------- + predictions : ndarray + Array of predictions aligned with the input data. + """ + if fit_params is None: + fit_params = {} + + X, y = indexable(X, y) + cv = check_cv(cv, y, classifier=is_classifier(estimator)) + + # Where predictions will be stored + n_samples = _num_samples(X) + predictions = None + + parallel = Parallel(n_jobs=n_jobs, verbose=verbose) + + def _fit_and_predict(train, test): + est = clone(estimator) + fit_params_fold = fit_params.copy() + + # If sample_weight is provided, slice it for the train indices + if sample_weight is not None: + fit_params_fold = {**fit_params_fold, "sample_weight": sample_weight[train]} + + est.fit(X[train], y[train], **fit_params_fold) + pred = getattr(est, method)(X[test]) + return test, pred + + results = parallel( + delayed(_fit_and_predict)(train, test) + for train, test in cv.split(X, y) + ) + + # Allocate predictions after we know their shape + for test, pred in results: + if predictions is None: + # initialize array with right shape and dtype + if pred.ndim == 1: + predictions = np.empty(n_samples, dtype=pred.dtype) + else: + predictions = np.empty((n_samples, pred.shape[1]), dtype=pred.dtype) + predictions[test] = pred + + return predictions