Skip to content

Commit f1a29bb

Browse files
authored
Merge pull request #414 from PolicyEngine/move-imputations-to-microimpute
Move QRF implementation to microimpute package
2 parents 1eb16c9 + 4b73891 commit f1a29bb

File tree

8 files changed

+143
-109
lines changed

8 files changed

+143
-109
lines changed

changelog_entry.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
- bump: patch
2+
changes:
3+
fixed:
4+
- Moved QRF implementation to microimpute package to avoid code duplication

policyengine_us_data/datasets/cps/cps.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from policyengine_us_data.utils.uprating import (
1313
create_policyengine_uprating_factors_table,
1414
)
15-
from policyengine_us_data.utils import QRF
15+
from microimpute.models.qrf import QRF
1616
import logging
1717

1818

@@ -177,19 +177,23 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame):
177177

178178
qrf = QRF()
179179
logging.info("Training imputation model for rent and real estate taxes.")
180-
qrf.fit(train_df[PREDICTORS], train_df[IMPUTATIONS])
180+
fitted_model = qrf.fit(
181+
X_train=train_df,
182+
predictors=PREDICTORS,
183+
imputed_variables=IMPUTATIONS,
184+
)
181185
logging.info("Imputing rent and real estate taxes.")
182-
imputed_values = qrf.predict(inference_df[PREDICTORS])
186+
imputed_values = fitted_model.predict(X_test=inference_df)
183187
logging.info("Imputation complete.")
184188
cps["rent"] = np.zeros_like(cps["age"])
185-
cps["rent"][mask] = imputed_values["rent"]
189+
cps["rent"][mask] = imputed_values[0.5]["rent"]
186190
# Assume zero housing assistance since
187191
cps["pre_subsidy_rent"] = cps["rent"]
188192
cps["housing_assistance"] = np.zeros_like(
189193
cps["spm_unit_capped_housing_subsidy_reported"]
190194
)
191195
cps["real_estate_taxes"] = np.zeros_like(cps["age"])
192-
cps["real_estate_taxes"][mask] = imputed_values["real_estate_taxes"]
196+
cps["real_estate_taxes"][mask] = imputed_values[0.5]["real_estate_taxes"]
193197

194198

195199
def add_takeup(self):

policyengine_us_data/datasets/cps/extended_cps.py

Lines changed: 95 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from policyengine_us_data.datasets.puf import *
66
import pandas as pd
77
import os
8-
from policyengine_us_data.utils import QRF
8+
from microimpute.models.qrf import QRF
99
import time
1010
import logging
11+
import gc
1112

1213
# These are sorted by magnitude.
1314
# First 15 contain 90%.
@@ -28,7 +29,7 @@
2829
"self_employment_income",
2930
"w2_wages_from_qualified_business",
3031
"unadjusted_basis_qualified_property",
31-
"business_is_sstb",
32+
"business_is_sstb", # bool
3233
"short_term_capital_gains",
3334
"qualified_dividend_income",
3435
"charitable_cash_donations",
@@ -220,25 +221,103 @@ def impute_income_variables(
220221
predictors: list[str] = None,
221222
outputs: list[str] = None,
222223
):
223-
X_train = puf_sim.calculate_dataframe(predictors)
224-
y_train = puf_sim.calculate_dataframe(outputs)
225-
X = cps_sim.calculate_dataframe(predictors)
226-
y = pd.DataFrame(columns=outputs, index=X.index)
227-
model = QRF()
228-
start = time.time()
229-
model.fit(
230-
X_train,
231-
y_train,
224+
225+
# Calculate all variables together to preserve dependencies
226+
X_train = puf_sim.calculate_dataframe(predictors + outputs)
227+
228+
# Check which outputs are actually in the result
229+
available_outputs = [col for col in outputs if col in X_train.columns]
230+
missing_outputs = [col for col in outputs if col not in X_train.columns]
231+
232+
if missing_outputs:
233+
logging.warning(
234+
f"The following {len(missing_outputs)} variables were not calculated: {missing_outputs}"
235+
)
236+
# Log the specific missing variable that's causing issues
237+
if "recapture_of_investment_credit" in missing_outputs:
238+
logging.error(
239+
"recapture_of_investment_credit is missing from PUF calculation!"
240+
)
241+
242+
logging.info(
243+
f"X_train shape: {X_train.shape}, columns: {len(X_train.columns)}"
232244
)
245+
246+
X_test = cps_sim.calculate_dataframe(predictors)
247+
233248
logging.info(
234-
f"Training imputation models from the PUF took {time.time() - start:.2f} seconds"
249+
f"Imputing {len(available_outputs)} variables using batched sequential QRF"
235250
)
236-
start = time.time()
237-
y = model.predict(X)
251+
total_start = time.time()
252+
253+
# Batch variables to avoid memory issues with sequential imputation
254+
batch_size = 10 # Reduce to 10 variables at a time
255+
result = pd.DataFrame(index=X_test.index)
256+
257+
# Sample training data more aggressively upfront
258+
sample_size = min(5000, len(X_train)) # Reduced from 5000
259+
if len(X_train) > sample_size:
260+
logging.info(
261+
f"Sampling training data from {len(X_train)} to {sample_size} rows"
262+
)
263+
X_train_sampled = X_train.sample(n=sample_size, random_state=42)
264+
else:
265+
X_train_sampled = X_train
266+
267+
for batch_start in range(0, len(available_outputs), batch_size):
268+
batch_end = min(batch_start + batch_size, len(available_outputs))
269+
batch_vars = available_outputs[batch_start:batch_end]
270+
271+
logging.info(
272+
f"Processing batch {batch_start//batch_size + 1}: variables {batch_start+1}-{batch_end} ({batch_vars})"
273+
)
274+
275+
# Force garbage collection before each batch
276+
gc.collect()
277+
278+
# Create a fresh QRF for each batch
279+
qrf = QRF(
280+
log_level="INFO",
281+
memory_efficient=True,
282+
batch_size=10,
283+
cleanup_interval=5,
284+
)
285+
286+
# Use pre-sampled data for this batch
287+
batch_X_train = X_train_sampled[predictors + batch_vars].copy()
288+
289+
# Fit model for this batch with sequential imputation within the batch
290+
fitted_model = qrf.fit(
291+
X_train=batch_X_train,
292+
predictors=predictors,
293+
imputed_variables=batch_vars,
294+
n_jobs=1, # Single thread to reduce memory overhead
295+
)
296+
297+
# Predict for this batch
298+
batch_predictions = fitted_model.predict(X_test=X_test)
299+
300+
# Extract median predictions and add to result
301+
for var in batch_vars:
302+
result[var] = batch_predictions[0.5][var]
303+
304+
# Clean up batch objects
305+
del fitted_model
306+
del batch_predictions
307+
del batch_X_train
308+
gc.collect()
309+
310+
logging.info(f"Completed batch {batch_start//batch_size + 1}")
311+
312+
# Add zeros for missing variables
313+
for var in missing_outputs:
314+
result[var] = 0
315+
238316
logging.info(
239-
f"Predicting imputed values took {time.time() - start:.2f} seconds"
317+
f"Imputing {len(available_outputs)} variables took {time.time() - total_start:.2f} seconds total"
240318
)
241-
return y
319+
320+
return result
242321

243322

244323
class ExtendedCPS_2024(ExtendedCPS):

policyengine_us_data/datasets/puf/puf.py

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -173,23 +173,30 @@ def impute_pension_contributions_to_puf(puf_df):
173173
["employment_income", "household_weight", "pre_tax_contributions"]
174174
)
175175

