Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
def test_local_spark_jobs_strategy(spark, s3_unittest_data_bucket, hive_unittest_metastore_db):
expected_table_name = "award_search"
delta_table_spec = TABLE_SPEC[expected_table_name]
expected_db_name = delta_table_spec["destination_database"]
expected_db_name = delta_table_spec.destination_database

spark_jobs = SparkJobs(LocalStrategy())
spark_jobs.start(
Expand Down
66 changes: 41 additions & 25 deletions usaspending_api/etl/management/commands/archive_table_in_delta.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@
import logging
import psycopg2

from datetime import datetime, timedelta
from django.core.management.base import BaseCommand

from usaspending_api.common.helpers.sql_helpers import get_database_dsn_string
import psycopg2
from django.core.management.base import BaseCommand, CommandParser

from usaspending_api.common.etl.spark import load_delta_table
from usaspending_api.common.helpers.spark_helpers import (
configure_spark_session,
get_active_spark_session,
get_jdbc_connection_properties,
get_usas_jdbc_url,
)
from usaspending_api.download.delta_models.download_job import download_job_create_sql_string
from usaspending_api.common.helpers.sql_helpers import get_database_dsn_string
from usaspending_api.download.delta_models.download_job import (
download_job_create_sql_string,
)
from usaspending_api.etl.table_specs import ArchiveTableSpec

logger = logging.getLogger(__name__)

TABLE_SPEC = {
"download_job": {
"destination_database": "arc",
"destination_table": "download_job",
"archive_date_field": "update_date",
"source_table": "download_job",
"source_database": "public",
"delta_table_create_sql": download_job_create_sql_string,
}
"download_job": ArchiveTableSpec(
**{
"destination_database": "arc",
"destination_table": "download_job",
"archive_date_field": "update_date",
"source_table": "download_job",
"source_database": "public",
"delta_table_create_sql": download_job_create_sql_string,
}
)
}


Expand All @@ -35,7 +40,8 @@ class Command(BaseCommand):
those records from Postgres.
"""

def add_arguments(self, parser):
@staticmethod
def add_arguments(parser: CommandParser) -> None:
parser.add_argument(
"--destination-table",
type=str,
Expand All @@ -54,7 +60,8 @@ def add_arguments(self, parser):
"--alt-db",
type=str,
required=False,
help="An alternate Delta Database (aka schema) in which to archive this table, overriding the TABLE_SPEC's destination_database",
help="An alternate Delta Database (aka schema) in which to archive this table, overriding the TABLE_SPEC's"
" destination_database",
)
parser.add_argument(
"--alt-name",
Expand All @@ -63,7 +70,7 @@ def add_arguments(self, parser):
help="An alternate Delta Table name which to archive this table, overriding the destination_table",
)

def handle(self, *args, **options):
def handle(self, *args, **options) -> None:
extra_conf = {
# Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
Expand All @@ -86,12 +93,12 @@ def handle(self, *args, **options):
archive_period = options["archive_period"]

table_spec = TABLE_SPEC[destination_table]
destination_database = options["alt_db"] or table_spec["destination_database"]
destination_database = options["alt_db"] or table_spec.destination_database
destination_table_name = options["alt_name"] or destination_table
source_table = table_spec["source_table"]
source_database = table_spec["source_database"]
source_table = table_spec.source_table
source_database = table_spec.source_database
qualified_source_table = f"{source_database}.{source_table}"
archive_date_field = table_spec["archive_date_field"]
archive_date_field = table_spec.archive_date_field

archive_date = datetime.now() - timedelta(days=archive_period)
archive_date_string = archive_date.strftime("%Y-%m-%d")
Expand All @@ -104,13 +111,18 @@ def handle(self, *args, **options):
# Resolve JDBC URL for Source Database
jdbc_url = get_usas_jdbc_url()
if not jdbc_url:
raise RuntimeError(f"Couldn't find JDBC url, please properly configure your CONFIG.")
raise RuntimeError(
"Couldn't find JDBC url, please properly configure your CONFIG."
)
if not jdbc_url.startswith("jdbc:postgresql://"):
raise ValueError("JDBC URL given is not in postgres JDBC URL format (e.g. jdbc:postgresql://...")
raise ValueError(
"JDBC URL given is not in postgres JDBC URL format (e.g. jdbc:postgresql://..."
)

# Retrieve data from Postgres
query_with_predicate = (
f"(SELECT * FROM {qualified_source_table} WHERE {archive_date_field} < '{archive_date_string}') AS tmp"
f"(SELECT * FROM {qualified_source_table} "
f"WHERE {archive_date_field} < '{archive_date_string}') AS tmp"
)

df = spark.read.jdbc(
Expand All @@ -122,7 +134,9 @@ def handle(self, *args, **options):
# Write data to Delta Lake in Append Mode
load_delta_table(spark, df, destination_table_name, overwrite=False)
archived_count = df.count()
logger.info(f"Archived {archived_count} records from the {qualified_source_table}")
logger.info(
f"Archived {archived_count} records from the {qualified_source_table}"
)

# Delete data from
with psycopg2.connect(dsn=get_database_dsn_string()) as connection:
Expand All @@ -132,7 +146,9 @@ def handle(self, *args, **options):
)
deleted_count = cursor.rowcount

logger.info(f"Deleted {deleted_count} records from the {qualified_source_table} table")
logger.info(
f"Deleted {deleted_count} records from the {qualified_source_table} table"
)

# Shut down spark
if spark_created_by_command:
Expand Down
57 changes: 35 additions & 22 deletions usaspending_api/etl/management/commands/create_delta_table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandParser
from pyspark.sql.types import StructType

from usaspending_api.awards.delta_models.award_id_lookup import AWARD_ID_LOOKUP_SCHEMA
Expand All @@ -10,23 +10,36 @@
)
from usaspending_api.common.spark.configs import DEFAULT_EXTRA_CONF
from usaspending_api.config import CONFIG
from usaspending_api.etl.management.commands.archive_table_in_delta import TABLE_SPEC as ARCHIVE_TABLE_SPEC
from usaspending_api.etl.management.commands.load_query_to_delta import TABLE_SPEC as LOAD_QUERY_TABLE_SPEC
from usaspending_api.etl.management.commands.load_table_to_delta import TABLE_SPEC as LOAD_TABLE_TABLE_SPEC
from usaspending_api.transactions.delta_models.transaction_id_lookup import TRANSACTION_ID_LOOKUP_SCHEMA
from usaspending_api.etl.management.commands.archive_table_in_delta import (
TABLE_SPEC as ARCHIVE_TABLE_SPEC,
)
from usaspending_api.etl.management.commands.load_query_to_delta import (
TABLE_SPEC as LOAD_QUERY_TABLE_SPEC,
)
from usaspending_api.etl.management.commands.load_table_to_delta import (
TABLE_SPEC as LOAD_TABLE_TABLE_SPEC,
)
from usaspending_api.etl.table_specs import TableSpec
from usaspending_api.transactions.delta_models.transaction_id_lookup import (
TRANSACTION_ID_LOOKUP_SCHEMA,
)

TABLE_SPEC = {
**ARCHIVE_TABLE_SPEC,
**LOAD_TABLE_TABLE_SPEC,
**LOAD_QUERY_TABLE_SPEC,
"award_id_lookup": {
"destination_database": "int",
"delta_table_create_sql": AWARD_ID_LOOKUP_SCHEMA,
},
"transaction_id_lookup": {
"destination_database": "int",
"delta_table_create_sql": TRANSACTION_ID_LOOKUP_SCHEMA,
},
"award_id_lookup": TableSpec(
**{
"destination_database": "int",
"delta_table_create_sql": AWARD_ID_LOOKUP_SCHEMA,
}
),
"transaction_id_lookup": TableSpec(
**{
"destination_database": "int",
"delta_table_create_sql": TRANSACTION_ID_LOOKUP_SCHEMA,
}
),
}

logger = logging.getLogger(__name__)
Expand All @@ -37,7 +50,7 @@ class Command(BaseCommand):
This command creates an empty Delta Table based on the provided --destination-table argument.
"""

def add_arguments(self, parser):
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--destination-table",
type=str,
Expand Down Expand Up @@ -66,7 +79,7 @@ def add_arguments(self, parser):
"name",
)

