Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
445d412
[DEV-13726] Add DuckDB dependency
aguest-kc Nov 5, 2025
9a0f694
[DEV-13726] Add DuckDB download strategy
aguest-kc Nov 5, 2025
1c48b93
[DEV-13726] Update number formatting and import order
aguest-kc Nov 5, 2025
5cf9413
[DEV-13726] Add DuckDB type hints and imports
aguest-kc Nov 5, 2025
55a2ea9
[DEV-13726] Make Spark downloads work with DuckDB
aguest-kc Nov 5, 2025
b4f9340
[DEV-13726] Clean up write CSV function
aguest-kc Nov 5, 2025
51c4762
[DEV-13726] Pin DuckDB version and install extensions
aguest-kc Nov 5, 2025
1a9a320
[DEV-13726] Time the entire download process
aguest-kc Nov 5, 2025
28c5185
[DEV-13726] Remove DuckDB flag from viewset
aguest-kc Nov 5, 2025
8091b75
[DEV-13726] Manually set memory limit for DuckDB
aguest-kc Nov 6, 2025
ee83d50
[DEV-13726] Update AWS config
aguest-kc Nov 6, 2025
f8b857d
[DEV-13726] Use env vars for AWS secret
aguest-kc Nov 6, 2025
ee645b9
[DEV-13726] Update the Broker DB URL env variable name
aguest-kc Nov 12, 2025
7b05e11
[DEV-13726] Update Broker env var name
aguest-kc Nov 12, 2025
cb264a3
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Nov 13, 2025
b4e4ec3
[DEV-13726] Add comment
aguest-kc Nov 13, 2025
37c4a5b
[DEV-13726] Update S3 auth and CSV generation
aguest-kc Nov 17, 2025
cf75a9c
[DEV-13726] Fix typo in DuckDB secret
aguest-kc Nov 17, 2025
ea2cb08
[DEV-13726] Remove test CSV file row limit
aguest-kc Nov 17, 2025
6a40043
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Nov 18, 2025
8b01fc4
[DEV-13726] Consolidate SQL statements
aguest-kc Nov 18, 2025
39e614f
[DEV-13726] Update federal account dataframe
aguest-kc Nov 19, 2025
3b5bc75
[DEV-13726] Update federal account dataframe
aguest-kc Nov 19, 2025
24881dc
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Nov 21, 2025
ba1ca86
[DEV-13726] Flake8 fixes and comment clean up
aguest-kc Nov 21, 2025
29c25f6
[DEV-13726] black formatting fixes
aguest-kc Nov 21, 2025
1444a70
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 1, 2025
325b000
[DEV-13726] Update SQL functions
aguest-kc Dec 1, 2025
dce5833
[DEV-13726] Add spark parameter
aguest-kc Dec 1, 2025
52960bd
[DEV-13726] Black format fix
aguest-kc Dec 1, 2025
777bce2
[DEV-13726] Revert default Spark strategy to Databricks
aguest-kc Dec 4, 2025
faa7e16
[DEV-13726] Use LocalStrategy instead of DatabricksStrategy
aguest-kc Dec 4, 2025
8b15fa4
[DEV-13726] Only use Spark for File A downloads
aguest-kc Dec 4, 2025
990e37b
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 8, 2025
6522b80
[DEV-13726] Move DuckDB setup to setup_spark_session() method
aguest-kc Dec 10, 2025
ac7e90f
[DEV-13726] Convert to UNIX line endings
aguest-kc Dec 10, 2025
8e402f3
[DEV-13726] Revert Spark table logic
aguest-kc Dec 11, 2025
d12f8a8
[DEV-13726] Replaced hardcoded values with variables
aguest-kc Dec 11, 2025
6fb3e58
[DEV-13726] Remove case that would never be reached
aguest-kc Dec 11, 2025
c0adbd1
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 11, 2025
41fab45
[DEV-13726] Explain why DuckDB version is pinned
aguest-kc Dec 12, 2025
0756469
[DEV-13726] Create a DuckDB Dockerfile
aguest-kc Dec 12, 2025
b8aea3d
[DEV-13726] Use CONFIG more
aguest-kc Dec 12, 2025
dd5c153
[DEV-13726] Log as exceptions and raise exception
aguest-kc Dec 12, 2025
a3d2a97
[DEV-13726] Add file_number in Pandas
aguest-kc Dec 12, 2025
2645a8a
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 12, 2025
454e6ba
[DEV-13726] Add `rel` definition back and flake8 fix
aguest-kc Dec 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@ RUN apt update && apt install -y \
libpq-dev \
postgresql-13

