Skip to content
Merged
7 changes: 6 additions & 1 deletion usaspending_api/download/delta_downloads/account_balances.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pyspark.sql import functions as sf, Column, DataFrame, SparkSession
from usaspending_api.config import CONFIG

from usaspending_api.common.spark.utils import collect_concat
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
Expand Down Expand Up @@ -28,7 +29,11 @@ class AccountBalancesMixin:

@property
def download_table(self) -> DataFrame:
return self.spark.table("rpt.account_balances_download")
# TODO: This should be reverted back after Spark downloads are migrated to EMR
# return self.spark.table("rpt.account_balances_download")
return self.spark.read.format("delta").load(
f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/account_balances_download"
)

def _build_dataframe(self) -> DataFrame:
return (
Expand Down
10 changes: 9 additions & 1 deletion usaspending_api/download/delta_downloads/award_financial.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pyspark.sql import functions as sf, Column, DataFrame, SparkSession
from usaspending_api.config import CONFIG

from usaspending_api.common.spark.utils import collect_concat, filter_submission_and_sum
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
Expand All @@ -23,7 +24,11 @@ class AwardFinancialMixin:

@property
def download_table(self) -> DataFrame:
return self.spark.table("rpt.award_financial_download")
# TODO: This should be reverted back after Spark downloads are migrated to EMR
# return self.spark.table("rpt.award_financial_download")
return self.spark.read.format("delta").load(
f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download"
)

@property
def non_zero_filters(self) -> Column:
Expand Down Expand Up @@ -152,6 +157,9 @@ def select_cols(self) -> list[Column]:
)

