Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
split_csv_and_generate_signed_urls,
fetch_institution_ids,
)
from .config import sftp_vars, env_vars, startup_env_vars, gcs_vars
from .config import sftp_vars, env_vars, startup_env_vars
from .authn import (
Token,
get_current_username,
Expand Down Expand Up @@ -166,9 +166,7 @@ async def execute_pdp_pull(
logging.debug(f"Processing {blobs}")
print(f"Processing {blobs}")
signed_urls = split_csv_and_generate_signed_urls(
bucket_name=get_sftp_bucket_name(env_vars["ENV"]),
source_blob_name=blobs,
storage_account_file=gcs_vars["GCP_SERVICE_ACCOUNT_KEY_PATH"],
bucket_name=get_sftp_bucket_name(env_vars["ENV"]), source_blob_name=blobs
)
logging.info(f"Signed URls generated {signed_urls}")

Expand Down
157 changes: 63 additions & 94 deletions src/worker/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@
from pydantic import BaseModel
import os
import stat
from datetime import datetime, timezone
from datetime import datetime, timedelta
import io
import logging
from typing import List, Dict, Any, Optional, Union
from typing import List, Dict, Any
import requests
import pandas as pd
import re
import binascii
import collections
import hashlib
from urllib.parse import quote
from google.oauth2 import service_account
import six
import google.auth
import google.auth.transport.requests as google_requests


def get_sftp_bucket_name(env_var: str) -> str:
Expand Down Expand Up @@ -253,102 +249,76 @@ def post_file_to_signed_url(file_path: str, signed_url: str) -> str:


def generate_signed_url(
service_account_file: str,
bucket_name: str,
object_name: str,
subresource: Optional[str] = None,
expiration: int = 600000,
http_method: str = "GET",
query_parameters: Optional[Dict[str, Union[int, str]]] = None,
headers: Optional[Dict[str, str]] = None,
bucket_name: str, object_name: str, expiration: int = 600000
) -> str:
escaped_object_name = quote(six.ensure_binary(object_name), safe=b"/~")
canonical_uri = f"/{escaped_object_name}"
"""
Generates a signed URL for a Cloud Storage object using V4 signing.
This function is production-ready with error handling and logging.

datetime_now = datetime.now(tz=timezone.utc)
request_timestamp = datetime_now.strftime("%Y%m%dT%H%M%SZ")
datestamp = datetime_now.strftime("%Y%m%d")
Returns:
tuple: A tuple containing the signed URL and an HTTP status code.

google_credentials = service_account.Credentials.from_service_account_file(
service_account_file
)
client_email = google_credentials.service_account_email
credential_scope = f"{datestamp}/auto/storage/goog4_request"
credential = f"{client_email}/{credential_scope}"

if headers is None:
headers = dict()
host = f"{bucket_name}.storage.googleapis.com"
headers["host"] = host

canonical_headers = ""
ordered_headers = collections.OrderedDict(sorted(headers.items()))
for k, v in ordered_headers.items():
lower_k = str(k).lower()
strip_v = str(v).lower()
canonical_headers += f"{lower_k}:{strip_v}\n"

signed_headers = ""
for k, _ in ordered_headers.items():
lower_k = str(k).lower()
signed_headers += f"{lower_k};"
signed_headers = signed_headers[:-1] # remove trailing ';'

if query_parameters is None:
query_parameters = dict()
query_parameters["X-Goog-Algorithm"] = "GOOG4-RSA-SHA256"
query_parameters["X-Goog-Credential"] = credential
query_parameters["X-Goog-Date"] = request_timestamp
query_parameters["X-Goog-Expires"] = expiration
query_parameters["X-Goog-SignedHeaders"] = signed_headers
if subresource:
query_parameters[subresource] = ""

canonical_query_string = ""
ordered_query_parameters = collections.OrderedDict(sorted(query_parameters.items()))
for k, v in ordered_query_parameters.items():
encoded_k = quote(str(k), safe="")
encoded_v = quote(str(v), safe="")
canonical_query_string += f"{encoded_k}={encoded_v}&"
canonical_query_string = canonical_query_string[:-1] # remove trailing '&'

canonical_request = "\n".join(
[
http_method,
canonical_uri,
canonical_query_string,
canonical_headers,
signed_headers,
"UNSIGNED-PAYLOAD",
]
)
Raises:
Exception: Re-raises exceptions encountered during the process.
"""
try:
# Obtain default credentials and project ID.
credentials, project_id = google.auth.default()
logging.info("Obtained default credentials.")
except Exception as e:
logging.error("Failed to get default credentials: %s", e)
raise Exception("Credential error") from e

try:
# Refresh credentials to ensure an access token is available.
request_obj = google_requests.Request()
credentials.refresh(request_obj)
logging.info("Credentials refreshed successfully.")
except Exception as e:
logging.error("Failed to refresh credentials: %s", e)
raise Exception("Credential refresh error") from e

canonical_request_hash = hashlib.sha256(canonical_request.encode()).hexdigest()
try:
# Create a storage client using the refreshed credentials.
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blob = bucket.get_blob(object_name)
if blob is None:
raise ValueError(
f"Blob '{object_name}' not found in bucket '{bucket_name}'."
)
logging.info("Accessed bucket and blob successfully.")
except Exception as e:
logging.error("Failed to access bucket or blob: %s", e)
raise Exception("Bucket/blob access error") from e

string_to_sign = "\n".join(
[
"GOOG4-RSA-SHA256",
request_timestamp,
credential_scope,
canonical_request_hash,
]
)
# Set expiration for the signed URL.
expires = datetime.utcnow() + timedelta(seconds=expiration) # 24 hours

# signer.sign() signs using RSA-SHA256 with PKCS1v15 padding
signature = binascii.hexlify(
google_credentials.signer.sign(string_to_sign)
).decode()
# Determine the service account email.
service_account_email = "" # Fallback value.
if hasattr(credentials, "service_account_email"):
service_account_email = credentials.service_account_email
logging.info("Using service account email: %s", service_account_email)

scheme_and_host = "{}://{}".format("https", host)
signed_url = "{}{}?{}&x-goog-signature={}".format(
scheme_and_host, canonical_uri, canonical_query_string, signature
)
try:
# Generate the signed URL.
signed_url = blob.generate_signed_url(
expiration=expires,
service_account_email=service_account_email,
access_token=credentials.token,
method="GET",
)
logging.info("Signed URL generated successfully.")
except Exception as e:
logging.error("Failed to generate signed URL: %s", e)
raise Exception("Signed URL generation error") from e

return signed_url
return str(signed_url)


def split_csv_and_generate_signed_urls(
bucket_name: str, source_blob_name: str, storage_account_file: str
bucket_name: str, source_blob_name: str
) -> Dict[str, Dict[str, str]]:
"""
Fetches a CSV from Google Cloud Storage, splits it by a specified column, uploads the results,
Expand Down Expand Up @@ -431,7 +401,6 @@ def split_csv_and_generate_signed_urls(
# Attempt to generate a signed URL for the new blob
try:
signed_url = generate_signed_url(
service_account_file=storage_account_file,
bucket_name=bucket_name,
object_name=new_blob_name,
)
Expand Down