Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from delta.tables import DeltaTable
from usaspending_api.download.helpers.delta_models_helpers import fy_quarter_period
from pyspark.sql import DataFrame, functions as sf, SparkSession
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.types import (
BooleanType,
DateType,
DecimalType,
IntegerType,
LongType,
StringType,
StructField,
StructType,
TimestampType,
LongType,
)

from usaspending_api.download.helpers.delta_models_helpers import fy_quarter_period

account_balances_schema = StructType(
[
StructField("funding_toptier_agency_id", IntegerType()),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from delta.tables import DeltaTable
from pyspark.sql import SparkSession, functions as sf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.functions import expr
from pyspark.sql.types import (
BooleanType,
DateType,
DecimalType,
IntegerType,
LongType,
StringType,
StructField,
StructType,
LongType,
)

from usaspending_api.download.helpers.delta_models_helpers import fy_quarter_period
from usaspending_api.download.helpers.download_annotation_functions import AWARD_URL

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from delta.tables import DeltaTable
from pyspark.sql import SparkSession, functions as sf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.types import (
BooleanType,
DateType,
DecimalType,
IntegerType,
LongType,
StringType,
StructField,
StructType,
LongType,
)
from usaspending_api.download.helpers.delta_models_helpers import fy_quarter_period

from usaspending_api.download.helpers.delta_models_helpers import fy_quarter_period

object_class_program_activity_schema = StructType(
[
Expand Down
5 changes: 2 additions & 3 deletions usaspending_api/etl/management/commands/create_delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from pyspark.sql.types import StructType

from usaspending_api.awards.delta_models.award_id_lookup import AWARD_ID_LOOKUP_SCHEMA
from usaspending_api.common.spark.configs import DEFAULT_EXTRA_CONF
from usaspending_api.config import CONFIG
from usaspending_api.common.helpers.spark_helpers import (
configure_spark_session,
get_active_spark_session,
)
from usaspending_api.common.spark.configs import DEFAULT_EXTRA_CONF
from usaspending_api.config import CONFIG
from usaspending_api.etl.management.commands.archive_table_in_delta import TABLE_SPEC as ARCHIVE_TABLE_SPEC
from usaspending_api.etl.management.commands.load_query_to_delta import TABLE_SPEC as LOAD_QUERY_TABLE_SPEC
from usaspending_api.etl.management.commands.load_table_to_delta import TABLE_SPEC as LOAD_TABLE_TABLE_SPEC
Expand All @@ -33,7 +33,6 @@


class Command(BaseCommand):

help = """
This command creates an empty Delta Table based on the provided --destination-table argument.
"""
Expand Down
45 changes: 22 additions & 23 deletions usaspending_api/etl/management/commands/load_query_to_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@
covid_faba_spending_load_sql_strings,
)
from usaspending_api.disaster.models import CovidFABASpending
from usaspending_api.download.delta_models.account_balances_download import (
account_balances_schema,
load_account_balances,
load_account_balances_incremental,
)
from usaspending_api.download.delta_models.award_financial_download import (
award_financial_schema,
load_award_financial,
load_award_financial_incremental,
award_financial_schema,
)
from usaspending_api.download.delta_models.object_class_program_activity_download import (
object_class_program_activity_schema,
load_object_class_program_activity,
load_object_class_program_activity_incremental,
)
from usaspending_api.download.delta_models.account_balances_download import (
load_account_balances,
load_account_balances_incremental,
account_balances_schema,
object_class_program_activity_schema,
)
from usaspending_api.download.delta_models.transaction_download import transaction_download_schema
from usaspending_api.recipient.delta_models import (
Expand Down Expand Up @@ -84,7 +84,6 @@
transaction_search_create_sql_string,
)


AWARD_URL = f"{HOST}/award/" if "localhost" in HOST else f"https://{HOST}/award/"

logger = logging.getLogger(__name__)
Expand All @@ -105,14 +104,14 @@
"is_partition_column_unique": True,
"delta_table_create_sql": award_search_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": AWARD_SEARCH_POSTGRES_COLUMNS,
"custom_schema": "recipient_hash STRING, federal_accounts STRING, cfdas ARRAY<STRING>,"
" tas_components ARRAY<STRING>",
"column_names": list(AWARD_SEARCH_COLUMNS),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"award_search_gold": {
"model": AwardSearch,
Expand All @@ -129,14 +128,14 @@
"is_partition_column_unique": True,
"delta_table_create_sql": award_search_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": AWARD_SEARCH_POSTGRES_GOLD_COLUMNS,
"custom_schema": "recipient_hash STRING, federal_accounts STRING, cfdas ARRAY<STRING>,"
" tas_components ARRAY<STRING>",
"column_names": list(AWARD_SEARCH_POSTGRES_GOLD_COLUMNS),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"recipient_lookup": {
"model": RecipientLookup,
Expand All @@ -153,13 +152,13 @@
"is_partition_column_unique": True,
"delta_table_create_sql": rpt_recipient_lookup_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": RECIPIENT_LOOKUP_POSTGRES_COLUMNS,
"custom_schema": "recipient_hash STRING",
"column_names": list(RPT_RECIPIENT_LOOKUP_DELTA_COLUMNS),
"postgres_seq_name": "recipient_lookup_id_seq",
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"recipient_profile": {
"model": RecipientProfile,
Expand All @@ -176,13 +175,13 @@
"is_partition_column_unique": False,
"delta_table_create_sql": recipient_profile_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": RECIPIENT_PROFILE_POSTGRES_COLUMNS,
"custom_schema": "recipient_hash STRING",
"column_names": list(RPT_RECIPIENT_PROFILE_DELTA_COLUMNS),
"postgres_seq_name": "recipient_profile_id_seq",
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"summary_state_view": {
"model": SummaryStateView,
Expand All @@ -199,13 +198,13 @@
"is_partition_column_unique": True,
"delta_table_create_sql": summary_state_view_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": SUMMARY_STATE_VIEW_POSTGRES_COLUMNS,
"custom_schema": "duh STRING",
"column_names": list(SUMMARY_STATE_VIEW_COLUMNS),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"sam_recipient": {
"model": None,
Expand All @@ -222,13 +221,13 @@
"is_partition_column_unique": True,
"delta_table_create_sql": sam_recipient_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": SAM_RECIPIENT_POSTGRES_COLUMNS,
"custom_schema": None,
"column_names": list(SAM_RECIPIENT_COLUMNS),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"transaction_search": {
"model": TransactionSearch,
Expand All @@ -245,13 +244,13 @@
"is_partition_column_unique": True,
"delta_table_create_sql": transaction_search_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": TRANSACTION_SEARCH_POSTGRES_COLUMNS,
"custom_schema": "recipient_hash STRING, federal_accounts STRING, parent_recipient_hash STRING",
"column_names": list(TRANSACTION_SEARCH_POSTGRES_COLUMNS),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"transaction_search_gold": {
"model": TransactionSearch,
Expand All @@ -268,7 +267,6 @@
"is_partition_column_unique": True,
"delta_table_create_sql": transaction_search_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": TRANSACTION_SEARCH_POSTGRES_GOLD_COLUMNS,
"custom_schema": "recipient_hash STRING, federal_accounts STRING, parent_recipient_hash STRING",
"column_names": list(TRANSACTION_SEARCH_POSTGRES_GOLD_COLUMNS),
Expand All @@ -282,6 +280,7 @@
{"table_suffix": "_fabs", "partitioning_clause": "FOR VALUES IN (FALSE)"},
],
},
"delta_table_create_partitions": None,
},
"transaction_current_cd_lookup": {
"model": None,
Expand All @@ -298,13 +297,13 @@
"is_partition_column_unique": True,
"delta_table_create_sql": transaction_current_cd_lookup_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": TRANSACTION_CURRENT_CD_LOOKUP_COLUMNS,
"custom_schema": "",
"column_names": list(TRANSACTION_CURRENT_CD_LOOKUP_COLUMNS),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"subaward_search": {
"model": SubawardSearch,
Expand All @@ -321,13 +320,13 @@
"is_partition_column_unique": True,
"delta_table_create_sql": subaward_search_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": SUBAWARD_SEARCH_POSTGRES_COLUMNS,
"custom_schema": "treasury_account_identifiers ARRAY<INTEGER>",
"column_names": list(SUBAWARD_SEARCH_COLUMNS),
"postgres_seq_name": None,
"tsvectors": SUBAWARD_SEARCH_POSTGRES_VECTORS,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"covid_faba_spending": {
"model": CovidFABASpending,
Expand All @@ -344,13 +343,13 @@
"is_partition_column_unique": False,
"delta_table_create_sql": covid_faba_spending_create_sql_string,
"delta_table_create_options": None,
"delta_table_create_partitions": None,
"source_schema": COVID_FABA_SPENDING_POSTGRES_COLUMNS,
"custom_schema": None,
"column_names": list(COVID_FABA_SPENDING_DELTA_COLUMNS),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": None,
},
"account_balances_download": {
"model": None,
Expand All @@ -367,13 +366,13 @@
"is_partition_column_unique": False,
"delta_table_create_sql": account_balances_schema,
"delta_table_create_options": {"delta.enableChangeDataFeed": True},
"delta_table_create_partitions": None,
"source_schema": None,
"custom_schema": None,
"column_names": list(),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"],
},
"award_financial_download": {
"model": None,
Expand All @@ -390,13 +389,13 @@
"is_partition_column_unique": False,
"delta_table_create_sql": award_financial_schema,
"delta_table_create_options": {"delta.enableChangeDataFeed": True},
"delta_table_create_partitions": None,
"source_schema": None,
"custom_schema": None,
"column_names": list(),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"],
},
"object_class_program_activity_download": {
"model": None,
Expand All @@ -413,13 +412,13 @@
"is_partition_column_unique": False,
"delta_table_create_sql": object_class_program_activity_schema,
"delta_table_create_options": {"delta.enableChangeDataFeed": True},
"delta_table_create_partitions": None,
"source_schema": None,
"custom_schema": None,
"column_names": list(),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"],
},
"transaction_download": {
"model": None,
Expand All @@ -436,13 +435,13 @@
"is_partition_column_unique": False,
"delta_table_create_sql": transaction_download_schema,
"delta_table_create_options": {"delta.enableChangeDataFeed": True},
"delta_table_create_partitions": ["awarding_agency_code", "is_fpds", "action_date_fiscal_year"],
"source_schema": None,
"custom_schema": None,
"column_names": list(),
"postgres_seq_name": None,
"tsvectors": None,
"postgres_partition_spec": None,
"delta_table_create_partitions": ["awarding_agency_code", "is_fpds", "action_date_fiscal_year"],
},
}

Expand Down