22
33import numpy as np
44import logging
5- from typing import Any , Annotated , Dict
5+ from typing import Any , Annotated , Dict , Optional
66
77from fastapi import FastAPI , Depends , HTTPException , status
88from fastapi .responses import FileResponse
@@ -103,13 +103,17 @@ async def login_for_access_token(
103103
104104
105105async def process_file (
106- storage_control : StorageControl , blob : str , env_vars : Dict [str , Any ]
106+ storage_control : StorageControl ,
107+ blob : str ,
108+ env_vars : Dict [str , Any ],
109+ pdp_id : Optional [str ] = None ,
107110) -> Dict [str , Any ]:
108111 """Process a single file: generate URLs, transfer, and validate."""
109112 logger .debug (f">>>> Splitting { blob } to extract institution data" )
110113 signed_urls = split_csv_and_generate_signed_urls (
111114 bucket_name = get_sftp_bucket_name (env_vars ["BUCKET_ENV" ]),
112115 source_blob_name = blob ,
116+ pdp_id = pdp_id ,
113117 )
114118 logger .info (f">>>> Signed URLs, File names generated for { blob } " )
115119
@@ -197,6 +201,7 @@ async def execute_pdp_pull(
197201 sftp_source_filename : str ,
198202 current_username : Annotated [str , Depends (get_current_username )],
199203 storage_control : Annotated [StorageControl , Depends (StorageControl )],
204+ pdp_id : Optional [str ] = None ,
200205) -> Any :
201206 """Performs the PDP pull of the file."""
202207
@@ -205,7 +210,7 @@ async def execute_pdp_pull(
205210 )
206211
207212 gcs_blob = sftp_file_to_gcs_helper (storage_control , sftp_source_filename )
208- result = await process_file (storage_control , gcs_blob , env_vars )
213+ result = await process_file (storage_control , gcs_blob , env_vars , pdp_id = pdp_id )
209214
210215 # Aggregate results to return
211216 return {
0 commit comments