Skip to content

Commit f965370

Browse files
committed
refactor oss
1 parent 9d44d80 commit f965370

File tree

2 files changed

+39
-49
lines changed

2 files changed

+39
-49
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,14 @@
190190
TimeType,
191191
UnknownType,
192192
UUIDType,
193+
strtobool,
193194
)
194195
from pyiceberg.utils.concurrent import ExecutorFactory
195196
from pyiceberg.utils.config import Config
196197
from pyiceberg.utils.datetime import millis_to_datetime
197198
from pyiceberg.utils.decimal import unscaled_to_decimal
198199
from pyiceberg.utils.deprecated import deprecation_message
199-
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
200+
from pyiceberg.utils.properties import get_first_property_value, properties_with_prefix, property_as_bool, property_as_int
200201
from pyiceberg.utils.singleton import Singleton
201202
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
202203

@@ -463,43 +464,41 @@ def _process_basic_properties(
463464
def _initialize_oss_fs(self) -> FileSystem:
464465
from pyarrow.fs import S3FileSystem
465466

466-
# Mapping from PyIceberg properties to S3FileSystem parameter names
467-
property_mapping = {
468-
S3_ENDPOINT: "endpoint_override",
469-
S3_PROXY_URI: "proxy_options",
470-
S3_CONNECT_TIMEOUT: "connect_timeout",
471-
S3_REQUEST_TIMEOUT: "request_timeout",
472-
}
473-
474-
# Properties that need special handling
475-
special_properties = {
476-
S3_ACCESS_KEY_ID,
477-
S3_SECRET_ACCESS_KEY,
478-
S3_SESSION_TOKEN,
479-
S3_CONNECT_TIMEOUT,
480-
S3_REQUEST_TIMEOUT,
481-
S3_FORCE_VIRTUAL_ADDRESSING,
482-
S3_ROLE_SESSION_NAME,
483-
S3_RESOLVE_REGION,
484-
S3_REGION,
485-
}
486-
487-
client_kwargs = self._process_basic_properties(property_mapping, special_properties, "s3")
488-
489-
if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties:
490-
client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID)
491-
492-
if S3_SECRET_ACCESS_KEY in self.properties or AWS_SECRET_ACCESS_KEY in self.properties:
493-
client_kwargs["secret_key"] = get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY)
494-
495-
if S3_SESSION_TOKEN in self.properties or AWS_SESSION_TOKEN in self.properties:
496-
client_kwargs["session_token"] = get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN)
497-
498-
if S3_REGION in self.properties or AWS_REGION in self.properties:
499-
client_kwargs["region"] = get_first_property_value(self.properties, S3_REGION, AWS_REGION)
500-
501-
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True)
467+
client_kwargs = {}
468+
if endpoint := get_first_property_value(self.properties, S3_ENDPOINT, "oss.endpoint_override"):
469+
client_kwargs["endpoint_override"] = endpoint
470+
if access_key := get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "oss.access_key"):
471+
client_kwargs["access_key"] = access_key
472+
if secret_key := get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "oss.secret_key"):
473+
client_kwargs["secret_key"] = secret_key
474+
if session_token := get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "oss.session_token"):
475+
client_kwargs["session_token"] = session_token
476+
if region := get_first_property_value(self.properties, S3_REGION, AWS_REGION, "oss.region"):
477+
client_kwargs["region"] = region
478+
# Check for force_virtual_addressing in order of preference, defaulting to True if not found
479+
if force_virtual_addressing := get_first_property_value(
480+
self.properties, S3_FORCE_VIRTUAL_ADDRESSING, "oss.force_virtual_addressing"
481+
):
482+
if isinstance(force_virtual_addressing, str): # S3_FORCE_VIRTUAL_ADDRESSING's value can be a string
483+
force_virtual_addressing = strtobool(force_virtual_addressing)
484+
client_kwargs["force_virtual_addressing"] = force_virtual_addressing
485+
else:
486+
client_kwargs["force_virtual_addressing"] = True
487+
if proxy_uri := get_first_property_value(self.properties, S3_PROXY_URI, "oss.proxy_options"):
488+
client_kwargs["proxy_options"] = proxy_uri
489+
if connect_timeout := get_first_property_value(self.properties, S3_CONNECT_TIMEOUT, "oss.connect_timeout"):
490+
client_kwargs["connect_timeout"] = float(connect_timeout)
491+
if request_timeout := get_first_property_value(self.properties, S3_REQUEST_TIMEOUT, "oss.request_timeout"):
492+
client_kwargs["request_timeout"] = float(request_timeout)
493+
if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN, "oss.role_arn"):
494+
client_kwargs["role_arn"] = role_arn
495+
if session_name := get_first_property_value(
496+
self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "oss.session_name"
497+
):
498+
client_kwargs["session_name"] = session_name
502499

500+
oss_properties = properties_with_prefix(self.properties, prefix="oss.")
501+
client_kwargs = {**oss_properties, **client_kwargs}
503502
return S3FileSystem(**client_kwargs)
504503

505504
def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:

pyiceberg/utils/properties.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ def properties_with_prefix(
9191
prefix: str,
9292
) -> Properties:
9393
"""
94-
Returns subset of provided map with keys matching the provided prefix. Matching is
95-
case-sensitive and the matching prefix is removed from the keys in returned map.
94+
Return subset of provided map with keys matching the provided prefix. Matching is case-sensitive and the matching prefix is removed from the keys in returned map.
9695
9796
Args:
9897
properties: input map
@@ -104,11 +103,7 @@ def properties_with_prefix(
104103
if not properties:
105104
return {}
106105

107-
return {
108-
key[len(prefix):]: value
109-
for key, value in properties.items()
110-
if key.startswith(prefix)
111-
}
106+
return {key[len(prefix) :]: value for key, value in properties.items() if key.startswith(prefix)}
112107

113108

114109
def filter_properties(
@@ -131,8 +126,4 @@ def filter_properties(
131126
if key_predicate is None:
132127
raise ValueError("Invalid key predicate: None")
133128

134-
return {
135-
key: value
136-
for key, value in properties.items()
137-
if key_predicate(key)
138-
}
129+
return {key: value for key, value in properties.items() if key_predicate(key)}

0 commit comments

Comments
 (0)