Skip to content

Commit 3d91f3b

Browse files
authored
Merge pull request #2 from datakind/revert-1-sftp-integration
Revert "feat: Added sftp ingestion functionalities"
2 parents 257d511 + c9fe795 commit 3d91f3b

File tree

6 files changed

+46
-449
lines changed

6 files changed

+46
-449
lines changed

.github/workflows/type-check.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
python-version: "3.10"
1616
- name: Get changed files
1717
id: changed-files
18-
uses: step-security/changed-files@v45
18+
uses: step-security/changed-files@45
1919
with:
2020
files: |
2121
src/**/*.py

pyproject.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ dependencies = [
2121
"strenum>=0.4.15",
2222
"tomli~=2.0; python_version<'3.11'",
2323
"jsonpickle>=4.0.1",
24-
"requests>=2.0.0",
25-
"types-requests",
26-
"types-paramiko",
27-
"pandas"
2824
]
2925

3026
[project.urls]

src/worker/authn.py

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,21 @@
22

33
from fastapi.security import (
44
OAuth2PasswordBearer,
5+
OAuth2PasswordRequestForm,
56
APIKeyHeader,
7+
APIKeyQuery,
68
)
79
from pydantic import BaseModel
810
from datetime import timedelta, datetime, timezone
911
from .config import env_vars
1012
from typing import Annotated
11-
from fastapi import Depends, HTTPException, status, Security
13+
from fastapi import Depends, HTTPException, status
1214
from jwt.exceptions import InvalidTokenError
1315

1416
oauth2_scheme = OAuth2PasswordBearer(
1517
tokenUrl="token",
1618
)
1719

18-
api_key_header = APIKeyHeader(name="X-API-KEY", scheme_name="api-key", auto_error=False)
19-
api_key_inst_header = APIKeyHeader(
20-
name="INST", scheme_name="api-inst", auto_error=False
21-
)
22-
# The following is for use by the frontend enduser only.
23-
api_key_enduser_header = APIKeyHeader(
24-
name="ENDUSER", scheme_name="api-enduser", auto_error=False
25-
)
26-
2720

2821
class Token(BaseModel):
2922
access_token: str
@@ -34,30 +27,7 @@ class TokenData(BaseModel):
3427
username: str | None = None
3528

3629

37-
def get_api_key(
38-
api_key_header: str = Security(api_key_header),
39-
api_key_inst_header: str = Security(api_key_inst_header),
40-
api_key_enduser_header: str = Security(api_key_enduser_header),
41-
) -> tuple:
42-
"""Retrieve the api key and enduser header key if present.
43-
44-
Args:
45-
api_key_header: The API key passed in the HTTP header.
46-
47-
Returns:
48-
A tuple with the api key and enduser header if present. Authentication happens elsewhere.
49-
Raises:
50-
HTTPException: If the API key is invalid or missing.
51-
"""
52-
if api_key_header:
53-
return (api_key_header, api_key_inst_header, api_key_enduser_header)
54-
raise HTTPException(
55-
status_code=status.HTTP_401_UNAUTHORIZED,
56-
detail="Invalid or missing API Key",
57-
)
58-
59-
60-
def check_creds(username: str, password: str) -> bool:
30+
def check_creds(username: str, password: str):
6131
if username == env_vars["USERNAME"] and password == env_vars["PASSWORD"]:
6232
return True
6333
raise HTTPException(
@@ -66,13 +36,13 @@ def check_creds(username: str, password: str) -> bool:
6636
)
6737

6838

