Skip to content

Commit ad07b6c

Browse files
rambleraptorkevinjqliu
authored andcommitted
Add RemovePartitionStatisticsUpdate and SetPartitionStatisticsUpdate (apache#2192)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> Closes apache#2191 # Rationale for this change apache/iceberg supports these two event types. We should do the same to match the Java implementation and allow users to alter their partition statistics files. [REST Spec Reference]([apache/iceberg@09140e5/open-api/rest-catalog-open-api.yaml#L2981-L3004](https://github.com/apache/iceberg/blob/09140e52836048b112c42c9cfe721295bd57048b/open-api/rest-catalog-open-api.yaml#L2981-L3004)) # Are these changes tested? Unit test included. # Are there any user-facing changes? - Adds `RemovePartitionStatisticsUpdate` and `SetPartitionStatisticsUpdate` actions. <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Kevin Liu <[email protected]>
1 parent b7cd1f5 commit ad07b6c

File tree

3 files changed

+122
-5
lines changed

3 files changed

+122
-5
lines changed

pyiceberg/table/statistics.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
from typing import Dict, List, Literal, Optional
17+
from typing import Dict, List, Literal, Optional, Union
1818

1919
from pydantic import Field
2020

@@ -48,7 +48,7 @@ class PartitionStatisticsFile(StatisticsCommonFields):
4848

4949

5050
def filter_statistics_by_snapshot_id(
51-
statistics: List[StatisticsFile],
51+
statistics: List[Union[StatisticsFile, PartitionStatisticsFile]],
5252
reject_snapshot_id: int,
53-
) -> List[StatisticsFile]:
53+
) -> List[Union[StatisticsFile, PartitionStatisticsFile]]:
5454
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]

