Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion usaspending_api/download/delta_downloads/account_balances.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pyspark.sql import functions as sf, Column, DataFrame, SparkSession
from usaspending_api.config import CONFIG

from usaspending_api.common.spark.utils import collect_concat
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
Expand Down Expand Up @@ -28,7 +29,11 @@ class AccountBalancesMixin:

@property
def download_table(self) -> DataFrame:
return self.spark.table("rpt.account_balances_download")
# TODO: This should be reverted back after Spark downloads are migrated to EMR
# return self.spark.table("rpt.account_balances_download")
return self.spark.read.format("delta").load(
f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/account_balances_download"
)

def _build_dataframe(self) -> DataFrame:
return (
Expand Down
10 changes: 9 additions & 1 deletion usaspending_api/download/delta_downloads/award_financial.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pyspark.sql import functions as sf, Column, DataFrame, SparkSession
from usaspending_api.config import CONFIG

from usaspending_api.common.spark.utils import collect_concat, filter_submission_and_sum
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
Expand All @@ -23,7 +24,11 @@ class AwardFinancialMixin:

@property
def download_table(self) -> DataFrame:
return self.spark.table("rpt.award_financial_download")
# TODO: This should be reverted back after Spark downloads are migrated to EMR
# return self.spark.table("rpt.award_financial_download")
return self.spark.read.format("delta").load(
f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download"
)

@property
def non_zero_filters(self) -> Column:
Expand Down Expand Up @@ -152,6 +157,9 @@ def select_cols(self) -> list[Column]:
)

