Skip to content

Add support for Pyarrow filesystem specific properties #2251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,33 @@ You can also set the FileIO explicitly:

For the FileIO there are several configuration options available:

### PyArrow FileSystem Extra Properties

When using `PyArrowFileIO`, any properties with filesystem specific prefixes that are not explicitly handled by PyIceberg will be passed to the underlying PyArrow filesystem implementations.

To use these properties, follow the format:

```txt
{fs_scheme}.{parameter_name}
```

- {fs_scheme} is the filesystem scheme (e.g., s3, hdfs, gcs).
- {parameter_name} must match the name expected by the PyArrow filesystem.
- Property values must use the correct type expected by the underlying filesystem (e.g., string, integer, boolean).

Below are examples of supported prefixes and how the properties are passed through:

<!-- markdown-link-check-disable -->

| Property Prefix | FileSystem | Example | Description |
|-----------------|------------------------------------------------------------------------------------------------------|-----------------------------|-----------------------------------------------------|
| `s3.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `s3.load_frequency=900` | Passed as `load_frequency=900` to S3FileSystem |
| `hdfs.` | [HadoopFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html) | `hdfs.replication=3` | Passed as `replication=3` to HadoopFileSystem |
| `gcs.` | [GcsFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.GcsFileSystem.html) | `gcs.project_id=test` | Passed as `project_id='test'` to GcsFileSystem |
| `adls.` | [AzureFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.AzureFileSystem.html) | `adls.blob_cache-size=1024` | Passed as `blob_cache_size=1024` to AzureFileSystem |
| `oss.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `oss.connect_timeout=30.0` | Passed as `connect_timeout=30.0` to S3FileSystem |
| `file.` | [LocalFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.LocalFileSystem.html) | `file.use_mmap=true` | Passed as `use_mmap=True` to LocalFileSystem |

### S3

<!-- markdown-link-check-disable -->
Expand Down
282 changes: 215 additions & 67 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,29 +423,67 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
def _initialize_oss_fs(self) -> FileSystem:
from pyarrow.fs import S3FileSystem

client_kwargs: Dict[str, Any] = {
"endpoint_override": self.properties.get(S3_ENDPOINT),
"access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
"force_virtual_addressing": property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True),
# Mapping from PyIceberg properties to S3FileSystem parameter names
property_mapping = {
S3_ENDPOINT: "endpoint_override",
S3_ACCESS_KEY_ID: "access_key",
AWS_ACCESS_KEY_ID: "access_key",
S3_SECRET_ACCESS_KEY: "secret_key",
AWS_SECRET_ACCESS_KEY: "secret_key",
S3_SESSION_TOKEN: "session_token",
AWS_SESSION_TOKEN: "session_token",
S3_REGION: "region",
AWS_REGION: "region",
S3_PROXY_URI: "proxy_options",
S3_CONNECT_TIMEOUT: "connect_timeout",
S3_REQUEST_TIMEOUT: "request_timeout",
S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing",
}

if proxy_uri := self.properties.get(S3_PROXY_URI):
client_kwargs["proxy_options"] = proxy_uri
# Properties that need special handling
special_properties = {
S3_CONNECT_TIMEOUT,
S3_REQUEST_TIMEOUT,
S3_FORCE_VIRTUAL_ADDRESSING,
S3_ROLE_SESSION_NAME,
S3_RESOLVE_REGION,
AWS_ROLE_SESSION_NAME,
}

if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
client_kwargs["connect_timeout"] = float(connect_timeout)
client_kwargs: Dict[str, Any] = {}

if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
client_kwargs["request_timeout"] = float(request_timeout)
for prop_name, prop_value in self.properties.items():
if prop_value is None:
continue

# Skip properties that need special handling
if prop_name in special_properties:
continue

if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
client_kwargs["role_arn"] = role_arn
# Map known property names to S3FileSystem parameter names
if prop_name in property_mapping:
param_name = property_mapping[prop_name]
client_kwargs[param_name] = prop_value

if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
client_kwargs["session_name"] = session_name
# Pass through any other s3.* properties to S3FileSystem
elif prop_name.startswith("s3."):
param_name = prop_name.split(".", 1)[1]
client_kwargs[param_name] = prop_value

# Handle properties that need first value resolution
if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties:
client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID)

if S3_SECRET_ACCESS_KEY in self.properties or AWS_SECRET_ACCESS_KEY in self.properties:
client_kwargs["secret_key"] = get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY)

if S3_SESSION_TOKEN in self.properties or AWS_SESSION_TOKEN in self.properties:
client_kwargs["session_token"] = get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN)

if S3_REGION in self.properties or AWS_REGION in self.properties:
client_kwargs["region"] = get_first_property_value(self.properties, S3_REGION, AWS_REGION)

