Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion usaspending_api/common/etl/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pyspark.sql.functions import col, concat, concat_ws, expr, lit, regexp_replace, to_date, transform, when
from pyspark.sql.types import ArrayType, DecimalType, StringType, StructType

from usaspending_api.accounts.models import FederalAccount, TreasuryAppropriationAccount
from usaspending_api.accounts.models import AppropriationAccountBalances, FederalAccount, TreasuryAppropriationAccount
from usaspending_api.common.helpers.spark_helpers import (
get_broker_jdbc_url,
get_jdbc_connection_properties,
Expand Down Expand Up @@ -51,6 +51,7 @@
MAX_PARTITIONS = CONFIG.SPARK_MAX_PARTITIONS
_USAS_RDS_REF_TABLES = [
Agency,
AppropriationAccountBalances,
Cfda,
CGAC,
CityCountyStateCode,
Expand Down
310 changes: 289 additions & 21 deletions usaspending_api/download/management/commands/delta_downloads/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ def __init__(
self,
spark: SparkSession,
account_download_filter: AccountDownloadFilter,
table_name: str = "rpt.account_download",
award_financial_table: str = "rpt.account_download",
):
# Resolve Filters
self.reporting_fiscal_year = account_download_filter.fy
self.submission_types = account_download_filter.submission_types
self.reporting_fiscal_quarter = account_download_filter.quarter or account_download_filter.period // 3
Expand All @@ -28,7 +29,16 @@ def __init__(
self.budget_function = account_download_filter.budget_function
self.budget_subfunction = account_download_filter.budget_subfunction
self.def_codes = account_download_filter.def_codes
self.df: DataFrame = spark.table(table_name)

# Base Dataframes
self._award_financial_df: DataFrame = spark.table(award_financial_table)
self.aab = spark.table("global_temp.appropriation_account_balances")
self.sa = spark.table("global_temp.submission_attributes")
self.taa = spark.table("global_temp.treasury_appropriation_account")
self.cgac_aid = spark.table("global_temp.cgac")
self.cgac_ata = spark.table("global_temp.cgac")
self.fa = spark.table("global_temp.federal_account")
self.ta = spark.table("global_temp.toptier_agency")

@property
def dynamic_filters(self) -> Column:
Expand Down Expand Up @@ -91,8 +101,60 @@ def non_zero_filters(self) -> Column:
)

@staticmethod
def collect_concat(col_name: str, concat_str: str = "; ") -> Column:
return sf.concat_ws(concat_str, sf.sort_array(sf.collect_set(col_name))).alias(col_name)
def collect_concat(col_name: str, concat_str: str = "; ", alias: str | None = None) -> Column:
if alias is None:
alias = col_name
return sf.concat_ws(concat_str, sf.sort_array(sf.collect_set(col_name))).alias(alias)

@property
def fy_quarter_period(self) -> Column:
return sf.when(
sf.col("quarter_format_flag"),
sf.concat(sf.lit("FY"), sf.col("reporting_fiscal_year"), sf.lit("Q"), sf.col("reporting_fiscal_quarter")),
).otherwise(
sf.concat(
sf.lit("FY"),
sf.col("reporting_fiscal_year"),
sf.lit("P"),
sf.lpad(sf.col("reporting_fiscal_period"), 2, "0"),
)
)

@property
def _account_balances_df(self) -> DataFrame:
return (
self.aab.join(self.sa, on="submission_id", how="inner")
.join(self.taa, on="treasury_account_identifier", how="leftouter")
.join(
self.cgac_aid.withColumnRenamed("agency_name", "agency_identifier_name"),
on=(self.taa.agency_id == self.cgac_aid.cgac_code),
how="leftouter",
)
.join(
self.cgac_ata.withColumnRenamed("agency_name", "allocation_transfer_agency_identifier_name"),
on=(self.taa.allocation_transfer_agency_id == self.cgac_ata.cgac_code),
how="leftouter",
)
.join(self.fa, on=self.taa.federal_account_id == self.fa.id, how="leftouter")
.join(self.ta, on=self.fa.parent_toptier_agency_id == self.ta.toptier_agency_id, how="leftouter")
.filter(
sf.col("submission_id").isin(
get_submission_ids_for_periods(
self.reporting_fiscal_year, self.reporting_fiscal_quarter, self.reporting_fiscal_period
)
)
)
.filter(self.dynamic_filters)
.withColumn("submission_period", self.fy_quarter_period)
)

@property
@abstractmethod
def account_balances(self) -> DataFrame: ...

@property
@abstractmethod
def object_class_program_activity(self) -> DataFrame: ...

@property
@abstractmethod
Expand All @@ -105,14 +167,9 @@ def source_dfs(self) -> list[DataFrame]:

class FederalAccountDownloadDataFrameBuilder(AbstractAccountDownloadDataFrameBuilder):

def __init__(
self,
spark: SparkSession,
account_download_filter: AccountDownloadFilter,
table_name: str = "rpt.account_download",
):
super().__init__(spark, account_download_filter, table_name)
self.agg_cols = {
@property
def award_financial_agg_cols(self) -> dict[str, Column]:
return {
"reporting_agency_name": self.collect_concat,
"budget_function": self.collect_concat,
"budget_subfunction": self.collect_concat,
Expand All @@ -122,7 +179,10 @@ def __init__(
"USSGL497200_downward_adj_of_prior_year_paid_deliv_orders_oblig": self.filter_and_sum,
"last_modified_date": lambda col: sf.max(col).alias(col),
}
self.select_cols = (

@property
def award_financial_select_cols(self) -> list[Any]:
return (
[sf.col("federal_owning_agency_name").alias("owning_agency_name")]
+ [
col
Expand All @@ -131,7 +191,10 @@ def __init__(
]
+ ["last_modified_date"]
)
filter_cols = [

@property
def award_financial_filter_cols(self) -> list[str]:
return [
"submission_id",
"federal_account_id",
"funding_toptier_agency_id",
Expand All @@ -142,7 +205,14 @@ def __init__(
"reporting_fiscal_year",
"quarter_format_flag",
]
self.groupby_cols = [col for col in self.df.columns if col not in list(self.agg_cols) + filter_cols]

@property
def award_financial_groupby_cols(self) -> list[Any]:
return [
col
for col in self._award_financial_df.columns
if col not in list(self.award_financial_agg_cols) + self.award_financial_filter_cols
]

def filter_to_latest_submissions_for_agencies(self, col_name: str, otherwise: Any = None) -> Column:
"""Filter to the latest submission regardless of whether the agency submitted on a monthly or quarterly basis"""
Expand All @@ -162,21 +232,219 @@ def filter_to_latest_submissions_for_agencies(self, col_name: str, otherwise: An
def filter_and_sum(self, col_name: str) -> Column:
return sf.sum(self.filter_to_latest_submissions_for_agencies(col_name)).alias(col_name)

@property
def account_balances_agg_cols(self) -> list[Column]:
return [
self.collect_concat(self.sa.reporting_agency_name, alias="reporting_agency_name"),
self.collect_concat("agency_identifier_name"),
self.collect_concat("budget_function_title", alias="budget_function"),
self.collect_concat("budget_subfunction_title", alias="budget_subfunction"),
sf.sum(sf.col("budget_authority_unobligated_balance_brought_forward_fyb")).alias(
"budget_authority_unobligated_balance_brought_forward"
),
sf.sum(sf.col("adjustments_to_unobligated_balance_brought_forward_cpe")).alias(
"adjustments_to_unobligated_balance_brought_forward_cpe"
),
sf.sum(sf.col("budget_authority_appropriated_amount_cpe")).alias("budget_authority_appropriated_amount"),
sf.sum(sf.col("borrowing_authority_amount_total_cpe")).alias("borrowing_authority_amount"),
sf.sum(sf.col("contract_authority_amount_total_cpe")).alias("contract_authority_amount"),
sf.sum(sf.col("spending_authority_from_offsetting_collections_amount_cpe")).alias(
"spending_authority_from_offsetting_collections_amount"
),
sf.sum(sf.col("other_budgetary_resources_amount_cpe")).alias("total_other_budgetary_resources_amount"),
sf.sum(sf.col("total_budgetary_resources_amount_cpe")).alias("total_budgetary_resources"),
sf.sum(sf.col("obligations_incurred_total_by_tas_cpe")).alias("obligations_incurred"),
sf.sum(sf.col("deobligations_recoveries_refunds_by_tas_cpe")).alias(
"deobligations_or_recoveries_or_refunds_from_prior_year"
),
sf.sum(sf.col("unobligated_balance_cpe")).alias("unobligated_balance"),
sf.sum(
sf.when(
(
(
sf.col("quarter_format_flag")
& (sf.col("reporting_fiscal_quarter") == self.reporting_fiscal_quarter)
)
| (
~sf.col("quarter_format_flag")
& (sf.col("reporting_fiscal_period") == self.reporting_fiscal_period)
)
)
& (sf.col("reporting_fiscal_year") == self.reporting_fiscal_year),
sf.col("gross_outlay_amount_by_tas_cpe"),
).otherwise(0)
).alias("gross_outlay_amount"),
sf.sum(sf.col("status_of_budgetary_resources_total_cpe")).alias("status_of_budgetary_resources_total"),
sf.max("published_date").alias("last_modified_date"),
]

@property
def account_balances_select_cols(self) -> list[Column]:
return [
sf.col("name").alias("owning_agency_name"),
sf.col("reporting_agency_name"),
sf.col("submission_period"),
sf.col("federal_account_code").alias("federal_account_symbol"),
sf.col("account_title").alias("federal_account_name"),
sf.col("agency_identifier_name"),
sf.col("budget_function"),
sf.col("budget_subfunction"),
sf.col("budget_authority_unobligated_balance_brought_forward"),
sf.col("adjustments_to_unobligated_balance_brought_forward_cpe"),
sf.col("budget_authority_appropriated_amount"),
sf.col("borrowing_authority_amount"),
sf.col("contract_authority_amount"),
sf.col("spending_authority_from_offsetting_collections_amount"),
sf.col("total_other_budgetary_resources_amount"),
sf.col("total_budgetary_resources"),
sf.col("obligations_incurred"),
sf.col("deobligations_or_recoveries_or_refunds_from_prior_year"),
sf.col("unobligated_balance"),
sf.col("gross_outlay_amount"),
sf.col("status_of_budgetary_resources_total"),
sf.col("last_modified_date"),
]

@property
def account_balances(self) -> DataFrame:
return (
self._account_balances_df.groupby(
"federal_account_code", "name", "federal_account.account_title", "submission_period"
)
.agg(*self.account_balances_agg_cols)
.select(*self.account_balances_select_cols)
)

@property
def object_class_program_activity(self) -> DataFrame:
raise NotImplementedError

@property
def award_financial(self) -> DataFrame:
return (
self.df.filter(self.dynamic_filters)
.groupBy(self.groupby_cols)
.agg(*[agg_func(col) for col, agg_func in self.agg_cols.items()])
self._award_financial_df.filter(self.dynamic_filters)
.groupBy(self.award_financial_groupby_cols)
.agg(*[agg_func(col) for col, agg_func in self.award_financial_agg_cols.items()])
# drop original agg columns from the dataframe to avoid ambiguous column names
.drop(*[sf.col(f"account_download.{col}") for col in self.agg_cols])
.drop(*[sf.col(f"account_download.{col}") for col in self.award_financial_agg_cols])
.filter(self.non_zero_filters)
.select(self.select_cols)
.select(self.award_financial_select_cols)
)


class TreasuryAccountDownloadDataFrameBuilder(AbstractAccountDownloadDataFrameBuilder):

@property
def account_balances_groupby_cols(self) -> list[Column]:
return [
self.aab.data_source,
self.aab.appropriation_account_balances_id,
self.aab.budget_authority_unobligated_balance_brought_forward_fyb,
self.aab.adjustments_to_unobligated_balance_brought_forward_cpe,
self.aab.budget_authority_appropriated_amount_cpe,
self.aab.borrowing_authority_amount_total_cpe,
self.aab.contract_authority_amount_total_cpe,
self.aab.spending_authority_from_offsetting_collections_amount_cpe,
self.aab.other_budgetary_resources_amount_cpe,
self.aab.total_budgetary_resources_amount_cpe,
self.aab.gross_outlay_amount_by_tas_cpe,
self.aab.deobligations_recoveries_refunds_by_tas_cpe,
self.aab.unobligated_balance_cpe,
self.aab.status_of_budgetary_resources_total_cpe,
self.aab.obligations_incurred_total_by_tas_cpe,
self.aab.drv_appropriation_availability_period_start_date,
self.aab.drv_appropriation_availability_period_end_date,
self.aab.drv_appropriation_account_expired_status,
self.aab.drv_obligations_unpaid_amount,
self.aab.drv_other_obligated_amount,
self.aab.reporting_period_start,
self.aab.reporting_period_end,
self.aab.last_modified_date,
self.aab.certified_date,
self.aab.create_date,
self.aab.update_date,
self.aab.final_of_fy,
self.aab.submission_id,
self.aab.treasury_account_identifier,
self.ta.name,
self.sa.reporting_agency_name,
self.taa.allocation_transfer_agency_id,
self.taa.agency_id,
self.taa.beginning_period_of_availability,
self.taa.ending_period_of_availability,
self.taa.availability_type_code,
self.taa.main_account_code,
self.taa.sub_account_code,
self.taa.tas_rendering_label,
self.taa.account_title,
self.taa.budget_function_title,
self.taa.budget_subfunction_title,
self.fa.federal_account_code,
self.fa.account_title,
sf.col("agency_identifier_name"),
sf.col("allocation_transfer_agency_identifier_name"),
sf.col("submission_period"),
]

@property
def account_balances_agg_cols(self) -> list[Column]:
return [sf.max("published_date").alias("max_last_modified_date")]

@property
def account_balances_select_cols(self) -> list[Column]:
return [
self.ta.name.alias("owning_agency_name"),
self.sa.reporting_agency_name,
sf.col("submission_period"),
self.taa.allocation_transfer_agency_id.alias("allocation_transfer_agency_identifier_code"),
self.taa.agency_id.alias("agency_identifier_code"),
self.taa.beginning_period_of_availability,
self.taa.ending_period_of_availability,
self.taa.availability_type_code,
self.taa.main_account_code,
self.taa.sub_account_code,
self.taa.tas_rendering_label.alias("treasury_account_symbol"),
self.taa.account_title.alias("treasury_account_name"),
sf.col("agency_identifier_name"),
sf.col("allocation_transfer_agency_identifier_name"),
self.taa.budget_function_title.alias("budget_function"),
self.taa.budget_subfunction_title.alias("budget_subfunction"),
self.fa.federal_account_code.alias("federal_account_symbol"),
self.fa.account_title.alias("federal_account_name"),
self.aab.budget_authority_unobligated_balance_brought_forward_fyb.alias(
"budget_authority_unobligated_balance_brought_forward"
),
self.aab.adjustments_to_unobligated_balance_brought_forward_cpe,
self.aab.budget_authority_appropriated_amount_cpe.alias("budget_authority_appropriated_amount"),
self.aab.borrowing_authority_amount_total_cpe.alias("borrowing_authority_amount"),
self.aab.contract_authority_amount_total_cpe.alias("contract_authority_amount"),
self.aab.spending_authority_from_offsetting_collections_amount_cpe.alias(
"spending_authority_from_offsetting_collections_amount"
),
self.aab.other_budgetary_resources_amount_cpe.alias("total_other_budgetary_resources_amount"),
self.aab.total_budgetary_resources_amount_cpe.alias("total_budgetary_resources"),
self.aab.obligations_incurred_total_by_tas_cpe.alias("obligations_incurred"),
self.aab.deobligations_recoveries_refunds_by_tas_cpe.alias(
"deobligations_or_recoveries_or_refunds_from_prior_year"
),
self.aab.unobligated_balance_cpe.alias("unobligated_balance"),
self.aab.gross_outlay_amount_by_tas_cpe.alias("gross_outlay_amount"),
self.aab.status_of_budgetary_resources_total_cpe.alias("status_of_budgetary_resources_total"),
sf.col("max_last_modified_date").alias("last_modified_date"),
]

@property
def account_balances(self) -> DataFrame:
return (
self._account_balances_df.groupby(*self.account_balances_groupby_cols)
.agg(*self.account_balances_agg_cols)
.select(*self.account_balances_select_cols)
)

@property
def object_class_program_activity(self) -> DataFrame:
raise NotImplementedError

@property
def award_financial(self) -> DataFrame:
select_cols = (
Expand All @@ -188,4 +456,4 @@ def award_financial(self) -> DataFrame:
]
+ ["last_modified_date"]
)
return self.df.filter(self.dynamic_filters & self.non_zero_filters).select(select_cols)
return self._award_financial_df.filter(self.dynamic_filters & self.non_zero_filters).select(select_cols)
Loading