Skip to content

Commit 5918924

Browse files
authored
Merge branch 'apache:main' into add-between-keyword
2 parents 763db24 + 8013545 commit 5918924

File tree

11 files changed

+331
-18
lines changed

11 files changed

+331
-18
lines changed

.github/workflows/check-md-link.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,4 @@ jobs:
3636
runs-on: ubuntu-latest
3737
steps:
3838
- uses: actions/checkout@master
39-
- uses: gaurav-nelson/github-action-markdown-link-check@v1
39+
- uses: tcort/github-action-markdown-link-check@v1

mkdocs/docs/api.md

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,33 @@ To show only data files or delete files in the current snapshot, use `table.insp
10041004

10051005
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
10061006

1007+
<!-- prettier-ignore-start -->
1008+
1009+
!!! note "Name Mapping"
1010+
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.
1011+
1012+
!!! note "Partitions"
1013+
`add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`.
1014+
1015+
!!! warning "Maintenance Operations"
1016+
Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them.
1017+
1018+
!!! warning "Check Duplicate Files"
1019+
The `check_duplicate_files` parameter determines whether the method validates that the specified `file_paths` do not already exist in the Iceberg table. When set to True (the default), the method performs a validation against the table’s current data files to prevent accidental duplication, helping to maintain data consistency by ensuring the same file is not added multiple times. While this check is important for data integrity, it can introduce performance overhead for tables with a large number of files. Setting check_duplicate_files=False can improve performance but increases the risk of duplicate files, which may lead to data inconsistencies or table corruption. It is strongly recommended to keep this parameter enabled unless duplicate file handling is strictly enforced elsewhere.
1020+
1021+
<!-- prettier-ignore-end -->
1022+
1023+
### Usage
1024+
1025+
| Parameter | Required? | Type | Description |
1026+
| ------------------------- | --------- | -------------- | ----------------------------------------------------------------------- |
1027+
| `file_paths` | ✔️ | List[str] | The list of full file paths to be added as data files to the table |
1028+
| `snapshot_properties` | | Dict[str, str] | Properties to set for the new snapshot. Defaults to an empty dictionary |
1029+
| `check_duplicate_files` | | bool | Whether to check for duplicate files. Defaults to `True` |
1030+
1031+
### Example
1032+
1033+
Add files to Iceberg table:
10071034
```python
10081035
# Given that these parquet files have schema consistent with the Iceberg table
10091036
@@ -1019,18 +1046,34 @@ tbl.add_files(file_paths=file_paths)
10191046
# A new snapshot is committed to the table with manifests pointing to the existing parquet files
10201047
```
10211048

1022-
<!-- prettier-ignore-start -->
1049+
Add files to Iceberg table with custom snapshot properties:
1050+
```python
1051+
# Assume an existing Iceberg table object `tbl`
10231052

1024-
!!! note "Name Mapping"
1025-
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.
1053+
file_paths = [
1054+
"s3a://warehouse/default/existing-1.parquet",
1055+
"s3a://warehouse/default/existing-2.parquet",
1056+
]
10261057

1027-
!!! note "Partitions"
1028-
`add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`.
1058+
# Custom snapshot properties
1059+
snapshot_properties = {"abc": "def"}
10291060

1030-
!!! warning "Maintenance Operations"
1031-
Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them.
1061+
# Enable duplicate file checking
1062+
check_duplicate_files = True
10321063

