Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
def test_local_spark_jobs_strategy(spark, s3_unittest_data_bucket, hive_unittest_metastore_db):
expected_table_name = "award_search"
delta_table_spec = TABLE_SPEC[expected_table_name]
expected_db_name = delta_table_spec["destination_database"]
expected_db_name = delta_table_spec.destination_database

spark_jobs = SparkJobs(LocalStrategy())
spark_jobs.start(
Expand Down
27 changes: 15 additions & 12 deletions usaspending_api/etl/management/commands/archive_table_in_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@
get_usas_jdbc_url,
)
from usaspending_api.download.delta_models.download_job import download_job_create_sql_string
from usaspending_api.etl.table_specs import ArchiveTableSpec

logger = logging.getLogger(__name__)

TABLE_SPEC = {
"download_job": {
"destination_database": "arc",
"destination_table": "download_job",
"archive_date_field": "update_date",
"source_table": "download_job",
"source_database": "public",
"delta_table_create_sql": download_job_create_sql_string,
}
"download_job": ArchiveTableSpec(
**{
"destination_database": "arc",
"destination_table": "download_job",
"archive_date_field": "update_date",
"source_table": "download_job",
"source_database": "public",
"delta_table_create_sql": download_job_create_sql_string,
}
)
}


Expand Down Expand Up @@ -86,12 +89,12 @@ def handle(self, *args, **options):
archive_period = options["archive_period"]

table_spec = TABLE_SPEC[destination_table]
destination_database = options["alt_db"] or table_spec["destination_database"]
destination_database = options["alt_db"] or table_spec.destination_database
destination_table_name = options["alt_name"] or destination_table
source_table = table_spec["source_table"]
source_database = table_spec["source_database"]
source_table = table_spec.source_table
source_database = table_spec.source_database
qualified_source_table = f"{source_database}.{source_table}"
archive_date_field = table_spec["archive_date_field"]
archive_date_field = table_spec.archive_date_field

archive_date = datetime.now() - timedelta(days=archive_period)
archive_date_string = archive_date.strftime("%Y-%m-%d")
Expand Down
51 changes: 32 additions & 19 deletions usaspending_api/etl/management/commands/create_delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,36 @@
)
from usaspending_api.common.spark.configs import DEFAULT_EXTRA_CONF
from usaspending_api.config import CONFIG
from usaspending_api.etl.management.commands.archive_table_in_delta import TABLE_SPEC as ARCHIVE_TABLE_SPEC
from usaspending_api.etl.management.commands.load_query_to_delta import TABLE_SPEC as LOAD_QUERY_TABLE_SPEC
from usaspending_api.etl.management.commands.load_table_to_delta import TABLE_SPEC as LOAD_TABLE_TABLE_SPEC
from usaspending_api.transactions.delta_models.transaction_id_lookup import TRANSACTION_ID_LOOKUP_SCHEMA
from usaspending_api.etl.management.commands.archive_table_in_delta import (
TABLE_SPEC as ARCHIVE_TABLE_SPEC,
)
from usaspending_api.etl.management.commands.load_query_to_delta import (
TABLE_SPEC as LOAD_QUERY_TABLE_SPEC,
)
from usaspending_api.etl.management.commands.load_table_to_delta import (
TABLE_SPEC as LOAD_TABLE_TABLE_SPEC,
)
from usaspending_api.etl.table_specs import TableSpec
from usaspending_api.transactions.delta_models.transaction_id_lookup import (
TRANSACTION_ID_LOOKUP_SCHEMA,
)

TABLE_SPEC = {
**ARCHIVE_TABLE_SPEC,
**LOAD_TABLE_TABLE_SPEC,
**LOAD_QUERY_TABLE_SPEC,
"award_id_lookup": {
"destination_database": "int",
"delta_table_create_sql": AWARD_ID_LOOKUP_SCHEMA,
},
"transaction_id_lookup": {
"destination_database": "int",
"delta_table_create_sql": TRANSACTION_ID_LOOKUP_SCHEMA,
},
"award_id_lookup": TableSpec(
**{
"destination_database": "int",
"delta_table_create_sql": AWARD_ID_LOOKUP_SCHEMA,
}
),
"transaction_id_lookup": TableSpec(
**{
"destination_database": "int",
"delta_table_create_sql": TRANSACTION_ID_LOOKUP_SCHEMA,
}
),
}

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,27 +91,27 @@ def handle(self, *args, **options):
spark_s3_bucket = options["spark_s3_bucket"]

table_spec = TABLE_SPEC[destination_table]
destination_database = options["alt_db"] or table_spec["destination_database"]
destination_database = options["alt_db"] or table_spec.destination_database
destination_table_name = options["alt_name"] or destination_table

# Set the database that will be interacted with for all Delta Lake table Spark-based activity
logger.info(f"Using Spark Database: {destination_database}")
spark.sql(f"create database if not exists {destination_database};")
spark.sql(f"use {destination_database};")
if isinstance(table_spec["delta_table_create_sql"], str):
if isinstance(table_spec.delta_table_create_sql, str):
# Define Schema Using CREATE TABLE AS command
spark.sql(
TABLE_SPEC[destination_table]["delta_table_create_sql"].format(
table_spec.delta_table_create_sql.format(
DESTINATION_TABLE=destination_table_name,
DESTINATION_DATABASE=destination_database,
SPARK_S3_BUCKET=spark_s3_bucket,
DELTA_LAKE_S3_PATH=CONFIG.DELTA_LAKE_S3_PATH,
)
)
elif isinstance(table_spec["delta_table_create_sql"], StructType):
schema = table_spec["delta_table_create_sql"]
additional_options = table_spec.get("delta_table_create_options") or {}
partition_cols = table_spec.get("delta_table_create_partitions") or []
elif isinstance(table_spec.delta_table_create_sql, StructType):
schema = table_spec.delta_table_create_sql
additional_options = table_spec.delta_table_create_options or {}
partition_cols = table_spec.delta_table_create_partitions or []
df = spark.createDataFrame([], schema)

default_options = {
Expand Down
Loading
Loading