Skip to content

Commit 3653cf0

Browse files
authored
Merge pull request #33 from datakind/sftp-ingestion
fix: test script
2 parents b276a49 + 2948df8 commit 3653cf0

File tree

2 files changed

+15
-6
lines changed

2 files changed

+15
-6
lines changed

src/worker/main.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import numpy as np
44
import logging
5-
from typing import Any, Annotated, Dict
5+
from typing import Any, Annotated, Dict, Optional
66

77
from fastapi import FastAPI, Depends, HTTPException, status
88
from fastapi.responses import FileResponse
@@ -103,13 +103,17 @@ async def login_for_access_token(
103103

104104

105105
async def process_file(
106-
storage_control: StorageControl, blob: str, env_vars: Dict[str, Any]
106+
storage_control: StorageControl,
107+
blob: str,
108+
env_vars: Dict[str, Any],
109+
pdp_id: Optional[str] = None,
107110
) -> Dict[str, Any]:
108111
"""Process a single file: generate URLs, transfer, and validate."""
109112
logger.debug(f">>>> Splitting {blob} to extract institution data")
110113
signed_urls = split_csv_and_generate_signed_urls(
111114
bucket_name=get_sftp_bucket_name(env_vars["BUCKET_ENV"]),
112115
source_blob_name=blob,
116+
pdp_id=pdp_id,
113117
)
114118
logger.info(f">>>> Signed URLs, File names generated for {blob}")
115119

@@ -197,6 +201,7 @@ async def execute_pdp_pull(
197201
sftp_source_filename: str,
198202
current_username: Annotated[str, Depends(get_current_username)],
199203
storage_control: Annotated[StorageControl, Depends(StorageControl)],
204+
pdp_id: Optional[str] = None,
200205
) -> Any:
201206
"""Performs the PDP pull of the file."""
202207

@@ -205,7 +210,7 @@ async def execute_pdp_pull(
205210
)
206211

207212
gcs_blob = sftp_file_to_gcs_helper(storage_control, sftp_source_filename)
208-
result = await process_file(storage_control, gcs_blob, env_vars)
213+
result = await process_file(storage_control, gcs_blob, env_vars, pdp_id=pdp_id)
209214

210215
# Aggregate results to return
211216
return {

src/worker/utilities.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from datetime import datetime, timedelta
99
import io
1010
import logging
11-
from typing import List, Dict, Any
11+
from typing import List, Dict, Any, Optional
1212
import requests
1313
import pandas as pd
1414
import re
@@ -383,7 +383,7 @@ def generate_signed_url(
383383

384384

385385
def split_csv_and_generate_signed_urls(
386-
bucket_name: str, source_blob_name: str
386+
bucket_name: str, source_blob_name: str, pdp_id: Optional[str] = None
387387
) -> Dict[str, Dict[str, str]]:
388388
"""
389389
Fetches a CSV from Google Cloud Storage, splits it by a specified column, uploads the results,
@@ -440,7 +440,11 @@ def split_csv_and_generate_signed_urls(
440440
all_data = {}
441441

442442
# Processing the DataFrame
443-
unique_inst_ids = df[institution_column].unique()
443+
# unique_inst_ids = df[institution_column].unique()
444+
unique_inst_ids = (
445+
list(df[institution_column].unique()) if pdp_id is None else [pdp_id]
446+
)
447+
444448
for inst_id in unique_inst_ids:
445449
group = df[df[institution_column] == inst_id]
446450
output = io.StringIO()

0 commit comments

Comments
 (0)