-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy path_estimation.py
More file actions
491 lines (419 loc) · 17.6 KB
/
_estimation.py
File metadata and controls
491 lines (419 loc) · 17.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
import warnings
import numpy as np
from joblib import Parallel, delayed
from scipy.optimize import minimize_scalar
from sklearn.base import clone
from sklearn.metrics import log_loss, root_mean_squared_error
from sklearn.model_selection import GridSearchCV, KFold, RandomizedSearchCV, cross_val_predict
from sklearn.preprocessing import LabelEncoder
from statsmodels.nonparametric.kde import KDEUnivariate
from ._checks import _check_finite_predictions, _check_is_partition
def _assure_2d_array(x):
if x.ndim == 1:
x = x.reshape(-1, 1)
elif x.ndim > 2:
raise ValueError("Only one- or two-dimensional arrays are allowed")
return x
def _get_cond_smpls(smpls, bin_var):
smpls_0 = [(np.intersect1d(np.where(bin_var == 0)[0], train), test) for train, test in smpls]
smpls_1 = [(np.intersect1d(np.where(bin_var == 1)[0], train), test) for train, test in smpls]
return smpls_0, smpls_1
def _get_cond_smpls_2d(smpls, bin_var1, bin_var2):
subset_00 = (bin_var1 == 0) & (bin_var2 == 0)
smpls_00 = [(np.intersect1d(np.where(subset_00)[0], train), test) for train, test in smpls]
subset_01 = (bin_var1 == 0) & (bin_var2 == 1)
smpls_01 = [(np.intersect1d(np.where(subset_01)[0], train), test) for train, test in smpls]
subset_10 = (bin_var1 == 1) & (bin_var2 == 0)
smpls_10 = [(np.intersect1d(np.where(subset_10)[0], train), test) for train, test in smpls]
subset_11 = (bin_var1 == 1) & (bin_var2 == 1)
smpls_11 = [(np.intersect1d(np.where(subset_11)[0], train), test) for train, test in smpls]
return smpls_00, smpls_01, smpls_10, smpls_11
def _fit(estimator, x, y, train_index, idx=None, sample_weights=None):
if sample_weights is not None:
estimator.fit(x[train_index, :], y[train_index], sample_weight=sample_weights[train_index])
else:
estimator.fit(x[train_index, :], y[train_index])
return estimator, idx
def _dml_cv_predict(
estimator,
x,
y,
smpls=None,
n_jobs=None,
est_params=None,
method="predict",
return_train_preds=False,
return_models=False,
sample_weights=None,
):
n_obs = x.shape[0]
smpls_is_partition = _check_is_partition(smpls, n_obs)
fold_specific_params = (est_params is not None) & (not isinstance(est_params, dict))
fold_specific_target = isinstance(y, list)
manual_cv_predict = (
(not smpls_is_partition) | return_train_preds | fold_specific_params | fold_specific_target | return_models
)
res = {"models": None}
if not manual_cv_predict:
# prepare fit_params for cross_val_predict
fit_params_for_cv = {"sample_weight": sample_weights} if sample_weights is not None else None
if est_params is None:
# if there are no parameters set we redirect to the standard method
preds = cross_val_predict(
clone(estimator),
x,
y,
cv=smpls,
n_jobs=n_jobs,
method=method,
params=fit_params_for_cv,
)
else:
assert isinstance(est_params, dict)
# if no fold-specific parameters we redirect to the standard method
# warnings.warn("Using the same (hyper-)parameters for all folds")
preds = cross_val_predict(
clone(estimator).set_params(**est_params),
x,
y,
cv=smpls,
n_jobs=n_jobs,
method=method,
params=fit_params_for_cv,
)
if method == "predict_proba":
res["preds"] = preds[:, 1]
else:
res["preds"] = preds
res["targets"] = np.copy(y)
else:
if not smpls_is_partition:
assert not fold_specific_target, "combination of fold-specific y and no cross-fitting not implemented yet"
# assert len(smpls) == 1
if method == "predict_proba":
assert not fold_specific_target # fold_specific_target only needed for PLIV.partialXZ
y = np.asarray(y)
le = LabelEncoder()
y = le.fit_transform(y)
parallel = Parallel(n_jobs=n_jobs, verbose=0, pre_dispatch="2*n_jobs")
if fold_specific_target:
y_list = list()
for idx, (train_index, _) in enumerate(smpls):
xx = np.full(n_obs, np.nan)
xx[train_index] = y[idx]
y_list.append(xx)
else:
# just replicate the y in a list
y_list = [y] * len(smpls)
if est_params is None:
fitted_models = parallel(
delayed(_fit)(clone(estimator), x, y_list[idx], train_index, idx, sample_weights=sample_weights)
for idx, (train_index, test_index) in enumerate(smpls)
)
elif isinstance(est_params, dict):
# warnings.warn("Using the same (hyper-)parameters for all folds")
fitted_models = parallel(
delayed(_fit)(
clone(estimator).set_params(**est_params), x, y_list[idx], train_index, idx, sample_weights=sample_weights
)
for idx, (train_index, test_index) in enumerate(smpls)
)
else:
assert len(est_params) == len(smpls), "provide one parameter setting per fold"
fitted_models = parallel(
delayed(_fit)(
clone(estimator).set_params(**est_params[idx]),
x,
y_list[idx],
train_index,
idx,
sample_weights=sample_weights,
)
for idx, (train_index, test_index) in enumerate(smpls)
)
preds = np.full(n_obs, np.nan)
targets = np.full(n_obs, np.nan)
train_preds = list()
train_targets = list()
for idx, (train_index, test_index) in enumerate(smpls):
assert idx == fitted_models[idx][1]
pred_fun = getattr(fitted_models[idx][0], method)
if method == "predict_proba":
preds[test_index] = pred_fun(x[test_index, :])[:, 1]
else:
preds[test_index] = pred_fun(x[test_index, :])
if fold_specific_target:
# targets not available for fold specific target
targets = None
else:
targets[test_index] = y[test_index]
if return_train_preds:
train_preds.append(pred_fun(x[train_index, :]))
train_targets.append(y[train_index])
res["preds"] = preds
res["targets"] = targets
if return_train_preds:
res["train_preds"] = train_preds
res["train_targets"] = train_targets
if return_models:
fold_ids = [xx[1] for xx in fitted_models]
if not np.all(fold_ids == np.arange(len(smpls))):
raise RuntimeError("export of fitted models failed")
res["models"] = [xx[0] for xx in fitted_models]
return res
def _double_dml_cv_predict(
estimator,
estimator_name,
x,
y,
smpls=None,
smpls_inner=None,
n_jobs=None,
est_params=None,
method="predict",
sample_weights=None,
):
res = {}
res["preds"] = np.zeros(y.shape, dtype=float)
res["preds_inner"] = []
res["targets_inner"] = []
res["models"] = []
for smpls_single_split, smpls_double_split in zip(smpls, smpls_inner):
res_inner = _dml_cv_predict(
estimator,
x,
y,
smpls=smpls_double_split,
n_jobs=n_jobs,
est_params=est_params,
method=method,
return_models=True,
sample_weights=sample_weights,
)
_check_finite_predictions(res_inner["preds"], estimator, estimator_name, smpls_double_split)
res["preds_inner"].append(res_inner["preds"])
res["targets_inner"].append(res_inner["targets"])
for model in res_inner["models"]:
res["models"].append(model)
if method == "predict_proba":
res["preds"][smpls_single_split[1]] += model.predict_proba(x[smpls_single_split[1]])[:, 1]
else:
res["preds"][smpls_single_split[1]] += model.predict(x[smpls_single_split[1]])
res["preds"] /= len(smpls)
res["targets"] = np.copy(y)
return res
def _dml_tune(
y,
x,
train_inds,
learner,
param_grid,
scoring_method,
n_folds_tune,
n_jobs_cv,
search_mode,
n_iter_randomized_search,
fold_specific_target=False,
):
tune_res = list()
for i, train_index in enumerate(train_inds):
tune_resampling = KFold(n_splits=n_folds_tune, shuffle=True)
if search_mode == "grid_search":
g_grid_search = GridSearchCV(learner, param_grid, scoring=scoring_method, cv=tune_resampling, n_jobs=n_jobs_cv)
else:
assert search_mode == "randomized_search"
g_grid_search = RandomizedSearchCV(
learner,
param_grid,
scoring=scoring_method,
cv=tune_resampling,
n_jobs=n_jobs_cv,
n_iter=n_iter_randomized_search,
)
if fold_specific_target:
tune_res.append(g_grid_search.fit(x[train_index, :], y[i]))
else:
tune_res.append(g_grid_search.fit(x[train_index, :], y[train_index]))
return tune_res
def _draw_weights(method, n_rep_boot, n_obs):
if method == "Bayes":
weights = np.random.exponential(scale=1.0, size=(n_rep_boot, n_obs)) - 1.0
elif method == "normal":
weights = np.random.normal(loc=0.0, scale=1.0, size=(n_rep_boot, n_obs))
elif method == "wild":
xx = np.random.normal(loc=0.0, scale=1.0, size=(n_rep_boot, n_obs))
yy = np.random.normal(loc=0.0, scale=1.0, size=(n_rep_boot, n_obs))
weights = xx / np.sqrt(2) + (np.power(yy, 2) - 1) / 2
else:
raise ValueError("invalid boot method")
return weights
def _rmse(y_true, y_pred):
subset = np.logical_not(np.isnan(y_true))
rmse = root_mean_squared_error(y_true[subset], y_pred[subset])
return rmse
def _logloss(y_true, y_pred):
subset = np.logical_not(np.isnan(y_true))
logloss = log_loss(y_true[subset], y_pred[subset])
return logloss
def _predict_zero_one_propensity(learner, X):
pred_proba = learner.predict_proba(X)
if pred_proba.shape[1] == 2:
res = pred_proba[:, 1]
else:
warnings.warn("Subsample has not common support. Results are based on adjusted propensities.")
res = learner.predict(X)
return res
def _get_bracket_guess(score, coef_start, coef_bounds):
# Ensure coef_start is a scalar (handles 1-element arrays from optimizers)
if isinstance(coef_start, np.ndarray):
coef_start = coef_start.item()
max_bracket_length = coef_bounds[1] - coef_bounds[0]
b_guess = coef_bounds
delta = 0.1
s_different = False
while (not s_different) & (delta <= 1.0):
a = np.maximum(coef_start - delta * max_bracket_length / 2, coef_bounds[0])
b = np.minimum(coef_start + delta * max_bracket_length / 2, coef_bounds[1])
b_guess = (float(a), float(b))
f_a = score(b_guess[0])
f_b = score(b_guess[1])
s_different = np.sign(f_a) != np.sign(f_b)
delta += 0.1
return s_different, b_guess
def _default_kde(u, weights):
dens = KDEUnivariate(u)
dens.fit(kernel="gau", bw="silverman", weights=weights, fft=False)
return dens.evaluate(0)
def _solve_ipw_score(ipw_score, bracket_guess):
def abs_ipw_score(theta):
return abs(ipw_score(theta))
res = minimize_scalar(abs_ipw_score, bracket=bracket_guess, method="brent")
ipw_est = res.x
return ipw_est
def _aggregate_coefs_and_ses(all_coefs, all_ses):
# already expects equally scaled variances over all repetitions
# aggregation is done over dimension 1, such that the coefs and ses have to be of shape (n_coefs, n_rep)
coefs = np.median(all_coefs, 1)
# construct the upper bounds & aggregate
critical_value = 1.96
all_upper_bounds = all_coefs + critical_value * all_ses
agg_upper_bounds = np.median(all_upper_bounds, axis=1)
# reverse to calculate the standard errors
ses = (agg_upper_bounds - coefs) / critical_value
return coefs, ses
def _var_est(psi, psi_deriv, smpls, is_cluster_data, cluster_vars=None, smpls_cluster=None, n_folds_per_cluster=None):
if not is_cluster_data:
# psi and psi_deriv should be of shape (n_obs, ...)
var_scaling_factor = psi.shape[0]
J = np.mean(psi_deriv)
gamma_hat = np.mean(np.square(psi))
else:
assert cluster_vars is not None
assert smpls_cluster is not None
assert n_folds_per_cluster is not None
n_folds = len(smpls)
# one cluster
if cluster_vars.shape[1] == 1:
first_cluster_var = cluster_vars[:, 0]
clusters = np.unique(first_cluster_var)
gamma_hat = 0
j_hat = 0
for i_fold in range(n_folds):
test_inds = smpls[i_fold][1]
test_cluster_inds = smpls_cluster[i_fold][1]
I_k = test_cluster_inds[0]
const = 1 / len(I_k)
for cluster_value in I_k:
ind_cluster = first_cluster_var == cluster_value
gamma_hat += const * np.sum(np.outer(psi[ind_cluster], psi[ind_cluster]))
j_hat += np.sum(psi_deriv[test_inds]) / len(I_k)
var_scaling_factor = len(clusters)
J = np.divide(j_hat, n_folds_per_cluster)
gamma_hat = np.divide(gamma_hat, n_folds_per_cluster)
else:
assert cluster_vars.shape[1] == 2
first_cluster_var = cluster_vars[:, 0]
second_cluster_var = cluster_vars[:, 1]
gamma_hat = 0
j_hat = 0
for i_fold in range(n_folds):
test_inds = smpls[i_fold][1]
test_cluster_inds = smpls_cluster[i_fold][1]
I_k = test_cluster_inds[0]
J_l = test_cluster_inds[1]
const = np.divide(min(len(I_k), len(J_l)), (np.square(len(I_k) * len(J_l))))
for cluster_value in I_k:
ind_cluster = (first_cluster_var == cluster_value) & np.isin(second_cluster_var, J_l)
gamma_hat += const * np.sum(np.outer(psi[ind_cluster], psi[ind_cluster]))
for cluster_value in J_l:
ind_cluster = (second_cluster_var == cluster_value) & np.isin(first_cluster_var, I_k)
gamma_hat += const * np.sum(np.outer(psi[ind_cluster], psi[ind_cluster]))
j_hat += np.sum(psi_deriv[test_inds]) / (len(I_k) * len(J_l))
n_first_clusters = len(np.unique(first_cluster_var))
n_second_clusters = len(np.unique(second_cluster_var))
var_scaling_factor = min(n_first_clusters, n_second_clusters)
J = np.divide(j_hat, np.square(n_folds_per_cluster))
gamma_hat = np.divide(gamma_hat, np.square(n_folds_per_cluster))
scaling = np.divide(1.0, np.multiply(var_scaling_factor, np.square(J)))
sigma2_hat = np.multiply(scaling, gamma_hat)
return sigma2_hat, var_scaling_factor
def _cond_targets(target, cond_sample):
cond_target = target.astype(float)
cond_target[np.invert(cond_sample)] = np.nan
return cond_target
def _set_external_predictions(external_predictions, learners, treatment, i_rep):
ext_prediction_dict = {}
for learner in learners:
if external_predictions is None:
ext_prediction_dict[learner] = None
elif learner in external_predictions[treatment].keys():
if isinstance(external_predictions[treatment][learner], np.ndarray):
ext_prediction_dict[learner] = external_predictions[treatment][learner][:, i_rep].astype(float)
else:
ext_prediction_dict[learner] = None
else:
ext_prediction_dict[learner] = None
return ext_prediction_dict
def _solve_quadratic_inequality(a: float, b: float, c: float):
"""
Solves the quadratic inequation a*x^2 + b*x + c <= 0 and returns the intervals.
Parameters
----------
a : float
Coefficient of x^2.
b : float
Coefficient of x.
c : float
Constant term.
Returns
-------
List[Tuple[float, float]]
A list of intervals where the inequation holds.
"""
# Handle special cases
if abs(a) < 1e-12: # a is effectively zero
if abs(b) < 1e-12: # constant case
return [(-np.inf, np.inf)] if c <= 0 else []
# Linear case:
else:
root = -c / b
return [(-np.inf, root)] if b > 0 else [(root, np.inf)]
# Standard case: quadratic equation
roots = np.polynomial.polynomial.polyroots([c, b, a])
real_roots = np.sort(roots[np.isreal(roots)].real)
if len(real_roots) == 0: # No real roots
if a > 0: # parabola opens upwards, no real roots
return []
else: # parabola opens downwards, always <= 0
return [(-np.inf, np.inf)]
elif len(real_roots) == 1 or np.allclose(real_roots[0], real_roots[1]): # One real root
if a > 0:
return [(real_roots[0], real_roots[0])] # parabola touches x-axis at one point
else:
return [(-np.inf, np.inf)] # parabola is always <= 0
else:
assert len(real_roots) == 2
if a > 0: # happy quadratic (parabola opens upwards)
return [(real_roots[0], real_roots[1])]
else: # sad quadratic (parabola opens downwards)
return [(-np.inf, real_roots[0]), (real_roots[1], np.inf)]