def _build_dataframe(self) -> DataFrame:
# TODO: Should handle the aggregate columns via a new name instead of relying on drops. If the Delta tables are
# referenced by their location then the ability to use the table identifier is lost as it doesn't
# appear to use the metastore for the Delta tables.
return (
self.download_table.filter(self.dynamic_filters)
.groupBy(self.group_by_cols)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql import functions as sf
from usaspending_api.config import CONFIG

from usaspending_api.common.spark.utils import collect_concat, filter_submission_and_sum
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
Expand Down Expand Up @@ -29,9 +30,16 @@ class ObjectClassProgramActivityMixin:

@property
def download_table(self) -> DataFrame:
return self.spark.table("rpt.object_class_program_activity_download")
# TODO: This should be reverted back after Spark downloads are migrated to EMR
# return self.spark.table("rpt.object_class_program_activity_download")
return self.spark.read.format("delta").load(
f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/object_class_program_activity_download"
)

def _build_dataframe(self) -> DataFrame:
# TODO: Should handle the aggregate columns via a new name instead of relying on drops. If the Delta tables are
# referenced by their location then the ability to use the table identifier is lost as it doesn't
# appear to use the metastore for the Delta tables.
return (
self.download_table.filter(
sf.col("submission_id").isin(
Expand Down Expand Up @@ -221,7 +229,7 @@ def group_by_cols(self) -> list[str]:
@property
def agg_cols(self) -> dict[str, callable]:
return {
"last_modified_date": lambda col: sf.max(col).alias(col),
"last_modified_date": lambda col: sf.max(col).alias(f"max_{col}"),
}

@property
Expand All @@ -230,7 +238,7 @@ def select_cols(self) -> list[Column]:
sf.col(col)
for col in query_paths["object_class_program_activity"]["treasury_account"].keys()
if not col.startswith("last_modified_date")
] + [sf.col("last_modified_date")]
] + [sf.col("max_last_modified_date").alias("last_modified_date")]


class ObjectClassProgramActivityDownloadFactory(AbstractAccountDownloadFactory):
Expand Down
232 changes: 114 additions & 118 deletions usaspending_api/download/delta_models/award_financial_download.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,122 @@
AWARD_FINANCIAL_DOWNLOAD_COLUMNS = {
"financial_accounts_by_awards_id": {"delta": "INTEGER NOT NULL", "postgres": "INTEGER NOT NULL"},
"submission_id": {"delta": "INTEGER NOT NULL", "postgres": "INTEGER NOT NULL"},
"federal_owning_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"treasury_owning_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"federal_account_symbol": {"delta": "STRING", "postgres": "TEXT"},
"federal_account_name": {"delta": "STRING", "postgres": "TEXT"},
"agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"},
"allocation_transfer_agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"},
"program_activity_code": {"delta": "STRING", "postgres": "TEXT"},
"program_activity_name": {"delta": "STRING", "postgres": "TEXT"},
"object_class_code": {"delta": "STRING", "postgres": "TEXT"},
"object_class_name": {"delta": "STRING", "postgres": "TEXT"},
"direct_or_reimbursable_funding_source": {"delta": "STRING", "postgres": "TEXT"},
"disaster_emergency_fund_code": {"delta": "STRING", "postgres": "TEXT"},
"disaster_emergency_fund_name": {"delta": "STRING", "postgres": "TEXT"},
"award_unique_key": {"delta": "STRING", "postgres": "TEXT"},
"award_id_piid": {"delta": "STRING", "postgres": "TEXT"},
"parent_award_id_piid": {"delta": "STRING", "postgres": "TEXT"},
"award_id_fain": {"delta": "STRING", "postgres": "TEXT"},
"award_id_uri": {"delta": "STRING", "postgres": "TEXT"},
"award_base_action_date": {"delta": "DATE", "postgres": "DATE"},
"award_latest_action_date": {"delta": "DATE", "postgres": "DATE"},
"period_of_performance_start_date": {"delta": "DATE", "postgres": "DATE"},
"period_of_performance_current_end_date": {"delta": "DATE", "postgres": "DATE"},
"ordering_period_end_date": {"delta": "DATE", "postgres": "DATE"},
"idv_type_code": {"delta": "STRING", "postgres": "TEXT"},
"idv_type": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_base_transaction_description": {"delta": "STRING", "postgres": "TEXT"},
"awarding_agency_code": {"delta": "STRING", "postgres": "TEXT"},
"awarding_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"awarding_subagency_code": {"delta": "STRING", "postgres": "TEXT"},
"awarding_subagency_name": {"delta": "STRING", "postgres": "TEXT"},
"awarding_office_code": {"delta": "STRING", "postgres": "TEXT"},
"awarding_office_name": {"delta": "STRING", "postgres": "TEXT"},
"funding_agency_code": {"delta": "STRING", "postgres": "TEXT"},
"funding_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"funding_sub_agency_code": {"delta": "STRING", "postgres": "TEXT"},
"funding_sub_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"funding_office_code": {"delta": "STRING", "postgres": "TEXT"},
"funding_office_name": {"delta": "STRING", "postgres": "TEXT"},
"recipient_uei": {"delta": "STRING", "postgres": "TEXT"},
"recipient_duns": {"delta": "STRING", "postgres": "TEXT"},
"recipient_name": {"delta": "STRING", "postgres": "TEXT"},
"recipient_name_raw": {"delta": "STRING", "postgres": "TEXT"},
"recipient_parent_uei": {"delta": "STRING", "postgres": "TEXT"},
"recipient_parent_duns": {"delta": "STRING", "postgres": "TEXT"},
"recipient_parent_name": {"delta": "STRING", "postgres": "TEXT"},
"recipient_parent_name_raw": {"delta": "STRING", "postgres": "TEXT"},
"recipient_country": {"delta": "STRING", "postgres": "TEXT"},
"recipient_state": {"delta": "STRING", "postgres": "TEXT"},
"recipient_county": {"delta": "STRING", "postgres": "TEXT"},
"recipient_city": {"delta": "STRING", "postgres": "TEXT"},
"primary_place_of_performance_country": {"delta": "STRING", "postgres": "TEXT"},
"primary_place_of_performance_state": {"delta": "STRING", "postgres": "TEXT"},
"primary_place_of_performance_county": {"delta": "STRING", "postgres": "TEXT"},
"primary_place_of_performance_zip_code": {"delta": "STRING", "postgres": "TEXT"},
"cfda_number": {"delta": "STRING", "postgres": "TEXT"},
"cfda_title": {"delta": "STRING", "postgres": "TEXT"},
"product_or_service_code": {"delta": "STRING", "postgres": "TEXT"},
"product_or_service_code_description": {"delta": "STRING", "postgres": "TEXT"},
"naics_code": {"delta": "STRING", "postgres": "TEXT"},
"naics_description": {"delta": "STRING", "postgres": "TEXT"},
"national_interest_action_code": {"delta": "STRING", "postgres": "TEXT"},
"national_interest_action": {"delta": "STRING", "postgres": "TEXT"},
"reporting_agency_name": {"delta": "STRING", "postgres": "TEXT"},
"submission_period": {"delta": "STRING", "postgres": "TEXT"},
"allocation_transfer_agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"},
"agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"},
"beginning_period_of_availability": {"delta": "DATE", "postgres": "DATE"},
"ending_period_of_availability": {"delta": "DATE", "postgres": "DATE"},
"availability_type_code": {"delta": "STRING", "postgres": "TEXT"},
"main_account_code": {"delta": "STRING", "postgres": "TEXT"},
"sub_account_code": {"delta": "STRING", "postgres": "TEXT"},
"treasury_account_symbol": {"delta": "STRING", "postgres": "TEXT"},
"treasury_account_name": {"delta": "STRING", "postgres": "TEXT"},
"funding_toptier_agency_id": {"delta": "INTEGER", "postgres": "INTEGER"},
"federal_account_id": {"delta": "INTEGER", "postgres": "INTEGER"},
"budget_function": {"delta": "STRING", "postgres": "TEXT"},
"budget_function_code": {"delta": "STRING", "postgres": "TEXT"},
"budget_subfunction": {"delta": "STRING", "postgres": "TEXT"},
"budget_subfunction_code": {"delta": "STRING", "postgres": "TEXT"},
"transaction_obligated_amount": {"delta": "NUMERIC(23,2)", "postgres": "NUMERIC(23,2)"},
"gross_outlay_amount_fyb_to_period_end": {"delta": "NUMERIC(23,2)", "postgres": "NUMERIC(23,2)"},
"ussgl487200_downward_adj_prior_year_prepaid_undeliv_order_oblig": {
"delta": "NUMERIC(23,2)",
"postgres": "NUMERIC(23,2)",
},
"ussgl497200_downward_adj_of_prior_year_paid_deliv_orders_oblig": {
"delta": "NUMERIC(23,2)",
"postgres": "NUMERIC(23,2)",
},
"award_base_action_date_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"},
"award_latest_action_date_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"},
"award_type_code": {"delta": "STRING", "postgres": "TEXT"},
"award_type": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_summary_recipient_cd_original": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_summary_recipient_cd_current": {"delta": "STRING", "postgres": "TEXT"},
"recipient_zip_code": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_summary_place_of_performance_cd_original": {"delta": "STRING", "postgres": "TEXT"},
"prime_award_summary_place_of_performance_cd_current": {"delta": "STRING", "postgres": "TEXT"},
"usaspending_permalink": {"delta": "STRING", "postgres": "TEXT"},
"last_modified_date": {"delta": "DATE", "postgres": "DATE"},
"reporting_fiscal_period": {"delta": "INTEGER", "postgres": "INTEGER"},
"reporting_fiscal_quarter": {"delta": "INTEGER", "postgres": "INTEGER"},
"reporting_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"},
"quarter_format_flag": {"delta": "BOOLEAN", "postgres": "BOOLEAN"},
}
from pyspark.sql.types import (
BooleanType,
DateType,
DecimalType,
IntegerType,
StringType,
StructField,
StructType,
)