69-
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
39+
def create_access_token(data: dict, expires_delta: timedelta | None = None):
7040
to_encode = data.copy()
7141
if expires_delta:
7242
expire = datetime.now(timezone.utc) + expires_delta
7343
else:
7444
expire = datetime.now(timezone.utc) + timedelta(
75-
minutes=float(env_vars["ACCESS_TOKEN_EXPIRE_MINUTES"])
45+
minutes=env_vars["ACCESS_TOKEN_EXPIRE_MINUTES"]
7646
)
7747
to_encode.update({"exp": expire})
7848
encoded_jwt = jwt.encode(

src/worker/main.py

Lines changed: 22 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,18 @@
22

33
import logging
44
from typing import Any, Annotated
5-
from fastapi import FastAPI, Depends, HTTPException, status, Security
5+
from fastapi import FastAPI, Depends, HTTPException, status
66
from fastapi.responses import FileResponse
77

88
from pydantic import BaseModel
9-
from fastapi.security import OAuth2PasswordRequestForm
9+
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
1010
from .utilities import (
1111
get_sftp_bucket_name,
1212
StorageControl,
13-
fetch_institution_ids,
14-
split_csv_and_generate_signed_urls,
1513
)
1614
from .config import sftp_vars, env_vars, startup_env_vars
17-
from .authn import (
18-
Token,
19-
get_current_username,
20-
check_creds,
21-
create_access_token,
22-
get_api_key,
23-
)
24-
from datetime import timedelta
25-
26-
27-
import os
15+
from .authn import Token, get_current_username, check_creds, create_access_token
16+
from datetime import timedelta, datetime, timezone
2817

2918
# Set the logging
3019
logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
@@ -52,9 +41,8 @@ class PdpPullRequest(BaseModel):
5241
class PdpPullResponse(BaseModel):
5342
"""Fields for the PDP pull response."""
5443

55-
sftp_files: list[dict]
56-
pdp_inst_generated: list[dict]
57-
pdp_inst_not_found: list[str]
44+
pdp_inst_generated: list[int]
45+
pdp_inst_not_found: list[int]
5846

5947

6048
@app.on_event("startup")
@@ -96,78 +84,30 @@ async def login_for_access_token(
9684
return Token(access_token=access_token, token_type="bearer")
9785

9886

99-
def sftp_helper(storage_control: StorageControl, sftp_source_filenames: list) -> list:
100-
"""
101-
For each source file in sftp_source_filenames, copies the file from the SFTP
102-
server to GCS. The destination filename is automatically generated by prefixing
103-
the base name of the source file with "processed_".
104-
105-
Args:
106-
storage_control (StorageControl): An instance with a method `copy_from_sftp_to_gcs`.
107-
sftp_source_filenames (list): A list of file paths on the SFTP server.
108-
"""
109-
num_files = len(sftp_source_filenames)
110-
logger.info(f"Starting sftp_helper for {num_files} file(s).")
111-
all_blobs = []
112-
for sftp_source_filename in sftp_source_filenames:
113-
sftp_source_filename = sftp_source_filename["path"]
114-
if (
115-
sftp_source_filename
116-
== "./receive/AO1600pdp_AO1600_AR_DEIDENTIFIED_STUDYID_20250228030226.csv"
117-
):
118-
logger.debug(f"Processing source file: {sftp_source_filename}")
119-
120-
# Extract the base filename.
121-
base_filename = os.path.basename(sftp_source_filename)
122-
dest_filename = f"{base_filename}"
123-
logger.debug(f"Destination filename will be: {dest_filename}")
124-
125-
try:
126-
storage_control.copy_from_sftp_to_gcs(
127-
sftp_vars["SFTP_HOST"],
128-
22,
129-
sftp_vars["SFTP_USER"],
130-
sftp_vars["SFTP_PASSWORD"],
131-
sftp_source_filename,
132-
get_sftp_bucket_name(env_vars["ENV"]),
133-
dest_filename,
134-
)
135-
all_blobs.append(dest_filename)
136-
logger.info(
137-
f"Successfully processed '{sftp_source_filename}' as '{dest_filename}'."
138-
)
139-
return all_blobs
140-
except Exception as e:
141-
logger.error(
142-
f"Error processing '{sftp_source_filename}': {e}", exc_info=True
143-
)
144-
return all_blobs
87+
def sftp_helper(
88+
storage_control: StorageControl, sftp_source_filename: str, dest_filename: str
89+
):
90+
storage_control.copy_from_sftp_to_gcs(
91+
sftp_vars["SFTP_HOST"],
92+
sftp_vars["SFTP_PORT"],
93+
sftp_vars["SFTP_USER"],
94+
sftp_vars["SFTP_PASSWORD"],
95+
sftp_source_filename,
96+
get_sftp_bucket_name(env_vars["ENV"]),
97+
dest_filename,
98+
)
14599

146100

147101
@app.post("/execute-pdp-pull", response_model=PdpPullResponse)
148-
async def execute_pdp_pull(
102+
def execute_pdp_pull(
149103
req: PdpPullRequest,
150104
current_username: Annotated[str, Depends(get_current_username)],
151105
storage_control: Annotated[StorageControl, Depends(StorageControl)],
152-
api_key_enduser_tuple: str = Security(get_api_key),
153106
) -> Any:
154107
"""Performs the PDP pull of the file."""
155108
storage_control.create_bucket_if_not_exists(get_sftp_bucket_name(env_vars["ENV"]))
156-
files = storage_control.list_sftp_files(
157-
sftp_vars["SFTP_HOST"], 22, sftp_vars["SFTP_USER"], sftp_vars["SFTP_PASSWORD"]
158-
)
159-
all_blobs = sftp_helper(storage_control, files)
160-
signed_urls = split_csv_and_generate_signed_urls(
161-
bucket_name=get_sftp_bucket_name(env_vars["ENV"]), source_blob_name=all_blobs[0]
162-
)
163-
164-
valid_pdp_ids, invalid_ids = fetch_institution_ids(
165-
pdp_ids=list(signed_urls.keys()),
166-
backend_api_key=next(key for key in api_key_enduser_tuple if key is not None),
167-
)
168-
109+
sftp_helper(storage_control, "sftp_file.csv", "write_out_file.csv")
169110
return {
170-
"sftp_files": files,
171-
"pdp_inst_generated": [valid_pdp_ids],
172-
"pdp_inst_not_found": invalid_ids,
111+
"pdp_inst_generated": [],
112+
"pdp_inst_not_found": [],
173113
}

0 commit comments

Comments
 (0)