176-
from policyengine_us_data.utils import QRF
176+
from microimpute.models.qrf import QRF
177177

178-
pension_contributions = QRF()
178+
qrf = QRF()
179179

180-
pension_contributions.fit(
181-
cps_df[["employment_income"]],
182-
cps_df[["pre_tax_contributions"]],
183-
)
184-
return pension_contributions.predict(
185-
X=puf_df[["employment_income"]],
180+
# Combine predictors and target into single DataFrame for models.QRF
181+
cps_train = cps_df[["employment_income", "pre_tax_contributions"]]
182+
183+
fitted_model = qrf.fit(
184+
X_train=cps_train,
185+
predictors=["employment_income"],
186+
imputed_variables=["pre_tax_contributions"],
186187
)
187188

189+
# Predict using the fitted model
190+
predictions = fitted_model.predict(X_test=puf_df[["employment_income"]])
191+
192+
# Return the median (0.5 quantile) predictions
193+
return predictions[0.5]["pre_tax_contributions"]
194+
188195

189196
def impute_missing_demographics(
190197
puf: pd.DataFrame, demographics: pd.DataFrame
191198
) -> pd.DataFrame:
192-
from policyengine_us_data.utils import QRF
199+
from microimpute.models.qrf import QRF
193200

194201
puf_with_demographics = (
195202
puf[puf.RECID.isin(demographics.RECID)]
@@ -217,19 +224,30 @@ def impute_missing_demographics(
217224
"XTOT",
218225
]
219226

220-
demographics_from_puf = QRF()
227+
qrf = QRF()
221228

222-
demographics_from_puf.fit(
223-
puf_with_demographics[NON_DEMOGRAPHIC_VARIABLES],
224-
puf_with_demographics[DEMOGRAPHIC_VARIABLES],
229+
# Prepare training data with predictors and variables to impute
230+
train_data = puf_with_demographics[
231+
NON_DEMOGRAPHIC_VARIABLES + DEMOGRAPHIC_VARIABLES
232+
]
233+
234+
fitted_model = qrf.fit(
235+
X_train=train_data,
236+
predictors=NON_DEMOGRAPHIC_VARIABLES,
237+
imputed_variables=DEMOGRAPHIC_VARIABLES,
225238
)
226239

227240
puf_without_demographics = puf[
228241
~puf.RECID.isin(puf_with_demographics.RECID)
229242
].reset_index()
230-
predicted_demographics = demographics_from_puf.predict(
231-
X=puf_without_demographics,
243+
244+
# Predict demographics
245+
predictions = fitted_model.predict(
246+
X_test=puf_without_demographics[NON_DEMOGRAPHIC_VARIABLES]
232247
)
248+
249+
# Get median predictions
250+
predicted_demographics = predictions[0.5]
233251
puf_with_imputed_demographics = pd.concat(
234252
[puf_without_demographics, predicted_demographics], axis=1
235253
)

policyengine_us_data/datasets/sipp/sipp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from microdf import MicroDataFrame
33
import numpy as np
44
from policyengine_us import Microsimulation
5-
from microimpute.models import QRF
5+
from microimpute.models.qrf import QRF
66
from policyengine_us_data.storage import STORAGE_FOLDER
77
import pickle
88
from huggingface_hub import hf_hub_download
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from .soi import *
22
from .uprating import *
33
from .loss import *
4-
from .qrf import *
54
from .l0 import *
65
from .seed import *

policyengine_us_data/utils/qrf.py

Lines changed: 0 additions & 70 deletions
This file was deleted.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ dependencies = [
2929
"tqdm>=4.60.0",
3030
"microdf_python>=1.0.0",
3131
"setuptools>=60",
32-
"microimpute>=1.0.1",
32+
"microimpute>=1.1.4",
3333
"pip-system-certs>=3.0",
3434
"google-cloud-storage>=2.0.0",
3535
"google-auth>=2.0.0",

0 commit comments

Comments
 (0)