def _build_dataframe(self) -> DataFrame:
# TODO: Should handle the aggregate columns via a new name instead of relying on drops. If the Delta tables are
# referenced by their location then the ability to use the table identifier is lost as it doesn't
# appear to use the metastore for the Delta tables.
return (
self.download_table.filter(self.dynamic_filters)
.groupBy(self.group_by_cols)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql import functions as sf
from usaspending_api.config import CONFIG

from usaspending_api.common.spark.utils import collect_concat, filter_submission_and_sum
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
Expand Down Expand Up @@ -29,9 +30,16 @@ class ObjectClassProgramActivityMixin:

@property
def download_table(self) -> DataFrame:
return self.spark.table("rpt.object_class_program_activity_download")
# TODO: This should be reverted back after Spark downloads are migrated to EMR
# return self.spark.table("rpt.object_class_program_activity_download")
return self.spark.read.format("delta").load(
f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/object_class_program_activity_download"
)

def _build_dataframe(self) -> DataFrame:
# TODO: Should handle the aggregate columns via a new name instead of relying on drops. If the Delta tables are
# referenced by their location then the ability to use the table identifier is lost as it doesn't
# appear to use the metastore for the Delta tables.
return (
self.download_table.filter(
sf.col("submission_id").isin(
Expand Down Expand Up @@ -221,7 +229,7 @@ def group_by_cols(self) -> list[str]:
@property
def agg_cols(self) -> dict[str, callable]:
return {
"last_modified_date": lambda col: sf.max(col).alias(col),
"last_modified_date": lambda col: sf.max(col).alias(f"max_{col}"),
}

@property
Expand All @@ -230,7 +238,7 @@ def select_cols(self) -> list[Column]:
sf.col(col)
for col in query_paths["object_class_program_activity"]["treasury_account"].keys()
if not col.startswith("last_modified_date")
] + [sf.col("last_modified_date")]
] + [sf.col("max_last_modified_date").alias("last_modified_date")]


class ObjectClassProgramActivityDownloadFactory(AbstractAccountDownloadFactory):
Expand Down
232 changes: 114 additions & 118 deletions usaspending_api/download/delta_models/award_financial_download.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,122 @@
AWARD_FINANCIAL_DOWNLOAD_COLUMNS = {
"financial_accounts_by_awards_id": {"delta": "INTEGER NOT NULL", "postgres": "INTEGER NOT NULL"},
"submission_id": {"delta": "INTEGER NOT NULL", "postgres": "INTEGER NOT NULL"},
"federal_owning_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"treasury_owning_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"federal_account_symbol": {"delta": "STRING", "postgres": "TEXT"},
"federal_account_name": {"delta": "STRING", "postgres": "TEXT"},
"agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"},
"allocation_transfer_agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"},
"program_activity_code": {"delta": "STRING", "postgres": "TEXT"},
"program_activity_name": {"delta": "STRING", "postgres": "TEXT"},
"object_class_code": {"delta": "STRING", "postgres": "TEXT"},
"object_class_name": {"delta": "STRING", "postgres": "TEXT"},
"direct_or_reimbursable_funding_source": {"delta": "STRING", "postgres": "TEXT"},
"disaster_emergency_fund_code": {"delta": "STRING", "postgres": "TEXT"},
"disaster_emergency_fund_name": {"delta": "STRING", "postgres": "TEXT"},
"award_unique_key": {"delta": "STRING", "postgres": "TEXT"},
"award_id_piid": {"delta": "STRING", "postgres": "TEXT"},
"parent_award_id_piid": {"delta": "STRING", "postgres": "TEXT"},
"award_id_fain": {"delta": "STRING", "postgres": "TEXT"},
"award_id_uri": {"delta": "STRING", "postgres": "TEXT"},
"award_base_action_date": {"delta": "DATE", "postgres": "DATE"},
"award_latest_action_date": {"delta": "DATE", "postgres": "DATE"},
"period_of_performance_start_date": {"delta": "DATE", "postgres": "DATE"},
"period_of_performance_current_end_date": {"delta": "DATE", "postgres": "DATE"},
"ordering_period_end_date": {"delta": "DATE", "postgres": "DATE"},
"idv_type_code": {"delta": "STRING", "postgres": "TEXT"},
"idv_type": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_base_transaction_description": {"delta": "STRING", "postgres": "TEXT"},
"awarding_agency_code": {"delta": "STRING", "postgres": "TEXT"},
"awarding_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"awarding_subagency_code": {"delta": "STRING", "postgres": "TEXT"},
"awarding_subagency_name": {"delta": "STRING", "postgres": "TEXT"},
"awarding_office_code": {"delta": "STRING", "postgres": "TEXT"},
"awarding_office_name": {"delta": "STRING", "postgres": "TEXT"},
"funding_agency_code": {"delta": "STRING", "postgres": "TEXT"},
"funding_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"funding_sub_agency_code": {"delta": "STRING", "postgres": "TEXT"},
"funding_sub_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"funding_office_code": {"delta": "STRING", "postgres": "TEXT"},
"funding_office_name": {"delta": "STRING", "postgres": "TEXT"},
"recipient_uei": {"delta": "STRING", "postgres": "TEXT"},
"recipient_duns": {"delta": "STRING", "postgres": "TEXT"},
"recipient_name": {"delta": "STRING", "postgres": "TEXT"},
"recipient_name_raw": {"delta": "STRING", "postgres": "TEXT"},
"recipient_parent_uei": {"delta": "STRING", "postgres": "TEXT"},
"recipient_parent_duns": {"delta": "STRING", "postgres": "TEXT"},
"recipient_parent_name": {"delta": "STRING", "postgres": "TEXT"},
"recipient_parent_name_raw": {"delta": "STRING", "postgres": "TEXT"},
"recipient_country": {"delta": "STRING", "postgres": "TEXT"},
"recipient_state": {"delta": "STRING", "postgres": "TEXT"},
"recipient_county": {"delta": "STRING", "postgres": "TEXT"},
"recipient_city": {"delta": "STRING", "postgres": "TEXT"},
"primary_place_of_performance_country": {"delta": "STRING", "postgres": "TEXT"},
"primary_place_of_performance_state": {"delta": "STRING", "postgres": "TEXT"},
"primary_place_of_performance_county": {"delta": "STRING", "postgres": "TEXT"},
"primary_place_of_performance_zip_code": {"delta": "STRING", "postgres": "TEXT"},
"cfda_number": {"delta": "STRING", "postgres": "TEXT"},
"cfda_title": {"delta": "STRING", "postgres": "TEXT"},
"product_or_service_code": {"delta": "STRING", "postgres": "TEXT"},
"product_or_service_code_description": {"delta": "STRING", "postgres": "TEXT"},
"naics_code": {"delta": "STRING", "postgres": "TEXT"},
"naics_description": {"delta": "STRING", "postgres": "TEXT"},
"national_interest_action_code": {"delta": "STRING", "postgres": "TEXT"},
"national_interest_action": {"delta": "STRING", "postgres": "TEXT"},
"reporting_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"submission_period": {"delta": "STRING", "postgres": "TEXT"},
"allocation_transfer_agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"},
"agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"},
"beginning_period_of_availability": {"delta": "DATE", "postgres": "DATE"},
"ending_period_of_availability": {"delta": "DATE", "postgres": "DATE"},
"availability_type_code": {"delta": "STRING", "postgres": "TEXT"},
"main_account_code": {"delta": "STRING", "postgres": "TEXT"},
"sub_account_code": {"delta": "STRING", "postgres": "TEXT"},
"treasury_account_symbol": {"delta": "STRING", "postgres": "TEXT"},
"treasury_account_name": {"delta": "STRING", "postgres": "TEXT"},
"funding_toptier_agency_id": {"delta": "INTEGER", "postgres": "INTEGER"},
"federal_account_id": {"delta": "INTEGER", "postgres": "INTEGER"},
"budget_function": {"delta": "STRING", "postgres": "TEXT"},
"budget_function_code": {"delta": "STRING", "postgres": "TEXT"},
"budget_subfunction": {"delta": "STRING", "postgres": "TEXT"},
"budget_subfunction_code": {"delta": "STRING", "postgres": "TEXT"},
"transaction_obligated_amount": {"delta": "NUMERIC(23,2)", "postgres": "NUMERIC(23,2)"},
"gross_outlay_amount_fyb_to_period_end": {"delta": "NUMERIC(23,2)", "postgres": "NUMERIC(23,2)"},
"ussgl487200_downward_adj_prior_year_prepaid_undeliv_order_oblig": {
"delta": "NUMERIC(23,2)",
"postgres": "NUMERIC(23,2)",
},
"ussgl497200_downward_adj_of_prior_year_paid_deliv_orders_oblig": {
"delta": "NUMERIC(23,2)",
"postgres": "NUMERIC(23,2)",
},
"award_base_action_date_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"},
"award_latest_action_date_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"},
"award_type_code": {"delta": "STRING", "postgres": "TEXT"},
"award_type": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_summary_recipient_cd_original": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_summary_recipient_cd_current": {"delta": "STRING", "postgres": "TEXT"},
"recipient_zip_code": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_summary_place_of_performance_cd_original": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_summary_place_of_performance_cd_current": {"delta": "STRING", "postgres": "TEXT"},
"usaspending_permalink": {"delta": "STRING", "postgres": "TEXT"},
"last_modified_date": {"delta": "DATE", "postgres": "DATE"},
"reporting_fiscal_period": {"delta": "INTEGER", "postgres": "INTEGER"},
"reporting_fiscal_quarter": {"delta": "INTEGER", "postgres": "INTEGER"},
"reporting_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"},
"quarter_format_flag": {"delta": "BOOLEAN", "postgres": "BOOLEAN"},
}
from pyspark.sql.types import (
BooleanType,
DateType,
DecimalType,
IntegerType,
StringType,
StructField,
StructType,
)

