Skip to content

Commit 7e5cd32

Browse files
FokkoJasperHG90
andauthored
Feat/update sort order (#2552)
# Rationale for this change Fixed the tests in #1500, but kept the commits so @JasperHG90 gets all the credits for the great work! Closes #1500 Closes #1245 ## Are these changes tested? ## Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Jasper Ginn <[email protected]>
1 parent 5ee5eea commit 7e5cd32

File tree

4 files changed

+348
-1
lines changed

4 files changed

+348
-1
lines changed

mkdocs/docs/api.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,6 +1268,24 @@ with table.update_spec() as update:
12681268
update.rename_field("bucketed_id", "sharded_id")
12691269
```
12701270

1271+
## Sort order updates
1272+
1273+
Users can update the sort order on existing tables for new data. See [sorting](https://iceberg.apache.org/spec/#sorting) for more details.
1274+
1275+
The API to use when updating a sort order is the `update_sort_order` API on the table.
1276+
1277+
Sort orders can only be updated by adding a new sort order. They cannot be deleted or modified.
1278+
1279+
### Updating a sort order on a table
1280+
1281+
To create a new sort order, you can use either the `asc` or `desc` API depending on whether you want you data sorted in ascending or descending order. Both take the name of the field, the sort order transform, and a null order that describes the order of null values when sorted.
1282+
1283+
```python
1284+
with table.update_sort_order() as update:
1285+
update.desc("event_ts", DayTransform(), NullOrder.NULLS_FIRST)
1286+
update.asc("some_field", IdentityTransform(), NullOrder.NULLS_LAST)
1287+
```
1288+
12711289
## Table properties
12721290

12731291
Set and remove properties through the `Transaction` API:

pyiceberg/table/__init__.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,12 @@
116116
update_table_metadata,
117117
)
118118
from pyiceberg.table.update.schema import UpdateSchema
119-
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles
119+
from pyiceberg.table.update.snapshot import (
120+
ManageSnapshots,
121+
UpdateSnapshot,
122+
_FastAppendFiles,
123+
)
124+
from pyiceberg.table.update.sorting import UpdateSortOrder
120125
from pyiceberg.table.update.spec import UpdateSpec
121126
from pyiceberg.table.update.statistics import UpdateStatistics
122127
from pyiceberg.transforms import IdentityTransform
@@ -436,6 +441,20 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
436441
name_mapping=self.table_metadata.name_mapping(),
437442
)
438443

444+
def update_sort_order(self, case_sensitive: bool = True) -> UpdateSortOrder:
445+
"""Create a new UpdateSortOrder to update the sort order of this table.
446+
447+
Args:
448+
case_sensitive: If field names are case-sensitive.
449+
450+
Returns:
451+
A new UpdateSortOrder.
452+
"""
453+
return UpdateSortOrder(
454+
self,
455+
case_sensitive=case_sensitive,
456+
)
457+
439458
def update_snapshot(
440459
self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
441460
) -> UpdateSnapshot:
@@ -1298,6 +1317,14 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
12981317
name_mapping=self.name_mapping(),
12991318
)
13001319

1320+
def update_sort_order(self, case_sensitive: bool = True) -> UpdateSortOrder:
1321+
"""Create a new UpdateSortOrder to update the sort order of this table.
1322+
1323+
Returns:
1324+
A new UpdateSortOrder.
1325+
"""
1326+
return UpdateSortOrder(transaction=Transaction(self, autocommit=True), case_sensitive=case_sensitive)
1327+
13011328
def name_mapping(self) -> Optional[NameMapping]:
13021329
"""Return the table's field-id NameMapping."""
13031330
return self.metadata.name_mapping()

pyiceberg/table/update/sorting.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from typing import TYPE_CHECKING, Any, List, Optional, Tuple
20+
21+
from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, UNSORTED_SORT_ORDER, NullOrder, SortDirection, SortField, SortOrder
22+
from pyiceberg.table.update import (
23+
AddSortOrderUpdate,
24+
AssertDefaultSortOrderId,
25+
SetDefaultSortOrderUpdate,
26+
TableRequirement,
27+
TableUpdate,
28+
UpdatesAndRequirements,
29+
UpdateTableMetadata,
30+
)
31+
from pyiceberg.transforms import Transform
32+
33+
if TYPE_CHECKING:
34+
from pyiceberg.table import Transaction
35+
36+
37+
class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]):
38+
_transaction: Transaction
39+
_last_assigned_order_id: Optional[int]
40+
_case_sensitive: bool
41+
_fields: List[SortField]
42+
43+
def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None:
44+
super().__init__(transaction)
45+
self._fields: List[SortField] = []
46+
self._case_sensitive: bool = case_sensitive
47+
self._last_assigned_order_id: Optional[int] = None
48+
49+
def _column_name_to_id(self, column_name: str) -> int:
50+
"""Map the column name to the column field id."""
51+
return (
52+
self._transaction.table_metadata.schema()
53+
.find_field(
54+
name_or_id=column_name,
55+
case_sensitive=self._case_sensitive,
56+
)
57+
.field_id
58+
)
59+
60+
def _add_sort_field(
61+
self,
62+
source_id: int,
63+
transform: Transform[Any, Any],
64+
direction: SortDirection,
65+
null_order: NullOrder,
66+
) -> UpdateSortOrder:
67+
"""Add a sort field to the sort order list."""
68+
self._fields.append(
69+
SortField(
70+
source_id=source_id,
71+
transform=transform,
72+
direction=direction,
73+
null_order=null_order,
74+
)
75+
)
76+
return self
77+
78+
def _reuse_or_create_sort_order_id(self) -> int:
79+
"""Return the last assigned sort order id or create a new one."""
80+
new_sort_order_id = INITIAL_SORT_ORDER_ID
81+
for sort_order in self._transaction.table_metadata.sort_orders:
82+
new_sort_order_id = max(new_sort_order_id, sort_order.order_id)
83+
if sort_order.fields == self._fields:
84+
return sort_order.order_id
85+
elif new_sort_order_id <= sort_order.order_id:
86+
new_sort_order_id = sort_order.order_id + 1
87+
return new_sort_order_id
88+
89+
def asc(
90+
self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST
91+
) -> UpdateSortOrder:
92+
"""Add a sort field with ascending order."""
93+
return self._add_sort_field(
94+
source_id=self._column_name_to_id(source_column_name),
95+
transform=transform,
96+
direction=SortDirection.ASC,
97+
null_order=null_order,
98+
)
99+
100+
def desc(
101+
self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST
102+
) -> UpdateSortOrder:
103+
"""Add a sort field with descending order."""
104+
return self._add_sort_field(
105+
source_id=self._column_name_to_id(source_column_name),
106+
transform=transform,
107+
direction=SortDirection.DESC,
108+
null_order=null_order,
109+
)
110+
111+
def _apply(self) -> SortOrder:
112+
"""Return the sort order."""
113+
if next(iter(self._fields), None) is None:
114+
return UNSORTED_SORT_ORDER
115+
else:
116+
return SortOrder(*self._fields, order_id=self._reuse_or_create_sort_order_id())
117+
118+
def _commit(self) -> UpdatesAndRequirements:
119+
"""Apply the pending changes and commit."""
120+
new_sort_order = self._apply()
121+
requirements: Tuple[TableRequirement, ...] = ()
122+
updates: Tuple[TableUpdate, ...] = ()
123+
124+
if (
125+
self._transaction.table_metadata.default_sort_order_id != new_sort_order.order_id
126+
and self._transaction.table_metadata.sort_order_by_id(new_sort_order.order_id) is None
127+
):
128+
self._last_assigned_order_id = new_sort_order.order_id
129+
updates = (AddSortOrderUpdate(sort_order=new_sort_order), SetDefaultSortOrderUpdate(sort_order_id=-1))
130+
else:
131+
updates = (SetDefaultSortOrderUpdate(sort_order_id=new_sort_order.order_id),)
132+
133+
required_last_assigned_sort_order_id = self._transaction.table_metadata.default_sort_order_id
134+
requirements = (AssertDefaultSortOrderId(default_sort_order_id=required_last_assigned_sort_order_id),)
135+
136+
return updates, requirements
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
# pylint:disable=redefined-outer-name
18+
19+
import pytest
20+
21+
from pyiceberg.catalog import Catalog
22+
from pyiceberg.exceptions import NoSuchTableError
23+
from pyiceberg.schema import Schema
24+
from pyiceberg.table import Table
25+
from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder
26+
from pyiceberg.transforms import (
27+
IdentityTransform,
28+
)
29+
30+
31+
def _simple_table(catalog: Catalog, table_schema_simple: Schema, format_version: str) -> Table:
32+
return _create_table_with_schema(catalog, table_schema_simple, format_version)
33+
34+
35+
def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: str) -> Table:
36+
tbl_name = "default.test_schema_evolution"
37+
try:
38+
catalog.drop_table(tbl_name)
39+
except NoSuchTableError:
40+
pass
41+
return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version})
42+
43+
44+
@pytest.mark.integration
45+
@pytest.mark.parametrize(
46+
"catalog, format_version",
47+
[
48+
(pytest.lazy_fixture("session_catalog"), "1"),
49+
(pytest.lazy_fixture("session_catalog_hive"), "1"),
50+
(pytest.lazy_fixture("session_catalog"), "2"),
51+
(pytest.lazy_fixture("session_catalog_hive"), "2"),
52+
],
53+
)
54+
def test_map_column_name_to_id(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None:
55+
simple_table = _simple_table(catalog, table_schema_simple, format_version)
56+
for col_name, col_id in {"foo": 1, "bar": 2, "baz": 3}.items():
57+
assert col_id == simple_table.update_sort_order()._column_name_to_id(col_name)
58+
59+
60+
@pytest.mark.integration
61+
@pytest.mark.parametrize(
62+
"catalog, format_version",
63+
[
64+
(pytest.lazy_fixture("session_catalog"), "1"),
65+
(pytest.lazy_fixture("session_catalog_hive"), "1"),
66+
(pytest.lazy_fixture("session_catalog"), "2"),
67+
(pytest.lazy_fixture("session_catalog_hive"), "2"),
68+
],
69+
)
70+
def test_update_sort_order(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None:
71+
simple_table = _simple_table(catalog, table_schema_simple, format_version)
72+
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).desc(
73+
"bar", IdentityTransform(), NullOrder.NULLS_LAST
74+
).commit()
75+
assert simple_table.sort_order() == SortOrder(
76+
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
77+
SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.DESC, null_order=NullOrder.NULLS_LAST),
78+
order_id=1,
79+
)
80+
81+
82+
@pytest.mark.integration
83+
@pytest.mark.parametrize(
84+
"catalog, format_version",
85+
[
86+
(pytest.lazy_fixture("session_catalog"), "1"),
87+
(pytest.lazy_fixture("session_catalog_hive"), "1"),
88+
(pytest.lazy_fixture("session_catalog"), "2"),
89+
(pytest.lazy_fixture("session_catalog_hive"), "2"),
90+
],
91+
)
92+
def test_increment_existing_sort_order_id(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None:
93+
simple_table = _simple_table(catalog, table_schema_simple, format_version)
94+
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit()
95+
assert simple_table.sort_order() == SortOrder(
96+
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
97+
order_id=1,
98+
)
99+
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_LAST).desc(
100+
"bar", IdentityTransform(), NullOrder.NULLS_FIRST
101+
).commit()
102+
assert (
103+
len(simple_table.sort_orders()) == 3
104+
) # 0: empty sort order from creating tables, 1: first sort order, 2: second sort order
105+
assert simple_table.sort_order() == SortOrder(
106+
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_LAST),
107+
SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.DESC, null_order=NullOrder.NULLS_FIRST),
108+
order_id=2,
109+
)
110+
111+
112+
@pytest.mark.integration
113+
@pytest.mark.parametrize(
114+
"catalog, format_version",
115+
[
116+
(pytest.lazy_fixture("session_catalog"), "1"),
117+
(pytest.lazy_fixture("session_catalog_hive"), "1"),
118+
(pytest.lazy_fixture("session_catalog"), "2"),
119+
(pytest.lazy_fixture("session_catalog_hive"), "2"),
120+
],
121+
)
122+
def test_update_existing_sort_order(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None:
123+
simple_table = _simple_table(catalog, table_schema_simple, format_version)
124+
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit()
125+
assert simple_table.sort_order() == SortOrder(
126+
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
127+
order_id=1,
128+
)
129+
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_LAST).desc(
130+
"bar", IdentityTransform(), NullOrder.NULLS_FIRST
131+
).commit()
132+
# Go back to the first sort order
133+
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit()
134+
assert (
135+
len(simple_table.sort_orders()) == 3
136+
) # line 133 should not create a new sort order since it is the same as the first one
137+
assert simple_table.sort_order() == SortOrder(
138+
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
139+
order_id=1,
140+
)
141+
142+
143+
@pytest.mark.integration
144+
@pytest.mark.parametrize(
145+
"catalog, format_version",
146+
[
147+
(pytest.lazy_fixture("session_catalog"), "1"),
148+
(pytest.lazy_fixture("session_catalog_hive"), "1"),
149+
(pytest.lazy_fixture("session_catalog"), "2"),
150+
(pytest.lazy_fixture("session_catalog_hive"), "2"),
151+
],
152+
)
153+
def test_update_existing_sort_order_with_unsorted_sort_order(
154+
catalog: Catalog, format_version: str, table_schema_simple: Schema
155+
) -> None:
156+
simple_table = _simple_table(catalog, table_schema_simple, format_version)
157+
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit()
158+
assert simple_table.sort_order() == SortOrder(
159+
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
160+
order_id=1,
161+
)
162+
# Table should now be unsorted
163+
simple_table.update_sort_order().commit()
164+
# Go back to the first sort order
165+
assert len(simple_table.sort_orders()) == 2
166+
assert simple_table.sort_order() == SortOrder(order_id=0)

0 commit comments

Comments
 (0)