Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ae408ab
[DEV-12639] add file c spark functionality
loreleitrimberger Jun 25, 2025
c6be152
Merge branch 'qat' into ftr/dev-12639-update-custom-download
loreleitrimberger Jul 24, 2025
1062b18
[DEV-12639] add spark job for downloading file c accounts in spark
loreleitrimberger Jul 28, 2025
7b7f904
[DEV-12639] update to account for other submission types
loreleitrimberger Jul 29, 2025
4d7050e
[DEV-12639] add test, clean up
loreleitrimberger Jul 30, 2025
0daaa4a
[DEV-12639] pre-commit fix
loreleitrimberger Jul 30, 2025
ee3c1e4
[DEV-12639] Update spark test, add ability to include object_class_pr…
loreleitrimberger Aug 4, 2025
a0ae19c
Merge branch 'qat' into ftr/dev-12639-update-custom-download
loreleitrimberger Aug 4, 2025
654c4b8
[DEV-12639] format
loreleitrimberger Aug 4, 2025
4558e83
[DEV-12639] format
loreleitrimberger Aug 4, 2025
ee82139
Merge branch 'ftr/dev-12639-update-custom-download' of https://github…
loreleitrimberger Aug 4, 2025
4900b48
Revert "[DEV-12639] format"
loreleitrimberger Aug 4, 2025
f07a3e5
Revert "[DEV-12639] format"
loreleitrimberger Aug 4, 2025
e9ffd2d
[DEV-12639] cleanup
loreleitrimberger Aug 5, 2025
19c7017
[DEV-12639] format
loreleitrimberger Aug 5, 2025
9a49a2f
[DEV-12639] format
loreleitrimberger Aug 5, 2025
6e10e5b
Revert "[DEV-12639] format"
loreleitrimberger Aug 5, 2025
7bbaceb
[DEV-12639] Fix files updated from black
loreleitrimberger Aug 5, 2025
c13a2a1
[DEV-12639] fix builder.py
loreleitrimberger Aug 5, 2025
e5c86d5
[DEV-12639] remove duplicate method
loreleitrimberger Aug 5, 2025
427786b
[DEV-12639] sandbox changes
loreleitrimberger Aug 5, 2025
9bb8ddb
[DEV-12639] revert builders.py
loreleitrimberger Aug 7, 2025
918752b
Merge branch 'qat' into ftr/dev-12639-update-custom-download
sethstoudenmier Aug 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions usaspending_api/common/experimental_api_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
EXPERIMENTAL_API_HEADER = "HTTP_X_EXPERIMENTAL_API"
ELASTICSEARCH_HEADER_VALUE = "elasticsearch"

DOWNLOAD_API_HEADER = "HTTP-X-DOWNLOAD-API"
DOWNLOAD_HEADER_VALUE = "download"

def is_experimental_elasticsearch_api(request: Request) -> bool:
"""
Expand All @@ -29,6 +31,12 @@ def is_experimental_elasticsearch_api(request: Request) -> bool:
return request.META.get(EXPERIMENTAL_API_HEADER) == ELASTICSEARCH_HEADER_VALUE


def is_experimental_download_api(request: Request) -> bool:
"""
Returns True or False depending on if the expected_header_value matches what is sent with the request
"""
return request.headers.get(DOWNLOAD_API_HEADER) == DOWNLOAD_HEADER_VALUE
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the same format as the experimental_elasticsearch_api wasn't coming through for me (using META.get and _ instead of -), but doing it this way did

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Django does some handling of the request headers (see https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.HttpRequest.META).

In short, the ES experimental header would normally be supplied to the API as X-Experimental-API and Django would convert it under request.META to be HTTP_X_EXPERIMENTAL_API.

With that said you should also be good to use request.headers in this case, but I would recommend changing the expected value to no include "HTTP" as that is something that Django prepends for the "request.META".


def mirror_request_to_elasticsearch(request: Union[HttpRequest, Request]):
"""Duplicate request and send-again against this server, with the ES header attached to mirror
non-elasticsearch load against elasticsearch for load testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,29 @@ def test_empty_array_filter_fail(client, download_test_data):
assert (
"Field 'filters|def_codes' value '[]' is below min '1' items" in resp.json()["detail"]
), "Incorrect error message"