##### Copy python packaged
COPY . /dockermount

RUN python3 -m pip install -r requirements/requirements.txt && \
python3 -m pip install -r requirements/requirements-server.txt && \
python3 -m pip install ansible==2.9.15 awscli==1.34.19

##### Ensure Python STDOUT gets sent to container logs
# Install DuckDB extensions

RUN mkdir -p /root/.duckdb/extensions/v1.4.1/linux_amd64 && \
curl http://extensions.duckdb.org/v1.4.1/linux_amd64/delta.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v1.4.1/linux_amd64/delta.duckdb_extension && \
curl http://extensions.duckdb.org/v1.4.1/linux_amd64/aws.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v1.4.1/linux_amd64/aws.duckdb_extension && \
curl http://extensions.duckdb.org/v1.4.1/linux_amd64/httpfs.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v1.4.1/linux_amd64/httpfs.duckdb_extension && \
curl http://extensions.duckdb.org/v1.4.1/linux_amd64/postgres_scanner.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v1.4.1/linux_amd64/postgres_scanner.duckdb_extension

# Ensure Python STDOUT gets sent to container logs
ENV PYTHONUNBUFFERED=1
1 change: 1 addition & 0 deletions requirements/requirements-app.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ djangorestframework==3.15.*
docutils==0.20.1
drf-api-tracking==1.8.4
drf-extensions==0.7.*
duckdb==1.4.1
elasticsearch-dsl==7.4.*
elasticsearch==7.10.*
et-xmlfile==1.1.0
Expand Down
1,582 changes: 876 additions & 706 deletions usaspending_api/common/etl/spark.py

Large diffs are not rendered by default.

84 changes: 83 additions & 1 deletion usaspending_api/common/helpers/download_csv_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import List, Optional

from django.conf import settings
from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession
from pyspark.sql import DataFrame

