Skip to content

Commit a7f6c08

Browse files
gmweavergarrett.weaver
andauthored
add support to configure anonymous in s3 clients (#2323)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Currently, it is not possible to configure `anonymous` in the underlying `s3fs` client (`False` by default). Resolves #2126 # Are these changes tested? Yes, unit tests added to confirm the setting is used when the property is set. # Are there any user-facing changes? Yes, users can now set the property `s3.anonymous` to True to enable. If not set, the behavior will be the same as it currently is. <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: garrett.weaver <[email protected]>
1 parent 6c1a1b2 commit a7f6c08

File tree

6 files changed

+81
-3
lines changed

6 files changed

+81
-3
lines changed

mkdocs/docs/configuration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ For the FileIO there are several configuration options available:
127127
| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
128128
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
129129
| s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. |
130+
| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or boto's credential resolver. |
130131

131132
<!-- markdown-link-check-enable-->
132133

@@ -197,6 +198,7 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya
197198
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
198199
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
199200
| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This is set to `True` by default as OSS can only be accessed with virtual hosted style address. |
201+
| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or standard AWS configuration methods. |
200202

201203
<!-- markdown-link-check-enable-->
202204

pyiceberg/io/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
AWS_SESSION_TOKEN = "client.session-token"
5353
AWS_ROLE_ARN = "client.role-arn"
5454
AWS_ROLE_SESSION_NAME = "client.role-session-name"
55+
S3_ANONYMOUS = "s3.anonymous"
5556
S3_ENDPOINT = "s3.endpoint"
5657
S3_ACCESS_KEY_ID = "s3.access-key-id"
5758
S3_SECRET_ACCESS_KEY = "s3.secret-access-key"

pyiceberg/io/fsspec.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
HF_ENDPOINT,
6666
HF_TOKEN,
6767
S3_ACCESS_KEY_ID,
68+
S3_ANONYMOUS,
6869
S3_CONNECT_TIMEOUT,
6970
S3_ENDPOINT,
7071
S3_PROXY_URI,
@@ -83,6 +84,7 @@
8384
OutputStream,
8485
)
8586
from pyiceberg.typedef import Properties
87+
from pyiceberg.types import strtobool
8688
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool
8789

8890
logger = logging.getLogger(__name__)
@@ -164,6 +166,9 @@ def _s3(properties: Properties) -> AbstractFileSystem:
164166
if request_timeout := properties.get(S3_REQUEST_TIMEOUT):
165167
config_kwargs["read_timeout"] = float(request_timeout)
166168

169+
if s3_anonymous := properties.get(S3_ANONYMOUS):
170+
config_kwargs["anon"] = strtobool(s3_anonymous)
171+
167172
fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs)
168173

169174
for event_name, event_function in register_events.items():

pyiceberg/io/pyarrow.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
HDFS_USER,
110110
PYARROW_USE_LARGE_TYPES_ON_READ,
111111
S3_ACCESS_KEY_ID,
112+
S3_ANONYMOUS,
112113
S3_CONNECT_TIMEOUT,
113114
S3_ENDPOINT,
114115
S3_FORCE_VIRTUAL_ADDRESSING,
@@ -179,6 +180,7 @@
179180
TimeType,
180181
UnknownType,
181182
UUIDType,
183+
strtobool,
182184
)
183185
from pyiceberg.utils.concurrent import ExecutorFactory
184186
from pyiceberg.utils.config import Config
@@ -450,6 +452,9 @@ def _initialize_oss_fs(self) -> FileSystem:
450452
if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
451453
client_kwargs["session_name"] = session_name
452454

455+
if s3_anonymous := self.properties.get(S3_ANONYMOUS):
456+
client_kwargs["anonymous"] = strtobool(s3_anonymous)
457+
453458
return S3FileSystem(**client_kwargs)
454459

455460
def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
@@ -501,6 +506,9 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
501506
):
502507
client_kwargs["retry_strategy"] = retry_instance
503508

