Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3641623
Remove local QRF implementation and use microimpute package
MaxGhenis Jul 30, 2025
ead55e9
Fix utils/__init__.py to remove QRF import
MaxGhenis Jul 30, 2025
876a189
Add changelog entry
MaxGhenis Jul 30, 2025
2c96dc5
Fix QRF predict call to use correct columns
MaxGhenis Jul 30, 2025
e587a39
Fix imports to use microimpute.utils.qrf instead of microimpute.model…
MaxGhenis Jul 30, 2025
2d73684
Update microimpute to >=1.0.2
MaxGhenis Jul 30, 2025
63f9b38
Fix impute_income_variables to handle variables one by one
MaxGhenis Jul 31, 2025
cac79ed
Format code with Black
MaxGhenis Jul 31, 2025
842f64e
Fix prediction indexing to use iloc instead of column name
MaxGhenis Jul 31, 2025
0a435b3
Trigger CI re-run
MaxGhenis Jul 31, 2025
b697214
Handle both DataFrame and Series return types from predict
MaxGhenis Jul 31, 2025
fb8f32f
Fix incorrect QRF import for wealth imputation
MaxGhenis Jul 31, 2025
2d992cf
Refactor all code to use microimpute.models.qrf.QRF instead of utils.qrf
MaxGhenis Jul 31, 2025
6335794
Format code with black
MaxGhenis Jul 31, 2025
74071fe
Fix KeyError for missing imputation variables
MaxGhenis Jul 31, 2025
b286416
Use non-sequential imputation to avoid memory issues and missing vari…
MaxGhenis Jul 31, 2025
f0eb281
Revert "Use non-sequential imputation to avoid memory issues and miss…
MaxGhenis Jul 31, 2025
2f39624
Keep sequential imputation with memory optimizations and missing vari…
MaxGhenis Jul 31, 2025
c0474a9
Add memory optimizations for sequential QRF imputation
MaxGhenis Jul 31, 2025
3e604f2
Use batched sequential imputation to reduce memory usage
MaxGhenis Jul 31, 2025
44a8049
Further reduce memory usage for sequential imputation
MaxGhenis Jul 31, 2025
af2c828
Update microimpute version
juaristi22 Aug 1, 2025
895b921
skip missing imputed variables
juaristi22 Aug 1, 2025
32cc33e
lint
juaristi22 Aug 1, 2025
4b73891
update microimpute version
juaristi22 Aug 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: patch
changes:
fixed:
- Moved QRF implementation to microimpute package to avoid code duplication
14 changes: 9 additions & 5 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from policyengine_us_data.utils.uprating import (
create_policyengine_uprating_factors_table,
)
from policyengine_us_data.utils import QRF
from microimpute.models.qrf import QRF
import logging


Expand Down Expand Up @@ -176,19 +176,23 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame):

qrf = QRF()
logging.info("Training imputation model for rent and real estate taxes.")
qrf.fit(train_df[PREDICTORS], train_df[IMPUTATIONS])
fitted_model = qrf.fit(
X_train=train_df,
predictors=PREDICTORS,
imputed_variables=IMPUTATIONS,
)
logging.info("Imputing rent and real estate taxes.")
imputed_values = qrf.predict(inference_df[PREDICTORS])
imputed_values = fitted_model.predict(X_test=inference_df)
logging.info("Imputation complete.")
cps["rent"] = np.zeros_like(cps["age"])
cps["rent"][mask] = imputed_values["rent"]
cps["rent"][mask] = imputed_values[0.5]["rent"]
# Assume zero housing assistance since
cps["pre_subsidy_rent"] = cps["rent"]
cps["housing_assistance"] = np.zeros_like(
cps["spm_unit_capped_housing_subsidy_reported"]
)
cps["real_estate_taxes"] = np.zeros_like(cps["age"])
cps["real_estate_taxes"][mask] = imputed_values["real_estate_taxes"]
cps["real_estate_taxes"][mask] = imputed_values[0.5]["real_estate_taxes"]