def handle(self, *args, **options):
def handle(self, *args, **options) -> None:
spark = get_active_spark_session()
spark_created_by_command = False
if not spark:
Expand All @@ -78,27 +91,27 @@ def handle(self, *args, **options):
spark_s3_bucket = options["spark_s3_bucket"]

table_spec = TABLE_SPEC[destination_table]
destination_database = options["alt_db"] or table_spec["destination_database"]
destination_database = options["alt_db"] or table_spec.destination_database
destination_table_name = options["alt_name"] or destination_table

# Set the database that will be interacted with for all Delta Lake table Spark-based activity
logger.info(f"Using Spark Database: {destination_database}")
spark.sql(f"create database if not exists {destination_database};")
spark.sql(f"use {destination_database};")
if isinstance(table_spec["delta_table_create_sql"], str):
if isinstance(table_spec.delta_table_create_sql, str):
# Define Schema Using CREATE TABLE AS command
spark.sql(
TABLE_SPEC[destination_table]["delta_table_create_sql"].format(
table_spec.delta_table_create_sql.format(
DESTINATION_TABLE=destination_table_name,
DESTINATION_DATABASE=destination_database,
SPARK_S3_BUCKET=spark_s3_bucket,
DELTA_LAKE_S3_PATH=CONFIG.DELTA_LAKE_S3_PATH,
)
)
elif isinstance(table_spec["delta_table_create_sql"], StructType):
schema = table_spec["delta_table_create_sql"]
additional_options = table_spec.get("delta_table_create_options") or {}
partition_cols = table_spec.get("delta_table_create_partitions") or []
elif isinstance(table_spec.delta_table_create_sql, StructType):
schema = table_spec.delta_table_create_sql
additional_options = table_spec.delta_table_create_options or {}
partition_cols = table_spec.delta_table_create_partitions or []
df = spark.createDataFrame([], schema)

default_options = {
Expand Down
Loading