Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
445d412
[DEV-13726] Add DuckDB dependency
aguest-kc Nov 5, 2025
9a0f694
[DEV-13726] Add DuckDB download strategy
aguest-kc Nov 5, 2025
1c48b93
[DEV-13726] Update number formatting and import order
aguest-kc Nov 5, 2025
5cf9413
[DEV-13726] Add DuckDB type hints and imports
aguest-kc Nov 5, 2025
55a2ea9
[DEV-13726] Make Spark downloads work with DuckDB
aguest-kc Nov 5, 2025
b4f9340
[DEV-13726] Clean up write CSV function
aguest-kc Nov 5, 2025
51c4762
[DEV-13726] Pin DuckDB version and install extensions
aguest-kc Nov 5, 2025
1a9a320
[DEV-13726] Time the entire download process
aguest-kc Nov 5, 2025
28c5185
[DEV-13726] Remove DuckDB flag from viewset
aguest-kc Nov 5, 2025
8091b75
[DEV-13726] Manually set memory limit for DuckDB
aguest-kc Nov 6, 2025
ee83d50
[DEV-13726] Update AWS config
aguest-kc Nov 6, 2025
f8b857d
[DEV-13726] Use env vars for AWS secret
aguest-kc Nov 6, 2025
ee645b9
[DEV-13726] Update the Broker DB URL env variable name
aguest-kc Nov 12, 2025
7b05e11
[DEV-13726] Update Broker env var name
aguest-kc Nov 12, 2025
cb264a3
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Nov 13, 2025
b4e4ec3
[DEV-13726] Add comment
aguest-kc Nov 13, 2025
37c4a5b
[DEV-13726] Update S3 auth and CSV generation
aguest-kc Nov 17, 2025
cf75a9c
[DEV-13726] Fix typo in DuckDB secret
aguest-kc Nov 17, 2025
ea2cb08
[DEV-13726] Remove test CSV file row limit
aguest-kc Nov 17, 2025
6a40043
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Nov 18, 2025
8b01fc4
[DEV-13726] Consolidate SQL statements
aguest-kc Nov 18, 2025
39e614f
[DEV-13726] Update federal account dataframe
aguest-kc Nov 19, 2025
3b5bc75
[DEV-13726] Update federal account dataframe
aguest-kc Nov 19, 2025
24881dc
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Nov 21, 2025
ba1ca86
[DEV-13726] Flake8 fixes and comment clean up
aguest-kc Nov 21, 2025
29c25f6
[DEV-13726] black formatting fixes
aguest-kc Nov 21, 2025
1444a70
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 1, 2025
325b000
[DEV-13726] Update SQL functions
aguest-kc Dec 1, 2025
dce5833
[DEV-13726] Add spark parameter
aguest-kc Dec 1, 2025
52960bd
[DEV-13726] Black format fix
aguest-kc Dec 1, 2025
777bce2
[DEV-13726] Revert default Spark strategy to Databricks
aguest-kc Dec 4, 2025
faa7e16
[DEV-13726] Use LocalStrategy instead of DatabricksStrategy
aguest-kc Dec 4, 2025
8b15fa4
[DEV-13726] Only use Spark for File A downloads
aguest-kc Dec 4, 2025
990e37b
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 8, 2025
6522b80
[DEV-13726] Move DuckDB setup to setup_spark_session() method
aguest-kc Dec 10, 2025
ac7e90f
[DEV-13726] Convert to UNIX line endings
aguest-kc Dec 10, 2025
8e402f3
[DEV-13726] Revert Spark table logic
aguest-kc Dec 11, 2025
d12f8a8
[DEV-13726] Replaced hardcoded values with variables
aguest-kc Dec 11, 2025
6fb3e58
[DEV-13726] Remove case that would never be reached
aguest-kc Dec 11, 2025
c0adbd1
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 11, 2025
41fab45
[DEV-13726] Explain why DuckDB version is pinned
aguest-kc Dec 12, 2025
0756469
[DEV-13726] Create a DuckDB Dockerfile
aguest-kc Dec 12, 2025
b8aea3d
[DEV-13726] Use CONFIG more
aguest-kc Dec 12, 2025
dd5c153
[DEV-13726] Log as exceptions and raise exception
aguest-kc Dec 12, 2025
a3d2a97
[DEV-13726] Add file_number in Pandas
aguest-kc Dec 12, 2025
2645a8a
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 12, 2025
454e6ba
[DEV-13726] Add `rel` definition back and flake8 fix
aguest-kc Dec 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ RUN apt update && apt install -y \
libpq-dev \
postgresql-13

