Skip to content

Commit e91f28b

Browse files
authored
Merge pull request #31 from datakind/sftp-ingestion
feat: added new sftp-files endpoint
2 parents b497c0d + bb58490 commit e91f28b

File tree

3 files changed

+44
-19
lines changed

3 files changed

+44
-19
lines changed

src/worker/main.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ class PdpPullRequest(BaseModel):
4949
class PdpPullResponse(BaseModel):
5050
"""Fields for the PDP pull response."""
5151

52-
sftp_files: list[dict]
5352
pdp_inst_generated: list[Any]
5453
pdp_inst_not_found: list[Any]
5554
upload_status: dict
@@ -58,6 +57,12 @@ class Config:
5857
json_encoders = {np.int64: lambda v: int(v)}
5958

6059

60+
class PdpListFiles(BaseModel):
61+
"""Fields for the PDP pull response."""
62+
63+
sftp_files: list[dict]
64+
65+
6166
@app.on_event("startup")
6267
def on_startup():
6368
print("Starting up app...")
@@ -162,8 +167,8 @@ async def process_file(
162167
}
163168

164169

165-
@app.post("/execute-pdp-pull", response_model=PdpPullResponse)
166-
async def execute_pdp_pull(
170+
@app.get("/sftp_files", response_model=PdpListFiles)
171+
def sftp_files(
167172
req: PdpPullRequest,
168173
current_username: Annotated[str, Depends(get_current_username)],
169174
storage_control: Annotated[StorageControl, Depends(StorageControl)],
@@ -181,15 +186,30 @@ async def execute_pdp_pull(
181186
remote_path="./receive",
182187
)
183188

184-
results = []
185-
for file in files:
186-
gcs_blob = sftp_file_to_gcs_helper(storage_control, file)
187-
result = await process_file(storage_control, gcs_blob, env_vars)
188-
results.append(result)
189-
190189
# Aggregate results to return
191190
return {
192191
"sftp_files": files,
192+
}
193+
194+
195+
@app.post("/execute-pdp-pull", response_model=PdpPullResponse)
196+
async def execute_pdp_pull(
197+
req: PdpPullRequest,
198+
sftp_source_filename: str,
199+
current_username: Annotated[str, Depends(get_current_username)],
200+
storage_control: Annotated[StorageControl, Depends(StorageControl)],
201+
) -> Any:
202+
"""Performs the PDP pull of the file."""
203+
204+
storage_control.create_bucket_if_not_exists(
205+
get_sftp_bucket_name(env_vars["BUCKET_ENV"])
206+
)
207+
208+
gcs_blob = sftp_file_to_gcs_helper(storage_control, sftp_source_filename)
209+
result = await process_file(storage_control, gcs_blob, env_vars)
210+
211+
# Aggregate results to return
212+
return {
193213
"pdp_inst_generated": list(result["valid_inst_ids"]),
194214
"pdp_inst_not_found": list(result["invalid_ids"]),
195215
"upload_status": dict(result["uploads"]),

src/worker/main_test.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ def test_retrieve_token(client: TestClient) -> Any:
4747
assert response.status_code == 200
4848

4949

50+
def sftp_files(client: TestClient) -> Any:
51+
"""Test GET /sftp-files endpoint"""
52+
response = client.get("/sftp-files")
53+
assert response.status_code == 200
54+
assert response.json() == {"sftp_files": {}}
55+
56+
5057
@patch("google.auth.default")
5158
def test_execute_pdp_pull(
5259
mock_auth_default: Any, client: TestClient, monkeypatch: Any
@@ -65,19 +72,17 @@ def test_execute_pdp_pull(
6572
lambda filename: f"processed_{filename}"
6673
)
6774
MOCK_STORAGE.create_bucket_if_not_exists.return_value = None
68-
MOCK_STORAGE.list_sftp_files.return_value = [
69-
{"path": "file1.csv"},
70-
{"path": "file2.csv"},
71-
]
75+
7276
# Optionally, if there's a process_file or similar function, you can mock it too.
7377
# For this test, we're focusing on the overall endpoint behavior.
7478

75-
response = client.post("/execute-pdp-pull", json={"placeholder": "val"})
79+
response = client.post(
80+
"/execute-pdp-pull?sftp_source_filename=file1.csv", json={"placeholder": "val"}
81+
)
7682

7783
# Verify the response status and content.
7884
assert response.status_code == 200
7985
assert response.json() == {
80-
"sftp_files": [{"path": "file1.csv"}, {"path": "file2.csv"}],
8186
"pdp_inst_generated": [],
8287
"pdp_inst_not_found": [],
8388
"upload_status": {},

src/worker/utilities.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def copy_from_sftp_to_gcs(
5454
if client is None:
5555
raise RuntimeError("Failed to create SFTP client.")
5656
# Open the file in binary read mode.
57-
with client.open(sftp_file, "rb") as f:
57+
with client.open(f"./receive/{sftp_file}", "rb") as f:
5858
blob.upload_from_file(f)
5959

6060
def list_sftp_files(
@@ -485,7 +485,7 @@ def split_csv_and_generate_signed_urls(
485485

486486

487487
def sftp_file_to_gcs_helper(
488-
storage_control: StorageControl, sftp_source_filename: dict
488+
storage_control: StorageControl, sftp_source_filename: str
489489
) -> str:
490490
"""
491491
For each source file in sftp_source_filenames, copies the file from the SFTP
@@ -502,7 +502,7 @@ def sftp_file_to_gcs_helper(
502502
)
503503
# for sftp_source_filename in sftp_source_filenames:
504504
# Extract the base filename and prepare the destination filename
505-
source_filename = sftp_source_filename["path"]
505+
source_filename = sftp_source_filename
506506
# Extract the base filename.
507507
base_filename = os.path.basename(source_filename)
508508
dest_filename = f"{base_filename}"
@@ -521,7 +521,7 @@ def sftp_file_to_gcs_helper(
521521
22,
522522
sftp_vars["SFTP_USER"],
523523
sftp_vars["SFTP_PASSWORD"],
524-
source_filename,
524+
dest_filename,
525525
get_sftp_bucket_name(env_vars["ENV"]),
526526
dest_filename,
527527
)

0 commit comments

Comments
 (0)