Skip to content

Commit 829b7dc

Browse files
geruhkevinjqliuFokkosmaheshwar-pltr
authored
Add support for write.metadata.path (#1642)
Adding support for writing metadata to a custom path set via `write.metadata.path` property. Since the Python library consolidates the table operation classes in both the table and catalog classes, I had to surface the metadata file location handling to the base `Catalog` class to avoid circular dependencies. This way we are also able to centralize the metadata location handling for table metadata and snapshots. Relates to #1492 --------- Co-authored-by: Kevin Liu <[email protected]> Co-authored-by: Fokko Driesprong <[email protected]> Co-authored-by: smaheshwar-pltr <[email protected]>
1 parent 8014b6c commit 829b7dc

File tree

9 files changed

+158
-30
lines changed

9 files changed

+158
-30
lines changed

mkdocs/docs/configuration.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ Iceberg tables support table properties to configure table behavior.
6868
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
6969
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
7070
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |
71+
| `write.metadata.path` | String pointing to location | `{metadata.location}/metadata` | Sets the location under which metadata is written. |
7172

7273
### Table behavior options
7374

@@ -203,12 +204,16 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya
203204

204205
## Location Providers
205206

206-
Apache Iceberg uses the concept of a `LocationProvider` to manage file paths for a table's data. In PyIceberg, the
207-
`LocationProvider` module is designed to be pluggable, allowing customization for specific use cases. The
207+
Apache Iceberg uses the concept of a `LocationProvider` to manage file paths for a table's data files. In PyIceberg, the
208+
`LocationProvider` module is designed to be pluggable, allowing customization for specific use cases, and to additionally determine metadata file locations. The
208209
`LocationProvider` for a table can be specified through table properties.
209210

210-
PyIceberg defaults to the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider), which generates
211-
file paths that are optimized for object storage.
211+
Both data file and metadata file locations can be customized by configuring the table properties [`write.data.path` and `write.metadata.path`](#write-options), respectively.
212+
213+
For more granular control, you can override the `LocationProvider`'s `new_data_location` and `new_metadata_location` methods to define custom logic for generating file paths. See [`Loading a Custom Location Provider`](configuration.md#loading-a-custom-location-provider).
214+
215+
PyIceberg defaults to the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider), which generates file paths for
216+
data files that are optimized for object storage.
212217

213218
### Simple Location Provider
214219

