diff --git a/setup.cfg b/setup.cfg index 9b4258c628..3fc11249c1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [flake8] select=C,E,F,W,B,B950 -ignore=E501,W503,E203,F541 +ignore=E501,W503,E203,F541,E704 exclude=.venv,build,usaspending_api.egg-info,usaspending_api/*/migrations/* max-line-length=120 \ No newline at end of file diff --git a/usaspending_api/common/helpers/download_csv_strategies.py b/usaspending_api/common/helpers/download_csv_strategies.py index 0e8b44fab5..abedbbb896 100644 --- a/usaspending_api/common/helpers/download_csv_strategies.py +++ b/usaspending_api/common/helpers/download_csv_strategies.py @@ -115,7 +115,7 @@ def download_to_csv( raise e finally: Path(temp_file_path).unlink() - return CSVDownloadMetadata([destination_path], row_count) + return CSVDownloadMetadata([str(destination_path)], row_count) class SparkToCSVStrategy(AbstractToCSVStrategy): diff --git a/usaspending_api/download/delta_models/account_download.py b/usaspending_api/download/delta_models/account_download.py index 1381a7b399..df61836d11 100644 --- a/usaspending_api/download/delta_models/account_download.py +++ b/usaspending_api/download/delta_models/account_download.py @@ -1,10 +1,12 @@ ACCOUNT_DOWNLOAD_COLUMNS = { "financial_accounts_by_awards_id": {"delta": "INTEGER NOT NULL", "postgres": "INTEGER NOT NULL"}, "submission_id": {"delta": "INTEGER NOT NULL", "postgres": "INTEGER NOT NULL"}, - "owning_agency_name": {"delta": "STRING", "postgres": "TEXT"}, + "federal_owning_agency_name": {"delta": "STRING", "postgres": "TEXT"}, + "treasury_owning_agency_name": {"delta": "STRING", "postgres": "TEXT"}, "federal_account_symbol": {"delta": "STRING", "postgres": "TEXT"}, "federal_account_name": {"delta": "STRING", "postgres": "TEXT"}, "agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"}, + "allocation_transfer_agency_identifier_name": {"delta": "STRING", "postgres": "TEXT"}, "program_activity_code": {"delta": "STRING", "postgres": "TEXT"}, "program_activity_name": {"delta": "STRING", "postgres": "TEXT"}, "object_class_code": {"delta": "STRING", "postgres": "TEXT"}, @@ -63,6 +65,15 @@ "national_interest_action": {"delta": "STRING", "postgres": "TEXT"}, "reporting_agency_name": {"delta": "STRING", "postgres": "TEXT"}, "submission_period": {"delta": "STRING", "postgres": "TEXT"}, + "allocation_transfer_agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"}, + "agency_identifier_code": {"delta": "STRING", "postgres": "TEXT"}, + "beginning_period_of_availability": {"delta": "DATE", "postgres": "DATE"}, + "ending_period_of_availability": {"delta": "DATE", "postgres": "DATE"}, + "availability_type_code": {"delta": "STRING", "postgres": "TEXT"}, + "main_account_code": {"delta": "STRING", "postgres": "TEXT"}, + "sub_account_code": {"delta": "STRING", "postgres": "TEXT"}, + "treasury_account_symbol": {"delta": "STRING", "postgres": "TEXT"}, + "treasury_account_name": {"delta": "STRING", "postgres": "TEXT"}, "funding_toptier_agency_id": {"delta": "INTEGER", "postgres": "INTEGER"}, "federal_account_id": {"delta": "INTEGER", "postgres": "INTEGER"}, "budget_function": {"delta": "STRING", "postgres": "TEXT"}, @@ -114,10 +125,12 @@ SELECT financial_accounts_by_awards.financial_accounts_by_awards_id, financial_accounts_by_awards.submission_id, - toptier_agency.name AS owning_agency_name, + federal_toptier_agency.name AS federal_owning_agency_name, + treasury_toptier_agency.name AS treasury_owning_agency_name, federal_account.federal_account_code AS federal_account_symbol, federal_account.account_title AS federal_account_name, cgac_aid.agency_name AS agency_identifier_name, + cgac_ata.agency_name AS allocation_transfer_agency_identifier_name, ref_program_activity.program_activity_code, ref_program_activity.program_activity_name, object_class.object_class AS object_class_code, @@ -200,6 +213,15 @@ ) ) END AS submission_period, + treasury_appropriation_account.allocation_transfer_agency_id AS allocation_transfer_agency_identifier_code, + treasury_appropriation_account.agency_id AS agency_identifier_code, + treasury_appropriation_account.beginning_period_of_availability AS beginning_period_of_availability, + treasury_appropriation_account.ending_period_of_availability AS ending_period_of_availability, + treasury_appropriation_account.availability_type_code AS availability_type_code, + treasury_appropriation_account.main_account_code AS main_account_code, + treasury_appropriation_account.sub_account_code AS sub_account_code, + treasury_appropriation_account.tas_rendering_label AS treasury_account_symbol, + treasury_appropriation_account.account_title AS treasury_account_name, treasury_appropriation_account.funding_toptier_agency_id AS funding_toptier_agency_id, treasury_appropriation_account.federal_account_id AS federal_account_id, treasury_appropriation_account.budget_function_title AS budget_function, @@ -308,56 +330,30 @@ submission_attributes.reporting_fiscal_quarter, submission_attributes.reporting_fiscal_year, submission_attributes.quarter_format_flag - FROM raw.financial_accounts_by_awards - INNER JOIN global_temp.submission_attributes AS submission_attributes - ON ( - financial_accounts_by_awards.submission_id - = submission_attributes.submission_id - ) - LEFT OUTER JOIN global_temp.treasury_appropriation_account - ON ( - financial_accounts_by_awards.treasury_account_id - = treasury_appropriation_account.treasury_account_identifier - ) - LEFT OUTER JOIN award_search - ON ( - financial_accounts_by_awards.award_id = award_search.award_id - ) - LEFT OUTER JOIN transaction_search - ON ( - award_search.latest_transaction_search_id - = transaction_search.transaction_id - ) - LEFT OUTER JOIN global_temp.ref_program_activity - ON ( - financial_accounts_by_awards.program_activity_id - = ref_program_activity.id - ) - LEFT OUTER JOIN global_temp.object_class - ON ( - financial_accounts_by_awards.object_class_id = object_class.id - ) - LEFT OUTER JOIN global_temp.disaster_emergency_fund_code - ON ( - financial_accounts_by_awards.disaster_emergency_fund_code - = disaster_emergency_fund_code.code - ) - LEFT OUTER JOIN global_temp.federal_account - ON ( - treasury_appropriation_account.federal_account_id = federal_account.id - ) - LEFT OUTER JOIN global_temp.toptier_agency - ON ( - federal_account.parent_toptier_agency_id - = toptier_agency.toptier_agency_id - ) - LEFT OUTER JOIN global_temp.cgac AS cgac_aid - ON ( - treasury_appropriation_account.agency_id = cgac_aid.cgac_code - ) - LEFT OUTER JOIN global_temp.cgac AS cgac_ata - ON ( - treasury_appropriation_account.allocation_transfer_agency_id - = cgac_ata.cgac_code - ); + FROM + raw.financial_accounts_by_awards + INNER JOIN global_temp.submission_attributes + ON (financial_accounts_by_awards.submission_id = submission_attributes.submission_id) + LEFT OUTER JOIN global_temp.treasury_appropriation_account + ON (financial_accounts_by_awards.treasury_account_id = treasury_appropriation_account.treasury_account_identifier) + LEFT OUTER JOIN award_search + ON (financial_accounts_by_awards.award_id = award_search.award_id) + LEFT OUTER JOIN transaction_search + ON (award_search.latest_transaction_search_id = transaction_search.transaction_id) + LEFT OUTER JOIN global_temp.ref_program_activity + ON (financial_accounts_by_awards.program_activity_id = ref_program_activity.id) + LEFT OUTER JOIN global_temp.object_class + ON (financial_accounts_by_awards.object_class_id = object_class.id) + LEFT OUTER JOIN global_temp.disaster_emergency_fund_code + ON (financial_accounts_by_awards.disaster_emergency_fund_code = disaster_emergency_fund_code.code) + LEFT OUTER JOIN global_temp.federal_account + ON (treasury_appropriation_account.federal_account_id = federal_account.id) + LEFT OUTER JOIN global_temp.toptier_agency as federal_toptier_agency + ON (federal_account.parent_toptier_agency_id = federal_toptier_agency.toptier_agency_id) + LEFT OUTER JOIN global_temp.toptier_agency as treasury_toptier_agency + ON (treasury_appropriation_account.funding_toptier_agency_id = treasury_toptier_agency.toptier_agency_id) + LEFT OUTER JOIN global_temp.cgac AS cgac_aid + ON (treasury_appropriation_account.agency_id = cgac_aid.cgac_code) + LEFT OUTER JOIN global_temp.cgac AS cgac_ata + ON (treasury_appropriation_account.allocation_transfer_agency_id = cgac_ata.cgac_code); """ diff --git a/usaspending_api/download/management/commands/delta_downloads/award_financial/__init__.py b/usaspending_api/download/management/commands/delta_downloads/award_financial/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/usaspending_api/download/management/commands/delta_downloads/award_financial/builders.py b/usaspending_api/download/management/commands/delta_downloads/award_financial/builders.py deleted file mode 100644 index fcf871a86a..0000000000 --- a/usaspending_api/download/management/commands/delta_downloads/award_financial/builders.py +++ /dev/null @@ -1,134 +0,0 @@ -from dataclasses import dataclass -from functools import reduce -from typing import Any - -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql import functions as sf, Column - -from usaspending_api.download.management.commands.delta_downloads.award_financial.filters import AccountDownloadFilter -from usaspending_api.download.management.commands.delta_downloads.award_financial.columns import ( - federal_account_groupby_cols, - federal_account_select_cols, -) -from usaspending_api.submissions.helpers import get_submission_ids_for_periods - - -class AccountDownloadDataFrameBuilder: - - def __init__( - self, - spark: SparkSession, - account_download_filter: AccountDownloadFilter, - table_name: str = "rpt.account_download", - ): - self.reporting_fiscal_year = account_download_filter.fy - self.reporting_fiscal_quarter = account_download_filter.quarter or account_download_filter.period // 3 - self.reporting_fiscal_period = account_download_filter.period or account_download_filter.quarter * 3 - self.agency = account_download_filter.agency - self.federal_account_id = account_download_filter.federal_account - self.budget_function = account_download_filter.budget_function - self.budget_subfunction = account_download_filter.budget_subfunction - self.def_codes = account_download_filter.def_codes - self.df: str = spark.table(table_name) - self.groupby_cols: list[str] = federal_account_groupby_cols - self.select_cols: list[str] = federal_account_select_cols - - def filter_to_latest_submissions_for_agencies(self, col_name: str, otherwise: Any = None) -> Column: - """Filter to the latest submission regardless of whether the agency submitted on a monthly or quarterly basis""" - return ( - sf.when( - sf.col("submission_id").isin( - get_submission_ids_for_periods( - self.reporting_fiscal_year, self.reporting_fiscal_quarter, self.reporting_fiscal_period - ) - ), - sf.col(col_name), - ) - .otherwise(otherwise) - .alias(col_name) - ) - - @property - def combined_filters(self) -> Column: - - @dataclass - class Condition: - name: str - condition: Column - apply: bool - - conditions = [ - Condition(name="year", condition=sf.col("reporting_fiscal_year") == self.reporting_fiscal_year, apply=True), - Condition( - name="quarter or month", - condition=( - (sf.col("reporting_fiscal_period") <= self.reporting_fiscal_period) & ~sf.col("quarter_format_flag") - ) - | ( - (sf.col("reporting_fiscal_quarter") <= self.reporting_fiscal_quarter) - & sf.col("quarter_format_flag") - ), - apply=True, - ), - Condition( - name="agency", condition=sf.col("funding_toptier_agency_id") == self.agency, apply=bool(self.agency) - ), - Condition( - name="federal account", - condition=sf.col("federal_account_id") == self.federal_account_id, - apply=bool(self.federal_account_id), - ), - Condition( - name="budget function", - condition=sf.col("budget_function_code") == self.budget_function, - apply=bool(self.budget_function), - ), - Condition( - name="budget subfunction", - condition=sf.col("budget_subfunction_code") == self.budget_subfunction, - apply=bool(self.budget_subfunction), - ), - Condition( - name="def_codes", - condition=sf.col("disaster_emergency_fund_code").isin(self.def_codes), - apply=bool(self.def_codes), - ), - ] - return reduce( - lambda x, y: x & y, - [condition.condition for condition in conditions if condition.apply], - ) - - @staticmethod - def collect_concat(col_name: str, concat_str: str = "; ") -> Column: - return sf.concat_ws(concat_str, sf.sort_array(sf.collect_set(col_name))).alias(col_name) - - @property - def source_df(self) -> DataFrame: - return ( - self.df.filter(self.combined_filters) - .groupBy(self.groupby_cols) - .agg( - *[ - self.collect_concat(col) - for col in ["reporting_agency_name", "budget_function", "budget_subfunction"] - ], - sf.sum("transaction_obligated_amount").alias("transaction_obligated_amount"), - *[ - sf.sum(self.filter_to_latest_submissions_for_agencies(col)).alias(col) - for col in [ - "gross_outlay_amount_FYB_to_period_end", - "USSGL487200_downward_adj_prior_year_prepaid_undeliv_order_oblig", - "USSGL497200_downward_adj_of_prior_year_paid_deliv_orders_oblig", - ] - ], - sf.max(sf.col("last_modified_date")).alias("last_modified_date"), - ) - .filter( - (sf.col("gross_outlay_amount_FYB_to_period_end") != 0) - | (sf.col("USSGL487200_downward_adj_prior_year_prepaid_undeliv_order_oblig") != 0) - | (sf.col("USSGL497200_downward_adj_of_prior_year_paid_deliv_orders_oblig") != 0) - | (sf.col("transaction_obligated_amount") != 0) - ) - .select(self.select_cols) - ) diff --git a/usaspending_api/download/management/commands/delta_downloads/award_financial/columns.py b/usaspending_api/download/management/commands/delta_downloads/award_financial/columns.py deleted file mode 100644 index b36e956503..0000000000 --- a/usaspending_api/download/management/commands/delta_downloads/award_financial/columns.py +++ /dev/null @@ -1,80 +0,0 @@ -from usaspending_api.download.v2.download_column_historical_lookups import query_paths - -federal_account_groupby_cols = [ - "owning_agency_name", - "federal_account_symbol", - "federal_account_name", - "agency_identifier_name", - "program_activity_code", - "program_activity_name", - "object_class_code", - "object_class_name", - "direct_or_reimbursable_funding_source", - "disaster_emergency_fund_code", - "disaster_emergency_fund_name", - "award_unique_key", - "award_id_piid", - "parent_award_id_piid", - "award_id_fain", - "award_id_uri", - "award_base_action_date", - "award_latest_action_date", - "period_of_performance_start_date", - "period_of_performance_current_end_date", - "ordering_period_end_date", - "idv_type_code", - "idv_type", - "prime_award_base_transaction_description", - "awarding_agency_code", - "awarding_agency_name", - "awarding_subagency_code", - "awarding_subagency_name", - "awarding_office_code", - "awarding_office_name", - "funding_agency_code", - "funding_agency_name", - "funding_sub_agency_code", - "funding_sub_agency_name", - "funding_office_code", - "funding_office_name", - "recipient_uei", - "recipient_duns", - "recipient_name", - "recipient_name_raw", - "recipient_parent_uei", - "recipient_parent_duns", - "recipient_parent_name", - "recipient_parent_name_raw", - "recipient_country", - "recipient_state", - "recipient_county", - "recipient_city", - "primary_place_of_performance_country", - "primary_place_of_performance_state", - "primary_place_of_performance_county", - "primary_place_of_performance_zip_code", - "cfda_number", - "cfda_title", - "product_or_service_code", - "product_or_service_code_description", - "naics_code", - "naics_description", - "national_interest_action_code", - "national_interest_action", - "submission_period", - "award_type_code", - "award_type", - "recipient_zip_code", - "award_base_action_date_fiscal_year", - "award_latest_action_date_fiscal_year", - "usaspending_permalink", - "prime_award_summary_recipient_cd_original", - "prime_award_summary_recipient_cd_current", - "prime_award_summary_place_of_performance_cd_original", - "prime_award_summary_place_of_performance_cd_current", -] - -federal_account_select_cols = [ - col if not col.startswith("last_modified_date") else "last_modified_date" - for col in query_paths["award_financial"]["federal_account"].keys() -] diff --git a/usaspending_api/download/management/commands/delta_downloads/builders.py b/usaspending_api/download/management/commands/delta_downloads/builders.py new file mode 100644 index 0000000000..c2c084a135 --- /dev/null +++ b/usaspending_api/download/management/commands/delta_downloads/builders.py @@ -0,0 +1,191 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from functools import reduce +from typing import Any + +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import functions as sf, Column + +from usaspending_api.download.management.commands.delta_downloads.filters import AccountDownloadFilter +from usaspending_api.download.v2.download_column_historical_lookups import query_paths +from usaspending_api.submissions.helpers import get_submission_ids_for_periods + + +class AbstractAccountDownloadDataFrameBuilder(ABC): + + def __init__( + self, + spark: SparkSession, + account_download_filter: AccountDownloadFilter, + table_name: str = "rpt.account_download", + ): + self.reporting_fiscal_year = account_download_filter.fy + self.submission_types = account_download_filter.submission_types + self.reporting_fiscal_quarter = account_download_filter.quarter or account_download_filter.period // 3 + self.reporting_fiscal_period = account_download_filter.period or account_download_filter.quarter * 3 + self.agency = account_download_filter.agency + self.federal_account_id = account_download_filter.federal_account + self.budget_function = account_download_filter.budget_function + self.budget_subfunction = account_download_filter.budget_subfunction + self.def_codes = account_download_filter.def_codes + self.df: DataFrame = spark.table(table_name) + + @property + def dynamic_filters(self) -> Column: + + @dataclass + class Condition: + name: str + condition: Column + apply: bool + + conditions = [ + Condition(name="year", condition=sf.col("reporting_fiscal_year") == self.reporting_fiscal_year, apply=True), + Condition( + name="quarter or month", + condition=( + (sf.col("reporting_fiscal_period") <= self.reporting_fiscal_period) & ~sf.col("quarter_format_flag") + ) + | ( + (sf.col("reporting_fiscal_quarter") <= self.reporting_fiscal_quarter) + & sf.col("quarter_format_flag") + ), + apply=True, + ), + Condition( + name="agency", condition=sf.col("funding_toptier_agency_id") == self.agency, apply=bool(self.agency) + ), + Condition( + name="federal account", + condition=sf.col("federal_account_id") == self.federal_account_id, + apply=bool(self.federal_account_id), + ), + Condition( + name="budget function", + condition=sf.col("budget_function_code") == self.budget_function, + apply=bool(self.budget_function), + ), + Condition( + name="budget subfunction", + condition=sf.col("budget_subfunction_code") == self.budget_subfunction, + apply=bool(self.budget_subfunction), + ), + Condition( + name="def_codes", + condition=sf.col("disaster_emergency_fund_code").isin(self.def_codes), + apply=bool(self.def_codes), + ), + ] + return reduce( + lambda x, y: x & y, + [condition.condition for condition in conditions if condition.apply], + ) + + @property + def non_zero_filters(self) -> Column: + return ( + (sf.col("gross_outlay_amount_FYB_to_period_end") != 0) + | (sf.col("USSGL487200_downward_adj_prior_year_prepaid_undeliv_order_oblig") != 0) + | (sf.col("USSGL497200_downward_adj_of_prior_year_paid_deliv_orders_oblig") != 0) + | (sf.col("transaction_obligated_amount") != 0) + ) + + @staticmethod + def collect_concat(col_name: str, concat_str: str = "; ") -> Column: + return sf.concat_ws(concat_str, sf.sort_array(sf.collect_set(col_name))).alias(col_name) + + @property + @abstractmethod + def award_financial(self) -> DataFrame: ... + + @property + def source_dfs(self) -> list[DataFrame]: + return [getattr(self, submission_type) for submission_type in self.submission_types] + + +class FederalAccountDownloadDataFrameBuilder(AbstractAccountDownloadDataFrameBuilder): + + def __init__( + self, + spark: SparkSession, + account_download_filter: AccountDownloadFilter, + table_name: str = "rpt.account_download", + ): + super().__init__(spark, account_download_filter, table_name) + self.agg_cols = { + "reporting_agency_name": self.collect_concat, + "budget_function": self.collect_concat, + "budget_subfunction": self.collect_concat, + "transaction_obligated_amount": lambda col: sf.sum(col).alias(col), + "gross_outlay_amount_FYB_to_period_end": self.filter_and_sum, + "USSGL487200_downward_adj_prior_year_prepaid_undeliv_order_oblig": self.filter_and_sum, + "USSGL497200_downward_adj_of_prior_year_paid_deliv_orders_oblig": self.filter_and_sum, + "last_modified_date": lambda col: sf.max(col).alias(col), + } + self.select_cols = ( + [sf.col("federal_owning_agency_name").alias("owning_agency_name")] + + [ + col + for col in query_paths["award_financial"]["federal_account"].keys() + if col != "owning_agency_name" and not col.startswith("last_modified_date") + ] + + ["last_modified_date"] + ) + filter_cols = [ + "submission_id", + "federal_account_id", + "funding_toptier_agency_id", + "budget_function_code", + "budget_subfunction_code", + "reporting_fiscal_period", + "reporting_fiscal_quarter", + "reporting_fiscal_year", + "quarter_format_flag", + ] + self.groupby_cols = [col for col in self.df.columns if col not in list(self.agg_cols) + filter_cols] + + def filter_to_latest_submissions_for_agencies(self, col_name: str, otherwise: Any = None) -> Column: + """Filter to the latest submission regardless of whether the agency submitted on a monthly or quarterly basis""" + return ( + sf.when( + sf.col("submission_id").isin( + get_submission_ids_for_periods( + self.reporting_fiscal_year, self.reporting_fiscal_quarter, self.reporting_fiscal_period + ) + ), + sf.col(col_name), + ) + .otherwise(otherwise) + .alias(col_name) + ) + + def filter_and_sum(self, col_name: str) -> Column: + return sf.sum(self.filter_to_latest_submissions_for_agencies(col_name)).alias(col_name) + + @property + def award_financial(self) -> DataFrame: + return ( + self.df.filter(self.dynamic_filters) + .groupBy(self.groupby_cols) + .agg(*[agg_func(col) for col, agg_func in self.agg_cols.items()]) + # drop original agg columns from the dataframe to avoid ambiguous column names + .drop(*[sf.col(f"account_download.{col}") for col in self.agg_cols]) + .filter(self.non_zero_filters) + .select(self.select_cols) + ) + + +class TreasuryAccountDownloadDataFrameBuilder(AbstractAccountDownloadDataFrameBuilder): + + @property + def award_financial(self) -> DataFrame: + select_cols = ( + [sf.col("treasury_owning_agency_name").alias("owning_agency_name")] + + [ + col + for col in query_paths["award_financial"]["treasury_account"].keys() + if col != "owning_agency_name" and not col.startswith("last_modified_date") + ] + + ["last_modified_date"] + ) + return self.df.filter(self.dynamic_filters & self.non_zero_filters).select(select_cols) diff --git a/usaspending_api/download/management/commands/delta_downloads/award_financial/filters.py b/usaspending_api/download/management/commands/delta_downloads/filters.py similarity index 95% rename from usaspending_api/download/management/commands/delta_downloads/award_financial/filters.py rename to usaspending_api/download/management/commands/delta_downloads/filters.py index dbe21264e8..79f60af5cf 100644 --- a/usaspending_api/download/management/commands/delta_downloads/award_financial/filters.py +++ b/usaspending_api/download/management/commands/delta_downloads/filters.py @@ -1,5 +1,5 @@ import warnings -from typing import Any +from typing import Any, Literal from pydantic import BaseModel, root_validator, validator from pydantic.fields import ModelField @@ -11,6 +11,7 @@ class AccountDownloadFilter(BaseModel): fy: int + submission_types: list[Literal["account_balances", "object_class_program_activity", "award_financial"]] period: int | None = None quarter: int | None = None agency: int | None = None diff --git a/usaspending_api/download/management/commands/generate_spark_download.py b/usaspending_api/download/management/commands/generate_spark_download.py index 8b5d5e43e5..ebac2990d6 100644 --- a/usaspending_api/download/management/commands/generate_spark_download.py +++ b/usaspending_api/download/management/commands/generate_spark_download.py @@ -3,7 +3,7 @@ import os import traceback from pathlib import Path -from typing import Optional, Dict, Tuple, Type, List, Union +from typing import Optional, Union from django.conf import settings from django.core.management.base import BaseCommand @@ -17,29 +17,21 @@ from usaspending_api.common.helpers.spark_helpers import ( configure_spark_session, get_active_spark_session, - get_jdbc_connection_properties, - get_usas_jdbc_url, ) from usaspending_api.common.spark.configs import DEFAULT_EXTRA_CONF -from usaspending_api.download.filestreaming.download_generation import build_data_file_name -from usaspending_api.download.filestreaming.download_source import DownloadSource -from usaspending_api.download.lookups import JOB_STATUS_DICT, FILE_FORMATS, VALUE_MAPPINGS -from usaspending_api.download.management.commands.delta_downloads.award_financial.builders import ( - AccountDownloadDataFrameBuilder, +from usaspending_api.download.lookups import JOB_STATUS_DICT, FILE_FORMATS +from usaspending_api.download.management.commands.delta_downloads.builders import ( + FederalAccountDownloadDataFrameBuilder, + TreasuryAccountDownloadDataFrameBuilder, ) -from usaspending_api.download.management.commands.delta_downloads.award_financial.filters import AccountDownloadFilter +from usaspending_api.download.management.commands.delta_downloads.filters import AccountDownloadFilter from usaspending_api.download.models import DownloadJob -from usaspending_api.download.v2.request_validations import AccountDownloadValidator, DownloadValidatorBase logger = logging.getLogger(__name__) -DOWNLOAD_SPEC = { - "award_financial": { - "federal_account": { - "df_builder": AccountDownloadDataFrameBuilder, - "validator_type": AccountDownloadValidator, - } - } +dataframe_builders = { + "federal_account": FederalAccountDownloadDataFrameBuilder, + "treasury_account": TreasuryAccountDownloadDataFrameBuilder, } @@ -47,96 +39,52 @@ class Command(BaseCommand): help = "Generate a download zip file based on the provided type and level." - download_job_id: int download_job: DownloadJob - download_level: str - download_query: str - download_source: DownloadSource - download_spec: Dict - download_type: str - download_validator_type: Type[DownloadValidatorBase] - file_format_spec: Dict file_prefix: str - jdbc_properties: Dict + jdbc_properties: dict jdbc_url: str should_cleanup: bool spark: SparkSession working_dir_path: Path def add_arguments(self, parser): - parser.add_argument("--download-type", type=str, required=True, choices=list(DOWNLOAD_SPEC)) - parser.add_argument( - "--download-level", - type=str, - required=True, - choices=set( - download_level - for download_level_list in [DOWNLOAD_SPEC[key] for key in DOWNLOAD_SPEC] - for download_level in download_level_list - ), - ) parser.add_argument("--download-job-id", type=int, required=True) parser.add_argument("--file-format", type=str, required=False, choices=list(FILE_FORMATS), default="csv") parser.add_argument("--file-prefix", type=str, required=False, default="") parser.add_argument("--skip-local-cleanup", action="store_true") def handle(self, *args, **options): - self.spark = get_active_spark_session() - spark_created_by_command = False - if not self.spark: - spark_created_by_command = True - self.spark = configure_spark_session(**DEFAULT_EXTRA_CONF, spark_context=self.spark) - - # Resolve Parameters - self.download_type = options["download_type"] - self.download_level = options["download_level"] - self.download_job_id = options["download_job_id"] + self.spark, spark_created_by_command = self.setup_spark_session() self.file_prefix = options["file_prefix"] self.should_cleanup = not options["skip_local_cleanup"] - - if self.download_level not in DOWNLOAD_SPEC[self.download_type].keys(): - raise ValueError( - f'Provided download level of "{self.download_level}" is not supported ' - f'for download type of "{self.download_type}".' - ) - - download_spec = DOWNLOAD_SPEC[self.download_type][self.download_level] - self.file_format_spec = FILE_FORMATS[options["file_format"]] - self.df_builder = download_spec["df_builder"] - self.download_validator_type = download_spec["validator_type"] - self.jdbc_properties = get_jdbc_connection_properties() - self.jdbc_url = get_usas_jdbc_url() - + self.download_job = self.get_download_job(options["download_job_id"]) self.working_dir_path = Path(settings.CSV_LOCAL_PATH) if not self.working_dir_path.exists(): self.working_dir_path.mkdir() - create_ref_temp_views(self.spark) - - self.download_job, self.download_source = self.get_download_job() self.process_download() - if spark_created_by_command: self.spark.stop() + @staticmethod + def setup_spark_session() -> tuple[SparkSession, bool]: + spark = get_active_spark_session() + spark_created_by_command = False + if not spark: + spark_created_by_command = True + spark = configure_spark_session(**DEFAULT_EXTRA_CONF, spark_context=spark) + return spark, spark_created_by_command + @cached_property def download_name(self) -> str: return self.download_job.file_name.replace(".zip", "") - def get_download_job(self) -> Tuple[DownloadJob, DownloadSource]: - download_job = DownloadJob.objects.get(download_job_id=self.download_job_id) + @staticmethod + def get_download_job(download_job_id) -> DownloadJob: + download_job = DownloadJob.objects.get(download_job_id=download_job_id) if download_job.job_status_id != JOB_STATUS_DICT["ready"]: - raise InvalidParameterException(f"Download Job {self.download_job_id} is not ready.") - json_request = json.loads(download_job.json_request) - download_source = DownloadSource( - VALUE_MAPPINGS[self.download_type]["table_name"], - self.download_level, - self.download_type, - json_request.get("agency", "all"), - ) - download_source.file_name = build_data_file_name(download_source, download_job, piid=None, assistance_id=None) - - return download_job, download_source + raise InvalidParameterException(f"Download Job {download_job_id} is not ready.") + return download_job def process_download(self): self.start_download() @@ -145,20 +93,27 @@ def process_download(self): spark_to_csv_strategy = SparkToCSVStrategy(logger) zip_file_path = self.working_dir_path / f"{self.download_name}.zip" download_request = json.loads(self.download_job.json_request) + df_builder = dataframe_builders[download_request["account_level"]] account_download_filter = AccountDownloadFilter(**download_request["filters"]) - source_df = self.df_builder(spark=self.spark, account_download_filter=account_download_filter).source_df - csv_metadata = spark_to_csv_strategy.download_to_csv( - source_sql=None, - destination_path=self.working_dir_path / self.download_name, - destination_file_name=self.download_name, - working_dir_path=self.working_dir_path, - download_zip_path=zip_file_path, - source_df=source_df, - ) - files_to_cleanup.extend(csv_metadata.filepaths) + source_dfs = df_builder(spark=self.spark, account_download_filter=account_download_filter).source_dfs + csvs_metadata = [ + spark_to_csv_strategy.download_to_csv( + source_sql=None, + destination_path=self.working_dir_path / self.download_name, + destination_file_name=self.download_name, + working_dir_path=self.working_dir_path, + download_zip_path=zip_file_path, + source_df=source_df, + ) + for source_df in source_dfs + ] + for csv_metadata in csvs_metadata: + files_to_cleanup.extend(csv_metadata.filepaths) self.download_job.file_size = os.stat(zip_file_path).st_size - self.download_job.number_of_rows = csv_metadata.number_of_rows - self.download_job.number_of_columns = csv_metadata.number_of_columns + self.download_job.number_of_rows = sum([csv_metadata.number_of_rows for csv_metadata in csvs_metadata]) + self.download_job.number_of_columns = sum( + [csv_metadata.number_of_columns for csv_metadata in csvs_metadata] + ) upload_download_file_to_s3(zip_file_path) except InvalidParameterException as e: exc_msg = "InvalidParameterException was raised while attempting to process the DownloadJob" @@ -193,7 +148,7 @@ def finish_download(self) -> None: self.download_job.save() logger.info(f"Finished processing DownloadJob {self.download_job.download_job_id}") - def cleanup(self, path_list: List[Union[Path, str]]) -> None: + def cleanup(self, path_list: list[Union[Path, str]]) -> None: for path in path_list: if isinstance(path, str): path = Path(path) diff --git a/usaspending_api/download/tests/integration/test_account_download_dataframe_builder.py b/usaspending_api/download/tests/integration/test_account_download_dataframe_builder.py index c8953bc390..a1b56181c0 100644 --- a/usaspending_api/download/tests/integration/test_account_download_dataframe_builder.py +++ b/usaspending_api/download/tests/integration/test_account_download_dataframe_builder.py @@ -4,14 +4,12 @@ import pytest from django.core.management import call_command from model_bakery import baker -from usaspending_api.download.management.commands.delta_downloads.award_financial.columns import ( - federal_account_select_cols, - federal_account_groupby_cols, +from usaspending_api.download.management.commands.delta_downloads.builders import ( + FederalAccountDownloadDataFrameBuilder, + TreasuryAccountDownloadDataFrameBuilder, ) -from usaspending_api.download.management.commands.delta_downloads.award_financial.builders import ( - AccountDownloadDataFrameBuilder, -) -from usaspending_api.download.management.commands.delta_downloads.award_financial.filters import AccountDownloadFilter +from usaspending_api.download.management.commands.delta_downloads.filters import AccountDownloadFilter +from usaspending_api.download.v2.download_column_historical_lookups import query_paths @pytest.fixture(scope="function") @@ -21,17 +19,34 @@ def account_download_table(spark, s3_unittest_data_bucket, hive_unittest_metasto f"--destination-table=account_download", f"--spark-s3-bucket={s3_unittest_data_bucket}", ) - columns = list(set(federal_account_select_cols + federal_account_groupby_cols)) + [ - "reporting_fiscal_year", - "reporting_fiscal_quarter", - "reporting_fiscal_period", - "quarter_format_flag", - "submission_id", - "federal_account_id", - "funding_toptier_agency_id", - "budget_function_code", - "budget_subfunction_code", - ] + columns = list( + set( + [ + col + for col in query_paths["award_financial"]["federal_account"].keys() + if col != "owning_agency_name" and not col.startswith("last_modified_date") + ] + + [ + col + for col in query_paths["award_financial"]["treasury_account"].keys() + if col != "owning_agency_name" and not col.startswith("last_modified_date") + ] + + [ + "federal_owning_agency_name", + "treasury_owning_agency_name", + "last_modified_date", + "reporting_fiscal_year", + "reporting_fiscal_quarter", + "reporting_fiscal_period", + "quarter_format_flag", + "submission_id", + "federal_account_id", + "funding_toptier_agency_id", + "budget_function_code", + "budget_subfunction_code", + ] + ) + ) test_data_df = pd.DataFrame( data={ "reporting_fiscal_year": [2018, 2018, 2018, 2018, 2019], @@ -40,7 +55,7 @@ def account_download_table(spark, s3_unittest_data_bucket, hive_unittest_metasto "reporting_fiscal_period": [None, None, 5, None, None], "transaction_obligated_amount": [100, 100, 100, 100, 100], "submission_id": [1, 2, 3, 4, 5], - "owning_agency_name": ["test1", "test2", "test2", "test2", "test3"], + "federal_owning_agency_name": ["test1", "test2", "test2", "test2", "test3"], "reporting_agency_name": ["A", "B", "C", "D", "E"], "budget_function": ["A", "B", "C", "D", "E"], "budget_subfunction": ["A", "B", "C", "D", "E"], @@ -74,58 +89,90 @@ def federal_account_models(db): baker.make("accounts.FederalAccount", pk=3, agency_identifier="345", main_account_code="0333") -@patch( - "usaspending_api.download.management.commands.delta_downloads.award_financial.builders.get_submission_ids_for_periods" -) -def test_account_download_dataframe_builder(mock_get_submission_ids_for_periods, spark, account_download_table): +@patch("usaspending_api.download.management.commands.delta_downloads.builders.get_submission_ids_for_periods") +def test_federal_account_download_dataframe_builder(mock_get_submission_ids_for_periods, spark, account_download_table): mock_get_submission_ids_for_periods.return_value = [1, 2, 4, 5] account_download_filter = AccountDownloadFilter( fy=2018, + submission_types=["award_financial"], quarter=4, ) - builder = AccountDownloadDataFrameBuilder(spark, account_download_filter, "rpt.account_download") - result = builder.source_df + builder = FederalAccountDownloadDataFrameBuilder(spark, account_download_filter, "rpt.account_download") + result = builder.source_dfs[0] + result_df = result.toPandas() for col in ["reporting_agency_name", "budget_function", "budget_subfunction"]: - assert sorted(result.toPandas()[col].to_list()) == ["A", "B; C; D"] - assert sorted(result.toPandas().transaction_obligated_amount.to_list()) == [100, 300] - assert sorted(result.toPandas().gross_outlay_amount_FYB_to_period_end.to_list()) == [100, 200] + assert sorted(result_df[col].to_list()) == ["A", "B; C; D"] + assert sorted(result_df.transaction_obligated_amount.to_list()) == [100, 300] + assert sorted(result_df.gross_outlay_amount_FYB_to_period_end.to_list()) == [100, 200] -@patch( - "usaspending_api.download.management.commands.delta_downloads.award_financial.builders.get_submission_ids_for_periods" -) -def test_filter_by_agency(mock_get_submission_ids_for_periods, spark, account_download_table, agency_models): +@patch("usaspending_api.download.management.commands.delta_downloads.builders.get_submission_ids_for_periods") +def test_filter_federal_by_agency(mock_get_submission_ids_for_periods, spark, account_download_table, agency_models): mock_get_submission_ids_for_periods.return_value = [1, 2, 4, 5] account_download_filter = AccountDownloadFilter( fy=2018, + submission_types=["award_financial"], quarter=4, agency=2, ) - builder = AccountDownloadDataFrameBuilder(spark, account_download_filter) - result = builder.source_df + builder = FederalAccountDownloadDataFrameBuilder(spark, account_download_filter) + result = builder.source_dfs[0] + result_df = result.toPandas() for col in ["reporting_agency_name", "budget_function", "budget_subfunction"]: - assert sorted(result.toPandas()[col].to_list()) == ["B; C; D"] - assert sorted(result.toPandas().transaction_obligated_amount.to_list()) == [300] - assert sorted(result.toPandas().gross_outlay_amount_FYB_to_period_end.to_list()) == [200] + assert sorted(result_df[col].to_list()) == ["B; C; D"] + assert result_df.transaction_obligated_amount.to_list() == [300] + assert result_df.gross_outlay_amount_FYB_to_period_end.to_list() == [200] -@patch( - "usaspending_api.download.management.commands.delta_downloads.award_financial.builders.get_submission_ids_for_periods" -) -def test_filter_by_federal_account_id( +@patch("usaspending_api.download.management.commands.delta_downloads.builders.get_submission_ids_for_periods") +def test_filter_federal_by_federal_account_id( mock_get_submission_ids_for_periods, spark, account_download_table, federal_account_models ): mock_get_submission_ids_for_periods.return_value = [1, 2, 4, 5] account_download_filter = AccountDownloadFilter( fy=2018, + submission_types=["award_financial"], quarter=4, federal_account=1, ) - builder = AccountDownloadDataFrameBuilder(spark, account_download_filter) - result = builder.source_df + builder = FederalAccountDownloadDataFrameBuilder(spark, account_download_filter) + result = builder.source_dfs[0] + result_df = result.toPandas() + for col in ["reporting_agency_name", "budget_function", "budget_subfunction"]: + assert sorted(result_df[col].to_list()) == ["A"] + assert sorted(result_df.transaction_obligated_amount.to_list()) == [100] + assert sorted(result_df.gross_outlay_amount_FYB_to_period_end.to_list()) == [100] + + +def test_treasury_account_download_dataframe_builder(spark, account_download_table): + account_download_filter = AccountDownloadFilter( + fy=2018, + submission_types=["award_financial"], + quarter=4, + ) + builder = TreasuryAccountDownloadDataFrameBuilder(spark, account_download_filter) + result = builder.source_dfs[0] + result_df = result.toPandas() + for col in ["reporting_agency_name", "budget_function", "budget_subfunction"]: + assert sorted(result_df[col].to_list()) == ["A", "B", "C", "D"] + assert result_df.transaction_obligated_amount.to_list() == [100] * 4 + assert result_df.gross_outlay_amount_FYB_to_period_end.to_list() == [100] * 4 + + +def test_filter_treasury_by_agency(spark, account_download_table, agency_models): + + account_download_filter = AccountDownloadFilter( + fy=2018, + submission_types=["award_financial"], + quarter=4, + agency=2, + ) + builder = TreasuryAccountDownloadDataFrameBuilder(spark, account_download_filter) + result = builder.source_dfs[0] + result_df = result.toPandas() for col in ["reporting_agency_name", "budget_function", "budget_subfunction"]: - assert sorted(result.toPandas()[col].to_list()) == ["A"] - assert sorted(result.toPandas().transaction_obligated_amount.to_list()) == [100] - assert sorted(result.toPandas().gross_outlay_amount_FYB_to_period_end.to_list()) == [100] + assert sorted(result_df[col].to_list()) == ["B", "C", "D"] + assert result_df.transaction_obligated_amount.to_list() == [100] * 3 + assert result_df.gross_outlay_amount_FYB_to_period_end.to_list() == [100] * 3 diff --git a/usaspending_api/download/tests/integration/test_account_download_filter.py b/usaspending_api/download/tests/integration/test_account_download_filter.py index 73eafc9c58..6f992c7142 100644 --- a/usaspending_api/download/tests/integration/test_account_download_filter.py +++ b/usaspending_api/download/tests/integration/test_account_download_filter.py @@ -2,7 +2,7 @@ from model_bakery import baker from usaspending_api.common.exceptions import InvalidParameterException -from usaspending_api.download.management.commands.delta_downloads.award_financial.filters import AccountDownloadFilter +from usaspending_api.download.management.commands.delta_downloads.filters import AccountDownloadFilter @pytest.fixture @@ -20,7 +20,13 @@ def federal_account_models(db): def test_account_download_filter_cast_to_int(agency_models, federal_account_models): - test_data = {"fy": "2018", "quarter": "4", "agency": "2", "federal_account": "3"} + test_data = { + "fy": "2018", + "submission_types": ["award_financial"], + "quarter": "4", + "agency": "2", + "federal_account": "3", + } result = AccountDownloadFilter(**test_data) assert result.fy == 2018 assert result.quarter == 4 @@ -31,6 +37,7 @@ def test_account_download_filter_cast_to_int(agency_models, federal_account_mode def test_account_download_handle_all(agency_models, federal_account_models): test_data = { "fy": "2018", + "submission_types": ["award_financial"], "quarter": "4", "agency": "all", "federal_account": "all", @@ -47,7 +54,7 @@ def test_account_download_handle_all(agency_models, federal_account_models): def test_account_download_both_period_quarter(agency_models, federal_account_models): - test_data = {"fy": "2018", "period": "12", "quarter": "4"} + test_data = {"fy": "2018", "submission_types": ["award_financial"], "period": "12", "quarter": "4"} with pytest.warns() as warnings: result = AccountDownloadFilter(**test_data) assert result.fy == 2018 @@ -58,13 +65,13 @@ def test_account_download_both_period_quarter(agency_models, federal_account_mod def test_account_download_none_period_quarter(agency_models, federal_account_models): - test_data = {"fy": "2018"} + test_data = {"fy": "2018", "submission_types": ["award_financial"]} with pytest.raises(InvalidParameterException, match="Must define period or quarter."): AccountDownloadFilter(**test_data) def test_account_download_no_agency(agency_models, federal_account_models): - test_data = {"fy": "2018", "period": 2, "agency": 3} + test_data = {"fy": "2018", "submission_types": ["award_financial"], "period": 2, "agency": 3} result = AccountDownloadFilter(**test_data) assert result.agency == 3 test_data = {"fy": "2018", "period": 2, "agency": 4} @@ -73,9 +80,9 @@ def test_account_download_no_agency(agency_models, federal_account_models): def test_account_download_no_federal_account(agency_models, federal_account_models): - test_data = {"fy": "2018", "period": 2, "federal_account": 3} + test_data = {"fy": "2018", "submission_types": ["award_financial"], "period": 2, "federal_account": 3} result = AccountDownloadFilter(**test_data) assert result.federal_account == 3 - test_data = {"fy": "2018", "period": 2, "federal_account": 4} + test_data = {"fy": "2018", "submission_types": ["award_financial"], "period": 2, "federal_account": 4} with pytest.raises(InvalidParameterException, match="Federal Account with that ID does not exist"): result = AccountDownloadFilter(**test_data)