1033-
<!-- prettier-ignore-end -->
1064+
# Add the Parquet files to the Iceberg table without rewriting
1065+
tbl.add_files(
1066+
file_paths=file_paths,
1067+
snapshot_properties=snapshot_properties,
1068+
check_duplicate_files=check_duplicate_files
1069+
)
1070+
1071+
# NameMapping must have been set to enable reads
1072+
assert tbl.name_mapping() is not None
1073+
1074+
# Verify that the snapshot property was set correctly
1075+
assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
1076+
```
10341077

10351078
## Schema evolution
10361079

mkdocs/docs/configuration.md

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ For the FileIO there are several configuration options available:
127127
| s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. |
128128
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
129129
| s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. |
130+
| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or boto's credential resolver. |
130131

131132
<!-- markdown-link-check-enable-->
132133

@@ -197,6 +198,7 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya
197198
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
198199
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
199200
| s3.force-virtual-addressing | True | Whether to use virtual addressing of buckets. This is set to `True` by default as OSS can only be accessed with virtual hosted style address. |
201+
| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or standard AWS configuration methods. |
200202

201203
<!-- markdown-link-check-enable-->
202204

@@ -388,6 +390,7 @@ The RESTCatalog supports pluggable authentication via the `auth` configuration b
388390

389391
- `noop`: No authentication (no Authorization header sent).
390392
- `basic`: HTTP Basic authentication.
393+
- `oauth2`: OAuth2 client credentials flow.
391394
- `custom`: Custom authentication manager (requires `auth.impl`).
392395
- `google`: Google Authentication support
393396

@@ -411,9 +414,10 @@ catalog:
411414

412415
| Property | Required | Description |
413416
|------------------|----------|-------------------------------------------------------------------------------------------------|
414-
| `auth.type` | Yes | The authentication type to use (`noop`, `basic`, or `custom`). |
417+
| `auth.type` | Yes | The authentication type to use (`noop`, `basic`, `oauth2`, or `custom`). |
415418
| `auth.impl` | Conditionally | The fully qualified class path for a custom AuthManager. Required if `auth.type` is `custom`. |
416419
| `auth.basic` | If type is `basic` | Block containing `username` and `password` for HTTP Basic authentication. |
420+
| `auth.oauth2` | If type is `oauth2` | Block containing OAuth2 configuration (see below). |
417421
| `auth.custom` | If type is `custom` | Block containing configuration for the custom AuthManager. |
418422
| `auth.google` | If type is `google` | Block containing `credentials_path` to a service account file (if using). Will default to using Application Default Credentials. |
419423

@@ -436,6 +440,20 @@ auth:
436440
password: mypass
437441
```
438442

443+
OAuth2 Authentication:
444+
445+
```yaml
446+
auth:
447+
type: oauth2
448+
oauth2:
449+
client_id: my-client-id
450+
client_secret: my-client-secret
451+
token_url: https://auth.example.com/oauth/token
452+
scope: read
453+
refresh_margin: 60 # (optional) seconds before expiry to refresh
454+
expires_in: 3600 # (optional) fallback if server does not provide
455+
```
456+
439457
Custom Authentication:
440458