client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True)

return S3FileSystem(**client_kwargs)

Expand All @@ -467,32 +505,79 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
else:
bucket_region = provided_region

client_kwargs: Dict[str, Any] = {
"endpoint_override": self.properties.get(S3_ENDPOINT),
"access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region": bucket_region,
# Properties that need special handling
property_mapping = {
S3_ENDPOINT: "endpoint_override",
S3_ACCESS_KEY_ID: "access_key",
AWS_ACCESS_KEY_ID: "access_key",
S3_SECRET_ACCESS_KEY: "secret_key",
AWS_SECRET_ACCESS_KEY: "secret_key",
S3_SESSION_TOKEN: "session_token",
AWS_SESSION_TOKEN: "session_token",
S3_PROXY_URI: "proxy_options",
S3_CONNECT_TIMEOUT: "connect_timeout",
S3_REQUEST_TIMEOUT: "request_timeout",
S3_ROLE_ARN: "role_arn",
AWS_ROLE_ARN: "role_arn",
S3_ROLE_SESSION_NAME: "session_name",
AWS_ROLE_SESSION_NAME: "session_name",
S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing",
S3_RETRY_STRATEGY_IMPL: "retry_strategy",
}

# Properties that need special handling and should not be passed directly
special_properties = {
S3_RESOLVE_REGION,
S3_REGION,
AWS_REGION,
S3_RETRY_STRATEGY_IMPL,
S3_CONNECT_TIMEOUT,
S3_REQUEST_TIMEOUT,
}

if proxy_uri := self.properties.get(S3_PROXY_URI):
client_kwargs["proxy_options"] = proxy_uri
client_kwargs: Dict[str, Any] = {}

client_kwargs["region"] = bucket_region
for prop_name, prop_value in self.properties.items():
if prop_value is None:
continue

# Skip properties that need special handling
if prop_name in special_properties:
continue

if prop_name in property_mapping:
param_name = property_mapping[prop_name]
client_kwargs[param_name] = prop_value

# Pass through any other s3.* properties that might be used by S3FileSystem
elif prop_name.startswith("s3."):
param_name = prop_name.split(".", 1)[1]
client_kwargs[param_name] = prop_value

# Handle properties that need first value resolution
if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties:
client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID)

if S3_SECRET_ACCESS_KEY in self.properties or AWS_SECRET_ACCESS_KEY in self.properties:
client_kwargs["secret_key"] = get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY)

if S3_SESSION_TOKEN in self.properties or AWS_SESSION_TOKEN in self.properties:
client_kwargs["session_token"] = get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN)

if S3_ROLE_ARN in self.properties or AWS_ROLE_ARN in self.properties:
client_kwargs["role_arn"] = get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN)

if S3_ROLE_SESSION_NAME in self.properties or AWS_ROLE_SESSION_NAME in self.properties:
client_kwargs["session_name"] = get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME)

if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
client_kwargs["connect_timeout"] = float(connect_timeout)

if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
client_kwargs["request_timeout"] = float(request_timeout)

if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
client_kwargs["role_arn"] = role_arn

if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
client_kwargs["session_name"] = session_name

if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None:
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False)

# Handle retry strategy special case
if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and (
retry_instance := _import_retry_strategy(retry_strategy_impl)
):
Expand All @@ -512,59 +597,111 @@ def _initialize_azure_fs(self) -> FileSystem:

from pyarrow.fs import AzureFileSystem

client_kwargs: Dict[str, str] = {}

if account_name := self.properties.get(ADLS_ACCOUNT_NAME):
client_kwargs["account_name"] = account_name

if account_key := self.properties.get(ADLS_ACCOUNT_KEY):
client_kwargs["account_key"] = account_key

if blob_storage_authority := self.properties.get(ADLS_BLOB_STORAGE_AUTHORITY):
client_kwargs["blob_storage_authority"] = blob_storage_authority
# Mapping from PyIceberg properties to AzureFileSystem parameter names
property_mapping = {
ADLS_ACCOUNT_NAME: "account_name",
ADLS_ACCOUNT_KEY: "account_key",
ADLS_BLOB_STORAGE_AUTHORITY: "blob_storage_authority",
ADLS_DFS_STORAGE_AUTHORITY: "dfs_storage_authority",
ADLS_BLOB_STORAGE_SCHEME: "blob_storage_scheme",
ADLS_DFS_STORAGE_SCHEME: "dfs_storage_scheme",
ADLS_SAS_TOKEN: "sas_token",
}

if dfs_storage_authority := self.properties.get(ADLS_DFS_STORAGE_AUTHORITY):
client_kwargs["dfs_storage_authority"] = dfs_storage_authority
client_kwargs: Dict[str, Any] = {}