AWARD_FINANCIAL_DOWNLOAD_DELTA_COLUMNS = {k: v["delta"] for k, v in AWARD_FINANCIAL_DOWNLOAD_COLUMNS.items()}
AWARD_FINANCIAL_DOWNLOAD_POSTGRES_COLUMNS = {k: v["postgres"] for k, v in AWARD_FINANCIAL_DOWNLOAD_COLUMNS.items()}
award_financial_schema = StructType(
[
StructField("financial_accounts_by_awards_id", IntegerType(), False),
StructField("submission_id", IntegerType(), False),
StructField("federal_owning_agency_name", StringType()),
StructField("treasury_owning_agency_name", StringType()),
StructField("federal_account_symbol", StringType()),
StructField("federal_account_name", StringType()),
StructField("agency_identifier_name", StringType()),
StructField("allocation_transfer_agency_identifier_name", StringType()),
StructField("program_activity_code", StringType()),
StructField("program_activity_name", StringType()),
StructField("object_class_code", StringType()),
StructField("object_class_name", StringType()),
StructField("direct_or_reimbursable_funding_source", StringType()),
StructField("disaster_emergency_fund_code", StringType()),
StructField("disaster_emergency_fund_name", StringType()),
StructField("award_unique_key", StringType()),
StructField("award_id_piid", StringType()),
StructField("parent_award_id_piid", StringType()),
StructField("award_id_fain", StringType()),
StructField("award_id_uri", StringType()),
StructField("award_base_action_date", DateType()),
StructField("award_latest_action_date", DateType()),
StructField("period_of_performance_start_date", DateType()),
StructField("period_of_performance_current_end_date", DateType()),
StructField("ordering_period_end_date", DateType()),
StructField("idv_type_code", StringType()),
StructField("idv_type", StringType()),
StructField("prime_award_base_transaction_description", StringType()),
StructField("awarding_agency_code", StringType()),
StructField("awarding_agency_name", StringType()),
StructField("awarding_subagency_code", StringType()),
StructField("awarding_subagency_name", StringType()),
StructField("awarding_office_code", StringType()),
StructField("awarding_office_name", StringType()),
StructField("funding_agency_code", StringType()),
StructField("funding_agency_name", StringType()),
StructField("funding_sub_agency_code", StringType()),
StructField("funding_sub_agency_name", StringType()),
StructField("funding_office_code", StringType()),
StructField("funding_office_name", StringType()),
StructField("recipient_uei", StringType()),
StructField("recipient_duns", StringType()),
StructField("recipient_name", StringType()),
StructField("recipient_name_raw", StringType()),
StructField("recipient_parent_uei", StringType()),
StructField("recipient_parent_duns", StringType()),
StructField("recipient_parent_name", StringType()),
StructField("recipient_parent_name_raw", StringType()),
StructField("recipient_country", StringType()),
StructField("recipient_state", StringType()),
StructField("recipient_county", StringType()),
StructField("recipient_city", StringType()),
StructField("primary_place_of_performance_country", StringType()),
StructField("primary_place_of_performance_state", StringType()),
StructField("primary_place_of_performance_county", StringType()),
StructField("primary_place_of_performance_zip_code", StringType()),
StructField("cfda_number", StringType()),
StructField("cfda_title", StringType()),
StructField("product_or_service_code", StringType()),
StructField("product_or_service_code_description", StringType()),
StructField("naics_code", StringType()),
StructField("naics_description", StringType()),
StructField("national_interest_action_code", StringType()),
StructField("national_interest_action", StringType()),
StructField("reporting_agency_name", StringType()),
StructField("submission_period", StringType()),
StructField("allocation_transfer_agency_identifier_code", StringType()),
StructField("agency_identifier_code", StringType()),
StructField("beginning_period_of_availability", DateType()),
StructField("ending_period_of_availability", DateType()),
StructField("availability_type_code", StringType()),
StructField("main_account_code", StringType()),
StructField("sub_account_code", StringType()),
StructField("treasury_account_symbol", StringType()),
StructField("treasury_account_name", StringType()),
StructField("funding_toptier_agency_id", IntegerType()),
StructField("federal_account_id", IntegerType()),
StructField("budget_function", StringType()),
StructField("budget_function_code", StringType()),
StructField("budget_subfunction", StringType()),
StructField("budget_subfunction_code", StringType()),
StructField("transaction_obligated_amount", DecimalType(23, 2)),
StructField("gross_outlay_amount_fyb_to_period_end", DecimalType(23, 2)),
StructField("ussgl487200_downward_adj_prior_year_prepaid_undeliv_order_oblig", DecimalType(23, 2)),
StructField("ussgl497200_downward_adj_of_prior_year_paid_deliv_orders_oblig", DecimalType(23, 2)),
StructField("award_base_action_date_fiscal_year", IntegerType()),
StructField("award_latest_action_date_fiscal_year", IntegerType()),
StructField("award_type_code", StringType()),
StructField("award_type", StringType()),
StructField("prime_award_summary_recipient_cd_original", StringType()),
StructField("prime_award_summary_recipient_cd_current", StringType()),
StructField("recipient_zip_code", StringType()),
StructField("prime_award_summary_place_of_performance_cd_original", StringType()),
StructField("prime_award_summary_place_of_performance_cd_current", StringType()),
StructField("usaspending_permalink", StringType()),
StructField("last_modified_date", DateType()),
StructField("reporting_fiscal_period", IntegerType()),
StructField("reporting_fiscal_quarter", IntegerType()),
StructField("reporting_fiscal_year", IntegerType()),
StructField("quarter_format_flag", BooleanType()),
]
)

award_financial_download_create_sql_string = rf"""
CREATE OR REPLACE TABLE {{DESTINATION_TABLE}} (
{", ".join([f'{key} {val}' for key, val in AWARD_FINANCIAL_DOWNLOAD_DELTA_COLUMNS.items()])}
)
USING DELTA
LOCATION 's3a://{{SPARK_S3_BUCKET}}/{{DELTA_LAKE_S3_PATH}}/{{DESTINATION_DATABASE}}/{{DESTINATION_TABLE}}'
"""

award_financial_download_load_sql_string = rf"""
INSERT OVERWRITE {{DESTINATION_DATABASE}}.{{DESTINATION_TABLE}} (
{",".join(list(AWARD_FINANCIAL_DOWNLOAD_COLUMNS))}
{",".join(list([field.name for field in award_financial_schema]))}
)
SELECT
financial_accounts_by_awards.financial_accounts_by_awards_id,
Expand Down
Loading