509+
if s3_anonymous := self.properties.get(S3_ANONYMOUS):
510+
client_kwargs["anonymous"] = strtobool(s3_anonymous)
511+
504512
return S3FileSystem(**client_kwargs)
505513

506514
def _initialize_azure_fs(self) -> FileSystem:
@@ -2797,9 +2805,11 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
27972805
functools.reduce(
27982806
operator.and_,
27992807
[
2800-
pc.field(partition_field_name) == unique_partition[partition_field_name]
2801-
if unique_partition[partition_field_name] is not None
2802-
else pc.field(partition_field_name).is_null()
2808+
(
2809+
pc.field(partition_field_name) == unique_partition[partition_field_name]
2810+
if unique_partition[partition_field_name] is not None
2811+
else pc.field(partition_field_name).is_null()
2812+
)
28032813
for field, partition_field_name in zip(spec.fields, partition_fields)
28042814
],
28052815
)

tests/io/test_fsspec.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,37 @@ def test_fsspec_s3_session_properties() -> None:
266266
)
267267

268268

269+
def test_fsspec_s3_session_properties_with_anonymous() -> None:
270+
session_properties: Properties = {
271+
"s3.anonymous": "true",
272+
"s3.endpoint": "http://localhost:9000",
273+
"s3.access-key-id": "admin",
274+
"s3.secret-access-key": "password",
275+
"s3.region": "us-east-1",
276+
"s3.session-token": "s3.session-token",
277+
**UNIFIED_AWS_SESSION_PROPERTIES,
278+
}
279+
280+
with mock.patch("s3fs.S3FileSystem") as mock_s3fs:
281+
s3_fileio = FsspecFileIO(properties=session_properties)
282+
filename = str(uuid.uuid4())
283+
284+
s3_fileio.new_input(location=f"s3://warehouse/{filename}")
285+
286+
mock_s3fs.assert_called_with(
287+
client_kwargs={
288+
"endpoint_url": "http://localhost:9000",
289+
"aws_access_key_id": "admin",
290+
"aws_secret_access_key": "password",
291+
"region_name": "us-east-1",
292+
"aws_session_token": "s3.session-token",
293+
},
294+
config_kwargs={
295+
"anon": True,
296+
},
297+
)
298+
299+
269300
def test_fsspec_unified_session_properties() -> None:
270301
session_properties: Properties = {
271302
"s3.endpoint": "http://localhost:9000",

tests/io/test_pyarrow.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,35 @@ def test_pyarrow_s3_session_properties() -> None:
390390
)
391391

392392

393+
def test_pyarrow_s3_session_properties_with_anonymous() -> None:
394+
session_properties: Properties = {
395+
"s3.anonymous": "true",
396+
"s3.endpoint": "http://localhost:9000",
397+
"s3.access-key-id": "admin",
398+
"s3.secret-access-key": "password",
399+
"s3.region": "us-east-1",
400+
"s3.session-token": "s3.session-token",
401+
**UNIFIED_AWS_SESSION_PROPERTIES,
402+
}
403+
404+
with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
405+
s3_fileio = PyArrowFileIO(properties=session_properties)
406+
filename = str(uuid.uuid4())
407+
408+
# Mock `resolve_s3_region` to prevent from the location used resolving to a different s3 region
409+
mock_s3_region_resolver.side_effect = OSError("S3 bucket is not found")
410+
s3_fileio.new_input(location=f"s3://warehouse/{filename}")
411+
412+
mock_s3fs.assert_called_with(
413+
anonymous=True,
414+
endpoint_override="http://localhost:9000",
415+
access_key="admin",
416+
secret_key="password",
417+
region="us-east-1",
418+
session_token="s3.session-token",
419+
)
420+
421+
393422
def test_pyarrow_unified_session_properties() -> None:
394423
session_properties: Properties = {
395424
"s3.endpoint": "http://localhost:9000",

0 commit comments

Comments
 (0)