if blob_storage_scheme := self.properties.get(ADLS_BLOB_STORAGE_SCHEME):
client_kwargs["blob_storage_scheme"] = blob_storage_scheme
for prop_name, prop_value in self.properties.items():
if prop_value is None:
continue

if dfs_storage_scheme := self.properties.get(ADLS_DFS_STORAGE_SCHEME):
client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme
# Map known property names to AzureFileSystem parameter names
if prop_name in property_mapping:
param_name = property_mapping[prop_name]
client_kwargs[param_name] = prop_value

if sas_token := self.properties.get(ADLS_SAS_TOKEN):
client_kwargs["sas_token"] = sas_token
# Pass through any other adls.* properties that might be used by AzureFileSystem
elif prop_name.startswith("adls."):
param_name = prop_name.split(".", 1)[1]
client_kwargs[param_name] = prop_value

return AzureFileSystem(**client_kwargs)

def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
from pyarrow.fs import HadoopFileSystem

hdfs_kwargs: Dict[str, Any] = {}
if netloc:
return HadoopFileSystem.from_uri(f"{scheme}://{netloc}")
if host := self.properties.get(HDFS_HOST):
hdfs_kwargs["host"] = host
if port := self.properties.get(HDFS_PORT):
# port should be an integer type
hdfs_kwargs["port"] = int(port)
if user := self.properties.get(HDFS_USER):
hdfs_kwargs["user"] = user
if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
hdfs_kwargs["kerb_ticket"] = kerb_ticket

# Mapping from PyIceberg properties to S3FileSystem parameter names
property_mapping = {
HDFS_HOST: "host",
HDFS_PORT: "port",
HDFS_USER: "user",
HDFS_KERB_TICKET: "kerb_ticket",
}

hdfs_kwargs: Dict[str, Any] = {}

for prop_name, prop_value in self.properties.items():
if prop_value is None:
continue

# Map known property names to HadoopFileSystem parameter names
if prop_name in property_mapping:
param_name = property_mapping[prop_name]

if param_name == "port":
hdfs_kwargs[param_name] = int(prop_value)
else:
hdfs_kwargs[param_name] = prop_value

# Pass through any other hdfs.* properties used to be used by HadoopFileSystem
elif prop_name.startswith("hdfs."):
param_name = prop_name.split(".", 1)[1]
hdfs_kwargs[param_name] = prop_value

return HadoopFileSystem(**hdfs_kwargs)

def _initialize_gcs_fs(self) -> FileSystem:
from pyarrow.fs import GcsFileSystem

# Mapping from PyIceberg properties to GcsFileSystem parameter names
property_mapping = {
GCS_TOKEN: "access_token",
GCS_TOKEN_EXPIRES_AT_MS: "credential_token_expiration",
GCS_DEFAULT_LOCATION: "default_bucket_location",
GCS_SERVICE_HOST: "endpoint_override",
}

# Properties that need special handling
special_properties = {
GCS_TOKEN_EXPIRES_AT_MS,
GCS_SERVICE_HOST,
}

gcs_kwargs: Dict[str, Any] = {}
if access_token := self.properties.get(GCS_TOKEN):
gcs_kwargs["access_token"] = access_token

for prop_name, prop_value in self.properties.items():
if prop_value is None:
continue

# Skip properties that need special handling
if prop_name in special_properties:
continue

# Map known property names to GcsFileSystem parameter names
if prop_name in property_mapping:
param_name = property_mapping[prop_name]
gcs_kwargs[param_name] = prop_value

# Pass through any other gcs.* properties that might be used by GcsFileSystem
elif prop_name.startswith("gcs."):
param_name = prop_name.split(".", 1)[1]
gcs_kwargs[param_name] = prop_value

if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration))
if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION):
gcs_kwargs["default_bucket_location"] = bucket_location

if endpoint := self.properties.get(GCS_SERVICE_HOST):
url_parts = urlparse(endpoint)
gcs_kwargs["scheme"] = url_parts.scheme
Expand All @@ -573,7 +710,18 @@ def _initialize_gcs_fs(self) -> FileSystem:
return GcsFileSystem(**gcs_kwargs)

def _initialize_local_fs(self) -> FileSystem:
return PyArrowLocalFileSystem()
local_kwargs: Dict[str, Any] = {}

for prop_name, prop_value in self.properties.items():
if prop_value is None:
continue

# Pass through any other file.* properties that might be used by PyArrowLocalFileSystem
elif prop_name.startswith("file."):
param_name = prop_name.split(".", 1)[1]
local_kwargs[param_name] = prop_value

return PyArrowLocalFileSystem(**local_kwargs)

def new_input(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to read bytes from the file at the given location.
Expand Down
Loading
Loading