From 44396f23fac0106ae5732749aa0baa5759d0fef7 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 16 Sep 2025 13:48:39 -0700 Subject: [PATCH 1/7] Add tests for Schema Evolution on Catalogs --- tests/integration/test_catalog.py | 66 +++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 587c13b35b..253d475fe5 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -26,6 +26,7 @@ from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import ( + CommitFailedException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchNamespaceError, @@ -34,6 +35,7 @@ ) from pyiceberg.io import WAREHOUSE from pyiceberg.schema import Schema +from pyiceberg.types import LongType, StringType from tests.conftest import clean_up @@ -343,3 +345,67 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str) else: assert k in update_report.removed assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"] + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_update_table_schema(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None: + identifier = (database_name, table_name) + test_catalog.create_namespace(database_name) + table = test_catalog.create_table(identifier, table_schema_nested) + + update = table.update_schema().add_column("new_col", LongType()) + update.commit() + + loaded = test_catalog.load_table(identifier) + + assert loaded.schema().find_field("new_col", case_sensitive=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_update_table_schema_conflict( + test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str +) -> None: + identifier = (database_name, table_name) + test_catalog.create_namespace(database_name) + table = test_catalog.create_table(identifier, table_schema_nested) + + update = table.update_schema().add_column("new_col", LongType()) + + # update the schema concurrently so that the original update fails + concurrent_table = test_catalog.load_table(identifier) + # The test schema is assumed to have a `bar` column that can be deleted. + concurrent_update = concurrent_table.update_schema(allow_incompatible_changes=True) + concurrent_update.set_identifier_fields("foo") + concurrent_update.update_column("foo", required=True) + concurrent_update.delete_column("bar") + concurrent_update.commit() + + # attempt to commit the original update + with pytest.raises(CommitFailedException, match="Requirement failed: current schema"): + update.commit() + + loaded = test_catalog.load_table(identifier) + assert loaded.schema() == concurrent_table.schema() + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_update_table_schema_then_revert( + test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str +) -> None: + identifier = (database_name, table_name) + test_catalog.create_namespace(database_name) + table = test_catalog.create_table(identifier, table_schema_nested) + original_schema_struct = table.schema().as_struct() + + table.update_schema().add_column("col1", StringType()).add_column("col2", StringType()).add_column( + "col3", StringType() + ).commit() + + table_with_cols = test_catalog.load_table(identifier) + table_with_cols.update_schema().delete_column("col1").delete_column("col2").delete_column("col3").commit() + + reverted_table = test_catalog.load_table(identifier) + assert reverted_table.schema().as_struct() == original_schema_struct From 839e774af11f2d29e88c9c51186f62554ff24c8e Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 22 Sep 2025 15:06:35 -0700 Subject: [PATCH 2/7] PR changes --- tests/integration/test_catalog.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 253d475fe5..d4b4de130e 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -376,11 +376,10 @@ def test_update_table_schema_conflict( # update the schema concurrently so that the original update fails concurrent_table = test_catalog.load_table(identifier) # The test schema is assumed to have a `bar` column that can be deleted. - concurrent_update = concurrent_table.update_schema(allow_incompatible_changes=True) - concurrent_update.set_identifier_fields("foo") - concurrent_update.update_column("foo", required=True) - concurrent_update.delete_column("bar") - concurrent_update.commit() + with concurrent_table.update_schema(allow_incompatible_changes=True) as concurrent_update: + concurrent_update.set_identifier_fields("foo") + concurrent_update.update_column("foo", required=True) + concurrent_update.delete_column("bar") # attempt to commit the original update with pytest.raises(CommitFailedException, match="Requirement failed: current schema"): From 5193429259b034911e5395f3ff5f586cca16b44e Mon Sep 17 00:00:00 2001 From: Alex Stephen <1325798+rambleraptor@users.noreply.github.com> Date: Tue, 7 Oct 2025 11:43:40 -0700 Subject: [PATCH 3/7] Update tests/integration/test_catalog.py Co-authored-by: Fokko Driesprong --- tests/integration/test_catalog.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index d4b4de130e..091ed4a9d4 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -354,8 +354,8 @@ def test_update_table_schema(test_catalog: Catalog, table_schema_nested: Schema, test_catalog.create_namespace(database_name) table = test_catalog.create_table(identifier, table_schema_nested) - update = table.update_schema().add_column("new_col", LongType()) - update.commit() + with table.update_schema() as update: + update.add_column("new_col", LongType()) loaded = test_catalog.load_table(identifier) From bfa841889f024058e238a1f6fca52787d5c7e0e6 Mon Sep 17 00:00:00 2001 From: Alex Stephen <1325798+rambleraptor@users.noreply.github.com> Date: Tue, 7 Oct 2025 11:43:48 -0700 Subject: [PATCH 4/7] Update tests/integration/test_catalog.py Co-authored-by: Fokko Driesprong --- tests/integration/test_catalog.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 091ed4a9d4..a29708fb8a 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -357,9 +357,7 @@ def test_update_table_schema(test_catalog: Catalog, table_schema_nested: Schema, with table.update_schema() as update: update.add_column("new_col", LongType()) - loaded = test_catalog.load_table(identifier) - - assert loaded.schema().find_field("new_col", case_sensitive=False) + assert table.schema().find_field("new_col", case_sensitive=False) @pytest.mark.integration From 65f0845698932b283f17640fae8b6143619f741c Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 7 Oct 2025 13:43:10 -0700 Subject: [PATCH 5/7] test change --- tests/integration/test_catalog.py | 68 +++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index a29708fb8a..a4b0a82583 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -35,7 +35,7 @@ ) from pyiceberg.io import WAREHOUSE from pyiceberg.schema import Schema -from pyiceberg.types import LongType, StringType +from pyiceberg.types import BooleanType, FloatType, IntegerType, ListType, LongType, MapType, NestedField, StringType, StructType from tests.conftest import clean_up @@ -373,23 +373,75 @@ def test_update_table_schema_conflict( # update the schema concurrently so that the original update fails concurrent_table = test_catalog.load_table(identifier) - # The test schema is assumed to have a `bar` column that can be deleted. + with concurrent_table.update_schema(allow_incompatible_changes=True) as concurrent_update: - concurrent_update.set_identifier_fields("foo") - concurrent_update.update_column("foo", required=True) - concurrent_update.delete_column("bar") + concurrent_update.add_column("new_col", StringType()) # attempt to commit the original update with pytest.raises(CommitFailedException, match="Requirement failed: current schema"): update.commit() - loaded = test_catalog.load_table(identifier) - assert loaded.schema() == concurrent_table.schema() + assert concurrent_table.schema() == Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + NestedField( + field_id=4, + name="qux", + field_type=ListType(type="list", element_id=8, element_type=StringType(), element_required=True), + required=True, + ), + NestedField( + field_id=5, + name="quux", + field_type=MapType( + type="map", + key_id=9, + key_type=StringType(), + value_id=10, + value_type=MapType( + type="map", key_id=11, key_type=StringType(), value_id=12, value_type=IntegerType(), value_required=True + ), + value_required=True, + ), + required=True, + ), + NestedField( + field_id=6, + name="location", + field_type=ListType( + type="list", + element_id=13, + element_type=StructType( + fields=( + NestedField(field_id=14, name="latitude", field_type=FloatType(), required=False), + NestedField(field_id=15, name="longitude", field_type=FloatType(), required=False), + ) + ), + element_required=True, + ), + required=True, + ), + NestedField( + field_id=7, + name="person", + field_type=StructType( + fields=( + NestedField(field_id=16, name="name", field_type=StringType(), required=False), + NestedField(field_id=17, name="age", field_type=IntegerType(), required=True), + ) + ), + required=False, + ), + NestedField(field_id=18, name="new_col", field_type=StringType(), required=False), + schema_id=1, + identifier_field_ids=[2], + ) @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) -def test_update_table_schema_then_revert( +def test_update_table_schema_then_change_back( test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str ) -> None: identifier = (database_name, table_name) From 87a816c441ddd560b0cefc12be9dce32aa4d9364 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 7 Oct 2025 13:46:01 -0700 Subject: [PATCH 6/7] formatting change --- tests/integration/test_catalog.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index a4b0a82583..371e158ceb 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -449,9 +449,11 @@ def test_update_table_schema_then_change_back( table = test_catalog.create_table(identifier, table_schema_nested) original_schema_struct = table.schema().as_struct() - table.update_schema().add_column("col1", StringType()).add_column("col2", StringType()).add_column( - "col3", StringType() - ).commit() + table.update_schema() \ + .add_column("col1", StringType()) \ + .add_column("col2", StringType()) \ + .add_column("col3", StringType()) \ + .commit() table_with_cols = test_catalog.load_table(identifier) table_with_cols.update_schema().delete_column("col1").delete_column("col2").delete_column("col3").commit() From bb598e584c13ada328203b2d42c62bb965f8f05b Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 7 Oct 2025 13:46:57 -0700 Subject: [PATCH 7/7] formatting --- tests/integration/test_catalog.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 371e158ceb..a4b0a82583 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -449,11 +449,9 @@ def test_update_table_schema_then_change_back( table = test_catalog.create_table(identifier, table_schema_nested) original_schema_struct = table.schema().as_struct() - table.update_schema() \ - .add_column("col1", StringType()) \ - .add_column("col2", StringType()) \ - .add_column("col3", StringType()) \ - .commit() + table.update_schema().add_column("col1", StringType()).add_column("col2", StringType()).add_column( + "col3", StringType() + ).commit() table_with_cols = test_catalog.load_table(identifier) table_with_cols.update_schema().delete_column("col1").delete_column("col2").delete_column("col3").commit()