diff --git a/usaspending_api/common/etl/spark.py b/usaspending_api/common/etl/spark.py index 85067c1c5c..134707bd9c 100644 --- a/usaspending_api/common/etl/spark.py +++ b/usaspending_api/common/etl/spark.py @@ -16,7 +16,7 @@ from pyspark.sql.functions import col, concat, concat_ws, expr, lit, regexp_replace, to_date, transform, when from pyspark.sql.types import ArrayType, DecimalType, StringType, StructType -from usaspending_api.accounts.models import FederalAccount, TreasuryAppropriationAccount +from usaspending_api.accounts.models import AppropriationAccountBalances, FederalAccount, TreasuryAppropriationAccount from usaspending_api.common.helpers.spark_helpers import ( get_broker_jdbc_url, get_jdbc_connection_properties, @@ -51,6 +51,7 @@ MAX_PARTITIONS = CONFIG.SPARK_MAX_PARTITIONS _USAS_RDS_REF_TABLES = [ Agency, + AppropriationAccountBalances, Cfda, CGAC, CityCountyStateCode, diff --git a/usaspending_api/download/management/commands/delta_downloads/builders.py b/usaspending_api/download/management/commands/delta_downloads/builders.py index c2c084a135..9a2db1ed68 100644 --- a/usaspending_api/download/management/commands/delta_downloads/builders.py +++ b/usaspending_api/download/management/commands/delta_downloads/builders.py @@ -17,8 +17,9 @@ def __init__( self, spark: SparkSession, account_download_filter: AccountDownloadFilter, - table_name: str = "rpt.account_download", + award_financial_table: str = "rpt.account_download", ): + # Resolve Filters self.reporting_fiscal_year = account_download_filter.fy self.submission_types = account_download_filter.submission_types self.reporting_fiscal_quarter = account_download_filter.quarter or account_download_filter.period // 3 @@ -28,7 +29,16 @@ def __init__( self.budget_function = account_download_filter.budget_function self.budget_subfunction = account_download_filter.budget_subfunction self.def_codes = account_download_filter.def_codes - self.df: DataFrame = spark.table(table_name) + + # Base Dataframes + self._award_financial_df: DataFrame = spark.table(award_financial_table) + self.aab = spark.table("global_temp.appropriation_account_balances") + self.sa = spark.table("global_temp.submission_attributes") + self.taa = spark.table("global_temp.treasury_appropriation_account") + self.cgac_aid = spark.table("global_temp.cgac") + self.cgac_ata = spark.table("global_temp.cgac") + self.fa = spark.table("global_temp.federal_account") + self.ta = spark.table("global_temp.toptier_agency") @property def dynamic_filters(self) -> Column: @@ -91,8 +101,60 @@ def non_zero_filters(self) -> Column: ) @staticmethod - def collect_concat(col_name: str, concat_str: str = "; ") -> Column: - return sf.concat_ws(concat_str, sf.sort_array(sf.collect_set(col_name))).alias(col_name) + def collect_concat(col_name: str, concat_str: str = "; ", alias: str | None = None) -> Column: + if alias is None: + alias = col_name + return sf.concat_ws(concat_str, sf.sort_array(sf.collect_set(col_name))).alias(alias) + + @property + def fy_quarter_period(self) -> Column: + return sf.when( + sf.col("quarter_format_flag"), + sf.concat(sf.lit("FY"), sf.col("reporting_fiscal_year"), sf.lit("Q"), sf.col("reporting_fiscal_quarter")), + ).otherwise( + sf.concat( + sf.lit("FY"), + sf.col("reporting_fiscal_year"), + sf.lit("P"), + sf.lpad(sf.col("reporting_fiscal_period"), 2, "0"), + ) + ) + + @property + def _account_balances_df(self) -> DataFrame: + return ( + self.aab.join(self.sa, on="submission_id", how="inner") + .join(self.taa, on="treasury_account_identifier", how="leftouter") + .join( + self.cgac_aid.withColumnRenamed("agency_name", "agency_identifier_name"), + on=(self.taa.agency_id == self.cgac_aid.cgac_code), + how="leftouter", + ) + .join( + self.cgac_ata.withColumnRenamed("agency_name", "allocation_transfer_agency_identifier_name"), + on=(self.taa.allocation_transfer_agency_id == self.cgac_ata.cgac_code), + how="leftouter", + ) + .join(self.fa, on=self.taa.federal_account_id == self.fa.id, how="leftouter") + .join(self.ta, on=self.fa.parent_toptier_agency_id == self.ta.toptier_agency_id, how="leftouter") + .filter( + sf.col("submission_id").isin( + get_submission_ids_for_periods( + self.reporting_fiscal_year, self.reporting_fiscal_quarter, self.reporting_fiscal_period + ) + ) + ) + .filter(self.dynamic_filters) + .withColumn("submission_period", self.fy_quarter_period) + ) + + @property + @abstractmethod + def account_balances(self) -> DataFrame: ... + + @property + @abstractmethod + def object_class_program_activity(self) -> DataFrame: ... @property @abstractmethod @@ -105,14 +167,9 @@ def source_dfs(self) -> list[DataFrame]: class FederalAccountDownloadDataFrameBuilder(AbstractAccountDownloadDataFrameBuilder): - def __init__( - self, - spark: SparkSession, - account_download_filter: AccountDownloadFilter, - table_name: str = "rpt.account_download", - ): - super().__init__(spark, account_download_filter, table_name) - self.agg_cols = { + @property + def award_financial_agg_cols(self) -> dict[str, Column]: + return { "reporting_agency_name": self.collect_concat, "budget_function": self.collect_concat, "budget_subfunction": self.collect_concat, @@ -122,7 +179,10 @@ def __init__( "USSGL497200_downward_adj_of_prior_year_paid_deliv_orders_oblig": self.filter_and_sum, "last_modified_date": lambda col: sf.max(col).alias(col), } - self.select_cols = ( + + @property + def award_financial_select_cols(self) -> list[Any]: + return ( [sf.col("federal_owning_agency_name").alias("owning_agency_name")] + [ col @@ -131,7 +191,10 @@ def __init__( ] + ["last_modified_date"] ) - filter_cols = [ + + @property + def award_financial_filter_cols(self) -> list[str]: + return [ "submission_id", "federal_account_id", "funding_toptier_agency_id", @@ -142,7 +205,14 @@ def __init__( "reporting_fiscal_year", "quarter_format_flag", ] - self.groupby_cols = [col for col in self.df.columns if col not in list(self.agg_cols) + filter_cols] + + @property + def award_financial_groupby_cols(self) -> list[Any]: + return [ + col + for col in self._award_financial_df.columns + if col not in list(self.award_financial_agg_cols) + self.award_financial_filter_cols + ] def filter_to_latest_submissions_for_agencies(self, col_name: str, otherwise: Any = None) -> Column: """Filter to the latest submission regardless of whether the agency submitted on a monthly or quarterly basis""" @@ -162,21 +232,219 @@ def filter_to_latest_submissions_for_agencies(self, col_name: str, otherwise: An def filter_and_sum(self, col_name: str) -> Column: return sf.sum(self.filter_to_latest_submissions_for_agencies(col_name)).alias(col_name) + @property + def account_balances_agg_cols(self) -> list[Column]: + return [ + self.collect_concat(self.sa.reporting_agency_name, alias="reporting_agency_name"), + self.collect_concat("agency_identifier_name"), + self.collect_concat("budget_function_title", alias="budget_function"), + self.collect_concat("budget_subfunction_title", alias="budget_subfunction"), + sf.sum(sf.col("budget_authority_unobligated_balance_brought_forward_fyb")).alias( + "budget_authority_unobligated_balance_brought_forward" + ), + sf.sum(sf.col("adjustments_to_unobligated_balance_brought_forward_cpe")).alias( + "adjustments_to_unobligated_balance_brought_forward_cpe" + ), + sf.sum(sf.col("budget_authority_appropriated_amount_cpe")).alias("budget_authority_appropriated_amount"), + sf.sum(sf.col("borrowing_authority_amount_total_cpe")).alias("borrowing_authority_amount"), + sf.sum(sf.col("contract_authority_amount_total_cpe")).alias("contract_authority_amount"), + sf.sum(sf.col("spending_authority_from_offsetting_collections_amount_cpe")).alias( + "spending_authority_from_offsetting_collections_amount" + ), + sf.sum(sf.col("other_budgetary_resources_amount_cpe")).alias("total_other_budgetary_resources_amount"), + sf.sum(sf.col("total_budgetary_resources_amount_cpe")).alias("total_budgetary_resources"), + sf.sum(sf.col("obligations_incurred_total_by_tas_cpe")).alias("obligations_incurred"), + sf.sum(sf.col("deobligations_recoveries_refunds_by_tas_cpe")).alias( + "deobligations_or_recoveries_or_refunds_from_prior_year" + ), + sf.sum(sf.col("unobligated_balance_cpe")).alias("unobligated_balance"), + sf.sum( + sf.when( + ( + ( + sf.col("quarter_format_flag") + & (sf.col("reporting_fiscal_quarter") == self.reporting_fiscal_quarter) + ) + | ( + ~sf.col("quarter_format_flag") + & (sf.col("reporting_fiscal_period") == self.reporting_fiscal_period) + ) + ) + & (sf.col("reporting_fiscal_year") == self.reporting_fiscal_year), + sf.col("gross_outlay_amount_by_tas_cpe"), + ).otherwise(0) + ).alias("gross_outlay_amount"), + sf.sum(sf.col("status_of_budgetary_resources_total_cpe")).alias("status_of_budgetary_resources_total"), + sf.max("published_date").alias("last_modified_date"), + ] + + @property + def account_balances_select_cols(self) -> list[Column]: + return [ + sf.col("name").alias("owning_agency_name"), + sf.col("reporting_agency_name"), + sf.col("submission_period"), + sf.col("federal_account_code").alias("federal_account_symbol"), + sf.col("account_title").alias("federal_account_name"), + sf.col("agency_identifier_name"), + sf.col("budget_function"), + sf.col("budget_subfunction"), + sf.col("budget_authority_unobligated_balance_brought_forward"), + sf.col("adjustments_to_unobligated_balance_brought_forward_cpe"), + sf.col("budget_authority_appropriated_amount"), + sf.col("borrowing_authority_amount"), + sf.col("contract_authority_amount"), + sf.col("spending_authority_from_offsetting_collections_amount"), + sf.col("total_other_budgetary_resources_amount"), + sf.col("total_budgetary_resources"), + sf.col("obligations_incurred"), + sf.col("deobligations_or_recoveries_or_refunds_from_prior_year"), + sf.col("unobligated_balance"), + sf.col("gross_outlay_amount"), + sf.col("status_of_budgetary_resources_total"), + sf.col("last_modified_date"), + ] + + @property + def account_balances(self) -> DataFrame: + return ( + self._account_balances_df.groupby( + "federal_account_code", "name", "federal_account.account_title", "submission_period" + ) + .agg(*self.account_balances_agg_cols) + .select(*self.account_balances_select_cols) + ) + + @property + def object_class_program_activity(self) -> DataFrame: + raise NotImplementedError + @property def award_financial(self) -> DataFrame: return ( - self.df.filter(self.dynamic_filters) - .groupBy(self.groupby_cols) - .agg(*[agg_func(col) for col, agg_func in self.agg_cols.items()]) + self._award_financial_df.filter(self.dynamic_filters) + .groupBy(self.award_financial_groupby_cols) + .agg(*[agg_func(col) for col, agg_func in self.award_financial_agg_cols.items()]) # drop original agg columns from the dataframe to avoid ambiguous column names - .drop(*[sf.col(f"account_download.{col}") for col in self.agg_cols]) + .drop(*[sf.col(f"account_download.{col}") for col in self.award_financial_agg_cols]) .filter(self.non_zero_filters) - .select(self.select_cols) + .select(self.award_financial_select_cols) ) class TreasuryAccountDownloadDataFrameBuilder(AbstractAccountDownloadDataFrameBuilder): + @property + def account_balances_groupby_cols(self) -> list[Column]: + return [ + self.aab.data_source, + self.aab.appropriation_account_balances_id, + self.aab.budget_authority_unobligated_balance_brought_forward_fyb, + self.aab.adjustments_to_unobligated_balance_brought_forward_cpe, + self.aab.budget_authority_appropriated_amount_cpe, + self.aab.borrowing_authority_amount_total_cpe, + self.aab.contract_authority_amount_total_cpe, + self.aab.spending_authority_from_offsetting_collections_amount_cpe, + self.aab.other_budgetary_resources_amount_cpe, + self.aab.total_budgetary_resources_amount_cpe, + self.aab.gross_outlay_amount_by_tas_cpe, + self.aab.deobligations_recoveries_refunds_by_tas_cpe, + self.aab.unobligated_balance_cpe, + self.aab.status_of_budgetary_resources_total_cpe, + self.aab.obligations_incurred_total_by_tas_cpe, + self.aab.drv_appropriation_availability_period_start_date, + self.aab.drv_appropriation_availability_period_end_date, + self.aab.drv_appropriation_account_expired_status, + self.aab.drv_obligations_unpaid_amount, + self.aab.drv_other_obligated_amount, + self.aab.reporting_period_start, + self.aab.reporting_period_end, + self.aab.last_modified_date, + self.aab.certified_date, + self.aab.create_date, + self.aab.update_date, + self.aab.final_of_fy, + self.aab.submission_id, + self.aab.treasury_account_identifier, + self.ta.name, + self.sa.reporting_agency_name, + self.taa.allocation_transfer_agency_id, + self.taa.agency_id, + self.taa.beginning_period_of_availability, + self.taa.ending_period_of_availability, + self.taa.availability_type_code, + self.taa.main_account_code, + self.taa.sub_account_code, + self.taa.tas_rendering_label, + self.taa.account_title, + self.taa.budget_function_title, + self.taa.budget_subfunction_title, + self.fa.federal_account_code, + self.fa.account_title, + sf.col("agency_identifier_name"), + sf.col("allocation_transfer_agency_identifier_name"), + sf.col("submission_period"), + ] + + @property + def account_balances_agg_cols(self) -> list[Column]: + return [sf.max("published_date").alias("max_last_modified_date")] + + @property + def account_balances_select_cols(self) -> list[Column]: + return [ + self.ta.name.alias("owning_agency_name"), + self.sa.reporting_agency_name, + sf.col("submission_period"), + self.taa.allocation_transfer_agency_id.alias("allocation_transfer_agency_identifier_code"), + self.taa.agency_id.alias("agency_identifier_code"), + self.taa.beginning_period_of_availability, + self.taa.ending_period_of_availability, + self.taa.availability_type_code, + self.taa.main_account_code, + self.taa.sub_account_code, + self.taa.tas_rendering_label.alias("treasury_account_symbol"), + self.taa.account_title.alias("treasury_account_name"), + sf.col("agency_identifier_name"), + sf.col("allocation_transfer_agency_identifier_name"), + self.taa.budget_function_title.alias("budget_function"), + self.taa.budget_subfunction_title.alias("budget_subfunction"), + self.fa.federal_account_code.alias("federal_account_symbol"), + self.fa.account_title.alias("federal_account_name"), + self.aab.budget_authority_unobligated_balance_brought_forward_fyb.alias( + "budget_authority_unobligated_balance_brought_forward" + ), + self.aab.adjustments_to_unobligated_balance_brought_forward_cpe, + self.aab.budget_authority_appropriated_amount_cpe.alias("budget_authority_appropriated_amount"), + self.aab.borrowing_authority_amount_total_cpe.alias("borrowing_authority_amount"), + self.aab.contract_authority_amount_total_cpe.alias("contract_authority_amount"), + self.aab.spending_authority_from_offsetting_collections_amount_cpe.alias( + "spending_authority_from_offsetting_collections_amount" + ), + self.aab.other_budgetary_resources_amount_cpe.alias("total_other_budgetary_resources_amount"), + self.aab.total_budgetary_resources_amount_cpe.alias("total_budgetary_resources"), + self.aab.obligations_incurred_total_by_tas_cpe.alias("obligations_incurred"), + self.aab.deobligations_recoveries_refunds_by_tas_cpe.alias( + "deobligations_or_recoveries_or_refunds_from_prior_year" + ), + self.aab.unobligated_balance_cpe.alias("unobligated_balance"), + self.aab.gross_outlay_amount_by_tas_cpe.alias("gross_outlay_amount"), + self.aab.status_of_budgetary_resources_total_cpe.alias("status_of_budgetary_resources_total"), + sf.col("max_last_modified_date").alias("last_modified_date"), + ] + + @property + def account_balances(self) -> DataFrame: + return ( + self._account_balances_df.groupby(*self.account_balances_groupby_cols) + .agg(*self.account_balances_agg_cols) + .select(*self.account_balances_select_cols) + ) + + @property + def object_class_program_activity(self) -> DataFrame: + raise NotImplementedError + @property def award_financial(self) -> DataFrame: select_cols = ( @@ -188,4 +456,4 @@ def award_financial(self) -> DataFrame: ] + ["last_modified_date"] ) - return self.df.filter(self.dynamic_filters & self.non_zero_filters).select(select_cols) + return self._award_financial_df.filter(self.dynamic_filters & self.non_zero_filters).select(select_cols) diff --git a/usaspending_api/download/tests/integration/test_account_download_dataframe_builder.py b/usaspending_api/download/tests/integration/test_account_download_dataframe_builder.py index a1b56181c0..a79e4f459b 100644 --- a/usaspending_api/download/tests/integration/test_account_download_dataframe_builder.py +++ b/usaspending_api/download/tests/integration/test_account_download_dataframe_builder.py @@ -1,9 +1,12 @@ +from datetime import datetime from unittest.mock import patch import pandas as pd import pytest from django.core.management import call_command from model_bakery import baker + +from usaspending_api.common.etl.spark import create_ref_temp_views from usaspending_api.download.management.commands.delta_downloads.builders import ( FederalAccountDownloadDataFrameBuilder, TreasuryAccountDownloadDataFrameBuilder, @@ -90,7 +93,10 @@ def federal_account_models(db): @patch("usaspending_api.download.management.commands.delta_downloads.builders.get_submission_ids_for_periods") -def test_federal_account_download_dataframe_builder(mock_get_submission_ids_for_periods, spark, account_download_table): +def test_federal_account_download_dataframe_builder( + mock_get_submission_ids_for_periods, spark, account_download_table, agency_models +): + create_ref_temp_views(spark) mock_get_submission_ids_for_periods.return_value = [1, 2, 4, 5] account_download_filter = AccountDownloadFilter( fy=2018, @@ -108,6 +114,7 @@ def test_federal_account_download_dataframe_builder(mock_get_submission_ids_for_ @patch("usaspending_api.download.management.commands.delta_downloads.builders.get_submission_ids_for_periods") def test_filter_federal_by_agency(mock_get_submission_ids_for_periods, spark, account_download_table, agency_models): + create_ref_temp_views(spark) mock_get_submission_ids_for_periods.return_value = [1, 2, 4, 5] account_download_filter = AccountDownloadFilter( @@ -127,8 +134,9 @@ def test_filter_federal_by_agency(mock_get_submission_ids_for_periods, spark, ac @patch("usaspending_api.download.management.commands.delta_downloads.builders.get_submission_ids_for_periods") def test_filter_federal_by_federal_account_id( - mock_get_submission_ids_for_periods, spark, account_download_table, federal_account_models + mock_get_submission_ids_for_periods, spark, account_download_table, federal_account_models, agency_models ): + create_ref_temp_views(spark) mock_get_submission_ids_for_periods.return_value = [1, 2, 4, 5] account_download_filter = AccountDownloadFilter( @@ -146,7 +154,8 @@ def test_filter_federal_by_federal_account_id( assert sorted(result_df.gross_outlay_amount_FYB_to_period_end.to_list()) == [100] -def test_treasury_account_download_dataframe_builder(spark, account_download_table): +def test_treasury_account_download_dataframe_builder(spark, account_download_table, agency_models): + create_ref_temp_views(spark) account_download_filter = AccountDownloadFilter( fy=2018, submission_types=["award_financial"], @@ -162,7 +171,7 @@ def test_treasury_account_download_dataframe_builder(spark, account_download_tab def test_filter_treasury_by_agency(spark, account_download_table, agency_models): - + create_ref_temp_views(spark) account_download_filter = AccountDownloadFilter( fy=2018, submission_types=["award_financial"], @@ -176,3 +185,68 @@ def test_filter_treasury_by_agency(spark, account_download_table, agency_models) assert sorted(result_df[col].to_list()) == ["B", "C", "D"] assert result_df.transaction_obligated_amount.to_list() == [100] * 3 assert result_df.gross_outlay_amount_FYB_to_period_end.to_list() == [100] * 3 + + +@pytest.mark.django_db(transaction=True) +@patch("usaspending_api.download.management.commands.delta_downloads.builders.get_submission_ids_for_periods") +def test_account_balances(mock_get_submission_ids_for_periods, spark, account_download_table, agency_models): + baker.make("references.CGAC", cgac_code="1").save() + baker.make("references.CGAC", cgac_code="2").save() + baker.make("references.CGAC", cgac_code="3").save() + baker.make("references.CGAC", cgac_code="4").save() + baker.make("references.ToptierAgency", toptier_agency_id=1, create_date=datetime.now()).save() + baker.make("references.ToptierAgency", toptier_agency_id=2, create_date=datetime.now()).save() + baker.make("accounts.FederalAccount", id=1, parent_toptier_agency_id=1).save() + baker.make("accounts.FederalAccount", id=2, parent_toptier_agency_id=2).save() + baker.make( + "submissions.SubmissionAttributes", + submission_id=1, + reporting_fiscal_year=2018, + reporting_fiscal_quarter=4, + quarter_format_flag=True, + ).save() + baker.make( + "submissions.SubmissionAttributes", + submission_id=2, + reporting_fiscal_year=2018, + reporting_fiscal_quarter=4, + quarter_format_flag=True, + ).save() + baker.make( + "submissions.SubmissionAttributes", + submission_id=3, + reporting_fiscal_year=2019, + reporting_fiscal_quarter=4, + quarter_format_flag=True, + ).save() + baker.make( + "accounts.TreasuryAppropriationAccount", + treasury_account_identifier=1, + agency_id="1", + allocation_transfer_agency_id="2", + federal_account_id=1, + ).save() + baker.make( + "accounts.TreasuryAppropriationAccount", + treasury_account_identifier=2, + agency_id="3", + allocation_transfer_agency_id="4", + federal_account_id=2, + ).save() + baker.make("accounts.AppropriationAccountBalances", submission_id=1, treasury_account_identifier_id=1).save() + baker.make("accounts.AppropriationAccountBalances", submission_id=2, treasury_account_identifier_id=2).save() + baker.make("accounts.AppropriationAccountBalances", submission_id=3, treasury_account_identifier_id=2).save() + + mock_get_submission_ids_for_periods.return_value = [1, 2, 3] + + create_ref_temp_views(spark) + + account_download_filter = AccountDownloadFilter( + fy=2018, + submission_types=["account_balances"], + quarter=4, + ) + ta_builder = TreasuryAccountDownloadDataFrameBuilder(spark, account_download_filter) + assert ta_builder.account_balances.count() == 2 + fa_builder = FederalAccountDownloadDataFrameBuilder(spark, account_download_filter) + assert fa_builder.account_balances.count() == 2