##### Copy python packaged
COPY . /dockermount

RUN python3 -m pip install -r requirements/requirements.txt && \
python3 -m pip install -r requirements/requirements-server.txt && \
python3 -m pip install ansible==2.9.15 awscli==1.34.19

##### Ensure Python STDOUT gets sent to container logs
# Ensure Python STDOUT gets sent to container logs
ENV PYTHONUNBUFFERED=1
10 changes: 10 additions & 0 deletions Dockerfile.duckdb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Dockerfile for downloads using DuckDB

FROM usaspending-backend:latest

# Install DuckDB extensions
RUN mkdir -p /root/.duckdb/extensions/v1.4.1/linux_amd64 && \
curl http://extensions.duckdb.org/v1.4.1/linux_amd64/delta.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v1.4.1/linux_amd64/delta.duckdb_extension && \
curl http://extensions.duckdb.org/v1.4.1/linux_amd64/aws.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v1.4.1/linux_amd64/aws.duckdb_extension && \
curl http://extensions.duckdb.org/v1.4.1/linux_amd64/httpfs.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v1.4.1/linux_amd64/httpfs.duckdb_extension && \
curl http://extensions.duckdb.org/v1.4.1/linux_amd64/postgres_scanner.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v1.4.1/linux_amd64/postgres_scanner.duckdb_extension
1 change: 1 addition & 0 deletions requirements/requirements-app.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ djangorestframework==3.15.*
docutils==0.20.1
drf-api-tracking==1.8.4
drf-extensions==0.7.*
duckdb==1.4.1 # Pinned because DuckDB extensions have to be manually installed for each specific version
elasticsearch-dsl==7.4.*
elasticsearch==7.10.*
et-xmlfile==1.1.0
Expand Down
201 changes: 181 additions & 20 deletions usaspending_api/common/etl/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@

import logging
import math
import os
import shutil
import time
from collections import namedtuple
from itertools import chain
from typing import List

import duckdb
from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession
from duckdb.experimental.spark.sql.dataframe import DataFrame as DuckDBDataFrame
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, concat, concat_ws, expr, lit, regexp_replace, to_date, transform, when
from pyspark.sql.types import ArrayType, DecimalType, StringType, StructType
Expand Down Expand Up @@ -48,6 +53,7 @@
ZipsGrouped,
)
from usaspending_api.reporting.models import ReportingAgencyMissingTas, ReportingAgencyOverview
from usaspending_api.settings import CSV_LOCAL_PATH, IS_LOCAL, USASPENDING_AWS_REGION
from usaspending_api.submissions.models import DABSSubmissionWindowSchedule, SubmissionAttributes

