33from __future__ import annotations
44
55from dataclasses import dataclass
6+ from datetime import datetime , timedelta , timezone
67from typing import BinaryIO , Dict , Protocol
78
89try : # pragma: no cover - optional dependency
910 import boto3
1011except Exception : # pragma: no cover - dependency may be absent in tests
1112 boto3 = None # type: ignore[assignment]
1213
14+ try : # pragma: no cover - optional dependency
15+ from botocore .signers import CloudFrontSigner
16+ except Exception : # pragma: no cover - dependency may be absent in tests
17+ CloudFrontSigner = None # type: ignore[assignment]
18+
1319
1420@dataclass (frozen = True )
1521class StoredObject :
@@ -19,6 +25,19 @@ class StoredObject:
1925 content_type : str | None = None
2026
2127
28+ @dataclass (frozen = True )
29+ class CloudFrontSigningConfig :
30+ """Configuration describing how to sign CloudFront URLs."""
31+
32+ key_pair_id : str
33+ private_key : str
34+ url_ttl_seconds : int = 3600
35+
36+ def __post_init__ (self ) -> None : # pragma: no cover - trivial validation
37+ if self .url_ttl_seconds <= 0 :
38+ raise ValueError ("CloudFront signed URL TTL must be positive" )
39+
40+
2241class StorageClient (Protocol ):
2342 """Minimal protocol for uploading file-like objects."""
2443
@@ -71,12 +90,16 @@ def __init__(
7190 * ,
7291 client : "boto3.client" | None = None ,
7392 base_url : str | None = None ,
93+ cloudfront_signer : "CloudFrontSigner" | None = None ,
94+ signed_url_ttl_seconds : int | None = None ,
7495 ) -> None :
7596 if boto3 is None : # pragma: no cover - only triggered when boto3 missing
7697 raise RuntimeError ("boto3 is required for S3 uploads but is not installed" )
7798 self ._bucket = bucket
7899 self ._client = client or boto3 .client ("s3" )
79100 self ._base_url = base_url .rstrip ("/" ) if base_url else None
101+ self ._cloudfront_signer = cloudfront_signer
102+ self ._signed_url_ttl_seconds = signed_url_ttl_seconds
80103
81104 def upload_fileobj (
82105 self ,
@@ -94,7 +117,14 @@ def upload_fileobj(
94117
95118 def _build_url (self , key : str ) -> str :
96119 if self ._base_url :
97- return f"{ self ._base_url } /{ key } "
120+ url = f"{ self ._base_url } /{ key } "
121+ if self ._cloudfront_signer :
122+ expires_in = self ._signed_url_ttl_seconds or 3600
123+ expiration = datetime .now (timezone .utc ) + timedelta (seconds = expires_in )
124+ return self ._cloudfront_signer .generate_presigned_url (
125+ url , date_less_than = expiration
126+ )
127+ return url
98128 region = getattr (self ._client .meta , "region_name" , "us-east-1" )
99129 return f"https://{ self ._bucket } .s3.{ region } .amazonaws.com/{ key } "
100130
@@ -103,12 +133,26 @@ def create_storage(
103133 * ,
104134 bucket : str | None ,
105135 base_url : str | None = None ,
136+ cloudfront_signing : CloudFrontSigningConfig | None = None ,
106137) -> StorageClient :
107138 """Instantiate the preferred storage backend."""
108139
109140 if bucket :
110141 try :
111- return S3Storage (bucket = bucket , base_url = base_url )
142+ cloudfront_signer = None
143+ signed_url_ttl_seconds = None
144+ if cloudfront_signing and base_url :
145+ cloudfront_signer = _create_cloudfront_signer (
146+ key_pair_id = cloudfront_signing .key_pair_id ,
147+ private_key_pem = cloudfront_signing .private_key ,
148+ )
149+ signed_url_ttl_seconds = cloudfront_signing .url_ttl_seconds
150+ return S3Storage (
151+ bucket = bucket ,
152+ base_url = base_url ,
153+ cloudfront_signer = cloudfront_signer ,
154+ signed_url_ttl_seconds = signed_url_ttl_seconds ,
155+ )
112156 except RuntimeError :
113157 # Fall back to in-memory storage when boto3 is unavailable.
114158 return InMemoryStorage (bucket = bucket , base_url = base_url )
@@ -120,5 +164,32 @@ def create_storage(
120164 "StorageClient" ,
121165 "InMemoryStorage" ,
122166 "S3Storage" ,
167+ "CloudFrontSigningConfig" ,
123168 "create_storage" ,
124169]
170+
171+
172+ def _create_cloudfront_signer (
173+ * , key_pair_id : str , private_key_pem : str
174+ ) -> "CloudFrontSigner" | None :
175+ """Instantiate a CloudFront signer from the provided credentials."""
176+
177+ if CloudFrontSigner is None : # pragma: no cover - dependency may be absent
178+ raise RuntimeError ("botocore is required for CloudFront signing but is not installed" )
179+
180+ try : # pragma: no cover - dependency may be absent in tests
181+ from cryptography .hazmat .primitives import hashes , serialization
182+ from cryptography .hazmat .primitives .asymmetric import padding
183+ except Exception as exc : # pragma: no cover - dependency may be absent
184+ raise RuntimeError (
185+ "cryptography is required for CloudFront signing but is not installed"
186+ ) from exc
187+
188+ private_key = serialization .load_pem_private_key (
189+ private_key_pem .encode ("utf-8" ), password = None
190+ )
191+
192+ def _rsa_signer (message : bytes ) -> bytes :
193+ return private_key .sign (message , padding .PKCS1v15 (), hashes .SHA1 ())
194+
195+ return CloudFrontSigner (key_pair_id , _rsa_signer )
0 commit comments