441459
```yaml
@@ -451,7 +469,7 @@ auth:
451469

452470
- If `auth.type` is `custom`, you **must** specify `auth.impl` with the full class path to your custom AuthManager.
453471
- If `auth.type` is not `custom`, specifying `auth.impl` is not allowed.
454-
- The configuration block under each type (e.g., `basic`, `custom`) is passed as keyword arguments to the corresponding AuthManager.
472+
- The configuration block under each type (e.g., `basic`, `oauth2`, `custom`) is passed as keyword arguments to the corresponding AuthManager.
455473

456474
<!-- markdown-link-check-enable-->
457475

pyiceberg/catalog/rest/auth.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
import base64
1919
import importlib
2020
import logging
21+
import threading
22+
import time
2123
from abc import ABC, abstractmethod
24+
from functools import cached_property
2225
from typing import Any, Dict, List, Optional, Type
2326

27+
import requests
2428
from requests import HTTPError, PreparedRequest, Session
2529
from requests.auth import AuthBase
2630

@@ -121,6 +125,98 @@ def auth_header(self) -> str:
121125
return f"Bearer {self._token}"
122126

123127

128+
class OAuth2TokenProvider:
129+
"""Thread-safe OAuth2 token provider with token refresh support."""
130+
131+
client_id: str
132+
client_secret: str
133+
token_url: str
134+
scope: Optional[str]
135+
refresh_margin: int
136+
expires_in: Optional[int]
137+
138+
_token: Optional[str]
139+
_expires_at: int
140+
_lock: threading.Lock
141+
142+
def __init__(
143+
self,
144+
client_id: str,
145+
client_secret: str,
146+
token_url: str,
147+
scope: Optional[str] = None,
148+
refresh_margin: int = 60,
149+
expires_in: Optional[int] = None,
150+
):
151+
self.client_id = client_id
152+
self.client_secret = client_secret
153+
self.token_url = token_url
154+
self.scope = scope
155+
self.refresh_margin = refresh_margin
156+
self.expires_in = expires_in
157+
158+
self._token = None
159+
self._expires_at = 0
160+
self._lock = threading.Lock()
161+
162+
@cached_property
163+
def _client_secret_header(self) -> str:
164+
creds = f"{self.client_id}:{self.client_secret}"
165+
creds_bytes = creds.encode("utf-8")
166+
b64_creds = base64.b64encode(creds_bytes).decode("utf-8")
167+
return f"Basic {b64_creds}"
168+
169+
def _refresh_token(self) -> None:
170+
data = {"grant_type": "client_credentials"}
171+
if self.scope:
172+
data["scope"] = self.scope
173+
174+
response = requests.post(self.token_url, data=data, headers={"Authorization": self._client_secret_header})
175+
response.raise_for_status()
176+
result = response.json()
177+
178+
self._token = result["access_token"]
179+
expires_in = result.get("expires_in", self.expires_in)
180+
if expires_in is None:
181+
raise ValueError(
182+
"The expiration time of the Token must be provided by the Server in the Access Token Response in `expires_in` field, or by the PyIceberg Client."
183+
)
184+
self._expires_at = time.monotonic() + expires_in - self.refresh_margin
185+
186+
def get_token(self) -> str:
187+
with self._lock:
188+
if not self._token or time.monotonic() >= self._expires_at:
189+
self._refresh_token()
190+
if self._token is None:
191+
raise ValueError("Authorization token is None after refresh")
192+
return self._token
193+
194+
195+
class OAuth2AuthManager(AuthManager):
196+
"""Auth Manager implementation that supports OAuth2 as defined in IETF RFC6749."""
197+
198+
def __init__(
199+
self,
200+
client_id: str,
201+
client_secret: str,
202+
token_url: str,
203+
scope: Optional[str] = None,
204+
refresh_margin: int = 60,
205+
expires_in: Optional[int] = None,
206+
):
207+
self.token_provider = OAuth2TokenProvider(
208+
client_id,
209+
client_secret,
210+
token_url,
211+
scope,
212+
refresh_margin,
213+
expires_in,
214+
)
215+
216+
def auth_header(self) -> str:
217+
return f"Bearer {self.token_provider.get_token()}"
218+
219+
124220
class GoogleAuthManager(AuthManager):
125221
"""An auth manager that is responsible for handling Google credentials."""
126222

@@ -228,4 +324,5 @@ def create(cls, class_or_name: str, config: Dict[str, Any]) -> AuthManager:
228324
AuthManagerFactory.register("noop", NoopAuthManager)
229325
AuthManagerFactory.register("basic", BasicAuthManager)
230326
AuthManagerFactory.register("legacyoauth2", LegacyOAuth2AuthManager)
327+
AuthManagerFactory.register("oauth2", OAuth2AuthManager)
231328
AuthManagerFactory.register("google", GoogleAuthManager)

pyiceberg/io/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
AWS_SESSION_TOKEN = "client.session-token"
5353
AWS_ROLE_ARN = "client.role-arn"
5454
AWS_ROLE_SESSION_NAME = "client.role-session-name"
55+
S3_ANONYMOUS = "s3.anonymous"
5556
S3_ENDPOINT = "s3.endpoint"
5657
S3_ACCESS_KEY_ID = "s3.access-key-id"
5758
S3_SECRET_ACCESS_KEY = "s3.secret-access-key"

pyiceberg/io/fsspec.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
HF_ENDPOINT,
6666
HF_TOKEN,
6767
S3_ACCESS_KEY_ID,
68+
S3_ANONYMOUS,
6869
S3_CONNECT_TIMEOUT,
6970
S3_ENDPOINT,
7071
S3_PROXY_URI,
@@ -83,6 +84,7 @@
8384
OutputStream,
8485
)
8586
from pyiceberg.typedef import Properties
87+
from pyiceberg.types import strtobool
8688
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool
8789

8890
logger = logging.getLogger(__name__)
@@ -164,6 +166,9 @@ def _s3(properties: Properties) -> AbstractFileSystem:
164166
if request_timeout := properties.get(S3_REQUEST_TIMEOUT):
165167
config_kwargs["read_timeout"] = float(request_timeout)
166168

169+
if s3_anonymous := properties.get(S3_ANONYMOUS):
170+
config_kwargs["anon"] = strtobool(s3_anonymous)
171+
167172
fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs)
168173

169174
for event_name, event_function in register_events.items():

pyiceberg/io/pyarrow.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
HDFS_USER,
110110
PYARROW_USE_LARGE_TYPES_ON_READ,
111111
S3_ACCESS_KEY_ID,
112+
S3_ANONYMOUS,
112113
S3_CONNECT_TIMEOUT,
113114
S3_ENDPOINT,
114115
S3_FORCE_VIRTUAL_ADDRESSING,
@@ -179,6 +180,7 @@
179180
TimeType,
180181
UnknownType,
181182
UUIDType,
183+
strtobool,
182184
)
183185
from pyiceberg.utils.concurrent import ExecutorFactory
184186
from pyiceberg.utils.config import Config
@@ -450,6 +452,9 @@ def _initialize_oss_fs(self) -> FileSystem:
450452
if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
451453
client_kwargs["session_name"] = session_name
452454

455+
if s3_anonymous := self.properties.get(S3_ANONYMOUS):
456+
client_kwargs["anonymous"] = strtobool(s3_anonymous)
457+
453458
return S3FileSystem(**client_kwargs)
454459

455460
def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
@@ -501,6 +506,9 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
501506
):
502507
client_kwargs["retry_strategy"] = retry_instance
503508

509+
if s3_anonymous := self.properties.get(S3_ANONYMOUS):
510+
client_kwargs["anonymous"] = strtobool(s3_anonymous)
511+
504512
return S3FileSystem(**client_kwargs)
505513

506514
def _initialize_azure_fs(self) -> FileSystem:
@@ -2445,8 +2453,12 @@ def data_file_statistics_from_parquet_metadata(
24452453

24462454
if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
24472455
scale = stats_col.iceberg_type.scale
2448-
col_aggs[field_id].update_min(unscaled_to_decimal(statistics.min_raw, scale))
2449-
col_aggs[field_id].update_max(unscaled_to_decimal(statistics.max_raw, scale))
2456+
col_aggs[field_id].update_min(
2457+
unscaled_to_decimal(statistics.min_raw, scale)
2458+
) if statistics.min_raw is not None else None
2459+
col_aggs[field_id].update_max(
2460+
unscaled_to_decimal(statistics.max_raw, scale)
2461+
) if statistics.max_raw is not None else None
24502462
else:
24512463
col_aggs[field_id].update_min(statistics.min)
24522464
col_aggs[field_id].update_max(statistics.max)
@@ -2793,9 +2805,11 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
27932805
functools.reduce(
27942806
operator.and_,
27952807
[
2796-
pc.field(partition_field_name) == unique_partition[partition_field_name]
2797-
if unique_partition[partition_field_name] is not None
2798-
else pc.field(partition_field_name).is_null()
2808+
(
2809+
pc.field(partition_field_name) == unique_partition[partition_field_name]
2810+
if unique_partition[partition_field_name] is not None
2811+
else pc.field(partition_field_name).is_null()
2812+
)
27992813
for field, partition_field_name in zip(spec.fields, partition_fields)
28002814
],
28012815
)

0 commit comments

Comments
 (0)