Skip to content

Commit 1788015

Browse files
authored
Merge pull request #4 from datakind/sftp-integration
fix sftp errors
2 parents 66c6332 + fe377e3 commit 1788015

File tree

5 files changed

+253
-104
lines changed

5 files changed

+253
-104
lines changed

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ dependencies = [
2424
"requests>=2.0.0",
2525
"types-requests",
2626
"types-paramiko",
27-
"pandas"
27+
"pandas",
28+
"collection",
29+
"six",
30+
"types-six"
2831
]
2932

3033
[project.urls]

src/worker/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dotenv import load_dotenv
55

66
# defaults to unit test values.
7+
78
env_vars = {
89
"ENV": "LOCAL",
910
"SECRET_KEY": "",

src/worker/main.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Main file for the SST Worker."""
22

3+
import numpy as np
34
import logging
45
from typing import Any, Annotated
56
from fastapi import FastAPI, Depends, HTTPException, status, Security
@@ -13,7 +14,7 @@
1314
split_csv_and_generate_signed_urls,
1415
fetch_institution_ids,
1516
)
16-
from .config import sftp_vars, env_vars, startup_env_vars
17+
from .config import sftp_vars, env_vars, startup_env_vars, gcs_vars
1718
from .authn import (
1819
Token,
1920
get_current_username,
@@ -51,8 +52,11 @@ class PdpPullResponse(BaseModel):
5152
"""Fields for the PDP pull response."""
5253

5354
sftp_files: list[dict]
54-
pdp_inst_generated: list[str]
55-
pdp_inst_not_found: list[str]
55+
pdp_inst_generated: list[Any]
56+
pdp_inst_not_found: list[Any]
57+
58+
class Config:
59+
json_encoders = {np.int64: lambda v: int(v)}
5660

5761

5862
@app.on_event("startup")
@@ -134,7 +138,6 @@ def sftp_helper(storage_control: StorageControl, sftp_source_filenames: list) ->
134138
logger.info(
135139
f"Successfully processed '{sftp_source_filename}' as '{dest_filename}'."
136140
)
137-
return all_blobs
138141
except Exception as e:
139142
logger.error(
140143
f"Error processing '{sftp_source_filename}': {e}", exc_info=True
@@ -143,7 +146,7 @@ def sftp_helper(storage_control: StorageControl, sftp_source_filenames: list) ->
143146

144147

145148
@app.post("/execute-pdp-pull", response_model=PdpPullResponse)
146-
def execute_pdp_pull(
149+
async def execute_pdp_pull(
147150
req: PdpPullRequest,
148151
current_username: Annotated[str, Depends(get_current_username)],
149152
storage_control: Annotated[StorageControl, Depends(StorageControl)],
@@ -155,20 +158,27 @@ def execute_pdp_pull(
155158
sftp_vars["SFTP_HOST"], 22, sftp_vars["SFTP_USER"], sftp_vars["SFTP_PASSWORD"]
156159
)
157160
all_blobs = sftp_helper(storage_control, files)
161+
print(f"It's all processed {all_blobs}")
158162
valid_pdp_ids = []
159163
invalid_ids = []
160164

161165
for blobs in all_blobs:
166+
logging.debug(f"Processing {blobs}")
167+
print(f"Processing {blobs}")
162168
signed_urls = split_csv_and_generate_signed_urls(
163-
bucket_name=get_sftp_bucket_name(env_vars["ENV"]), source_blob_name=blobs
169+
bucket_name=get_sftp_bucket_name(env_vars["ENV"]),
170+
source_blob_name=blobs,
171+
storage_account_file=gcs_vars["GCP_SERVICE_ACCOUNT_KEY_PATH"],
164172
)
173+
logging.info(f"Signed URls generated {signed_urls}")
165174

166175
temp_valid_pdp_ids, temp_invalid_ids = fetch_institution_ids(
167176
pdp_ids=list(signed_urls.keys()),
168177
backend_api_key=next(
169178
key for key in api_key_enduser_tuple if key is not None
170179
),
171180
)
181+
172182
valid_pdp_ids.append(temp_valid_pdp_ids)
173183
invalid_ids.append(temp_invalid_ids)
174184

0 commit comments

Comments
 (0)