diff --git a/.gitignore b/.gitignore index a63f8215..9fb46dd6 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ **/_build !population_by_state.csv **/*.pkl +venv diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29b..a5b08451 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,5 @@ +- bump: minor + changes: + added: + - scf package loading module + - auto loan balance imputation notebook \ No newline at end of file diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 1ce98852..9cbc3f14 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -57,6 +57,7 @@ def generate(self): add_spm_variables(cps, spm_unit) add_household_variables(cps, household) add_rent(self, cps, person, household) + add_auto_loan_balance(self, cps) add_tips(self, cps) raw_data.close() @@ -167,6 +168,156 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): cps["real_estate_taxes"][mask] = imputed_values["real_estate_taxes"] +def add_auto_loan_balance(self, cps: h5py.File) -> None: + """ "Add auto loan balance variable.""" + self.save_dataset(cps) + cps_data = self.load_dataset() + + # Preprocess the CPS for imputation + lengths = {k: len(v) for k, v in cps_data.items()} + var_len = cps_data["person_household_id"].shape[0] + vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] + agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + + agg = ( + agg_data.groupby("person_household_id")[ + ["employment_income", "self_employment_income", "farm_income"] + ] + .sum() + .rename( + columns={ + "employment_income": "household_employment_income", + "self_employment_income": "household_self_employment_income", + "farm_income": "household_farm_income", + } + ) + .reset_index() + ) + + mask = cps_data["is_household_head"] + mask_len = mask.shape[0] + + cps_data = { + var: data[mask] if data.shape[0] == mask_len else data + for var, data in cps_data.items() + } + + CPS_RACE_MAPPING = { + 1: 1, # White only -> WHITE + 2: 2, # Black only -> BLACK/AFRICAN-AMERICAN + 3: 5, # American Indian, Alaskan Native only -> AMERICAN INDIAN/ALASKA NATIVE + 4: 4, # Asian only -> ASIAN + 5: 6, # Hawaiian/Pacific Islander only -> NATIVE HAWAIIAN/PACIFIC ISLANDER + 6: 7, # White-Black -> OTHER + 7: 7, # White-AI -> OTHER + 8: 7, # White-Asian -> OTHER + 9: 7, # White-HP -> OTHER + 10: 7, # Black-AI -> OTHER + 11: 7, # Black-Asian -> OTHER + 12: 7, # Black-HP -> OTHER + 13: 7, # AI-Asian -> OTHER + 14: 7, # AI-HP -> OTHER + 15: 7, # Asian-HP -> OTHER + 16: 7, # White-Black-AI -> OTHER + 17: 7, # White-Black-Asian -> OTHER + 18: 7, # White-Black-HP -> OTHER + 19: 7, # White-AI-Asian -> OTHER + 20: 7, # White-AI-HP -> OTHER + 21: 7, # White-Asian-HP -> OTHER + 22: 7, # Black-AI-Asian -> OTHER + 23: 7, # White-Black-AI-Asian -> OTHER + 24: 7, # White-AI-Asian-HP -> OTHER + 25: 7, # Other 3 race comb. -> OTHER + 26: 7, # Other 4 or 5 race comb. -> OTHER + } + + # Apply the mapping to recode the race values + cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)( + cps_data["cps_race"] + ) + + lengths = {k: len(v) for k, v in cps_data.items()} + var_len = cps_data["household_id"].shape[0] + vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] + receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + + receiver_data = receiver_data.merge( + agg[ + [ + "person_household_id", + "household_employment_income", + "household_self_employment_income", + "household_farm_income", + ] + ], + on="person_household_id", + how="left", + ) + receiver_data.drop("employment_income", axis=1, inplace=True) + receiver_data.drop("self_employment_income", axis=1, inplace=True) + receiver_data.drop("farm_income", axis=1, inplace=True) + + receiver_data.rename( + columns={ + "household_employment_income": "employment_income", + "household_self_employment_income": "self_employment_income", + "household_farm_income": "farm_income", + }, + inplace=True, + ) + + # Impute auto loan balance from the SCF + from policyengine_us_data.datasets.scf.scf import SCF_2022 + + scf_dataset = SCF_2022() + scf_data = scf_dataset.load_dataset() + scf_data = pd.DataFrame({key: scf_data[key] for key in scf_data.keys()}) + + PREDICTORS = [ + "age", + "is_female", + "cps_race", + "own_children_in_household", + "employment_income", + "self_employment_income", + "farm_income", + ] + IMPUTED_VARIABLES = ["auto_loan_balance"] + weights = ["household_weight"] + + donor_data = scf_data[PREDICTORS + IMPUTED_VARIABLES + weights].copy() + + donor_data = donor_data.loc[ + np.random.choice( + donor_data.index, + size=100_000, + replace=True, + p=donor_data.household_weight / donor_data.household_weight.sum(), + ) + ] + + from microimpute.models.qrf import QRF + + qrf_model = QRF() + fitted_model = qrf_model.fit( + X_train=donor_data, + predictors=PREDICTORS, + imputed_variables=IMPUTED_VARIABLES, + tune_hyperparameters=True, + ) + + imputations = fitted_model.predict(X_test=receiver_data) + + for var in IMPUTED_VARIABLES: + cps[var] = imputations[0.5][var] + + cps["auto_loan_interest"] = ( + cps["auto_loan_balance"] * scf_data["auto_loan_interest"].mean() / 100 + ) * 12 + + self.save_dataset(cps) + + def add_takeup(self): data = self.load_dataset() diff --git a/policyengine_us_data/datasets/scf/__init__.py b/policyengine_us_data/datasets/scf/__init__.py new file mode 100644 index 00000000..9e35812e --- /dev/null +++ b/policyengine_us_data/datasets/scf/__init__.py @@ -0,0 +1 @@ +from policyengine_us_data.datasets.scf import * diff --git a/policyengine_us_data/datasets/scf/fed_scf.py b/policyengine_us_data/datasets/scf/fed_scf.py new file mode 100644 index 00000000..003ce7d2 --- /dev/null +++ b/policyengine_us_data/datasets/scf/fed_scf.py @@ -0,0 +1,119 @@ +from policyengine_core.data import Dataset +from tqdm import tqdm +from typing import List, Optional, Union +import requests +from io import BytesIO +from zipfile import ZipFile +import pandas as pd +import os +from policyengine_us_data.storage import STORAGE_FOLDER + + +class FedSCF(Dataset): + """Dataset containing Survey of Consumer Finances data from the Federal Reserve.""" + + time_period: int + """Year of the dataset.""" + + def load(self): + """Loads the raw SCF dataset. + + Returns: + pd.DataFrame: The raw SCF data. + """ + # Check if file exists + if not os.path.exists(self.file_path): + print(f"Raw SCF dataset file not found. Generating it.") + self.generate() + + # Open the HDF store and return the DataFrame + with pd.HDFStore(self.file_path, mode="r") as storage: + return storage["data"] + + def generate(self): + if self._scf_download_url is None: + raise ValueError( + f"No raw SCF data URL known for year {self.time_period}." + ) + + url = self._scf_download_url + + response = requests.get(url, stream=True) + total_size_in_bytes = int( + response.headers.get("content-length", 200e6) + ) + progress_bar = tqdm( + total=total_size_in_bytes, + unit="iB", + unit_scale=True, + desc="Downloading SCF", + ) + if response.status_code == 404: + raise FileNotFoundError( + "Received a 404 response when fetching the data." + ) + with BytesIO() as file: + content_length_actual = 0 + for data in response.iter_content(int(1e6)): + progress_bar.update(len(data)) + content_length_actual += len(data) + file.write(data) + progress_bar.set_description("Downloaded SCF") + progress_bar.total = content_length_actual + progress_bar.close() + + zipfile = ZipFile(file) + with pd.HDFStore(self.file_path, mode="w") as storage: + # Find the Stata file, which should be the only .dta file in the zip + dta_files = [ + f for f in zipfile.namelist() if f.endswith(".dta") + ] + if not dta_files: + raise FileNotFoundError( + "No .dta file found in the SCF zip archive." + ) + # Usually there's only one .dta file, but we'll handle multiple just in case + for dta_file in dta_files: + with zipfile.open(dta_file) as f: + # Read the Stata file with pandas + data = pd.read_stata(f) + # Add year column + data["year"] = self.time_period + # Store in HDF file + storage["data"] = data + + @property + def _scf_download_url(self) -> str: + return SCF_URL_BY_YEAR.get(self.time_period) + + +class FedSCF_2022(FedSCF): + time_period = 2022 + label = "Federal Reserve SCF (2022)" + name = "fed_scf_2022" + file_path = STORAGE_FOLDER / "fed_scf_2022.h5" + data_format = Dataset.TABLES + + +class FedSCF_2019(FedSCF): + time_period = 2019 + label = "Federal Reserve SCF (2019)" + name = "fed_scf_2019" + file_path = STORAGE_FOLDER / "fed_scf_2019.h5" + data_format = Dataset.TABLES + + +class FedSCF_2016(FedSCF): + time_period = 2016 + label = "Federal Reserve SCF (2016)" + name = "fed_scf_2016" + file_path = STORAGE_FOLDER / "fed_scf_2016.h5" + data_format = Dataset.TABLES + + +# URLs for the SCF data by year +SCF_URL_BY_YEAR = { + 2016: "https://www.federalreserve.gov/econres/files/scfp2016s.zip", + 2019: "https://www.federalreserve.gov/econres/files/scfp2019s.zip", + 2022: "https://www.federalreserve.gov/econres/files/scfp2022s.zip", +} diff --git a/policyengine_us_data/datasets/scf/scf.py b/policyengine_us_data/datasets/scf/scf.py new file mode 100644 index 00000000..6c0b5b0c --- /dev/null +++ b/policyengine_us_data/datasets/scf/scf.py @@ -0,0 +1,422 @@ +from policyengine_core.data import Dataset +from policyengine_us_data.storage import STORAGE_FOLDER +from policyengine_us_data.datasets.scf.fed_scf import ( + FedSCF, + FedSCF_2016, + FedSCF_2019, + FedSCF_2022, +) +import pandas as pd +import numpy as np +import os +import h5py +from typing import List, Optional, Union, Type + + +class SCF(Dataset): + """Dataset containing processed Survey of Consumer Finances data.""" + + name = "scf" + label = "SCF" + raw_scf: Type[FedSCF] = None + time_period: int = None + data_format = Dataset.ARRAYS + frac: float | None = 1 + + def generate(self): + """Generates the SCF dataset for PolicyEngine US microsimulations. + + Downloads the raw SCF data and processes it for use in PolicyEngine. + """ + if self.raw_scf is None: + raise ValueError("No raw SCF data class specified.") + + # Load raw SCF data + raw_scf_instance = self.raw_scf(require=True) + raw_data = raw_scf_instance.load() + + # Make sure raw_data is a DataFrame + if not isinstance(raw_data, pd.DataFrame): + raise TypeError( + f"Expected DataFrame but got {type(raw_data)} from {self.raw_scf.name}" + ) + + # Initialize dictionary for arrays + scf = {} + + # Process the SCF data - convert to dictionary + add_variables_to_dict(scf, raw_data) + rename_columns_to_match_cps(scf, raw_data) + add_auto_loan_interest(scf, self.time_period) + + # Save the dataset - explicitly convert any arrays that aren't numpy arrays + for key in scf: + if not isinstance(scf[key], np.ndarray): + try: + scf[key] = np.array(scf[key]) + except Exception as e: + print( + f"Warning: Could not convert {key} to numpy array: {e}" + ) + + self.save_dataset(scf) + + # Downsample if needed + if self.frac is not None and self.frac < 1.0: + self.downsample(frac=self.frac) + + def load_dataset(self): + """Loads the processed SCF dataset. + + Returns: + dict: Dictionary of numpy arrays with preprocessed SCF data. + """ + # Check if file exists + if not os.path.exists(self.file_path): + print(f"SCF dataset file not found. Generating it.") + self.generate() + + # Open the HDF5 file and handle potential errors + try: + with h5py.File(self.file_path, "r") as f: + # Convert to a dictionary of numpy arrays + return {key: f[key][()] for key in f.keys()} + except Exception as e: + print(f"Error loading HDF5 file: {e}") + # Alternative loading method - use pandas to load and then convert + print("Trying alternative loading method...") + try: + with pd.HDFStore(self.file_path, mode="r") as store: + data = store["data"] + # Convert DataFrame to dict of arrays + return {col: data[col].values for col in data.columns} + except Exception as e2: + print(f"Alternative loading also failed: {e2}") + # Last resort - regenerate the file + print("Regenerating dataset...") + if os.path.exists(self.file_path): + os.remove(self.file_path) + self.generate() + with h5py.File(self.file_path, "r") as f: + return {key: f[key][()] for key in f.keys()} + + def downsample(self, frac: float): + """Downsamples the SCF dataset. + + Args: + frac (float): Fraction of data to keep. + """ + from policyengine_us import Microsimulation + + # Store original dtypes before modifying + original_data: dict = self.load_dataset() + original_dtypes = { + key: original_data[key].dtype for key in original_data + } + + sim = Microsimulation(dataset=self) + sim.subsample(frac=frac) + + for key in original_data: + if key not in sim.tax_benefit_system.variables: + continue + values = sim.calculate(key).values + + # Preserve the original dtype if possible + if ( + key in original_dtypes + and hasattr(values, "dtype") + and values.dtype != original_dtypes[key] + ): + try: + original_data[key] = values.astype(original_dtypes[key]) + except: + # If conversion fails, log it but continue + print( + f"Warning: Could not convert {key} back to {original_dtypes[key]}" + ) + original_data[key] = values + else: + original_data[key] = values + + self.save_dataset(original_data) + + +def add_variables_to_dict(scf: dict, raw_data: pd.DataFrame) -> None: + """Add all variables directly from the SCF DataFrame to the SCF dictionary. + + Args: + scf (dict): The SCF dataset dictionary to populate. + raw_data (pd.DataFrame): The raw SCF data. + """ + # Simply copy all columns from the DataFrame to the dictionary as numpy arrays + for column in raw_data.columns: + if pd.api.types.is_numeric_dtype(raw_data[column]): + # Handle NaN values for numeric columns + scf[column] = raw_data[column].fillna(0).values + elif pd.api.types.is_categorical_dtype(raw_data[column]): + # Convert categorical to numbers or strings as needed + scf[column] = raw_data[column].cat.codes.values + else: + # Convert object/string columns to numpy arrays + # For simplicity, convert to string + scf[column] = raw_data[column].astype(str).values + + +def rename_columns_to_match_cps(scf: dict, raw_data: pd.DataFrame) -> None: + """Renames SCF columns to match CPS column names. + + Args: + scf (dict): The SCF data dictionary to populate. + raw_data (pd.DataFrame): The raw SCF dataframe. + """ + # Age variable - directly map + if "age" in raw_data.columns: + scf["age"] = raw_data["age"].values + + # Sex → is_female (SCF: hhsex: 1=male, 2=female) + if "hhsex" in raw_data.columns: + scf["is_female"] = (raw_data["hhsex"] == 2).values + + # Race → cps_race + # SCF has multiple race variables: race, racecl, racecl4, racecl5 + if "racecl5" in raw_data.columns: + # SCF's racecl5: 1=White, 2=Black, 3=Hispanic, 4=Asian, 7=Other + race_map = { + 1: 1, # White + 2: 2, # Black + 3: 3, # Hispanic + 4: 4, # Asian + 5: 7, # Other + } + scf["cps_race"] = ( + raw_data["racecl5"].map(race_map).fillna(6).astype(int).values + ) + # Hispanic indicator + scf["is_hispanic"] = (raw_data["racecl5"] == 3).values + + # Children in household + if "kids" in raw_data.columns: + scf["own_children_in_household"] = ( + raw_data["kids"].fillna(0).astype(int).values + ) + + # Employment & self-employment income + if "wageinc" in raw_data.columns: + scf["employment_income"] = raw_data["wageinc"].fillna(0).values + if "bussefarminc" in raw_data.columns: + scf["self_employment_income"] = ( + raw_data["bussefarminc"].fillna(0).values + ) + # Farm income - SCF bundles with business income + scf["farm_income"] = np.zeros_like(scf["self_employment_income"]) + + # Rent + if "rent" in raw_data.columns: + scf["rent"] = raw_data["rent"].fillna(0).values + + # Vehicle loan (auto loan) + if "veh_inst" in raw_data.columns: + scf["auto_loan_balance"] = raw_data["veh_inst"].fillna(0).values + + # Household weights + if "wgt" in raw_data.columns: + scf["household_weight"] = raw_data["wgt"].fillna(0).values + + # Marital status + if "married" in raw_data.columns: + # In SCF, married is a binary flag + scf["is_married"] = (raw_data["married"] == 1).values + # Create placeholders for other marital statuses + scf["is_widowed"] = np.zeros(len(raw_data), dtype=bool) + scf["is_separated"] = np.zeros(len(raw_data), dtype=bool) + + # Additional variables if available in raw_data + # Financial variables + variable_mappings = { + "intdivinc": "interest_income", + "ssretinc": "social_security_retirement", + "houses": "real_estate_value", + "mrthel": "mortgage_debt", + "edn_inst": "student_loan_debt", + "ccbal": "credit_card_debt", + } + + for scf_var, pe_var in variable_mappings.items(): + if scf_var in raw_data.columns: + scf[pe_var] = raw_data[scf_var].fillna(0).values + + +def add_auto_loan_interest(scf: dict, year: int) -> None: + """Adds auto loan interest to the summarized SCF dataset from the full SCF.""" + import requests + import zipfile + import io + import logging + from tqdm import tqdm + + logger = logging.getLogger(__name__) + + url = f"https://www.federalreserve.gov/econres/files/scf{year}s.zip" + + # Define columns of interest + columns = ["yy1", "y1", "x2219", "x2319", "x2419", "x7170"] + + try: + # Download zip file + logger.info(f"Downloading SCF data for year {year}") + try: + response = requests.get(url, timeout=60) + response.raise_for_status() # Raise an error for bad responses + except requests.exceptions.RequestException as e: + logger.error( + f"Network error downloading SCF data for year {year}: {str(e)}" + ) + raise RuntimeError( + f"Failed to download SCF data for year {year}" + ) from e + + # Process zip file + try: + logger.info("Creating zipfile from downloaded content") + z = zipfile.ZipFile(io.BytesIO(response.content)) + + # Find the .dta file in the zip + dta_files = [f for f in z.namelist() if f.endswith(".dta")] + if not dta_files: + logger.error(f"No Stata files found in zip for year {year}") + raise ValueError( + f"No Stata files found in zip for year {year}" + ) + + logger.info(f"Found Stata files: {dta_files}") + + # Read the Stata file + try: + logger.info(f"Reading Stata file: {dta_files[0]}") + with z.open(dta_files[0]) as f: + df = pd.read_stata(io.BytesIO(f.read()), columns=columns) + logger.info(f"Read DataFrame with shape {df.shape}") + except Exception as e: + logger.error( + f"Error reading Stata file for year {year}: {str(e)}" + ) + raise RuntimeError( + f"Failed to process Stata file for year {year}" + ) from e + + except zipfile.BadZipFile as e: + logger.error(f"Bad zip file for year {year}: {str(e)}") + raise RuntimeError( + f"Downloaded zip file is corrupt for year {year}" + ) from e + + # Process the interest data and add to final SCF dictionary + auto_int = df[columns].copy() + auto_int["x2219"] = auto_int["x2219"].replace(-1, 0) + auto_int["x2319"] = auto_int["x2319"].replace(-1, 0) + auto_int["x2419"] = auto_int["x2419"].replace(-1, 0) + auto_int["x7170"] = auto_int["x7170"].replace(-1, 0) + # Calculate total auto loan interest (sum of all auto loan interest variables) + auto_int["auto_loan_interest"] = auto_int[ + ["x2219", "x2319", "x2419", "x7170"] + ].sum(axis=1) + + # Check if we have household identifiers (y1, yy1) in both datasets + if ( + "y1" in scf + and "yy1" in scf + and "y1" in auto_int.columns + and "yy1" in auto_int.columns + ): + logger.info( + "Using household identifiers (y1, yy1) to ensure correct matching" + ) + + # Create unique identifier from y1 and yy1 for each dataset + # In the original data + auto_int["household_id"] = ( + auto_int["y1"].astype(str) + "_" + auto_int["yy1"].astype(str) + ) + + # In the SCF dictionary + # Convert the arrays to a temporary DataFrame for easier handling + temp_scf = pd.DataFrame({"y1": scf["y1"], "yy1": scf["yy1"]}) + temp_scf["household_id"] = ( + temp_scf["y1"].astype(str) + "_" + temp_scf["yy1"].astype(str) + ) + + # Create a mapping from household ID to auto loan interest + id_to_interest = dict( + zip( + auto_int["household_id"].values, + auto_int["auto_loan_interest"].values, + ) + ) + + # Create array for auto loan interest that matches SCF order + interest_values = np.zeros(len(temp_scf), dtype=float) + + # Fill in interest values based on household ID + for i, household_id in enumerate(temp_scf["household_id"]): + if household_id in id_to_interest: + interest_values[i] = id_to_interest[household_id] + + # Add to SCF dictionary + scf["auto_loan_interest"] = interest_values / 100 + logger.info( + f"Added auto loan interest data for year {year} with household matching" + ) + else: + # Fallback to simple assignment if identifiers aren't present + logger.warning( + "Household identifiers not found. Using direct array assignment (may not match households correctly)" + ) + scf["auto_loan_interest"] = auto_int["auto_loan_interest"].values + logger.info( + f"Added auto loan interest data for year {year} without household matching" + ) + + except Exception as e: + logger.error(f"Error processing year {year}: {str(e)}") + raise + + +class SCF_2022(SCF): + """SCF dataset for 2022.""" + + name = "scf_2022" + label = "SCF 2022" + raw_scf = FedSCF_2022 + file_path = STORAGE_FOLDER / "scf_2022.h5" + time_period = 2022 + frac = 1 + + +class SCF_2019(SCF): + """SCF dataset for 2019.""" + + name = "scf_2019" + label = "SCF 2019" + raw_scf = FedSCF_2019 + file_path = STORAGE_FOLDER / "scf_2019.h5" + time_period = 2019 + frac = 1 + + +class SCF_2016(SCF): + """SCF dataset for 2016.""" + + name = "scf_2016" + label = "SCF 2016" + raw_scf = FedSCF_2016 + file_path = STORAGE_FOLDER / "scf_2016.h5" + time_period = 2016 + frac = 1 + + +if __name__ == "__main__": + # Generate all SCF datasets + SCF_2016().generate() + SCF_2019().generate() + SCF_2022().generate() diff --git a/policyengine_us_data/tests/test_datasets/test_cps.py b/policyengine_us_data/tests/test_datasets/test_cps.py index 0cf2bba1..b4bd59ed 100644 --- a/policyengine_us_data/tests/test_datasets/test_cps.py +++ b/policyengine_us_data/tests/test_datasets/test_cps.py @@ -27,3 +27,16 @@ def test_policyengine_cps_loads(year: int): sim = Microsimulation(dataset=dataset) assert not sim.calculate("household_net_income").isna().any() + + +def test_cps_has_auto_loan_interest(): + from policyengine_us_data.datasets.cps import CPS_2024 + from policyengine_us import Microsimulation + + sim = Microsimulation(dataset=CPS_2024) + # Ensure we impute at least $65 billion in auto loan interest. + # We currently target $256 billion. + AUTO_LOAN_INTEREST_MINIMUM = 65e9 + assert ( + sim.calculate("auto_loan_interest").sum() > AUTO_LOAN_INTEREST_MINIMUM + )