Skip to content

Commit deb1b89

Browse files
committed
Add tests for Schema Evolution on Catalogs
1 parent f4e23e5 commit deb1b89

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

tests/integration/test_catalog.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from pyiceberg.catalog.rest import RestCatalog
2727
from pyiceberg.catalog.sql import SqlCatalog
2828
from pyiceberg.exceptions import (
29+
CommitFailedException,
2930
NamespaceAlreadyExistsError,
3031
NamespaceNotEmptyError,
3132
NoSuchNamespaceError,
@@ -34,6 +35,7 @@
3435
)
3536
from pyiceberg.io import WAREHOUSE
3637
from pyiceberg.schema import Schema
38+
from pyiceberg.types import LongType, StringType
3739
from tests.conftest import clean_up
3840

3941

@@ -343,3 +345,67 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str)
343345
else:
344346
assert k in update_report.removed
345347
assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]
348+
349+
350+
@pytest.mark.integration
351+
@pytest.mark.parametrize("test_catalog", CATALOGS)
352+
def test_update_table_schema(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
353+
identifier = (database_name, table_name)
354+
test_catalog.create_namespace(database_name)
355+
table = test_catalog.create_table(identifier, table_schema_nested)
356+
357+
update = table.update_schema().add_column("new_col", LongType())
358+
update.commit()
359+
360+
loaded = test_catalog.load_table(identifier)
361+
362+
assert loaded.schema().find_field("new_col", case_sensitive=False)
363+
364+
365+
@pytest.mark.integration
366+
@pytest.mark.parametrize("test_catalog", CATALOGS)
367+
def test_update_table_schema_conflict(
368+
test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str
369+
) -> None:
370+
identifier = (database_name, table_name)
371+
test_catalog.create_namespace(database_name)
372+
table = test_catalog.create_table(identifier, table_schema_nested)
373+
374+
update = table.update_schema().add_column("new_col", LongType())
375+
376+
# update the schema concurrently so that the original update fails
377+
concurrent_table = test_catalog.load_table(identifier)
378+
# The test schema is assumed to have a `bar` column that can be deleted.
379+
concurrent_update = concurrent_table.update_schema(allow_incompatible_changes=True)
380+
concurrent_update.set_identifier_fields("foo")
381+
concurrent_update.update_column("foo", required=True)
382+
concurrent_update.delete_column("bar")
383+
concurrent_update.commit()
384+
385+
# attempt to commit the original update
386+
with pytest.raises(CommitFailedException, match="Requirement failed: current schema"):
387+
update.commit()
388+
389+
loaded = test_catalog.load_table(identifier)
390+
assert loaded.schema() == concurrent_table.schema()
391+
392+
393+
@pytest.mark.integration
394+
@pytest.mark.parametrize("test_catalog", CATALOGS)
395+
def test_update_table_schema_then_revert(
396+
test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str
397+
) -> None:
398+
identifier = (database_name, table_name)
399+
test_catalog.create_namespace(database_name)
400+
table = test_catalog.create_table(identifier, table_schema_nested)
401+
original_schema_struct = table.schema().as_struct()
402+
403+
table.update_schema().add_column("col1", StringType()).add_column("col2", StringType()).add_column(
404+
"col3", StringType()
405+
).commit()
406+
407+
table_with_cols = test_catalog.load_table(identifier)
408+
table_with_cols.update_schema().delete_column("col1").delete_column("col2").delete_column("col3").commit()
409+
410+
reverted_table = test_catalog.load_table(identifier)
411+
assert reverted_table.schema().as_struct() == original_schema_struct

0 commit comments

Comments
 (0)