pyiceberg/table/update/__init__.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@
3636
SnapshotLogEntry,
3737
)
3838
from pyiceberg.table.sorting import SortOrder
39-
from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id
39+
from pyiceberg.table.statistics import (
40+
PartitionStatisticsFile,
41+
StatisticsFile,
42+
filter_statistics_by_snapshot_id,
43+
)
4044
from pyiceberg.typedef import (
4145
IcebergBaseModel,
4246
Properties,
@@ -198,6 +202,16 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
198202
snapshot_id: int = Field(alias="snapshot-id")
199203

200204

205+
class SetPartitionStatisticsUpdate(IcebergBaseModel):
206+
action: Literal["set-partition-statistics"] = Field(default="set-partition-statistics")
207+
partition_statistics: PartitionStatisticsFile
208+
209+
210+
class RemovePartitionStatisticsUpdate(IcebergBaseModel):
211+
action: Literal["remove-partition-statistics"] = Field(default="remove-partition-statistics")
212+
snapshot_id: int = Field(alias="snapshot-id")
213+
214+
201215
TableUpdate = Annotated[
202216
Union[
203217
AssignUUIDUpdate,
@@ -217,6 +231,8 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
217231
RemovePropertiesUpdate,
218232
SetStatisticsUpdate,
219233
RemoveStatisticsUpdate,
234+
SetPartitionStatisticsUpdate,
235+
RemovePartitionStatisticsUpdate,
220236
],
221237
Field(discriminator="action"),
222238
]
@@ -582,6 +598,29 @@ def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _Ta
582598
return base_metadata.model_copy(update={"statistics": statistics})
583599

584600

601+
@_apply_table_update.register(SetPartitionStatisticsUpdate)
602+
def _(update: SetPartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
603+
partition_statistics = filter_statistics_by_snapshot_id(
604+
base_metadata.partition_statistics, update.partition_statistics.snapshot_id
605+
)
606+
context.add_update(update)
607+
608+
return base_metadata.model_copy(update={"partition_statistics": partition_statistics + [update.partition_statistics]})
609+
610+
611+
@_apply_table_update.register(RemovePartitionStatisticsUpdate)
612+
def _(
613+
update: RemovePartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext
614+
) -> TableMetadata:
615+
if not any(part_stat.snapshot_id == update.snapshot_id for part_stat in base_metadata.partition_statistics):
616+
raise ValueError(f"Partition Statistics with snapshot id {update.snapshot_id} does not exist")
617+
618+
statistics = filter_statistics_by_snapshot_id(base_metadata.partition_statistics, update.snapshot_id)
619+
context.add_update(update)
620+
621+
return base_metadata.model_copy(update={"partition_statistics": statistics})
622+
623+
585624
def update_table_metadata(
586625
base_metadata: TableMetadata,
587626
updates: Tuple[TableUpdate, ...],

tests/table/test_init.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
SortField,
6565
SortOrder,
6666
)
67-
from pyiceberg.table.statistics import BlobMetadata, StatisticsFile
67+
from pyiceberg.table.statistics import BlobMetadata, PartitionStatisticsFile, StatisticsFile
6868
from pyiceberg.table.update import (
6969
AddSnapshotUpdate,
7070
AddSortOrderUpdate,
@@ -76,11 +76,13 @@
7676
AssertLastAssignedPartitionId,
7777
AssertRefSnapshotId,
7878
AssertTableUUID,
79+
RemovePartitionStatisticsUpdate,
7980
RemovePropertiesUpdate,
8081
RemoveSnapshotRefUpdate,
8182
RemoveSnapshotsUpdate,
8283
RemoveStatisticsUpdate,
8384
SetDefaultSortOrderUpdate,
85+
SetPartitionStatisticsUpdate,
8486
SetPropertiesUpdate,
8587
SetSnapshotRefUpdate,
8688
SetStatisticsUpdate,
@@ -1359,3 +1361,79 @@ def test_remove_statistics_update(table_v2_with_statistics: Table) -> None:
13591361
table_v2_with_statistics.metadata,
13601362
(RemoveStatisticsUpdate(snapshot_id=123456789),),
13611363
)
1364+
1365+
1366+
def test_set_partition_statistics_update(table_v2_with_statistics: Table) -> None:
1367+
snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id
1368+
1369+
partition_statistics_file = PartitionStatisticsFile(
1370+
snapshot_id=snapshot_id,
1371+
statistics_path="s3://bucket/warehouse/stats.puffin",
1372+
file_size_in_bytes=124,
1373+
)
1374+
1375+
update = SetPartitionStatisticsUpdate(
1376+
partition_statistics=partition_statistics_file,
1377+
)
1378+
1379+
new_metadata = update_table_metadata(
1380+
table_v2_with_statistics.metadata,
1381+
(update,),
1382+
)
1383+
1384+
expected = """
1385+
{
1386+
"snapshot-id": 3055729675574597004,
1387+
"statistics-path": "s3://bucket/warehouse/stats.puffin",
1388+
"file-size-in-bytes": 124
1389+
}"""
1390+
1391+
assert len(new_metadata.partition_statistics) == 1
1392+
1393+
updated_statistics = [stat for stat in new_metadata.partition_statistics if stat.snapshot_id == snapshot_id]
1394+
1395+
assert len(updated_statistics) == 1
1396+
assert json.loads(updated_statistics[0].model_dump_json()) == json.loads(expected)
1397+
1398+
1399+
def test_remove_partition_statistics_update(table_v2_with_statistics: Table) -> None:
1400+
# Add partition statistics file.
1401+
snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id
1402+
1403+
partition_statistics_file = PartitionStatisticsFile(
1404+
snapshot_id=snapshot_id,
1405+
statistics_path="s3://bucket/warehouse/stats.puffin",
1406+
file_size_in_bytes=124,
1407+
)
1408+
1409+
update = SetPartitionStatisticsUpdate(
1410+
partition_statistics=partition_statistics_file,
1411+
)
1412+
1413+
new_metadata = update_table_metadata(
1414+
table_v2_with_statistics.metadata,
1415+
(update,),
1416+
)
1417+
assert len(new_metadata.partition_statistics) == 1
1418+
1419+
# Remove the same partition statistics file.
1420+
remove_update = RemovePartitionStatisticsUpdate(snapshot_id=snapshot_id)
1421+
1422+
remove_metadata = update_table_metadata(
1423+
new_metadata,
1424+
(remove_update,),
1425+
)
1426+
1427+
assert len(remove_metadata.partition_statistics) == 0
1428+
1429+
1430+
def test_remove_partition_statistics_update_with_invalid_snapshot_id(table_v2_with_statistics: Table) -> None:
1431+
# Remove the same partition statistics file.
1432+
with pytest.raises(
1433+
ValueError,
1434+
match="Partition Statistics with snapshot id 123456789 does not exist",
1435+
):
1436+
update_table_metadata(
1437+
table_v2_with_statistics.metadata,
1438+
(RemovePartitionStatisticsUpdate(snapshot_id=123456789),),
1439+
)

0 commit comments

Comments
 (0)