diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 40cfc0b8c9..59e89dfc18 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -127,6 +127,7 @@ For the FileIO there are several configuration options available: | s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. | | s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. | | s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. | +| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or boto's credential resolver. | @@ -197,6 +198,7 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya | s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. | | s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. | | s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This is set to `True` by default as OSS can only be accessed with virtual hosted style address. | +| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or standard AWS configuration methods. | diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index f89de18f12..91ca6ee443 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -52,6 +52,7 @@ AWS_SESSION_TOKEN = "client.session-token" AWS_ROLE_ARN = "client.role-arn" AWS_ROLE_SESSION_NAME = "client.role-session-name" +S3_ANONYMOUS = "s3.anonymous" S3_ENDPOINT = "s3.endpoint" S3_ACCESS_KEY_ID = "s3.access-key-id" S3_SECRET_ACCESS_KEY = "s3.secret-access-key" diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index d075765ed1..c1f95b719f 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -65,6 +65,7 @@ HF_ENDPOINT, HF_TOKEN, S3_ACCESS_KEY_ID, + S3_ANONYMOUS, S3_CONNECT_TIMEOUT, S3_ENDPOINT, S3_PROXY_URI, @@ -83,6 +84,7 @@ OutputStream, ) from pyiceberg.typedef import Properties +from pyiceberg.types import strtobool from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool logger = logging.getLogger(__name__) @@ -164,6 +166,9 @@ def _s3(properties: Properties) -> AbstractFileSystem: if request_timeout := properties.get(S3_REQUEST_TIMEOUT): config_kwargs["read_timeout"] = float(request_timeout) + if s3_anonymous := properties.get(S3_ANONYMOUS): + config_kwargs["anon"] = strtobool(s3_anonymous) + fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs) for event_name, event_function in register_events.items(): diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index c756487c32..7076d7da76 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -109,6 +109,7 @@ HDFS_USER, PYARROW_USE_LARGE_TYPES_ON_READ, S3_ACCESS_KEY_ID, + S3_ANONYMOUS, S3_CONNECT_TIMEOUT, S3_ENDPOINT, S3_FORCE_VIRTUAL_ADDRESSING, @@ -179,6 +180,7 @@ TimeType, UnknownType, UUIDType, + strtobool, ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config @@ -450,6 +452,9 @@ def _initialize_oss_fs(self) -> FileSystem: if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): client_kwargs["session_name"] = session_name + if s3_anonymous := self.properties.get(S3_ANONYMOUS): + client_kwargs["anonymous"] = strtobool(s3_anonymous) + return S3FileSystem(**client_kwargs) def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: @@ -501,6 +506,9 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: ): client_kwargs["retry_strategy"] = retry_instance + if s3_anonymous := self.properties.get(S3_ANONYMOUS): + client_kwargs["anonymous"] = strtobool(s3_anonymous) + return S3FileSystem(**client_kwargs) def _initialize_azure_fs(self) -> FileSystem: @@ -2793,9 +2801,11 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T functools.reduce( operator.and_, [ - pc.field(partition_field_name) == unique_partition[partition_field_name] - if unique_partition[partition_field_name] is not None - else pc.field(partition_field_name).is_null() + ( + pc.field(partition_field_name) == unique_partition[partition_field_name] + if unique_partition[partition_field_name] is not None + else pc.field(partition_field_name).is_null() + ) for field, partition_field_name in zip(spec.fields, partition_fields) ], ) diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py index 854172727d..11b168dc16 100644 --- a/tests/io/test_fsspec.py +++ b/tests/io/test_fsspec.py @@ -266,6 +266,37 @@ def test_fsspec_s3_session_properties() -> None: ) +def test_fsspec_s3_session_properties_with_anonymous() -> None: + session_properties: Properties = { + "s3.anonymous": "true", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + "s3.region": "us-east-1", + "s3.session-token": "s3.session-token", + **UNIFIED_AWS_SESSION_PROPERTIES, + } + + with mock.patch("s3fs.S3FileSystem") as mock_s3fs: + s3_fileio = FsspecFileIO(properties=session_properties) + filename = str(uuid.uuid4()) + + s3_fileio.new_input(location=f"s3://warehouse/{filename}") + + mock_s3fs.assert_called_with( + client_kwargs={ + "endpoint_url": "http://localhost:9000", + "aws_access_key_id": "admin", + "aws_secret_access_key": "password", + "region_name": "us-east-1", + "aws_session_token": "s3.session-token", + }, + config_kwargs={ + "anon": True, + }, + ) + + def test_fsspec_unified_session_properties() -> None: session_properties: Properties = { "s3.endpoint": "http://localhost:9000", diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index f5c3082edc..420ff6f2c9 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -390,6 +390,35 @@ def test_pyarrow_s3_session_properties() -> None: ) +def test_pyarrow_s3_session_properties_with_anonymous() -> None: + session_properties: Properties = { + "s3.anonymous": "true", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + "s3.region": "us-east-1", + "s3.session-token": "s3.session-token", + **UNIFIED_AWS_SESSION_PROPERTIES, + } + + with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver: + s3_fileio = PyArrowFileIO(properties=session_properties) + filename = str(uuid.uuid4()) + + # Mock `resolve_s3_region` to prevent from the location used resolving to a different s3 region + mock_s3_region_resolver.side_effect = OSError("S3 bucket is not found") + s3_fileio.new_input(location=f"s3://warehouse/{filename}") + + mock_s3fs.assert_called_with( + anonymous=True, + endpoint_override="http://localhost:9000", + access_key="admin", + secret_key="password", + region="us-east-1", + session_token="s3.session-token", + ) + + def test_pyarrow_unified_session_properties() -> None: session_properties: Properties = { "s3.endpoint": "http://localhost:9000",