def add_takeup(self):
Expand Down
111 changes: 95 additions & 16 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from policyengine_us_data.datasets.puf import *
import pandas as pd
import os
from policyengine_us_data.utils import QRF
from microimpute.models.qrf import QRF
import time
import logging
import gc

# These are sorted by magnitude.
# First 15 contain 90%.
Expand All @@ -28,7 +29,7 @@
"self_employment_income",
"w2_wages_from_qualified_business",
"unadjusted_basis_qualified_property",
"business_is_sstb",
"business_is_sstb", # bool
"short_term_capital_gains",
"qualified_dividend_income",
"charitable_cash_donations",
Expand Down Expand Up @@ -220,25 +221,103 @@ def impute_income_variables(
predictors: list[str] = None,
outputs: list[str] = None,
):
X_train = puf_sim.calculate_dataframe(predictors)
y_train = puf_sim.calculate_dataframe(outputs)
X = cps_sim.calculate_dataframe(predictors)
y = pd.DataFrame(columns=outputs, index=X.index)
model = QRF()
start = time.time()
model.fit(
X_train,
y_train,

# Calculate all variables together to preserve dependencies
X_train = puf_sim.calculate_dataframe(predictors + outputs)

# Check which outputs are actually in the result
available_outputs = [col for col in outputs if col in X_train.columns]
missing_outputs = [col for col in outputs if col not in X_train.columns]

if missing_outputs:
logging.warning(
f"The following {len(missing_outputs)} variables were not calculated: {missing_outputs}"
)
# Log the specific missing variable that's causing issues
if "recapture_of_investment_credit" in missing_outputs:
logging.error(
"recapture_of_investment_credit is missing from PUF calculation!"
)

logging.info(
f"X_train shape: {X_train.shape}, columns: {len(X_train.columns)}"
)

X_test = cps_sim.calculate_dataframe(predictors)

logging.info(
f"Training imputation models from the PUF took {time.time() - start:.2f} seconds"
f"Imputing {len(available_outputs)} variables using batched sequential QRF"
)
start = time.time()
y = model.predict(X)
total_start = time.time()

# Batch variables to avoid memory issues with sequential imputation
batch_size = 10 # Reduce to 10 variables at a time
result = pd.DataFrame(index=X_test.index)

# Sample training data more aggressively upfront
sample_size = min(5000, len(X_train)) # Reduced from 5000
if len(X_train) > sample_size:
logging.info(
f"Sampling training data from {len(X_train)} to {sample_size} rows"
)
X_train_sampled = X_train.sample(n=sample_size, random_state=42)
else:
X_train_sampled = X_train

for batch_start in range(0, len(available_outputs), batch_size):
batch_end = min(batch_start + batch_size, len(available_outputs))
batch_vars = available_outputs[batch_start:batch_end]

logging.info(
f"Processing batch {batch_start//batch_size + 1}: variables {batch_start+1}-{batch_end} ({batch_vars})"
)

# Force garbage collection before each batch
gc.collect()

# Create a fresh QRF for each batch
qrf = QRF(
log_level="INFO",
memory_efficient=True,
batch_size=10,
cleanup_interval=5,
)

# Use pre-sampled data for this batch
batch_X_train = X_train_sampled[predictors + batch_vars].copy()

# Fit model for this batch with sequential imputation within the batch
fitted_model = qrf.fit(
X_train=batch_X_train,
predictors=predictors,
imputed_variables=batch_vars,
n_jobs=1, # Single thread to reduce memory overhead
)

# Predict for this batch
batch_predictions = fitted_model.predict(X_test=X_test)

# Extract median predictions and add to result
for var in batch_vars:
result[var] = batch_predictions[0.5][var]

# Clean up batch objects
del fitted_model
del batch_predictions
del batch_X_train
gc.collect()

logging.info(f"Completed batch {batch_start//batch_size + 1}")

# Add zeros for missing variables
for var in missing_outputs:
result[var] = 0

logging.info(
f"Predicting imputed values took {time.time() - start:.2f} seconds"
f"Imputing {len(available_outputs)} variables took {time.time() - total_start:.2f} seconds total"
)
return y

return result


class ExtendedCPS_2024(ExtendedCPS):
Expand Down
48 changes: 33 additions & 15 deletions policyengine_us_data/datasets/puf/puf.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,30 @@ def impute_pension_contributions_to_puf(puf_df):
["employment_income", "household_weight", "pre_tax_contributions"]
)