@pytest.mark.django_db(databases=[settings.DOWNLOAD_DB_ALIAS, settings.DEFAULT_DB_ALIAS])
def test_file_c_spark_download(client, download_test_data):
download_generation.retrieve_db_string = Mock(return_value=get_database_dsn_string())

resp = client.post(
"/api/v2/download/accounts/",
content_type="application/json",
data=json.dumps(
{
"account_level": "federal_account",
"filters": { "budget_function": "all", "agency": "all",
"submission_types": [
"account_balances",
"object_class_program_activity",
"award_financial"
],
"fy": "2021",
"period": 12
},
"file_format": "csv"
}
),
)

assert resp.status_code == status.HTTP_200_OK
42 changes: 40 additions & 2 deletions usaspending_api/download/v2/base_download_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from usaspending_api.broker.lookups import EXTERNAL_DATA_TYPE_DICT
from usaspending_api.broker.models import ExternalDataLoadDate
from usaspending_api.common.api_versioning import API_TRANSFORM_FUNCTIONS, api_transformations
from usaspending_api.common.experimental_api_flags import is_experimental_download_api
from usaspending_api.common.helpers.dict_helpers import order_nested_object
from usaspending_api.common.spark.jobs import DatabricksStrategy, LocalStrategy, SparkJobs
from usaspending_api.common.sqs.sqs_handler import get_sqs_queue
from usaspending_api.download.download_utils import create_unique_filename, log_new_download_job
from usaspending_api.download.filestreaming import download_generation
Expand All @@ -24,7 +26,7 @@
from usaspending_api.download.v2.request_validations import DownloadValidatorBase
from usaspending_api.routers.replicas import ReadReplicaRouter
from usaspending_api.submissions.models import DABSSubmissionWindowSchedule

from usaspending_api.settings import IS_LOCAL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use settings.IS_LOCAL since settings is already imported above.


@api_transformations(api_version=settings.API_VERSION, function_list=API_TRANSFORM_FUNCTIONS)
class BaseDownloadViewSet(APIView):
Expand Down Expand Up @@ -68,7 +70,34 @@ def post(
)

log_new_download_job(request, download_job)
self.process_request(download_job)
if (
is_experimental_download_api(request)
and json_request["request_type"] == "account"
and "award_financial" in json_request["download_types"]
):
# run spark download with only award_financial in download type
str_to_json_original = json.loads(download_job.json_request).copy()
str_to_json_award_financial = json.loads(download_job.json_request)
str_to_json_award_financial['download_types'] = ['award_financial']
str_to_json_award_financial['filters']['submission_types'] = ['award_financial']
download_job.json_request = json.dumps(str_to_json_award_financial)
download_job.save()
# goes to spark for File C account download
self.process_account_download_in_spark(download_job=download_job)
# remove award_financial from json request to run download with remaining types
if len(json_request["download_types"]) > 1:
str_to_json_original['filters']['submission_types'].remove('award_financial')
str_to_json_original['download_types'].remove('award_financial')
download_job.json_request = json.dumps(str_to_json_original)
download_job.save()
self.process_request(download_job)
# add award_financial back for the final response
str_to_json_original['filters']['submission_types'].append('award_financial')
str_to_json_original['download_types'].append('award_financial')
download_job.json_request = json.dumps(str_to_json_original)
download_job.save()
else:
self.process_request(download_job)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the best way to go about it, but this way the spark job is reading from the original download_job without files a or b in it and then the download_job runs without file c


return self.get_download_response(file_name=final_output_zip_name)

Expand All @@ -85,6 +114,15 @@ def process_request(self, download_job: DownloadJob):
queue = get_sqs_queue(queue_name=settings.BULK_DOWNLOAD_SQS_QUEUE_NAME)
queue.send_message(MessageBody=str(download_job.download_job_id))

def process_account_download_in_spark(self, download_job: DownloadJob):
"""
Process File C downloads through spark instead of sqs for better performance
"""
spark_jobs = SparkJobs(LocalStrategy()) if IS_LOCAL else SparkJobs(DatabricksStrategy())
spark_jobs.start(job_name="download_delta_table-award_search",
command_name="generate_spark_download",
command_options=[f"--download-job-id={download_job.download_job_id}"])

def get_download_response(self, file_name: str):
"""
Generate download response which encompasses various elements to provide accurate status for state of a
Expand Down
Loading