MAX_PARTITIONS = CONFIG.SPARK_MAX_PARTITIONS
Expand Down Expand Up @@ -555,31 +561,123 @@ def _generate_global_view_sql_strings(tables: List[str], jdbc_url: str) -> List[
return sql_strings


def create_ref_temp_views(spark: SparkSession, create_broker_views: bool = False):
def create_ref_temp_views(spark: SparkSession | DuckDBSparkSession, create_broker_views: bool = False):
"""Create global temporary Spark reference views that sit atop remote PostgreSQL RDS tables
Setting create_broker_views to True will create views for all tables list in _BROKER_REF_TABLES
Note: They will all be listed under global_temp.{table_name}

Args:
spark (SparkSession | DuckDBSparkSession): Spark session
create_broker_views (bool): Should the temporary views, using the Broker tables, be created
Default: False
"""

# Create USAS temp views
rds_ref_tables = build_ref_table_name_list()
rds_sql_strings = _generate_global_view_sql_strings(
tables=rds_ref_tables,
jdbc_url=get_usas_jdbc_url(),
)
logger.info(f"Creating the following tables under the global_temp database: {rds_ref_tables}")
for sql_statement in rds_sql_strings:
spark.sql(sql_statement)

# Create Broker temp views
if create_broker_views:
broker_sql_strings = _generate_global_view_sql_strings(
tables=_BROKER_REF_TABLES,
jdbc_url=get_broker_jdbc_url(),
)
logger.info(f"Creating the following Broker tables under the global_temp database: {_BROKER_REF_TABLES}")
for sql_statement in broker_sql_strings:
spark.sql(sql_statement)

match isinstance(spark, DuckDBSparkSession):
case True:
logger.info("Creating ref temp views using DuckDB")

if IS_LOCAL:
spark.sql(
f"""
CREATE OR REPLACE SECRET (
TYPE s3,
PROVIDER config,
KEY_ID '{CONFIG.AWS_ACCESS_KEY}',
SECRET '{CONFIG.AWS_SECRET_KEY}',
ENDPOINT '{CONFIG.AWS_S3_ENDPOINT}',
URL_STYLE 'path',
USE_SSL 'false'
);
"""
)
else:
# DuckDB will prepend the HTTP or HTTPS so we need to strip it from the AWS endpoint URL
endpoint_url = CONFIG.AWS_S3_ENDPOINT.replace("http://", "").replace("https://", "")
spark.sql(
f"""
CREATE OR REPLACE SECRET (
TYPE s3,
REGION '{USASPENDING_AWS_REGION}',
ENDPOINT '{endpoint_url}',
PROVIDER 'credential_chain'
);
"""
)

_download_delta_tables = [
{"schema": "rpt", "table_name": "account_balances_download"},
{"schema": "rpt", "table_name": "object_class_program_activity_download"},
]

# The DuckDB Delta extension is needed to interact with DeltaLake tables
spark.sql("LOAD delta; CREATE SCHEMA IF NOT EXISTS rpt;")
for table in _download_delta_tables:
s3_path = (
f"s3://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/{table['schema']}/{table['table_name']}"
)
try:
spark.sql(
f"""
CREATE OR REPLACE TABLE {table["schema"]}.{table["table_name"]} AS
SELECT * FROM delta_scan('{s3_path}');
"""
)
logger.info(f"Successfully created table {table['schema']}.{table['table_name']}")
except duckdb.IOException:
logger.exception(f"Failed to create table {table['table_name']}")
raise RuntimeError(f"Failed to create table {table['table_name']}")

# The DuckDB Postgres extension is needed to connect to the USAS Postgres DB
spark.sql("LOAD postgres; CREATE SCHEMA IF NOT EXISTS global_temp;")
spark.sql(f"ATTACH '{CONFIG.DATABASE_URL}' AS usas (TYPE postgres, READ_ONLY);")

for table in rds_ref_tables:
try:
spark.sql(f"CREATE OR REPLACE VIEW global_temp.{table} AS SELECT * FROM usas.public.{table};")
except duckdb.CatalogException:
logger.exception(f"Failed to create view {table} for {table}")
raise RuntimeError(f"Failed to create view {table} for {table}")

if create_broker_views:
spark.sql(
f"""
ATTACH '{CONFIG.BROKER_DB}' AS broker (TYPE postgres, READ_ONLY);
"""
)
logger.info(
f"Creating the following Broker tables under the global_temp database: {_BROKER_REF_TABLES}"
)
for table in _BROKER_REF_TABLES:
try:
spark.sql(f"CREATE OR REPLACE VIEW global_temp.{table} AS SELECT * FROM broker.public.{table};")
except duckdb.CatalogException:
logger.exception(f"Failed to create view {table} for {table}")
raise RuntimeError(f"Failed to create view {table} for {table}")
case False:
logger.info("Creating ref temp views using Spark")

rds_sql_strings = _generate_global_view_sql_strings(
tables=rds_ref_tables,
jdbc_url=get_usas_jdbc_url(),
)

for sql_statement in rds_sql_strings:
spark.sql(sql_statement)

if create_broker_views:
broker_sql_strings = _generate_global_view_sql_strings(
tables=_BROKER_REF_TABLES,
jdbc_url=get_broker_jdbc_url(),
)
logger.info(
f"Creating the following Broker tables under the global_temp database: {_BROKER_REF_TABLES}"
)
for sql_statement in broker_sql_strings:
spark.sql(sql_statement)

logger.info("Created the reference views in the global_temp database")

Expand All @@ -595,9 +693,10 @@ def write_csv_file(
) -> int:
"""Write DataFrame data to CSV file parts.
Args:
spark: passed-in active SparkSession
df: the DataFrame wrapping the data source to be dumped to CSV.
parts_dir: Path to dir that will contain the outputted parts files from partitions
spark: Passed-in active SparkSession
df: The DataFrame wrapping the data source to be dumped to CSV.
parts_dir: Path to dir that will contain the outputted parts files from partitions
num_partitions: Indicates the number of partitions to use when writing the Dataframe
overwrite: Whether to replace the file CSV files if they already exist by that name
max_records_per_file: Suggestion to Spark of how many records to put in each written CSV file part,
if it will end up writing multiple files.
Expand Down Expand Up @@ -635,6 +734,68 @@ def write_csv_file(
return df_record_count


def write_csv_file_duckdb(
df: DuckDBDataFrame,
download_file_name: str,
temp_csv_directory_path: str = CSV_LOCAL_PATH,
max_records_per_file: int = EXCEL_ROW_LIMIT,
logger: logging.Logger | None = None,
delimiter: str = ",",
) -> tuple[int, list[str] | list]:
"""Write DataFrame data to CSV file parts.
Args:
df: The DataFrame wrapping the data source to be dumped to CSV.
download_file_name: Name of the download being generated.
temp_csv_directory_path: Directory that will contain the individual CSV files before zipping.
Defaults to CSV_LOCAL_PATH
max_records_per_file: Max number of records to put in each written CSV file.
Defaults to EXCEL_ROW_LIMIT
logger: Logging instance to use.
Defaults to None
delimiter: Charactor used to separate columns in the CSV
Defaults to ","
Returns:
record count of the DataFrame that was used to populate the CSV file(s)
list of full path(s) to the temp CSV file(s)
"""
start = time.time()
_pandas_df = df.toPandas()
_pandas_df["file_number"] = (_pandas_df.index // max_records_per_file) + 1
df_record_count = len(_pandas_df)
rel = duckdb.from_df(_pandas_df)

full_file_paths = []

logger.info(f"Writing source data DataFrame to csv files for file {download_file_name}")
rel.to_csv(
file_name=f"{temp_csv_directory_path}{download_file_name}",
sep=delimiter,
escapechar='"',
header=True,
partition_by=["file_number"],
write_partition_columns=False, # Don't include the columns that are used for partitioning in the CSV
overwrite=True,
)

# Move and rename the CSV files to match the expected format
_partition_dirs = [
f"{temp_csv_directory_path}{download_file_name}/{d}"
for d in os.listdir(f"{temp_csv_directory_path}{download_file_name}")
]
for dir in _partition_dirs:
_old_csv_path = f"{dir}/{os.listdir(dir)[0]}"
_new_csv_path = (
f"{temp_csv_directory_path}{download_file_name}/{download_file_name}_{dir.split('=')[1].zfill(2)}.csv"
)
shutil.move(_old_csv_path, _new_csv_path)
full_file_paths.append(_new_csv_path)
os.rmdir(dir)

logger.info(f"{temp_csv_directory_path}{download_file_name} contains {df_record_count:,} rows of data")
logger.info(f"Wrote source data DataFrame to {len(full_file_paths)} CSV files in {(time.time() - start):3f}s")
return df_record_count, full_file_paths


def _merge_file_parts(fs, out_stream, conf, hadoop, partial_merged_file_path, part_file_list):
"""Read-in files in alphabetical order and append them one by one to the merged file"""

Expand Down
84 changes: 83 additions & 1 deletion usaspending_api/common/helpers/download_csv_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import List, Optional

from django.conf import settings
from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession
from pyspark.sql import DataFrame

from usaspending_api.common.csv_helpers import count_rows_in_delimited_file
Expand Down Expand Up @@ -64,6 +65,7 @@ def download_to_csv(
working_dir_path: The working directory path as a string
download_zip_path: The path (as a string) to the download zip file
source_df: A pyspark DataFrame that contains the data to be downloaded
Defaults to None.

Returns:
Returns a CSVDownloadMetadata object (a dataclass containing metadata about the download)
Expand All @@ -77,7 +79,13 @@ def __init__(self, logger: logging.Logger, *args, **kwargs):
self._logger = logger

def download_to_csv(
self, source_sql, destination_path, destination_file_name, working_dir_path, download_zip_path, source_df=None
self,
source_sql,
destination_path,
destination_file_name,
working_dir_path,
download_zip_path,
source_df=None,
):
start_time = time.perf_counter()
self._logger.info(f"Downloading data to {destination_path}")
Expand Down Expand Up @@ -245,3 +253,77 @@ def _move_data_csv_s3_to_local(
local_csv_file_paths.append(final_path)
self._logger.info(f"Copied data files from S3 to local machine in {(time.time() - start_time):3f}s")
return local_csv_file_paths


class DuckDBToCSVStrategy(AbstractToCSVStrategy):
def __init__(self, logger: logging.Logger, spark: DuckDBSparkSession, *args, **kwargs):
super().__init__(*args, **kwargs)
self._logger = logger
self.spark = spark

def download_to_csv(
self,
source_sql: str | None,
destination_path: str,
destination_file_name: str,
working_dir_path: str,
download_zip_path: str,
source_df=None,
delimiter=",",
file_format="csv",
):
from usaspending_api.common.etl.spark import write_csv_file_duckdb

try:
if source_df is not None:
df = source_df
else:
df = self.spark.sql(source_sql)
record_count, final_csv_data_file_locations = write_csv_file_duckdb(
df=df,
download_file_name=destination_file_name,
max_records_per_file=EXCEL_ROW_LIMIT,
logger=self._logger,
delimiter=delimiter,
)
column_count = len(df.columns)
except Exception:
self._logger.exception("Exception encountered. See logs")
raise
append_files_to_zip_file(final_csv_data_file_locations, download_zip_path)
self._logger.info(f"Generated the following data csv files {final_csv_data_file_locations}")
return CSVDownloadMetadata(final_csv_data_file_locations, record_count, column_count)

def _move_data_csv_s3_to_local(
self, bucket_name, s3_file_paths, s3_bucket_path, s3_bucket_sub_path, destination_path_dir
) -> List[str]:
"""Moves files from s3 data csv location to a location on the local machine.

Args:
bucket_name: The name of the bucket in s3 where file_names and s3_path are
s3_file_paths: A list of file paths to move from s3, name should
include s3a:// and bucket name
s3_bucket_path: The bucket path, e.g. s3a:// + bucket name
s3_bucket_sub_path: The path to the s3 files in the bucket, exluding s3a:// + bucket name, e.g. temp_directory/files
destination_path_dir: The location to move those files from s3 to, must not include the
file name in the path. This path should be a diretory.

Returns:
A list of the final location on the local machine that the
files were moved to from s3.
"""
start_time = time.time()
self._logger.info("Moving data files from S3 to local machine...")
local_csv_file_paths = []
for file_name in s3_file_paths:
s3_key = file_name.replace(f"{s3_bucket_path}/", "")
file_name_only = s3_key.replace(f"{s3_bucket_sub_path}/", "")
final_path = f"{destination_path_dir}/{file_name_only}"
download_s3_object(
bucket_name,
s3_key,
final_path,
)
local_csv_file_paths.append(final_path)
self._logger.info(f"Copied data files from S3 to local machine in {(time.time() - start_time):3f}s")
return local_csv_file_paths
Loading