pyiceberg/catalog/__init__.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
Table,
5858
TableProperties,
5959
)
60+
from pyiceberg.table.locations import load_location_provider
6061
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
6162
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6263
from pyiceberg.table.update import (
@@ -857,7 +858,8 @@ def _create_staged_table(
857858
database_name, table_name = self.identifier_to_database_and_table(identifier)
858859

859860
location = self._resolve_table_location(location, database_name, table_name)
860-
metadata_location = self._get_metadata_location(location=location)
861+
provider = load_location_provider(location, properties)
862+
metadata_location = provider.new_table_metadata_file_location()
861863
metadata = new_table_metadata(
862864
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
863865
)
@@ -888,7 +890,8 @@ def _update_and_stage_table(
888890
)
889891

890892
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
891-
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
893+
provider = load_location_provider(updated_metadata.location, updated_metadata.properties)
894+
new_metadata_location = provider.new_table_metadata_file_location(new_metadata_version)
892895

893896
return StagedTable(
894897
identifier=table_identifier,
@@ -945,13 +948,6 @@ def _get_default_warehouse_location(self, database_name: str, table_name: str) -
945948
def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None:
946949
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
947950

948-
@staticmethod
949-
def _get_metadata_location(location: str, new_version: int = 0) -> str:
950-
if new_version < 0:
951-
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
952-
version_str = f"{new_version:05d}"
953-
return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"
954-
955951
@staticmethod
956952
def _parse_metadata_version(metadata_location: str) -> int:
957953
"""Parse the version from the metadata location.

pyiceberg/catalog/dynamodb.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from pyiceberg.schema import Schema
5555
from pyiceberg.serializers import FromInputFile
5656
from pyiceberg.table import CommitTableResponse, Table
57+
from pyiceberg.table.locations import load_location_provider
5758
from pyiceberg.table.metadata import new_table_metadata
5859
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
5960
from pyiceberg.table.update import (
@@ -173,7 +174,9 @@ def create_table(
173174
database_name, table_name = self.identifier_to_database_and_table(identifier)
174175

175176
location = self._resolve_table_location(location, database_name, table_name)
176-
metadata_location = self._get_metadata_location(location=location)
177+
provider = load_location_provider(table_location=location, table_properties=properties)
178+
metadata_location = provider.new_table_metadata_file_location()
179+
177180
metadata = new_table_metadata(
178181
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
179182
)

pyiceberg/catalog/sql.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
from pyiceberg.schema import Schema
6363
from pyiceberg.serializers import FromInputFile
6464
from pyiceberg.table import CommitTableResponse, Table
65+
from pyiceberg.table.locations import load_location_provider
6566
from pyiceberg.table.metadata import new_table_metadata
6667
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6768
from pyiceberg.table.update import (
@@ -207,7 +208,8 @@ def create_table(
207208

208209
namespace = Catalog.namespace_to_string(namespace_identifier)
209210
location = self._resolve_table_location(location, namespace, table_name)
210-
metadata_location = self._get_metadata_location(location=location)
211+
location_provider = load_location_provider(table_location=location, table_properties=properties)
212+
metadata_location = location_provider.new_table_metadata_file_location()
211213
metadata = new_table_metadata(
212214
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
213215
)

pyiceberg/table/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
)
8080
from pyiceberg.schema import Schema
8181
from pyiceberg.table.inspect import InspectTable
82+
from pyiceberg.table.locations import LocationProvider, load_location_provider
8283
from pyiceberg.table.metadata import (
8384
INITIAL_SEQUENCE_NUMBER,
8485
TableMetadata,
@@ -208,6 +209,7 @@ class TableProperties:
208209
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True
209210

210211
WRITE_DATA_PATH = "write.data.path"
212+
WRITE_METADATA_PATH = "write.metadata.path"
211213

212214
DELETE_MODE = "write.delete.mode"
213215
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
@@ -1008,6 +1010,10 @@ def location(self) -> str:
10081010
"""Return the table's base location."""
10091011
return self.metadata.location
10101012

1013+
def location_provider(self) -> LocationProvider:
1014+
"""Return the table's location provider."""
1015+
return load_location_provider(table_location=self.metadata.location, table_properties=self.metadata.properties)
1016+
10111017
@property
10121018
def last_sequence_number(self) -> int:
10131019
return self.metadata.last_sequence_number

pyiceberg/table/locations.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717
import importlib
1818
import logging
19+
import uuid
1920
from abc import ABC, abstractmethod
2021
from typing import Optional
2122

@@ -29,7 +30,7 @@
2930

3031

3132
class LocationProvider(ABC):
32-
"""A base class for location providers, that provide data file locations for a table's write tasks.
33+
"""A base class for location providers, that provide file locations for a table's write tasks.
3334
3435
Args:
3536
table_location (str): The table's base storage location.
@@ -40,6 +41,7 @@ class LocationProvider(ABC):
4041
table_properties: Properties
4142

4243
data_path: str
44+
metadata_path: str
4345

4446
def __init__(self, table_location: str, table_properties: Properties):
4547
self.table_location = table_location
@@ -52,6 +54,11 @@ def __init__(self, table_location: str, table_properties: Properties):
5254
else:
5355
self.data_path = f"{self.table_location.rstrip('/')}/data"
5456

57+
if path := table_properties.get(TableProperties.WRITE_METADATA_PATH):
58+
self.metadata_path = path.rstrip("/")
59+
else:
60+
self.metadata_path = f"{self.table_location.rstrip('/')}/metadata"
61+
5562
@abstractmethod
5663
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
5764
"""Return a fully-qualified data file location for the given filename.
@@ -64,6 +71,35 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti
6471
str: A fully-qualified location URI for the data file.
6572
"""
6673

74+
def new_table_metadata_file_location(self, new_version: int = 0) -> str:
75+
"""Return a fully-qualified metadata file location for a new table version.
76+
77+
Args:
78+
new_version (int): Version number of the metadata file.
79+
80+
Returns:
81+
str: fully-qualified URI for the new table metadata file.
82+
83+
Raises:
84+
ValueError: If the version is negative.
85+
"""
86+
if new_version < 0:
87+
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
88+
89+
file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
90+
return self.new_metadata_location(file_name)
91+
92+
def new_metadata_location(self, metadata_file_name: str) -> str:
93+
"""Return a fully-qualified metadata file location for the given filename.
94+
95+
Args:
96+
metadata_file_name (str): Name of the metadata file.
97+
98+
Returns:
99+
str: A fully-qualified location URI for the metadata file.
100+
"""
101+
return f"{self.metadata_path}/{metadata_file_name}"
102+
67103

68104
class SimpleLocationProvider(LocationProvider):
69105
def __init__(self, table_location: str, table_properties: Properties):

pyiceberg/table/update/snapshot.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,14 @@
8484
from pyiceberg.table import Transaction
8585

8686

87-
def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
88-
return f"{location}/metadata/{commit_uuid}-m{num}.avro"
87+
def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str:
88+
return f"{commit_uuid}-m{num}.avro"
8989

9090

91-
def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
91+
def _new_manifest_list_file_name(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
9292
# Mimics the behavior in Java:
9393
# https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
94-
return f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
94+
return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
9595

9696

9797
class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
@@ -243,13 +243,13 @@ def _commit(self) -> UpdatesAndRequirements:
243243
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
244244

245245
summary = self._summary(self.snapshot_properties)
246-
247-
manifest_list_file_path = _generate_manifest_list_path(
248-
location=self._transaction.table_metadata.location,
246+
file_name = _new_manifest_list_file_name(
249247
snapshot_id=self._snapshot_id,
250248
attempt=0,
251249
commit_uuid=self.commit_uuid,
252250
)
251+
location_provider = self._transaction._table.location_provider()
252+
manifest_list_file_path = location_provider.new_metadata_location(file_name)
253253
with write_manifest_list(
254254
format_version=self._transaction.table_metadata.format_version,
255255
output_file=self._io.new_output(manifest_list_file_path),
@@ -295,13 +295,10 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
295295
)
296296

297297
def new_manifest_output(self) -> OutputFile:
298-
return self._io.new_output(
299-
_new_manifest_path(
300-
location=self._transaction.table_metadata.location,
301-
num=next(self._manifest_num_counter),
302-
commit_uuid=self.commit_uuid,
303-
)
304-
)
298+
location_provider = self._transaction._table.location_provider()
299+
file_name = _new_manifest_file_name(num=next(self._manifest_num_counter), commit_uuid=self.commit_uuid)
300+
file_path = location_provider.new_metadata_location(file_name)
301+
return self._io.new_output(file_path)
305302

306303
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
307304
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)

tests/catalog/test_base.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
TableAlreadyExistsError,
3636
)
3737
from pyiceberg.io import WAREHOUSE
38+
from pyiceberg.io.pyarrow import schema_to_pyarrow
3839
from pyiceberg.partitioning import PartitionField, PartitionSpec
3940
from pyiceberg.schema import Schema
4041
from pyiceberg.table import (
4142
Table,
43+
TableProperties,
4244
)
4345
from pyiceberg.table.update import (
4446
AddSchemaUpdate,
@@ -563,3 +565,60 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None
563565
with pytest.raises(ValidationError) as exc_info:
564566
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
565567
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
568+
569+
570+
def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> None:
571+
metadata_path = f"{catalog._warehouse_location}/custom/path"
572+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
573+
table = catalog.create_table(
574+
identifier=TEST_TABLE_IDENTIFIER,
575+
schema=TEST_TABLE_SCHEMA,
576+
partition_spec=TEST_TABLE_PARTITION_SPEC,
577+
properties={TableProperties.WRITE_METADATA_PATH: metadata_path},
578+
)
579+
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
580+
table.append(df)
581+
manifests = table.current_snapshot().manifests(table.io) # type: ignore
582+
location_provider = table.location_provider()
583+
584+
assert location_provider.new_metadata_location("").startswith(metadata_path)
585+
assert manifests[0].manifest_path.startswith(metadata_path)
586+
assert table.location() != metadata_path
587+
assert table.metadata_location.startswith(metadata_path)
588+
589+
590+
def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None:
591+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
592+
table = catalog.create_table(
593+
identifier=TEST_TABLE_IDENTIFIER,
594+
schema=TEST_TABLE_SCHEMA,
595+
partition_spec=TEST_TABLE_PARTITION_SPEC,
596+
properties=TEST_TABLE_PROPERTIES,
597+
)
598+
metadata_path = f"{table.location()}/metadata"
599+
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
600+
table.append(df)
601+
manifests = table.current_snapshot().manifests(table.io) # type: ignore
602+
location_provider = table.location_provider()
603+
604+
assert location_provider.new_metadata_location("").startswith(metadata_path)
605+
assert manifests[0].manifest_path.startswith(metadata_path)
606+
assert table.metadata_location.startswith(metadata_path)
607+
608+
609+
def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) -> None:
610+
catalog.create_namespace(TEST_TABLE_NAMESPACE)
611+
table = catalog.create_table(
612+
identifier=TEST_TABLE_IDENTIFIER,
613+
schema=TEST_TABLE_SCHEMA,
614+
partition_spec=TEST_TABLE_PARTITION_SPEC,
615+
)
616+
617+
initial_metadata_path = f"{table.location()}/metadata"
618+
assert table.location_provider().new_metadata_location("metadata.json") == f"{initial_metadata_path}/metadata.json"
619+
620+
# update table with new path for metadata
621+
new_metadata_path = f"{table.location()}/custom/path"
622+
table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: new_metadata_path}).commit_transaction()
623+
624+
assert table.location_provider().new_metadata_location("metadata.json") == f"{new_metadata_path}/metadata.json"

tests/table/test_locations.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,27 @@ def test_simple_location_provider_write_data_path() -> None:
157157
)
158158

159159
assert provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/file.parquet"
160+
161+
162+
def test_location_provider_metadata_default_location() -> None:
163+
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)
164+
165+
assert provider.new_metadata_location("manifest.avro") == "table_location/metadata/manifest.avro"
166+
167+
168+
def test_location_provider_metadata_location_with_custom_path() -> None:
169+
provider = load_location_provider(
170+
table_location="table_location",
171+
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path"},
172+
)
173+
174+
assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"
175+
176+
177+
def test_metadata_location_with_trailing_slash() -> None:
178+
provider = load_location_provider(
179+
table_location="table_location",
180+
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path/"},
181+
)
182+
183+
assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"

0 commit comments

Comments
 (0)