AWARD_FINANCIAL_DOWNLOAD_DELTA_COLUMNS = {k: v["delta"] for k, v in AWARD_FINANCIAL_DOWNLOAD_COLUMNS.items()}
AWARD_FINANCIAL_DOWNLOAD_POSTGRES_COLUMNS = {k: v["postgres"] for k, v in AWARD_FINANCIAL_DOWNLOAD_COLUMNS.items()}
award_financial_schema = StructType(
[
StructField("financial_accounts_by_awards_id", IntegerType(), False),
StructField("submission_id", IntegerType(), False),
StructField("federal_owning_agency_name", StringType()),
StructField("treasury_owning_agency_name", StringType()),
StructField("federal_account_symbol", StringType()),
StructField("federal_account_name", StringType()),
StructField("agency_identifier_name", StringType()),
StructField("allocation_transfer_agency_identifier_name", StringType()),
StructField("program_activity_code", StringType()),
StructField("program_activity_name", StringType()),
StructField("object_class_code", StringType()),
StructField("object_class_name", StringType()),
StructField("direct_or_reimbursable_funding_source", StringType()),
StructField("disaster_emergency_fund_code", StringType()),
StructField("disaster_emergency_fund_name", StringType()),
StructField("award_unique_key", StringType()),
StructField("award_id_piid", StringType()),
StructField("parent_award_id_piid", StringType()),
StructField("award_id_fain", StringType()),
StructField("award_id_uri", StringType()),
StructField("award_base_action_date", DateType()),
StructField("award_latest_action_date", DateType()),
StructField("period_of_performance_start_date", DateType()),
StructField("period_of_performance_current_end_date", DateType()),
StructField("ordering_period_end_date", DateType()),
StructField("idv_type_code", StringType()),
StructField("idv_type", StringType()),
StructField("prime_award_base_transaction_description", StringType()),
StructField("awarding_agency_code", StringType()),
StructField("awarding_agency_name", StringType()),
StructField("awarding_subagency_code", StringType()),
StructField("awarding_subagency_name", StringType()),
StructField("awarding_office_code", StringType()),
StructField("awarding_office_name", StringType()),
StructField("funding_agency_code", StringType()),
StructField("funding_agency_name", StringType()),
StructField("funding_sub_agency_code", StringType()),
StructField("funding_sub_agency_name", StringType()),
StructField("funding_office_code", StringType()),
StructField("funding_office_name", StringType()),
StructField("recipient_uei", StringType()),
StructField("recipient_duns", StringType()),
StructField("recipient_name", StringType()),
StructField("recipient_name_raw", StringType()),
StructField("recipient_parent_uei", StringType()),
StructField("recipient_parent_duns", StringType()),
StructField("recipient_parent_name", StringType()),
StructField("recipient_parent_name_raw", StringType()),
StructField("recipient_country", StringType()),
StructField("recipient_state", StringType()),
StructField("recipient_county", StringType()),
StructField("recipient_city", StringType()),
StructField("primary_place_of_performance_country", StringType()),
StructField("primary_place_of_performance_state", StringType()),
StructField("primary_place_of_performance_county", StringType()),
StructField("primary_place_of_performance_zip_code", StringType()),
StructField("cfda_number", StringType()),
StructField("cfda_title", StringType()),
StructField("product_or_service_code", StringType()),
StructField("product_or_service_code_description", StringType()),
StructField("naics_code", StringType()),
StructField("naics_description", StringType()),
StructField("national_interest_action_code", StringType()),
StructField("national_interest_action", StringType()),
StructField("reporting_agency_name", StringType()),
StructField("submission_period", StringType()),
StructField("allocation_transfer_agency_identifier_code", StringType()),
StructField("agency_identifier_code", StringType()),
StructField("beginning_period_of_availability", DateType()),
StructField("ending_period_of_availability", DateType()),
StructField("availability_type_code", StringType()),
StructField("main_account_code", StringType()),
StructField("sub_account_code", StringType()),
StructField("treasury_account_symbol", StringType()),
StructField("treasury_account_name", StringType()),
StructField("funding_toptier_agency_id", IntegerType()),
StructField("federal_account_id", IntegerType()),
StructField("budget_function", StringType()),
StructField("budget_function_code", StringType()),
StructField("budget_subfunction", StringType()),
StructField("budget_subfunction_code", StringType()),
StructField("transaction_obligated_amount", DecimalType(23, 2)),
StructField("gross_outlay_amount_fyb_to_period_end", DecimalType(23, 2)),
StructField("ussgl487200_downward_adj_prior_year_prepaid_undeliv_order_oblig", DecimalType(23, 2)),
StructField("ussgl497200_downward_adj_of_prior_year_paid_deliv_orders_oblig", DecimalType(23, 2)),
StructField("award_base_action_date_fiscal_year", IntegerType()),
StructField("award_latest_action_date_fiscal_year", IntegerType()),
StructField("award_type_code", StringType()),
StructField("award_type", StringType()),
StructField("prime_award_summary_recipient_cd_original", StringType()),
StructField("prime_award_summary_recipient_cd_current", StringType()),
StructField("recipient_zip_code", StringType()),
StructField("prime_award_summary_place_of_performance_cd_original", StringType()),
StructField("prime_award_summary_place_of_performance_cd_current", StringType()),
StructField("usaspending_permalink", StringType()),
StructField("last_modified_date", DateType()),
StructField("reporting_fiscal_period", IntegerType()),
StructField("reporting_fiscal_quarter", IntegerType()),
StructField("reporting_fiscal_year", IntegerType()),
StructField("quarter_format_flag", BooleanType()),
]
)

award_financial_download_create_sql_string = rf"""
CREATE OR REPLACE TABLE {{DESTINATION_TABLE}} (
{", ".join([f'{key} {val}' for key, val in AWARD_FINANCIAL_DOWNLOAD_DELTA_COLUMNS.items()])}
)
USING DELTA
LOCATION 's3a://{{SPARK_S3_BUCKET}}/{{DELTA_LAKE_S3_PATH}}/{{DESTINATION_DATABASE}}/{{DESTINATION_TABLE}}'
"""

award_financial_download_load_sql_string = rf"""
INSERT OVERWRITE {{DESTINATION_DATABASE}}.{{DESTINATION_TABLE}} (
{",".join(list(AWARD_FINANCIAL_DOWNLOAD_COLUMNS))}
{",".join(list([field.name for field in award_financial_schema]))}
)
SELECT
financial_accounts_by_awards.financial_accounts_by_awards_id,
Expand Down
Loading