from usaspending_api.common.csv_helpers import count_rows_in_delimited_file
Expand Down Expand Up @@ -64,6 +65,7 @@ def download_to_csv(
working_dir_path: The working directory path as a string
download_zip_path: The path (as a string) to the download zip file
source_df: A pyspark DataFrame that contains the data to be downloaded
Defaults to None.

Returns:
Returns a CSVDownloadMetadata object (a dataclass containing metadata about the download)
Expand All @@ -77,7 +79,13 @@ def __init__(self, logger: logging.Logger, *args, **kwargs):
self._logger = logger

def download_to_csv(
self, source_sql, destination_path, destination_file_name, working_dir_path, download_zip_path, source_df=None
self,
source_sql,
destination_path,
destination_file_name,
working_dir_path,
download_zip_path,
source_df=None,
):
start_time = time.perf_counter()
self._logger.info(f"Downloading data to {destination_path}")
Expand Down Expand Up @@ -245,3 +253,77 @@ def _move_data_csv_s3_to_local(
local_csv_file_paths.append(final_path)
self._logger.info(f"Copied data files from S3 to local machine in {(time.time() - start_time):3f}s")
return local_csv_file_paths


class DuckDBToCSVStrategy(AbstractToCSVStrategy):
def __init__(self, logger: logging.Logger, spark: DuckDBSparkSession, *args, **kwargs):
super().__init__(*args, **kwargs)
self._logger = logger
self.spark = spark

def download_to_csv(
self,
source_sql: str | None,
destination_path: str,
destination_file_name: str,
working_dir_path: str,
download_zip_path: str,
source_df=None,
delimiter=",",
file_format="csv",
):
from usaspending_api.common.etl.spark import write_csv_file_duckdb

try:
if source_df is not None:
df = source_df
else:
df = self.spark.sql(source_sql)
record_count, final_csv_data_file_locations = write_csv_file_duckdb(
df=df,
download_file_name=destination_file_name,
max_records_per_file=EXCEL_ROW_LIMIT,
logger=self._logger,
delimiter=delimiter,
)
column_count = len(df.columns)
except Exception:
self._logger.exception("Exception encountered. See logs")
raise
append_files_to_zip_file(final_csv_data_file_locations, download_zip_path)
self._logger.info(f"Generated the following data csv files {final_csv_data_file_locations}")
return CSVDownloadMetadata(final_csv_data_file_locations, record_count, column_count)

def _move_data_csv_s3_to_local(
self, bucket_name, s3_file_paths, s3_bucket_path, s3_bucket_sub_path, destination_path_dir
) -> List[str]:
"""Moves files from s3 data csv location to a location on the local machine.

Args:
bucket_name: The name of the bucket in s3 where file_names and s3_path are
s3_file_paths: A list of file paths to move from s3, name should
include s3a:// and bucket name
s3_bucket_path: The bucket path, e.g. s3a:// + bucket name
s3_bucket_sub_path: The path to the s3 files in the bucket, exluding s3a:// + bucket name, e.g. temp_directory/files
destination_path_dir: The location to move those files from s3 to, must not include the
file name in the path. This path should be a diretory.

Returns:
A list of the final location on the local machine that the
files were moved to from s3.
"""
start_time = time.time()
self._logger.info("Moving data files from S3 to local machine...")
local_csv_file_paths = []
for file_name in s3_file_paths:
s3_key = file_name.replace(f"{s3_bucket_path}/", "")
file_name_only = s3_key.replace(f"{s3_bucket_sub_path}/", "")
final_path = f"{destination_path_dir}/{file_name_only}"
download_s3_object(
bucket_name,
s3_key,
final_path,
)
local_csv_file_paths.append(final_path)
self._logger.info(f"Copied data files from S3 to local machine in {(time.time() - start_time):3f}s")
return local_csv_file_paths
26 changes: 24 additions & 2 deletions usaspending_api/common/spark/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

from databricks.sdk import WorkspaceClient
from databricks.sdk.config import Config as DatabricksConfig
from databricks.sdk.service.jobs import RunLifeCycleState, BaseJob
from databricks.sdk.service.jobs import BaseJob, RunLifeCycleState
from django.core.management import call_command
from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession

from usaspending_api.common.spark.configs import LOCAL_EXTENDED_EXTRA_CONF, OPTIONAL_SPARK_HIVE_JAR, SPARK_SESSION_JARS

Expand Down Expand Up @@ -153,7 +154,7 @@ def _run_in_container(job_name: str, command_name: str, command_options: list[st
template_container = client.containers.get("spark-submit")
except docker.errors.NotFound:
logger.exception(
f"The 'spark-submit' container was not found. Please create this container first via the supported"
"The 'spark-submit' container was not found. Please create this container first via the supported"
" spark-submit docker compose workflow."
)
raise
Expand Down Expand Up @@ -205,6 +206,27 @@ def handle_start(self, job_name: str, command_name: str, command_options: list[s
return run_details


class DuckDBStrategy(_AbstractStrategy):
@property
def name(self) -> str:
return "DUCKDB"

@staticmethod
@contextmanager
def _get_spark_session() -> Generator["SparkSession", None, None]:
spark = DuckDBSparkSession.builder.getOrCreate()
yield spark
spark.stop()

def handle_start(self, job_name: str, command_name: str, command_options: list[str], **kwargs) -> None:
try:
with self._get_spark_session():
call_command(command_name, *command_options)
except Exception:
logger.exception(f"Failed on command: {command_name} {' '.join(command_options)}")
raise


class SparkJobs:
def __init__(self, strategy: _AbstractStrategy):
self._strategy = strategy
Expand Down
44 changes: 37 additions & 7 deletions usaspending_api/common/spark/utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,47 @@
from pyspark.sql import functions as sf, Column
from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession
from duckdb.experimental.spark.sql.column import Column as DuckDBSparkColumn
from pyspark.sql import Column, SparkSession

from usaspending_api.submissions.helpers import get_submission_ids_for_periods
from usaspending_api.download.delta_downloads.filters.account_filters import AccountDownloadFilters
from usaspending_api.submissions.helpers import get_submission_ids_for_periods


def collect_concat(col_name: str | Column, concat_str: str = "; ", alias: str | None = None) -> Column:
def collect_concat(
col_name: str | Column | DuckDBSparkColumn,
spark: SparkSession | DuckDBSparkSession,
concat_str: str = "; ",
alias: str | None = None,
) -> Column | DuckDBSparkColumn:
"""Aggregates columns into a string of values seperated by some delimiter"""
if alias is None:
alias = col_name if isinstance(col_name, str) else str(col_name._jc)
return sf.concat_ws(concat_str, sf.sort_array(sf.collect_set(col_name))).alias(alias)

if isinstance(spark, DuckDBSparkSession):
from duckdb.experimental.spark.sql import functions as sf

if alias is None and isinstance(col_name, str):
alias = col_name
elif alias is None and not isinstance(col_name, str):
# DuckDB doesn't have a "._jc" property like PySpark does so we need a string for the alias
raise TypeError(f"`col_name` must be a string for DuckDB, but got {type(col_name)}")

# collect_set() is not implemented in DuckDB's Spark API, but the `list_distinct` SQL method should work
return sf.concat_ws(concat_str, sf.sort_array(sf.call_function("list_distinct", col_name))).alias(alias)
else:
from pyspark.sql import functions as sf

if alias is None:
alias = col_name if isinstance(col_name, str) else str(col_name._jc)

return sf.concat_ws(concat_str, sf.sort_array(sf.collect_set(col_name))).alias(alias)


def filter_submission_and_sum(
col_name: str, filters: AccountDownloadFilters, spark: SparkSession | DuckDBSparkSession
) -> Column:
if isinstance(spark, DuckDBSparkSession):
from duckdb.experimental.spark.sql import functions as sf
else:
from pyspark.sql import functions as sf

def filter_submission_and_sum(col_name: str, filters: AccountDownloadFilters) -> Column:
filter_column = (
sf.when(
sf.col("submission_id").isin(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@
from functools import cached_property
from typing import TypeVar

from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession
from duckdb.experimental.spark.sql.column import Column as DuckDBSparkColumn
from duckdb.experimental.spark.sql.dataframe import DataFrame as DuckDBSparkDataFrame
from pydantic import BaseModel
from pyspark.sql import Column, DataFrame, SparkSession

DownloadFilters = TypeVar("DownloadFilters", bound=BaseModel)


class AbstractDownload(ABC):
def __init__(self, spark: SparkSession, filters: DownloadFilters, dynamic_filters: Column):
def __init__(
self,
spark: SparkSession | DuckDBSparkSession,
filters: DownloadFilters,
dynamic_filters: Column | DuckDBSparkColumn,
):
self._spark = spark
self._filters = filters
self._dynamic_filters = dynamic_filters
Expand All @@ -21,27 +29,27 @@ def filters(self) -> DownloadFilters:
return self._filters

@property
def dynamic_filters(self) -> Column:
def dynamic_filters(self) -> Column | DuckDBSparkColumn:
return self._dynamic_filters

@property
def start_time(self) -> datetime:
return self._start_time

@property
def spark(self) -> SparkSession:
def spark(self) -> SparkSession | DuckDBSparkSession:
return self._spark

@cached_property
def file_name(self) -> str:
return self._build_file_name()

@cached_property
def dataframe(self) -> DataFrame:
def dataframe(self) -> DataFrame | DuckDBSparkDataFrame:
return self._build_dataframe()

@abstractmethod
def _build_file_name(self) -> str: ...

@abstractmethod
def _build_dataframe(self) -> DataFrame: ...
def _build_dataframe(self) -> DataFrame | DuckDBSparkDataFrame: ...
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from functools import reduce
from typing import TypeVar

from pyspark.sql import functions as sf, Column, SparkSession
from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession
from duckdb.experimental.spark.sql.column import Column as DuckDBColumn
from pyspark.sql import Column, SparkSession

from usaspending_api.common.exceptions import InvalidParameterException
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
Expand All @@ -13,7 +15,6 @@
)
from usaspending_api.download.delta_downloads.filters.account_filters import AccountDownloadFilters


AccountDownload = TypeVar("AccountDownload", bound=AbstractAccountDownload)


Expand All @@ -28,13 +29,12 @@ class AccountDownloadConditionName(Enum):


class AbstractAccountDownloadFactory(ABC):

def __init__(self, spark: SparkSession, filters: AccountDownloadFilters):
def __init__(self, spark: SparkSession | DuckDBSparkSession, filters: AccountDownloadFilters):
self._spark = spark
self._filters = filters

@property
def spark(self) -> SparkSession:
def spark(self) -> SparkSession | DuckDBSparkSession:
return self._spark

@property
Expand All @@ -46,11 +46,16 @@ def supported_filter_conditions(self) -> list[AccountDownloadConditionName]:
return list(AccountDownloadConditionName)

@property
def dynamic_filters(self) -> Column:
def dynamic_filters(self) -> Column | DuckDBColumn:
if type(self._spark) is DuckDBSparkSession:
from duckdb.experimental.spark.sql import functions as sf
else:
from pyspark.sql import functions as sf

@dataclass
class Condition:
name: AccountDownloadConditionName
condition: Column
condition: Column | DuckDBColumn
apply: bool

conditions = [
Expand Down
8 changes: 4 additions & 4 deletions usaspending_api/download/delta_downloads/account_balances.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ def group_by_cols(self) -> list[str]:
@property
def agg_cols(self) -> list[Column]:
return [
collect_concat("reporting_agency_name"),
collect_concat("agency_identifier_name"),
collect_concat("budget_function"),
collect_concat("budget_subfunction"),
collect_concat("reporting_agency_name", spark=self.spark),
collect_concat("agency_identifier_name", spark=self.spark),
collect_concat("budget_function", spark=self.spark),
collect_concat("budget_subfunction", spark=self.spark),
sf.sum(sf.col("budget_authority_unobligated_balance_brought_forward")).alias(
"budget_authority_unobligated_balance_brought_forward"
),
Expand Down
14 changes: 8 additions & 6 deletions usaspending_api/download/delta_downloads/award_financial.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,18 @@ def group_by_cols(self) -> list[str]:
@property
def agg_cols(self) -> dict[str, callable]:
return {
"reporting_agency_name": collect_concat,
"budget_function": collect_concat,
"budget_subfunction": collect_concat,
"reporting_agency_name": lambda col: collect_concat(col, spark=self.spark),
"budget_function": lambda col: collect_concat(col, spark=self.spark),
"budget_subfunction": lambda col: collect_concat(col, spark=self.spark),
"transaction_obligated_amount": lambda col: sf.sum(col).alias(col),
"gross_outlay_amount_FYB_to_period_end": lambda col: filter_submission_and_sum(col, self.filters),
"gross_outlay_amount_FYB_to_period_end": lambda col: filter_submission_and_sum(
col, self.filters, spark=self.spark
),
"USSGL487200_downward_adj_prior_year_prepaid_undeliv_order_oblig": lambda col: filter_submission_and_sum(
col, self.filters
col, self.filters, spark=self.spark
),
"USSGL497200_downward_adj_of_prior_year_paid_deliv_orders_oblig": lambda col: filter_submission_and_sum(
col, self.filters
col, self.filters, spark=self.spark
),
"last_modified_date": lambda col: sf.max(sf.date_format(col, "yyyy-MM-dd")).alias(col),
}
Expand Down
Loading