diff --git a/usaspending_api/download/delta_downloads/account_balances.py b/usaspending_api/download/delta_downloads/account_balances.py index c0193f144c..d1d50d0001 100644 --- a/usaspending_api/download/delta_downloads/account_balances.py +++ b/usaspending_api/download/delta_downloads/account_balances.py @@ -1,4 +1,5 @@ from pyspark.sql import functions as sf, Column, DataFrame, SparkSession +from usaspending_api.config import CONFIG from usaspending_api.common.spark.utils import collect_concat from usaspending_api.download.delta_downloads.abstract_downloads.account_download import ( @@ -28,7 +29,11 @@ class AccountBalancesMixin: @property def download_table(self) -> DataFrame: - return self.spark.table("rpt.account_balances_download") + # TODO: This should be reverted back after Spark downloads are migrated to EMR + # return self.spark.table("rpt.account_balances_download") + return self.spark.read.format("delta").load( + f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/account_balances_download" + ) def _build_dataframe(self) -> DataFrame: return ( diff --git a/usaspending_api/download/delta_downloads/award_financial.py b/usaspending_api/download/delta_downloads/award_financial.py index 6a08297569..307504623e 100644 --- a/usaspending_api/download/delta_downloads/award_financial.py +++ b/usaspending_api/download/delta_downloads/award_financial.py @@ -1,4 +1,5 @@ from pyspark.sql import functions as sf, Column, DataFrame, SparkSession +from usaspending_api.config import CONFIG from usaspending_api.common.spark.utils import collect_concat, filter_submission_and_sum from usaspending_api.download.delta_downloads.abstract_downloads.account_download import ( @@ -23,7 +24,11 @@ class AwardFinancialMixin: @property def download_table(self) -> DataFrame: - return self.spark.table("rpt.award_financial_download") + # TODO: This should be reverted back after Spark downloads are migrated to EMR + # return self.spark.table("rpt.award_financial_download") + return self.spark.read.format("delta").load( + f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download" + ) @property def non_zero_filters(self) -> Column: @@ -152,6 +157,9 @@ def select_cols(self) -> list[Column]: ) def _build_dataframe(self) -> DataFrame: + # TODO: Should handle the aggregate columns via a new name instead of relying on drops. If the Delta tables are + # referenced by their location then the ability to use the table identifier is lost as it doesn't + # appear to use the metastore for the Delta tables. return ( self.download_table.filter(self.dynamic_filters) .groupBy(self.group_by_cols) diff --git a/usaspending_api/download/delta_downloads/object_class_program_activity.py b/usaspending_api/download/delta_downloads/object_class_program_activity.py index e946af736f..b2e67f3279 100644 --- a/usaspending_api/download/delta_downloads/object_class_program_activity.py +++ b/usaspending_api/download/delta_downloads/object_class_program_activity.py @@ -1,5 +1,6 @@ from pyspark.sql import Column, DataFrame, SparkSession from pyspark.sql import functions as sf +from usaspending_api.config import CONFIG from usaspending_api.common.spark.utils import collect_concat, filter_submission_and_sum from usaspending_api.download.delta_downloads.abstract_downloads.account_download import ( @@ -29,9 +30,16 @@ class ObjectClassProgramActivityMixin: @property def download_table(self) -> DataFrame: - return self.spark.table("rpt.object_class_program_activity_download") + # TODO: This should be reverted back after Spark downloads are migrated to EMR + # return self.spark.table("rpt.object_class_program_activity_download") + return self.spark.read.format("delta").load( + f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/object_class_program_activity_download" + ) def _build_dataframe(self) -> DataFrame: + # TODO: Should handle the aggregate columns via a new name instead of relying on drops. If the Delta tables are + # referenced by their location then the ability to use the table identifier is lost as it doesn't + # appear to use the metastore for the Delta tables. return ( self.download_table.filter( sf.col("submission_id").isin( @@ -221,7 +229,7 @@ def group_by_cols(self) -> list[str]: @property def agg_cols(self) -> dict[str, callable]: return { - "last_modified_date": lambda col: sf.max(col).alias(col), + "last_modified_date": lambda col: sf.max(col).alias(f"max_{col}"), } @property @@ -230,7 +238,7 @@ def select_cols(self) -> list[Column]: sf.col(col) for col in query_paths["object_class_program_activity"]["treasury_account"].keys() if not col.startswith("last_modified_date") - ] + [sf.col("last_modified_date")] + ] + [sf.col("max_last_modified_date").alias("last_modified_date")] class ObjectClassProgramActivityDownloadFactory(AbstractAccountDownloadFactory): diff --git a/usaspending_api/download/delta_models/award_financial_download.py b/usaspending_api/download/delta_models/award_financial_download.py index 55802be714..70d4b5b9d0 100644 --- a/usaspending_api/download/delta_models/award_financial_download.py +++ b/usaspending_api/download/delta_models/award_financial_download.py @@ -1,126 +1,122 @@ -AWARD_FINANCIAL_DOWNLOAD_COLUMNS = { - "financial_accounts_by_awards_id": {"delta": "INTEGER NOT NULL", "postgres": "INTEGER NOT NULL"}, - "submission_id": {"delta": "INTEGER NOT NULL", "postgres": "INTEGER NOT NULL"}, - "federal_owning_agency_name": {"delta": "STRING", "postgres": "TEXT"}, - "treasury_owning_agency_name": {"delta": "STRING", "postgres": "TEXT"}, - "federal_account_symbol": {"delta": "STRING", "postgres": "TEXT"}, - "federal_account_name": {"delta": "STRING", "postgres": "TEXT"}, - "agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"}, - "allocation_transfer_agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"}, - "program_activity_code": {"delta": "STRING", "postgres": "TEXT"}, - "program_activity_name": {"delta": "STRING", "postgres": "TEXT"}, - "object_class_code": {"delta": "STRING", "postgres": "TEXT"}, - "object_class_name": {"delta": "STRING", "postgres": "TEXT"}, - "direct_or_reimbursable_funding_source": {"delta": "STRING", "postgres": "TEXT"}, - "disaster_emergency_fund_code": {"delta": "STRING", "postgres": "TEXT"}, - "disaster_emergency_fund_name": {"delta": "STRING", "postgres": "TEXT"}, - "award_unique_key": {"delta": "STRING", "postgres": "TEXT"}, - "award_id_piid": {"delta": "STRING", "postgres": "TEXT"}, - "parent_award_id_piid": {"delta": "STRING", "postgres": "TEXT"}, - "award_id_fain": {"delta": "STRING", "postgres": "TEXT"}, - "award_id_uri": {"delta": "STRING", "postgres": "TEXT"}, - "award_base_action_date": {"delta": "DATE", "postgres": "DATE"}, - "award_latest_action_date": {"delta": "DATE", "postgres": "DATE"}, - "period_of_performance_start_date": {"delta": "DATE", "postgres": "DATE"}, - "period_of_performance_current_end_date": {"delta": "DATE", "postgres": "DATE"}, - "ordering_period_end_date": {"delta": "DATE", "postgres": "DATE"}, - "idv_type_code": {"delta": "STRING", "postgres": "TEXT"}, - "idv_type": {"delta": "STRING", "postgres": "TEXT"}, - "prime_award_base_transaction_description": {"delta": "STRING", "postgres": "TEXT"}, - "awarding_agency_code": {"delta": "STRING", "postgres": "TEXT"}, - "awarding_agency_name": {"delta": "STRING", "postgres": "TEXT"}, - "awarding_subagency_code": {"delta": "STRING", "postgres": "TEXT"}, - "awarding_subagency_name": {"delta": "STRING", "postgres": "TEXT"}, - "awarding_office_code": {"delta": "STRING", "postgres": "TEXT"}, - "awarding_office_name": {"delta": "STRING", "postgres": "TEXT"}, - "funding_agency_code": {"delta": "STRING", "postgres": "TEXT"}, - "funding_agency_name": {"delta": "STRING", "postgres": "TEXT"}, - "funding_sub_agency_code": {"delta": "STRING", "postgres": "TEXT"}, - "funding_sub_agency_name": {"delta": "STRING", "postgres": "TEXT"}, - "funding_office_code": {"delta": "STRING", "postgres": "TEXT"}, - "funding_office_name": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_uei": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_duns": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_name": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_name_raw": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_parent_uei": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_parent_duns": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_parent_name": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_parent_name_raw": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_country": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_state": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_county": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_city": {"delta": "STRING", "postgres": "TEXT"}, - "primary_place_of_performance_country": {"delta": "STRING", "postgres": "TEXT"}, - "primary_place_of_performance_state": {"delta": "STRING", "postgres": "TEXT"}, - "primary_place_of_performance_county": {"delta": "STRING", "postgres": "TEXT"}, - "primary_place_of_performance_zip_code": {"delta": "STRING", "postgres": "TEXT"}, - "cfda_number": {"delta": "STRING", "postgres": "TEXT"}, - "cfda_title": {"delta": "STRING", "postgres": "TEXT"}, - "product_or_service_code": {"delta": "STRING", "postgres": "TEXT"}, - "product_or_service_code_description": {"delta": "STRING", "postgres": "TEXT"}, - "naics_code": {"delta": "STRING", "postgres": "TEXT"}, - "naics_description": {"delta": "STRING", "postgres": "TEXT"}, - "national_interest_action_code": {"delta": "STRING", "postgres": "TEXT"}, - "national_interest_action": {"delta": "STRING", "postgres": "TEXT"}, - "reporting_agency_name": {"delta": "STRING", "postgres": "TEXT"}, - "submission_period": {"delta": "STRING", "postgres": "TEXT"}, - "allocation_transfer_agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"}, - "agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"}, - "beginning_period_of_availability": {"delta": "DATE", "postgres": "DATE"}, - "ending_period_of_availability": {"delta": "DATE", "postgres": "DATE"}, - "availability_type_code": {"delta": "STRING", "postgres": "TEXT"}, - "main_account_code": {"delta": "STRING", "postgres": "TEXT"}, - "sub_account_code": {"delta": "STRING", "postgres": "TEXT"}, - "treasury_account_symbol": {"delta": "STRING", "postgres": "TEXT"}, - "treasury_account_name": {"delta": "STRING", "postgres": "TEXT"}, - "funding_toptier_agency_id": {"delta": "INTEGER", "postgres": "INTEGER"}, - "federal_account_id": {"delta": "INTEGER", "postgres": "INTEGER"}, - "budget_function": {"delta": "STRING", "postgres": "TEXT"}, - "budget_function_code": {"delta": "STRING", "postgres": "TEXT"}, - "budget_subfunction": {"delta": "STRING", "postgres": "TEXT"}, - "budget_subfunction_code": {"delta": "STRING", "postgres": "TEXT"}, - "transaction_obligated_amount": {"delta": "NUMERIC(23,2)", "postgres": "NUMERIC(23,2)"}, - "gross_outlay_amount_fyb_to_period_end": {"delta": "NUMERIC(23,2)", "postgres": "NUMERIC(23,2)"}, - "ussgl487200_downward_adj_prior_year_prepaid_undeliv_order_oblig": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "ussgl497200_downward_adj_of_prior_year_paid_deliv_orders_oblig": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "award_base_action_date_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"}, - "award_latest_action_date_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"}, - "award_type_code": {"delta": "STRING", "postgres": "TEXT"}, - "award_type": {"delta": "STRING", "postgres": "TEXT"}, - "prime_award_summary_recipient_cd_original": {"delta": "STRING", "postgres": "TEXT"}, - "prime_award_summary_recipient_cd_current": {"delta": "STRING", "postgres": "TEXT"}, - "recipient_zip_code": {"delta": "STRING", "postgres": "TEXT"}, - "prime_award_summary_place_of_performance_cd_original": {"delta": "STRING", "postgres": "TEXT"}, - "prime_award_summary_place_of_performance_cd_current": {"delta": "STRING", "postgres": "TEXT"}, - "usaspending_permalink": {"delta": "STRING", "postgres": "TEXT"}, - "last_modified_date": {"delta": "DATE", "postgres": "DATE"}, - "reporting_fiscal_period": {"delta": "INTEGER", "postgres": "INTEGER"}, - "reporting_fiscal_quarter": {"delta": "INTEGER", "postgres": "INTEGER"}, - "reporting_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"}, - "quarter_format_flag": {"delta": "BOOLEAN", "postgres": "BOOLEAN"}, -} +from pyspark.sql.types import ( + BooleanType, + DateType, + DecimalType, + IntegerType, + StringType, + StructField, + StructType, +) -AWARD_FINANCIAL_DOWNLOAD_DELTA_COLUMNS = {k: v["delta"] for k, v in AWARD_FINANCIAL_DOWNLOAD_COLUMNS.items()} -AWARD_FINANCIAL_DOWNLOAD_POSTGRES_COLUMNS = {k: v["postgres"] for k, v in AWARD_FINANCIAL_DOWNLOAD_COLUMNS.items()} +award_financial_schema = StructType( + [ + StructField("financial_accounts_by_awards_id", IntegerType(), False), + StructField("submission_id", IntegerType(), False), + StructField("federal_owning_agency_name", StringType()), + StructField("treasury_owning_agency_name", StringType()), + StructField("federal_account_symbol", StringType()), + StructField("federal_account_name", StringType()), + StructField("agency_identifier_name", StringType()), + StructField("allocation_transfer_agency_identifier_name", StringType()), + StructField("program_activity_code", StringType()), + StructField("program_activity_name", StringType()), + StructField("object_class_code", StringType()), + StructField("object_class_name", StringType()), + StructField("direct_or_reimbursable_funding_source", StringType()), + StructField("disaster_emergency_fund_code", StringType()), + StructField("disaster_emergency_fund_name", StringType()), + StructField("award_unique_key", StringType()), + StructField("award_id_piid", StringType()), + StructField("parent_award_id_piid", StringType()), + StructField("award_id_fain", StringType()), + StructField("award_id_uri", StringType()), + StructField("award_base_action_date", DateType()), + StructField("award_latest_action_date", DateType()), + StructField("period_of_performance_start_date", DateType()), + StructField("period_of_performance_current_end_date", DateType()), + StructField("ordering_period_end_date", DateType()), + StructField("idv_type_code", StringType()), + StructField("idv_type", StringType()), + StructField("prime_award_base_transaction_description", StringType()), + StructField("awarding_agency_code", StringType()), + StructField("awarding_agency_name", StringType()), + StructField("awarding_subagency_code", StringType()), + StructField("awarding_subagency_name", StringType()), + StructField("awarding_office_code", StringType()), + StructField("awarding_office_name", StringType()), + StructField("funding_agency_code", StringType()), + StructField("funding_agency_name", StringType()), + StructField("funding_sub_agency_code", StringType()), + StructField("funding_sub_agency_name", StringType()), + StructField("funding_office_code", StringType()), + StructField("funding_office_name", StringType()), + StructField("recipient_uei", StringType()), + StructField("recipient_duns", StringType()), + StructField("recipient_name", StringType()), + StructField("recipient_name_raw", StringType()), + StructField("recipient_parent_uei", StringType()), + StructField("recipient_parent_duns", StringType()), + StructField("recipient_parent_name", StringType()), + StructField("recipient_parent_name_raw", StringType()), + StructField("recipient_country", StringType()), + StructField("recipient_state", StringType()), + StructField("recipient_county", StringType()), + StructField("recipient_city", StringType()), + StructField("primary_place_of_performance_country", StringType()), + StructField("primary_place_of_performance_state", StringType()), + StructField("primary_place_of_performance_county", StringType()), + StructField("primary_place_of_performance_zip_code", StringType()), + StructField("cfda_number", StringType()), + StructField("cfda_title", StringType()), + StructField("product_or_service_code", StringType()), + StructField("product_or_service_code_description", StringType()), + StructField("naics_code", StringType()), + StructField("naics_description", StringType()), + StructField("national_interest_action_code", StringType()), + StructField("national_interest_action", StringType()), + StructField("reporting_agency_name", StringType()), + StructField("submission_period", StringType()), + StructField("allocation_transfer_agency_identifier_code", StringType()), + StructField("agency_identifier_code", StringType()), + StructField("beginning_period_of_availability", DateType()), + StructField("ending_period_of_availability", DateType()), + StructField("availability_type_code", StringType()), + StructField("main_account_code", StringType()), + StructField("sub_account_code", StringType()), + StructField("treasury_account_symbol", StringType()), + StructField("treasury_account_name", StringType()), + StructField("funding_toptier_agency_id", IntegerType()), + StructField("federal_account_id", IntegerType()), + StructField("budget_function", StringType()), + StructField("budget_function_code", StringType()), + StructField("budget_subfunction", StringType()), + StructField("budget_subfunction_code", StringType()), + StructField("transaction_obligated_amount", DecimalType(23, 2)), + StructField("gross_outlay_amount_fyb_to_period_end", DecimalType(23, 2)), + StructField("ussgl487200_downward_adj_prior_year_prepaid_undeliv_order_oblig", DecimalType(23, 2)), + StructField("ussgl497200_downward_adj_of_prior_year_paid_deliv_orders_oblig", DecimalType(23, 2)), + StructField("award_base_action_date_fiscal_year", IntegerType()), + StructField("award_latest_action_date_fiscal_year", IntegerType()), + StructField("award_type_code", StringType()), + StructField("award_type", StringType()), + StructField("prime_award_summary_recipient_cd_original", StringType()), + StructField("prime_award_summary_recipient_cd_current", StringType()), + StructField("recipient_zip_code", StringType()), + StructField("prime_award_summary_place_of_performance_cd_original", StringType()), + StructField("prime_award_summary_place_of_performance_cd_current", StringType()), + StructField("usaspending_permalink", StringType()), + StructField("last_modified_date", DateType()), + StructField("reporting_fiscal_period", IntegerType()), + StructField("reporting_fiscal_quarter", IntegerType()), + StructField("reporting_fiscal_year", IntegerType()), + StructField("quarter_format_flag", BooleanType()), + ] +) -award_financial_download_create_sql_string = rf""" - CREATE OR REPLACE TABLE {{DESTINATION_TABLE}} ( - {", ".join([f'{key} {val}' for key, val in AWARD_FINANCIAL_DOWNLOAD_DELTA_COLUMNS.items()])} - ) - USING DELTA - LOCATION 's3a://{{SPARK_S3_BUCKET}}/{{DELTA_LAKE_S3_PATH}}/{{DESTINATION_DATABASE}}/{{DESTINATION_TABLE}}' - """ award_financial_download_load_sql_string = rf""" INSERT OVERWRITE {{DESTINATION_DATABASE}}.{{DESTINATION_TABLE}} ( - {",".join(list(AWARD_FINANCIAL_DOWNLOAD_COLUMNS))} + {",".join(list([field.name for field in award_financial_schema]))} ) SELECT financial_accounts_by_awards.financial_accounts_by_awards_id, diff --git a/usaspending_api/download/delta_models/object_class_program_activity_download.py b/usaspending_api/download/delta_models/object_class_program_activity_download.py index 83368d5fb2..d078e6eb35 100644 --- a/usaspending_api/download/delta_models/object_class_program_activity_download.py +++ b/usaspending_api/download/delta_models/object_class_program_activity_download.py @@ -1,217 +1,104 @@ -OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_COLUMNS = { - "financial_accounts_by_program_activity_object_class_id": { - "delta": "INTEGER NOT NULL", - "postgres": "INTEGER NOT NULL", - }, - "submission_id": {"delta": "INTEGER NOT NULL", "postgres": "INTEGER NOT NULL"}, - "owning_agency_name": {"delta": "STRING", "postgres": "TEXT"}, - "federal_account_symbol": {"delta": "STRING", "postgres": "TEXT"}, - "federal_account_name": {"delta": "STRING", "postgres": "TEXT"}, - "agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"}, - "allocation_transfer_agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"}, - "program_activity_code": {"delta": "STRING", "postgres": "TEXT"}, - "program_activity_name": {"delta": "STRING", "postgres": "TEXT"}, - "object_class_code": {"delta": "STRING", "postgres": "TEXT"}, - "object_class_name": {"delta": "STRING", "postgres": "TEXT"}, - "direct_or_reimbursable_funding_source": {"delta": "STRING", "postgres": "TEXT"}, - "disaster_emergency_fund_code": {"delta": "STRING", "postgres": "TEXT"}, - "disaster_emergency_fund_name": {"delta": "STRING", "postgres": "TEXT"}, - "funding_toptier_agency_id": {"delta": "INTEGER", "postgres": "INTEGER"}, - "federal_account_id": {"delta": "INTEGER", "postgres": "INTEGER"}, - "budget_function": {"delta": "STRING", "postgres": "TEXT"}, - "budget_function_code": {"delta": "STRING", "postgres": "TEXT"}, - "budget_subfunction": {"delta": "STRING", "postgres": "TEXT"}, - "budget_subfunction_code": {"delta": "STRING", "postgres": "TEXT"}, - "reporting_agency_name": {"delta": "STRING", "postgres": "TEXT"}, - "reporting_fiscal_period": {"delta": "INTEGER", "postgres": "INTEGER"}, - "reporting_fiscal_quarter": {"delta": "INTEGER", "postgres": "INTEGER"}, - "reporting_fiscal_year": {"delta": "INTEGER", "postgres": "INTEGER"}, - "quarter_format_flag": {"delta": "BOOLEAN", "postgres": "BOOLEAN"}, - "submission_period": {"delta": "STRING", "postgres": "TEXT"}, - "USSGL480100_undelivered_orders_obligations_unpaid_FYB": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL480100_undelivered_orders_obligations_unpaid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL483100_undelivered_orders_obligations_transferred_unpaid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL488100_upward_adj_prior_year_undeliv_orders_oblig_unpaid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL490100_delivered_orders_obligations_unpaid_FYB": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL490100_delivered_orders_obligations_unpaid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL493100_delivered_orders_obligations_transferred_unpaid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL498100_upward_adj_of_prior_year_deliv_orders_oblig_unpaid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL480200_undelivered_orders_obligations_prepaid_advanced_FYB": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL480200_undelivered_orders_obligations_prepaid_advanced": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL483200_undeliv_orders_oblig_transferred_prepaid_advanced": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL488200_upward_adj_prior_year_undeliv_orders_oblig_prepaid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL490200_delivered_orders_obligations_paid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL490800_authority_outlayed_not_yet_disbursed_FYB": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL490800_authority_outlayed_not_yet_disbursed": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL498200_upward_adj_of_prior_year_deliv_orders_oblig_paid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "obligations_undelivered_orders_unpaid_total_FYB": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "obligations_undelivered_orders_unpaid_total": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "obligations_delivered_orders_unpaid_total": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "obligations_delivered_orders_unpaid_total_FYB": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "gross_outlays_undelivered_orders_prepaid_total": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "gross_outlays_undelivered_orders_prepaid_total_fyb": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "gross_outlays_delivered_orders_paid_total_FYB": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "gross_outlays_delivered_orders_paid_total": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "gross_outlay_amount_FYB": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "gross_outlay_amount_FYB_to_period_end": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "obligations_incurred": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL487100_downward_adj_prior_year_unpaid_undeliv_orders_oblig": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL497100_downward_adj_prior_year_unpaid_deliv_orders_oblig": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL487200_downward_adj_prior_year_prepaid_undeliv_order_oblig": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "USSGL497200_downward_adj_of_prior_year_paid_deliv_orders_oblig": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "deobligations_or_recoveries_or_refunds_from_prior_year": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "last_modified_date": {"delta": "DATE", "postgres": "DATE"}, - "data_source": {"delta": "STRING", "postgres": "TEXT"}, - "program_activity_id": {"delta": "INTEGER", "postgres": "INTEGER"}, - "object_class_id": {"delta": "INTEGER", "postgres": "INTEGER"}, - "prior_year_adjustment": {"delta": "STRING", "postgres": "TEXT"}, - "ussgl480110_rein_undel_ord_cpe": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "ussgl490110_rein_deliv_ord_cpe": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "drv_obligations_incurred_by_program_object_class": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "drv_obligations_undelivered_orders_unpaid": { - "delta": "NUMERIC(23,2)", - "postgres": "NUMERIC(23,2)", - }, - "reporting_period_start": {"delta": "DATE", "postgres": "DATE"}, - "reporting_period_end": {"delta": "DATE", "postgres": "DATE"}, - "certified_date": {"delta": "DATE", "postgres": "DATE"}, - "create_date": {"delta": "DATE", "postgres": "DATE"}, - "update_date": {"delta": "DATE", "postgres": "DATE"}, - "treasury_account_id": {"delta": "INTEGER", "postgres": "INTEGER"}, - "allocation_transfer_agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"}, - "agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"}, - "beginning_period_of_availability": {"delta": "STRING", "postgres": "TEXT"}, - "ending_period_of_availability": {"delta": "STRING", "postgres": "TEXT"}, - "availability_type_code": {"delta": "STRING", "postgres": "TEXT"}, - "main_account_code": {"delta": "STRING", "postgres": "TEXT"}, - "sub_account_code": {"delta": "STRING", "postgres": "TEXT"}, - "treasury_account_symbol": {"delta": "STRING", "postgres": "TEXT"}, - "treasury_account_name": {"delta": "STRING", "postgres": "TEXT"}, -} +from pyspark.sql.types import ( + BooleanType, + DateType, + DecimalType, + IntegerType, + StringType, + StructField, + StructType, +) -OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_DELTA_COLUMNS = { - k: v["delta"] for k, v in OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_COLUMNS.items() -} -OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_POSTGRES_COLUMNS = { - k: v["postgres"] for k, v in OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_COLUMNS.items() -} +object_class_program_activity_schema = StructType( + [ + StructField("financial_accounts_by_program_activity_object_class_id", IntegerType(), False), + StructField("submission_id", IntegerType(), False), + StructField("owning_agency_name", StringType()), + StructField("federal_account_symbol", StringType()), + StructField("federal_account_name", StringType()), + StructField("agency_identifier_name", StringType()), + StructField("allocation_transfer_agency_identifier_name", StringType()), + StructField("program_activity_code", StringType()), + StructField("program_activity_name", StringType()), + StructField("object_class_code", StringType()), + StructField("object_class_name", StringType()), + StructField("direct_or_reimbursable_funding_source", StringType()), + StructField("disaster_emergency_fund_code", StringType()), + StructField("disaster_emergency_fund_name", StringType()), + StructField("funding_toptier_agency_id", IntegerType()), + StructField("federal_account_id", IntegerType()), + StructField("budget_function", StringType()), + StructField("budget_function_code", StringType()), + StructField("budget_subfunction", StringType()), + StructField("budget_subfunction_code", StringType()), + StructField("reporting_agency_name", StringType()), + StructField("reporting_fiscal_period", IntegerType()), + StructField("reporting_fiscal_quarter", IntegerType()), + StructField("reporting_fiscal_year", IntegerType()), + StructField("quarter_format_flag", BooleanType()), + StructField("submission_period", StringType()), + StructField("USSGL480100_undelivered_orders_obligations_unpaid_FYB", DecimalType(23, 2)), + StructField("USSGL480100_undelivered_orders_obligations_unpaid", DecimalType(23, 2)), + StructField("USSGL483100_undelivered_orders_obligations_transferred_unpaid", DecimalType(23, 2)), + StructField("USSGL488100_upward_adj_prior_year_undeliv_orders_oblig_unpaid", DecimalType(23, 2)), + StructField("USSGL490100_delivered_orders_obligations_unpaid_FYB", DecimalType(23, 2)), + StructField("USSGL490100_delivered_orders_obligations_unpaid", DecimalType(23, 2)), + StructField("USSGL493100_delivered_orders_obligations_transferred_unpaid", DecimalType(23, 2)), + StructField("USSGL498100_upward_adj_of_prior_year_deliv_orders_oblig_unpaid", DecimalType(23, 2)), + StructField("USSGL480200_undelivered_orders_obligations_prepaid_advanced_FYB", DecimalType(23, 2)), + StructField("USSGL480200_undelivered_orders_obligations_prepaid_advanced", DecimalType(23, 2)), + StructField("USSGL483200_undeliv_orders_oblig_transferred_prepaid_advanced", DecimalType(23, 2)), + StructField("USSGL488200_upward_adj_prior_year_undeliv_orders_oblig_prepaid", DecimalType(23, 2)), + StructField("USSGL490200_delivered_orders_obligations_paid", DecimalType(23, 2)), + StructField("USSGL490800_authority_outlayed_not_yet_disbursed_FYB", DecimalType(23, 2)), + StructField("USSGL490800_authority_outlayed_not_yet_disbursed", DecimalType(23, 2)), + StructField("USSGL498200_upward_adj_of_prior_year_deliv_orders_oblig_paid", DecimalType(23, 2)), + StructField("obligations_undelivered_orders_unpaid_total_FYB", DecimalType(23, 2)), + StructField("obligations_undelivered_orders_unpaid_total", DecimalType(23, 2)), + StructField("obligations_delivered_orders_unpaid_total", DecimalType(23, 2)), + StructField("obligations_delivered_orders_unpaid_total_FYB", DecimalType(23, 2)), + StructField("gross_outlays_undelivered_orders_prepaid_total", DecimalType(23, 2)), + StructField("gross_outlays_undelivered_orders_prepaid_total_fyb", DecimalType(23, 2)), + StructField("gross_outlays_delivered_orders_paid_total_FYB", DecimalType(23, 2)), + StructField("gross_outlays_delivered_orders_paid_total", DecimalType(23, 2)), + StructField("gross_outlay_amount_FYB", DecimalType(23, 2)), + StructField("gross_outlay_amount_FYB_to_period_end", DecimalType(23, 2)), + StructField("obligations_incurred", DecimalType(23, 2)), + StructField("USSGL487100_downward_adj_prior_year_unpaid_undeliv_orders_oblig", DecimalType(23, 2)), + StructField("USSGL497100_downward_adj_prior_year_unpaid_deliv_orders_oblig", DecimalType(23, 2)), + StructField("USSGL487200_downward_adj_prior_year_prepaid_undeliv_order_oblig", DecimalType(23, 2)), + StructField("USSGL497200_downward_adj_of_prior_year_paid_deliv_orders_oblig", DecimalType(23, 2)), + StructField("deobligations_or_recoveries_or_refunds_from_prior_year", DecimalType(23, 2)), + StructField("last_modified_date", DateType()), + StructField("data_source", StringType()), + StructField("program_activity_id", IntegerType()), + StructField("object_class_id", IntegerType()), + StructField("prior_year_adjustment", StringType()), + StructField("ussgl480110_rein_undel_ord_cpe", DecimalType(23, 2)), + StructField("ussgl490110_rein_deliv_ord_cpe", DecimalType(23, 2)), + StructField("drv_obligations_incurred_by_program_object_class", DecimalType(23, 2)), + StructField("drv_obligations_undelivered_orders_unpaid", DecimalType(23, 2)), + StructField("reporting_period_start", DateType()), + StructField("reporting_period_end", DateType()), + StructField("certified_date", DateType()), + StructField("create_date", DateType()), + StructField("update_date", DateType()), + StructField("treasury_account_id", IntegerType()), + StructField("allocation_transfer_agency_identifier_code", StringType()), + StructField("agency_identifier_code", StringType()), + StructField("beginning_period_of_availability", StringType()), + StructField("ending_period_of_availability", StringType()), + StructField("availability_type_code", StringType()), + StructField("main_account_code", StringType()), + StructField("sub_account_code", StringType()), + StructField("treasury_account_symbol", StringType()), + StructField("treasury_account_name", StringType()), + ] +) -object_class_program_activity_download_create_sql_string = rf""" - CREATE OR REPLACE TABLE {{DESTINATION_TABLE}} ( - {", ".join([f"{key} {val}" for key, val in OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_DELTA_COLUMNS.items()])} - ) - USING DELTA - LOCATION 's3a://{{SPARK_S3_BUCKET}}/{{DELTA_LAKE_S3_PATH}}/{{DESTINATION_DATABASE}}/{{DESTINATION_TABLE}}' - """ object_class_program_activity_download_load_sql_string = rf""" INSERT OVERWRITE {{DESTINATION_DATABASE}}.{{DESTINATION_TABLE}} ( - {",".join(list(OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_COLUMNS))} + {",".join(list([field.name for field in object_class_program_activity_schema]))} ) SELECT fabpaoc.financial_accounts_by_program_activity_object_class_id, diff --git a/usaspending_api/download/tests/integration/test_account_download_factories.py b/usaspending_api/download/tests/integration/test_account_download_factories.py index 21d8487eae..dca0ef621c 100644 --- a/usaspending_api/download/tests/integration/test_account_download_factories.py +++ b/usaspending_api/download/tests/integration/test_account_download_factories.py @@ -1,9 +1,12 @@ +from decimal import Decimal from unittest.mock import patch +import numpy as np import pandas as pd import pytest from django.core.management import call_command from model_bakery import baker +from usaspending_api.config import CONFIG from usaspending_api.common.etl.spark import create_ref_temp_views from usaspending_api.download.delta_downloads.account_balances import AccountBalancesDownloadFactory @@ -13,51 +16,35 @@ ObjectClassProgramActivityDownloadFactory, ) from usaspending_api.download.delta_models.account_balances_download import account_balances_schema -from usaspending_api.download.v2.download_column_historical_lookups import query_paths +from usaspending_api.download.delta_models.award_financial_download import award_financial_schema +from usaspending_api.download.delta_models.object_class_program_activity_download import ( + object_class_program_activity_schema, +) @pytest.fixture(scope="function") -def award_financial_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db): +def award_financial_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db, monkeypatch): call_command( "create_delta_table", "--destination-table=award_financial_download", f"--spark-s3-bucket={s3_unittest_data_bucket}", ) - columns = list( - set( - [ - col - for col in query_paths["award_financial"]["federal_account"].keys() - if col != "owning_agency_name" and not col.startswith("last_modified_date") - ] - + [ - col - for col in query_paths["award_financial"]["treasury_account"].keys() - if col != "owning_agency_name" and not col.startswith("last_modified_date") - ] - + [ - "federal_owning_agency_name", - "treasury_owning_agency_name", - "last_modified_date", - "reporting_fiscal_year", - "reporting_fiscal_quarter", - "reporting_fiscal_period", - "quarter_format_flag", - "submission_id", - "federal_account_id", - "funding_toptier_agency_id", - "budget_function_code", - "budget_subfunction_code", - ] - ) + monkeypatch.setattr( + f"usaspending_api.download.delta_downloads.award_financial.AwardFinancialMixin.download_table", + spark.read.format("delta").load( + f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download" + ), ) + column_placeholders = {field.name: [None] * 5 for field in award_financial_schema} test_data_df = pd.DataFrame( data={ + **column_placeholders, + "financial_accounts_by_awards_id": [1, 2, 3, 4, 5], "reporting_fiscal_year": [2018, 2018, 2018, 2018, 2019], "quarter_format_flag": [True, True, False, True, True], "reporting_fiscal_quarter": [1, 2, None, 4, 2], "reporting_fiscal_period": [None, None, 5, None, None], - "transaction_obligated_amount": [100, 100, 100, 100, 100], + "transaction_obligated_amount": [Decimal(100).quantize(Decimal(".01"))] * 5, "submission_id": [1, 2, 3, 4, 5], "federal_owning_agency_name": ["test1", "test2", "test2", "test2", "test3"], "reporting_agency_name": ["A", "B", "C", "D", "E"], @@ -65,32 +52,44 @@ def award_financial_table(spark, s3_unittest_data_bucket, hive_unittest_metastor "budget_function_code": ["A100", "B100", "C100", "D100", "E100"], "budget_subfunction": ["A", "B", "C", "D", "E"], "budget_subfunction_code": ["A200", "B200", "C200", "D200", "E200"], - "gross_outlay_amount_FYB_to_period_end": [100, 100, 100, 100, 100], + "gross_outlay_amount_fyb_to_period_end": [Decimal(100).quantize(Decimal(".01"))] * 5, "funding_toptier_agency_id": [1, 2, 2, 2, 3], "federal_account_id": [1, 2, 2, 2, 3], "disaster_emergency_fund_code": ["L", "L", "L", "L", "L"], - }, - columns=columns, - ).fillna("dummy_text") + } + ) + # Some data manipulation for matching the Spark schema + test_data_df[["reporting_fiscal_quarter", "reporting_fiscal_period"]] = test_data_df[ + ["reporting_fiscal_quarter", "reporting_fiscal_period"] + ].astype(pd.Int8Dtype()) + test_data_df = test_data_df.replace([np.nan], [None]) + ( - spark.createDataFrame(test_data_df) + spark.createDataFrame(test_data_df, schema=award_financial_schema) .write.format("delta") .mode("overwrite") - .option("overwriteSchema", "true") .saveAsTable("rpt.award_financial_download") ) yield @pytest.fixture(scope="function") -def account_balances_download_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db): +def account_balances_download_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db, monkeypatch): call_command( "create_delta_table", "--destination-table=account_balances_download", f"--spark-s3-bucket={s3_unittest_data_bucket}", ) + monkeypatch.setattr( + f"usaspending_api.download.delta_downloads.account_balances.AccountBalancesMixin.download_table", + spark.read.format("delta").load( + f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/account_balances_download" + ), + ) + column_placeholders = {field.name: [None] * 5 for field in account_balances_schema} test_data_df = pd.DataFrame( data={ + **column_placeholders, "reporting_fiscal_year": [2018, 2018, 2018, 2018, 2019], "quarter_format_flag": [True, True, False, True, True], "reporting_fiscal_quarter": [1, 2, None, 4, 2], @@ -102,72 +101,45 @@ def account_balances_download_table(spark, s3_unittest_data_bucket, hive_unittes "budget_function_code": ["A100", "B100", "C100", "D100", "E100"], "budget_subfunction": ["A", "B", "C", "D", "E"], "budget_subfunction_code": ["A200", "B200", "C200", "D200", "E200"], - "gross_outlay_amount": [100, 100, 100, 100, 100], + "gross_outlay_amount": [Decimal(100).quantize(Decimal(".01"))] * 5, "funding_toptier_agency_id": [1, 2, 2, 2, 3], "federal_account_id": [1, 2, 2, 2, 3], - }, - columns=[field.name for field in account_balances_schema], - ).fillna("dummy_text") + } + ) + # Some data manipulation for matching the Spark schema + test_data_df[["reporting_fiscal_quarter", "reporting_fiscal_period"]] = test_data_df[ + ["reporting_fiscal_quarter", "reporting_fiscal_period"] + ].astype(pd.Int8Dtype()) + test_data_df = test_data_df.replace([np.nan], [None]) + ( - spark.createDataFrame(test_data_df) + spark.createDataFrame(test_data_df, schema=account_balances_schema) .write.format("delta") .mode("overwrite") - .option("overwriteSchema", "true") .saveAsTable("rpt.account_balances_download") ) yield @pytest.fixture(scope="function") -def object_class_by_program_activity_download_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db): +def object_class_by_program_activity_download_table( + spark, s3_unittest_data_bucket, hive_unittest_metastore_db, monkeypatch +): call_command( "create_delta_table", "--destination-table=object_class_program_activity_download", f"--spark-s3-bucket={s3_unittest_data_bucket}", ) - columns = list( - set( - [ - col - for col in query_paths["object_class_program_activity"]["federal_account"].keys() - if not col.startswith("last_modified_date") - ] - + [ - col - for col in query_paths["object_class_program_activity"]["treasury_account"].keys() - if not col.startswith("last_modified_date") - ] - + [ - "last_modified_date", - "reporting_fiscal_year", - "reporting_fiscal_quarter", - "reporting_fiscal_period", - "quarter_format_flag", - "submission_id", - "federal_account_id", - "funding_toptier_agency_id", - "budget_function_code", - "budget_subfunction_code", - "data_source", - "financial_accounts_by_program_activity_object_class_id", - "update_date", - "object_class_id", - "drv_obligations_incurred_by_program_object_class", - "prior_year_adjustment", - "drv_obligations_undelivered_orders_unpaid", - "USSGL490110_rein_deliv_ord_CPE", - "program_activity_id", - "USSGL480110_rein_undel_ord_CPE", - "treasury_account_id", - "create_date", - "reporting_period_end", - "reporting_period_start", - "certified_date", - ] - ) + monkeypatch.setattr( + f"usaspending_api.download.delta_downloads.object_class_program_activity.ObjectClassProgramActivityMixin.download_table", + spark.read.format("delta").load( + f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/object_class_program_activity_download" + ), ) + column_placeholders = {field.name: [None] * 5 for field in object_class_program_activity_schema} test_data_df = pd.DataFrame( data={ + **column_placeholders, "financial_accounts_by_program_activity_object_class_id": [1, 2, 3, 4, 5], "submission_id": [1, 2, 3, 4, 5], "owning_agency_name": ["test1", "test2", "test2", "test2", "test3"], @@ -192,18 +164,22 @@ def object_class_by_program_activity_download_table(spark, s3_unittest_data_buck "BudgetSubFunction5", ], "budget_subfunction_code": ["SF01", "SF02", "SF03", "SF04", "SF05"], - "gross_outlay_amount_FYB_to_period_end": [100, 100, 100, 100, 100], + "gross_outlay_amount_FYB_to_period_end": [Decimal(100).quantize(Decimal(".01"))] * 5, "funding_toptier_agency_id": [1, 2, 2, 2, 3], "federal_account_id": [1, 2, 2, 2, 3], "disaster_emergency_fund_code": ["L", "L", "L", "L", "L"], }, - columns=columns, - ).fillna("dummy_text") + ) + # Some data manipulation for matching the Spark schema + test_data_df[["reporting_fiscal_quarter", "reporting_fiscal_period"]] = test_data_df[ + ["reporting_fiscal_quarter", "reporting_fiscal_period"] + ].astype(pd.Int8Dtype()) + test_data_df = test_data_df.replace([np.nan], [None]) + ( - spark.createDataFrame(test_data_df) + spark.createDataFrame(test_data_df, schema=object_class_program_activity_schema) .write.format("delta") .mode("overwrite") - .option("overwriteSchema", "true") .saveAsTable("rpt.object_class_program_activity_download") ) yield diff --git a/usaspending_api/download/tests/integration/test_download_accounts.py b/usaspending_api/download/tests/integration/test_download_accounts.py index 40413593c8..f287538cfa 100644 --- a/usaspending_api/download/tests/integration/test_download_accounts.py +++ b/usaspending_api/download/tests/integration/test_download_accounts.py @@ -10,6 +10,7 @@ from django.core.management import call_command from model_bakery import baker from rest_framework import status +from usaspending_api.config import CONFIG from usaspending_api.accounts.models import FederalAccount, TreasuryAppropriationAccount from usaspending_api.awards.models import FinancialAccountsByAwards @@ -21,6 +22,46 @@ from usaspending_api.search.models import TransactionSearch +@pytest.fixture +def create_download_delta_tables(spark, s3_unittest_data_bucket, hive_unittest_metastore_db, monkeypatch): + call_command( + "create_delta_table", + f"--spark-s3-bucket={s3_unittest_data_bucket}", + f"--destination-table=award_financial_download", + ) + monkeypatch.setattr( + f"usaspending_api.download.delta_downloads.award_financial.AwardFinancialMixin.download_table", + spark.read.format("delta").load( + f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download" + ), + ) + + call_command( + "create_delta_table", + f"--spark-s3-bucket={s3_unittest_data_bucket}", + f"--destination-table=object_class_program_activity_download", + ) + monkeypatch.setattr( + f"usaspending_api.download.delta_downloads.object_class_program_activity.ObjectClassProgramActivityMixin.download_table", + spark.read.format("delta").load( + f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/object_class_program_activity_download" + ), + ) + + call_command( + "create_delta_table", + f"--spark-s3-bucket={s3_unittest_data_bucket}", + f"--destination-table=account_balances_download", + ) + monkeypatch.setattr( + f"usaspending_api.download.delta_downloads.account_balances.AccountBalancesMixin.download_table", + spark.read.format("delta").load( + f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/account_balances_download" + ), + ) + yield + + @pytest.fixture def download_test_data(db): # Populate job status lookup table @@ -415,25 +456,7 @@ def test_empty_array_filter_fail(client, download_test_data): @pytest.mark.django_db(databases=[settings.DOWNLOAD_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) -def test_file_c_spark_download(client, download_test_data, spark, s3_unittest_data_bucket, hive_unittest_metastore_db): - download_generation.retrieve_db_string = Mock(return_value=get_database_dsn_string()) - - call_command( - "create_delta_table", - f"--spark-s3-bucket={s3_unittest_data_bucket}", - f"--destination-table=award_financial_download", - ) - call_command( - "create_delta_table", - f"--spark-s3-bucket={s3_unittest_data_bucket}", - f"--destination-table=object_class_program_activity_download", - ) - call_command( - "create_delta_table", - f"--spark-s3-bucket={s3_unittest_data_bucket}", - f"--destination-table=account_balances_download", - ) - +def test_file_c_spark_download(client, download_test_data, create_download_delta_tables): resp = client.post( "/api/v2/download/accounts/", content_type="application/json", @@ -456,25 +479,7 @@ def test_file_c_spark_download(client, download_test_data, spark, s3_unittest_da @pytest.mark.django_db(databases=[settings.DOWNLOAD_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) -def test_file_c_spark_download_columns(client, download_test_data, s3_unittest_data_bucket, hive_unittest_metastore_db): - download_generation.retrieve_db_string = Mock(return_value=get_database_dsn_string()) - - call_command( - "create_delta_table", - f"--spark-s3-bucket={s3_unittest_data_bucket}", - f"--destination-table=award_financial_download", - ) - call_command( - "create_delta_table", - f"--spark-s3-bucket={s3_unittest_data_bucket}", - f"--destination-table=object_class_program_activity_download", - ) - call_command( - "create_delta_table", - f"--spark-s3-bucket={s3_unittest_data_bucket}", - f"--destination-table=account_balances_download", - ) - +def test_file_c_spark_download_columns(client, download_test_data, create_download_delta_tables): resp = client.post( "/api/v2/download/accounts/", content_type="application/json", @@ -505,28 +510,7 @@ def test_file_c_spark_download_columns(client, download_test_data, s3_unittest_d @pytest.mark.django_db(databases=[settings.DOWNLOAD_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) -def test_file_c_spark_download_unknown_columns( - client, download_test_data, s3_unittest_data_bucket, hive_unittest_metastore_db -): - - download_generation.retrieve_db_string = Mock(return_value=get_database_dsn_string()) - - call_command( - "create_delta_table", - f"--spark-s3-bucket={s3_unittest_data_bucket}", - f"--destination-table=award_financial_download", - ) - call_command( - "create_delta_table", - f"--spark-s3-bucket={s3_unittest_data_bucket}", - f"--destination-table=object_class_program_activity_download", - ) - call_command( - "create_delta_table", - f"--spark-s3-bucket={s3_unittest_data_bucket}", - f"--destination-table=account_balances_download", - ) - +def test_file_c_spark_download_unknown_columns(client, download_test_data): resp = client.post( "/api/v2/download/accounts/", content_type="application/json", diff --git a/usaspending_api/etl/management/commands/load_query_to_delta.py b/usaspending_api/etl/management/commands/load_query_to_delta.py index 95e6795ca5..9291bd5257 100644 --- a/usaspending_api/etl/management/commands/load_query_to_delta.py +++ b/usaspending_api/etl/management/commands/load_query_to_delta.py @@ -21,14 +21,12 @@ ) from usaspending_api.disaster.models import CovidFABASpending from usaspending_api.download.delta_models.award_financial_download import ( - AWARD_FINANCIAL_DOWNLOAD_POSTGRES_COLUMNS, - award_financial_download_create_sql_string, award_financial_download_load_sql_string, + award_financial_schema, ) from usaspending_api.download.delta_models.object_class_program_activity_download import ( - OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_POSTGRES_COLUMNS, - object_class_program_activity_download_create_sql_string, object_class_program_activity_download_load_sql_string, + object_class_program_activity_schema, ) from usaspending_api.download.delta_models.account_balances_download import ( load_account_balances, @@ -362,10 +360,10 @@ "partition_column": "financial_accounts_by_awards_id", "partition_column_type": "numeric", "is_partition_column_unique": False, - "delta_table_create_sql": award_financial_download_create_sql_string, - "source_schema": AWARD_FINANCIAL_DOWNLOAD_POSTGRES_COLUMNS, + "delta_table_create_sql": award_financial_schema, + "source_schema": None, "custom_schema": None, - "column_names": list(AWARD_FINANCIAL_DOWNLOAD_POSTGRES_COLUMNS), + "column_names": list(), "postgres_seq_name": None, "tsvectors": None, "postgres_partition_spec": None, @@ -383,10 +381,10 @@ "partition_column": "financial_accounts_by_program_activity_object_class_id", "partition_column_type": "numeric", "is_partition_column_unique": False, - "delta_table_create_sql": object_class_program_activity_download_create_sql_string, - "source_schema": OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_POSTGRES_COLUMNS, + "delta_table_create_sql": object_class_program_activity_schema, + "source_schema": None, "custom_schema": None, - "column_names": list(OBJECT_CLASS_PROGRAM_ACTIVITY_DOWNLOAD_POSTGRES_COLUMNS), + "column_names": list(), "postgres_seq_name": None, "tsvectors": None, "postgres_partition_spec": None,