Skip to content

Commit a1cac17

Browse files
ENG-372 add ambient credentials for S3 auth (#575)
add ambient credentials for S3 auth removes default to anonymous. User should declare what they want explicitly. quick fix for kafka test (just adds a sleep)
1 parent ff0ca42 commit a1cac17

File tree

5 files changed

+286
-8
lines changed

5 files changed

+286
-8
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.2.3
2+
3+
* **Feature**: allow environment credentials for S3
4+
15
## 1.2.2
26

37
* **Fix**: prevent S3 path conflicts using tempfile for directory isolation

test/integration/connectors/kafka/test_kafka_cloud.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import os
33
import tempfile
4+
import time
45
from pathlib import Path
56

67
import pytest
@@ -163,6 +164,9 @@ async def test_kafka_destination_cloud(upload_file: Path, kafka_seed_topic_cloud
163164
else:
164165
uploader.run(path=upload_file, file_data=file_data)
165166

167+
# Give Kafka Cloud some time to propagate messages
168+
time.sleep(4)
169+
166170
conf = {
167171
"bootstrap.servers": os.environ["KAFKA_BOOTSTRAP_SERVER"],
168172
"sasl.username": os.environ["KAFKA_API_KEY"],
@@ -174,7 +178,8 @@ async def test_kafka_destination_cloud(upload_file: Path, kafka_seed_topic_cloud
174178
"auto.offset.reset": "earliest",
175179
}
176180

177-
all_messages = get_all_messages(conf=conf, topic=TOPIC)
181+
# Increase max_empty_messages for cloud Kafka latency
182+
all_messages = get_all_messages(conf=conf, topic=TOPIC, max_empty_messages=6)
178183
with upload_file.open("r") as upload_fs:
179184
content_to_upload = json.load(upload_fs)
180185
assert len(all_messages) == len(content_to_upload), (

test/integration/connectors/test_s3.py

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,3 +288,230 @@ async def test_s3_destination_different_relative_path_and_full_path(upload_file:
288288
assert uploaded_files[0].as_posix() == f"{destination_path.lstrip('s3://')}/folder1"
289289
finally:
290290
s3fs.rm(path=destination_path, recursive=True)
291+
292+
293+
class TestS3AmbientCredentials:
294+
"""Test suite for S3 ambient credentials functionality"""
295+
296+
def test_ambient_credentials_field_default(self):
297+
"""Test that ambient_credentials defaults to False"""
298+
access_config = S3AccessConfig()
299+
assert access_config.ambient_credentials is False
300+
301+
def test_ambient_credentials_field_explicit(self):
302+
"""Test setting ambient_credentials explicitly"""
303+
access_config = S3AccessConfig(ambient_credentials=True)
304+
assert access_config.ambient_credentials is True
305+
306+
def test_default_blocks_automatic_credentials(self):
307+
"""Test that default behavior blocks automatic credential pickup"""
308+
# No explicit credentials provided, anonymous=False (default)
309+
access_config = S3AccessConfig()
310+
connection_config = S3ConnectionConfig(access_config=access_config, anonymous=False)
311+
312+
# Should raise UserAuthError instead of silently changing behavior
313+
with pytest.raises(UserAuthError, match="No authentication method specified"):
314+
connection_config.get_access_config()
315+
316+
def test_explicit_credentials_work_normally(self):
317+
"""Test that explicit credentials work with normal authentication"""
318+
access_config = S3AccessConfig(key="test-key", secret="test-secret")
319+
connection_config = S3ConnectionConfig(access_config=access_config, anonymous=False)
320+
321+
config = connection_config.get_access_config()
322+
323+
# Should use explicit credentials
324+
assert config["anon"] is False
325+
assert config["key"] == "test-key"
326+
assert config["secret"] == "test-secret"
327+
328+
def test_explicit_anonymous_mode_respected(self):
329+
"""Test that explicit anonymous=True is respected"""
330+
access_config = S3AccessConfig()
331+
connection_config = S3ConnectionConfig(access_config=access_config, anonymous=True)
332+
333+
config = connection_config.get_access_config()
334+
335+
# Should be anonymous
336+
assert config["anon"] is True
337+
assert "key" not in config
338+
339+
def test_ambient_credentials_requires_env_var(self, monkeypatch):
340+
"""Test that ambient_credentials=True requires ALLOW_AMBIENT_CREDENTIALS_S3 env var"""
341+
# Clear the environment variable
342+
monkeypatch.delenv("ALLOW_AMBIENT_CREDENTIALS_S3", raising=False)
343+
344+
access_config = S3AccessConfig(ambient_credentials=True)
345+
connection_config = S3ConnectionConfig(access_config=access_config, anonymous=False)
346+
347+
# Should raise error when env var is not set
348+
with pytest.raises(
349+
UserAuthError, match="ALLOW_AMBIENT_CREDENTIALS_S3 environment variable is not set"
350+
):
351+
connection_config.get_access_config()
352+
353+
def test_ambient_credentials_enables_ambient_mode(self, monkeypatch):
354+
"""Test that ambient_credentials=True enables ambient credential pickup
355+
when env var is set"""
356+
# Set the environment variable
357+
monkeypatch.setenv("ALLOW_AMBIENT_CREDENTIALS_S3", "true")
358+
359+
access_config = S3AccessConfig(ambient_credentials=True)
360+
connection_config = S3ConnectionConfig(access_config=access_config, anonymous=False)
361+
362+
config = connection_config.get_access_config()
363+
364+
# Should allow ambient credentials (anon=False, no explicit credentials)
365+
assert config["anon"] is False
366+
assert "key" not in config
367+
assert "secret" not in config
368+
assert "token" not in config
369+
370+
def test_ambient_credentials_field_excluded_from_config(self):
371+
"""Test that ambient_credentials field is not passed to s3fs"""
372+
# Test with explicit credentials
373+
access_config = S3AccessConfig(
374+
key="test-key",
375+
secret="test-secret",
376+
ambient_credentials=True, # Should be excluded
377+
)
378+
connection_config = S3ConnectionConfig(access_config=access_config)
379+
380+
config = connection_config.get_access_config()
381+
382+
# ambient_credentials should not appear in final config
383+
assert "ambient_credentials" not in config
384+
assert config["key"] == "test-key"
385+
assert config["secret"] == "test-secret"
386+
387+
def test_none_values_filtered_but_falsy_values_preserved(self):
388+
"""Test that None values are filtered but other falsy values are preserved"""
389+
access_config = S3AccessConfig(
390+
key="test-key",
391+
secret=None, # Should be filtered
392+
token="", # Should be preserved (empty string)
393+
)
394+
connection_config = S3ConnectionConfig(access_config=access_config)
395+
396+
config = connection_config.get_access_config()
397+
398+
# None should be filtered, empty string should be preserved
399+
assert config["key"] == "test-key"
400+
assert "secret" not in config # None was filtered
401+
assert config["token"] == "" # Empty string preserved
402+
403+
def test_endpoint_url_preserved_with_all_auth_modes(self, monkeypatch):
404+
"""Test that endpoint_url is preserved across all authentication modes"""
405+
endpoint = "https://custom-s3.example.com"
406+
407+
# Test with explicit credentials
408+
access_config = S3AccessConfig(key="test-key", secret="test-secret")
409+
connection_config = S3ConnectionConfig(access_config=access_config, endpoint_url=endpoint)
410+
config = connection_config.get_access_config()
411+
assert config["endpoint_url"] == endpoint
412+
413+
# Test with ambient credentials
414+
monkeypatch.setenv("ALLOW_AMBIENT_CREDENTIALS_S3", "true")
415+
access_config = S3AccessConfig(ambient_credentials=True)
416+
connection_config = S3ConnectionConfig(access_config=access_config, endpoint_url=endpoint)
417+
config = connection_config.get_access_config()
418+
assert config["endpoint_url"] == endpoint
419+
420+
# Test with anonymous mode
421+
connection_config = S3ConnectionConfig(anonymous=True, endpoint_url=endpoint)
422+
config = connection_config.get_access_config()
423+
assert config["endpoint_url"] == endpoint
424+
425+
def test_authentication_error_raised(self):
426+
"""Test that authentication error is raised when automatic credentials would be used"""
427+
access_config = S3AccessConfig(ambient_credentials=False)
428+
connection_config = S3ConnectionConfig(access_config=access_config, anonymous=False)
429+
430+
# This should raise UserAuthError with helpful message
431+
with pytest.raises(UserAuthError) as exc_info:
432+
connection_config.get_access_config()
433+
434+
# Should provide clear error message
435+
error_message = str(exc_info.value)
436+
assert "No authentication method specified" in error_message
437+
assert "ambient_credentials=False" in error_message
438+
439+
def test_ambient_credentials_env_var_variations(self, monkeypatch):
440+
"""Test that only 'true' (case-insensitive) values for ALLOW_AMBIENT_CREDENTIALS_S3 work"""
441+
valid_values = ["true", "TRUE", "True", "tRuE"]
442+
443+
access_config = S3AccessConfig(ambient_credentials=True)
444+
connection_config = S3ConnectionConfig(access_config=access_config, anonymous=False)
445+
446+
for value in valid_values:
447+
monkeypatch.setenv("ALLOW_AMBIENT_CREDENTIALS_S3", value)
448+
449+
# Should not raise error
450+
config = connection_config.get_access_config()
451+
assert config["anon"] is False
452+
453+
def test_ambient_credentials_info_logged(self, caplog, monkeypatch):
454+
"""Test that info message is logged when using ambient credentials"""
455+
import logging
456+
457+
# Set the environment variable
458+
monkeypatch.setenv("ALLOW_AMBIENT_CREDENTIALS_S3", "true")
459+
460+
# Ensure we capture INFO level logs
461+
caplog.set_level(logging.INFO)
462+
463+
access_config = S3AccessConfig(ambient_credentials=True)
464+
connection_config = S3ConnectionConfig(access_config=access_config, anonymous=False)
465+
466+
# This should trigger the ambient credentials info log
467+
config = connection_config.get_access_config()
468+
469+
# Should use ambient credentials
470+
assert config["anon"] is False
471+
472+
# Should log ambient credentials info
473+
assert "Using ambient AWS credentials" in caplog.text
474+
475+
@pytest.mark.asyncio
476+
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, BLOB_STORAGE_TAG)
477+
@requires_env("S3_INGEST_TEST_ACCESS_KEY", "S3_INGEST_TEST_SECRET_KEY")
478+
async def test_s3_destination_with_ambient_credentials(self, upload_file: Path, monkeypatch):
479+
"""Test S3 destination using ambient credentials with standard AWS env vars"""
480+
# Get test credentials and set them as standard AWS environment variables
481+
test_access_key = os.getenv("S3_INGEST_TEST_ACCESS_KEY")
482+
test_secret_key = os.getenv("S3_INGEST_TEST_SECRET_KEY")
483+
monkeypatch.setenv("AWS_ACCESS_KEY_ID", test_access_key)
484+
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", test_secret_key)
485+
486+
# Set the environment variable to allow ambient credentials
487+
monkeypatch.setenv("ALLOW_AMBIENT_CREDENTIALS_S3", "true")
488+
489+
s3_bucket = "s3://utic-ingest-test-fixtures"
490+
destination_path = f"{s3_bucket}/destination/{uuid.uuid4()}"
491+
492+
# Use ambient credentials (no explicit key/secret provided)
493+
connection_config = S3ConnectionConfig(
494+
access_config=S3AccessConfig(ambient_credentials=True),
495+
)
496+
upload_config = S3UploaderConfig(remote_url=destination_path)
497+
uploader = S3Uploader(connection_config=connection_config, upload_config=upload_config)
498+
s3fs = uploader.fs
499+
file_data = FileData(
500+
source_identifiers=SourceIdentifiers(
501+
fullpath=upload_file.name, filename=upload_file.name
502+
),
503+
connector_type=CONNECTOR_TYPE,
504+
identifier="mock file data",
505+
)
506+
try:
507+
uploader.precheck()
508+
if uploader.is_async():
509+
await uploader.run_async(path=upload_file, file_data=file_data)
510+
else:
511+
uploader.run(path=upload_file, file_data=file_data)
512+
uploaded_files = [
513+
Path(file) for file in s3fs.ls(path=destination_path) if Path(file).name != "_empty"
514+
]
515+
assert len(uploaded_files) == 1
516+
finally:
517+
s3fs.rm(path=destination_path, recursive=True)

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.2.2" # pragma: no cover
1+
__version__ = "1.2.3" # pragma: no cover

unstructured_ingest/processes/connectors/fsspec/s3.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import contextlib
2+
import os
23
from contextlib import contextmanager
34
from dataclasses import dataclass, field
45
from time import time
@@ -33,7 +34,7 @@
3334

3435
CONNECTOR_TYPE = "s3"
3536

36-
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html#object-key-guidelines-avoid-characters
37+
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html#object-key-guidelines-avoid-characters # noqa
3738
CHARACTERS_TO_AVOID = ["\\", "{", "^", "}", "%", "`", "]", '"', ">", "[", "~", "<", "#", "|"]
3839

3940
if TYPE_CHECKING:
@@ -56,6 +57,13 @@ class S3AccessConfig(FsspecAccessConfig):
5657
token: Optional[str] = Field(
5758
default=None, description="If not anonymous, use this security token, if specified."
5859
)
60+
ambient_credentials: bool = Field(
61+
default=False,
62+
description="Explicitly allow using ambient AWS credentials from .aws folder, "
63+
"environment variables, or IAM roles. Requires ALLOW_AMBIENT_CREDENTIALS_S3 environment "
64+
"variable to also be set to 'true' (case insensitive) for security. When False (default), "
65+
"only explicit credentials or anonymous access are allowed.",
66+
)
5967

6068

6169
class S3ConnectionConfig(FsspecConnectionConfig):
@@ -72,14 +80,48 @@ class S3ConnectionConfig(FsspecConnectionConfig):
7280
connector_type: str = Field(default=CONNECTOR_TYPE, init=False)
7381

7482
def get_access_config(self) -> dict[str, Any]:
75-
access_configs: dict[str, Any] = {"anon": self.anonymous}
83+
access_config = self.access_config.get_secret_value()
84+
has_explicit_credentials = bool(
85+
access_config.key or access_config.secret or access_config.token
86+
)
87+
88+
access_configs: dict[str, Any]
89+
90+
if has_explicit_credentials:
91+
access_configs = {"anon": False}
92+
# Avoid injecting None by filtering out k,v pairs where the value is None
93+
access_configs.update(
94+
{
95+
k: v
96+
for k, v in access_config.model_dump().items()
97+
if v is not None and k != "ambient_credentials"
98+
}
99+
)
100+
elif access_config.ambient_credentials:
101+
if os.getenv("ALLOW_AMBIENT_CREDENTIALS_S3", "").lower() == "true":
102+
logger.info(
103+
"Using ambient AWS credentials (environment variables, .aws folder, IAM roles)"
104+
)
105+
access_configs = {"anon": False}
106+
# Don't pass explicit credentials, let s3fs/boto3 auto-detect
107+
else:
108+
# Field allows but environment doesn't - raise error for security
109+
raise UserAuthError(
110+
"Ambient credentials requested (ambient_credentials=True) but "
111+
"ALLOW_AMBIENT_CREDENTIALS_S3 environment variable is not set to 'true'. "
112+
)
113+
elif self.anonymous:
114+
access_configs = {"anon": True}
115+
else:
116+
# User set anonymous=False but provided no credentials and no ambient permission
117+
raise UserAuthError(
118+
"No authentication method specified. anonymous=False but no explicit credentials "
119+
"provided and ambient_credentials=False."
120+
)
121+
76122
if self.endpoint_url:
77123
access_configs["endpoint_url"] = self.endpoint_url
78124

79-
# Avoid injecting None by filtering out k,v pairs where the value is None
80-
access_configs.update(
81-
{k: v for k, v in self.access_config.get_secret_value().model_dump().items() if v}
82-
)
83125
return access_configs
84126

85127
@requires_dependencies(["s3fs", "fsspec"], extras="s3")

0 commit comments

Comments
 (0)