|
2 | 2 |
|
3 | 3 | import logging |
4 | 4 | from typing import Any, Annotated |
5 | | -from fastapi import FastAPI, Depends, HTTPException, status |
| 5 | +from fastapi import FastAPI, Depends, HTTPException, status, Security |
6 | 6 | from fastapi.responses import FileResponse |
7 | 7 |
|
8 | 8 | from pydantic import BaseModel |
9 | | -from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer |
| 9 | +from fastapi.security import OAuth2PasswordRequestForm |
10 | 10 | from .utilities import ( |
11 | 11 | get_sftp_bucket_name, |
12 | 12 | StorageControl, |
| 13 | + split_csv_and_generate_signed_urls, |
| 14 | + fetch_institution_ids, |
13 | 15 | ) |
14 | 16 | from .config import sftp_vars, env_vars, startup_env_vars |
15 | | -from .authn import Token, get_current_username, check_creds, create_access_token |
16 | | -from datetime import timedelta, datetime, timezone |
| 17 | +from .authn import ( |
| 18 | + Token, |
| 19 | + get_current_username, |
| 20 | + check_creds, |
| 21 | + create_access_token, |
| 22 | + get_api_key, |
| 23 | +) |
| 24 | +from datetime import timedelta |
| 25 | +import os |
17 | 26 |
|
18 | 27 | # Set the logging |
19 | 28 | logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s") |
@@ -41,8 +50,9 @@ class PdpPullRequest(BaseModel): |
41 | 50 | class PdpPullResponse(BaseModel): |
42 | 51 | """Fields for the PDP pull response.""" |
43 | 52 |
|
44 | | - pdp_inst_generated: list[int] |
45 | | - pdp_inst_not_found: list[int] |
| 53 | + sftp_files: list[dict] |
| 54 | + pdp_inst_generated: list[str] |
| 55 | + pdp_inst_not_found: list[str] |
46 | 56 |
|
47 | 57 |
|
48 | 58 | @app.on_event("startup") |
@@ -84,30 +94,86 @@ async def login_for_access_token( |
84 | 94 | return Token(access_token=access_token, token_type="bearer") |
85 | 95 |
|
86 | 96 |
|
87 | | -def sftp_helper( |
88 | | - storage_control: StorageControl, sftp_source_filename: str, dest_filename: str |
89 | | -): |
90 | | - storage_control.copy_from_sftp_to_gcs( |
91 | | - sftp_vars["SFTP_HOST"], |
92 | | - sftp_vars["SFTP_PORT"], |
93 | | - sftp_vars["SFTP_USER"], |
94 | | - sftp_vars["SFTP_PASSWORD"], |
95 | | - sftp_source_filename, |
96 | | - get_sftp_bucket_name(env_vars["ENV"]), |
97 | | - dest_filename, |
98 | | - ) |
| 97 | +def sftp_helper(storage_control: StorageControl, sftp_source_filenames: list) -> list: |
| 98 | + """ |
| 99 | + For each source file in sftp_source_filenames, copies the file from the SFTP |
| 100 | + server to GCS. The destination filename is automatically generated by prefixing |
| 101 | + the base name of the source file with "processed_". |
| 102 | +
|
| 103 | + Args: |
| 104 | + storage_control (StorageControl): An instance with a method `copy_from_sftp_to_gcs`. |
| 105 | + sftp_source_filenames (list): A list of file paths on the SFTP server. |
| 106 | + """ |
| 107 | + num_files = len(sftp_source_filenames) |
| 108 | + logger.info(f"Starting sftp_helper for {num_files} file(s).") |
| 109 | + all_blobs = [] |
| 110 | + for sftp_source_filename in sftp_source_filenames: |
| 111 | + sftp_source_filename = sftp_source_filename["path"] |
| 112 | + if ( |
| 113 | + sftp_source_filename |
| 114 | + == "./receive/AO1600pdp_AO1600_AR_DEIDENTIFIED_STUDYID_20250228030226.csv" |
| 115 | + ): |
| 116 | + logger.debug(f"Processing source file: {sftp_source_filename}") |
| 117 | + |
| 118 | + # Extract the base filename. |
| 119 | + base_filename = os.path.basename(sftp_source_filename) |
| 120 | + dest_filename = f"{base_filename}" |
| 121 | + logger.debug(f"Destination filename will be: {dest_filename}") |
| 122 | + |
| 123 | + try: |
| 124 | + storage_control.copy_from_sftp_to_gcs( |
| 125 | + sftp_vars["SFTP_HOST"], |
| 126 | + 22, |
| 127 | + sftp_vars["SFTP_USER"], |
| 128 | + sftp_vars["SFTP_PASSWORD"], |
| 129 | + sftp_source_filename, |
| 130 | + get_sftp_bucket_name(env_vars["ENV"]), |
| 131 | + dest_filename, |
| 132 | + ) |
| 133 | + all_blobs.append(dest_filename) |
| 134 | + logger.info( |
| 135 | + f"Successfully processed '{sftp_source_filename}' as '{dest_filename}'." |
| 136 | + ) |
| 137 | + return all_blobs |
| 138 | + except Exception as e: |
| 139 | + logger.error( |
| 140 | + f"Error processing '{sftp_source_filename}': {e}", exc_info=True |
| 141 | + ) |
| 142 | + return all_blobs |
99 | 143 |
|
100 | 144 |
|
101 | 145 | @app.post("/execute-pdp-pull", response_model=PdpPullResponse) |
102 | 146 | def execute_pdp_pull( |
103 | 147 | req: PdpPullRequest, |
104 | 148 | current_username: Annotated[str, Depends(get_current_username)], |
105 | 149 | storage_control: Annotated[StorageControl, Depends(StorageControl)], |
| 150 | + api_key_enduser_tuple: str = Security(get_api_key), |
106 | 151 | ) -> Any: |
107 | 152 | """Performs the PDP pull of the file.""" |
108 | 153 | storage_control.create_bucket_if_not_exists(get_sftp_bucket_name(env_vars["ENV"])) |
109 | | - sftp_helper(storage_control, "sftp_file.csv", "write_out_file.csv") |
| 154 | + files = storage_control.list_sftp_files( |
| 155 | + sftp_vars["SFTP_HOST"], 22, sftp_vars["SFTP_USER"], sftp_vars["SFTP_PASSWORD"] |
| 156 | + ) |
| 157 | + all_blobs = sftp_helper(storage_control, files) |
| 158 | + valid_pdp_ids = [] |
| 159 | + invalid_ids = [] |
| 160 | + |
| 161 | + for blobs in all_blobs: |
| 162 | + signed_urls = split_csv_and_generate_signed_urls( |
| 163 | + bucket_name=get_sftp_bucket_name(env_vars["ENV"]), source_blob_name=blobs |
| 164 | + ) |
| 165 | + |
| 166 | + temp_valid_pdp_ids, temp_invalid_ids = fetch_institution_ids( |
| 167 | + pdp_ids=list(signed_urls.keys()), |
| 168 | + backend_api_key=next( |
| 169 | + key for key in api_key_enduser_tuple if key is not None |
| 170 | + ), |
| 171 | + ) |
| 172 | + valid_pdp_ids.append(temp_valid_pdp_ids) |
| 173 | + invalid_ids.append(temp_invalid_ids) |
| 174 | + |
110 | 175 | return { |
111 | | - "pdp_inst_generated": [], |
112 | | - "pdp_inst_not_found": [], |
| 176 | + "sftp_files": files, |
| 177 | + "pdp_inst_generated": valid_pdp_ids, |
| 178 | + "pdp_inst_not_found": invalid_ids, |
113 | 179 | } |
0 commit comments