from policyengine_us_data.utils import QRF
from microimpute.models.qrf import QRF

pension_contributions = QRF()
qrf = QRF()

pension_contributions.fit(
cps_df[["employment_income"]],
cps_df[["pre_tax_contributions"]],
)
return pension_contributions.predict(
X=puf_df[["employment_income"]],
# Combine predictors and target into single DataFrame for models.QRF
cps_train = cps_df[["employment_income", "pre_tax_contributions"]]

fitted_model = qrf.fit(
X_train=cps_train,
predictors=["employment_income"],
imputed_variables=["pre_tax_contributions"],
)

# Predict using the fitted model
predictions = fitted_model.predict(X_test=puf_df[["employment_income"]])

# Return the median (0.5 quantile) predictions
return predictions[0.5]["pre_tax_contributions"]


def impute_missing_demographics(
puf: pd.DataFrame, demographics: pd.DataFrame
) -> pd.DataFrame:
from policyengine_us_data.utils import QRF
from microimpute.models.qrf import QRF

puf_with_demographics = (
puf[puf.RECID.isin(demographics.RECID)]
Expand Down Expand Up @@ -217,19 +224,30 @@ def impute_missing_demographics(
"XTOT",
]

demographics_from_puf = QRF()
qrf = QRF()

demographics_from_puf.fit(
puf_with_demographics[NON_DEMOGRAPHIC_VARIABLES],
puf_with_demographics[DEMOGRAPHIC_VARIABLES],
# Prepare training data with predictors and variables to impute
train_data = puf_with_demographics[
NON_DEMOGRAPHIC_VARIABLES + DEMOGRAPHIC_VARIABLES
]

fitted_model = qrf.fit(
X_train=train_data,
predictors=NON_DEMOGRAPHIC_VARIABLES,
imputed_variables=DEMOGRAPHIC_VARIABLES,
)

puf_without_demographics = puf[
~puf.RECID.isin(puf_with_demographics.RECID)
].reset_index()
predicted_demographics = demographics_from_puf.predict(
X=puf_without_demographics,

# Predict demographics
predictions = fitted_model.predict(
X_test=puf_without_demographics[NON_DEMOGRAPHIC_VARIABLES]
)

# Get median predictions
predicted_demographics = predictions[0.5]
puf_with_imputed_demographics = pd.concat(
[puf_without_demographics, predicted_demographics], axis=1
)
Expand Down
2 changes: 1 addition & 1 deletion policyengine_us_data/datasets/sipp/sipp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from microdf import MicroDataFrame
import numpy as np
from policyengine_us import Microsimulation
from microimpute.models import QRF
from microimpute.models.qrf import QRF
from policyengine_us_data.storage import STORAGE_FOLDER
import pickle
from huggingface_hub import hf_hub_download
Expand Down
1 change: 0 additions & 1 deletion policyengine_us_data/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .soi import *
from .uprating import *
from .loss import *
from .qrf import *
from .l0 import *
from .seed import *
70 changes: 0 additions & 70 deletions policyengine_us_data/utils/qrf.py

This file was deleted.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies = [
"tqdm>=4.60.0",
"microdf_python>=1.0.0",
"setuptools>=60",
"microimpute>=1.0.1",
"microimpute>=1.1.4",
"pip-system-certs>=3.0",
"google-cloud-storage>=2.0.0",
"google-auth>=2.0.0",
Expand Down
Loading