Skip to content

Commit 3eecdad

Browse files
authored
Add close option to Catalog (#2390)
1 parent 3c7c279 commit 3eecdad

File tree

4 files changed

+115
-2
lines changed

4 files changed

+115
-2
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from enum import Enum
2727
from typing import (
2828
TYPE_CHECKING,
29+
Any,
2930
Callable,
3031
Dict,
3132
List,
@@ -793,6 +794,33 @@ def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadat
793794
removed_previous_metadata_files.difference_update(current_metadata_files)
794795
delete_files(io, removed_previous_metadata_files, METADATA)
795796

797+
def close(self) -> None: # noqa: B027
798+
"""Close the catalog and release any resources.
799+
800+
This method should be called when the catalog is no longer needed to ensure
801+
proper cleanup of resources like database connections, file handles, etc.
802+
803+
Default implementation does nothing. Override in subclasses that need cleanup.
804+
"""
805+
806+
def __enter__(self) -> "Catalog":
807+
"""Enter the context manager.
808+
809+
Returns:
810+
Catalog: The catalog instance.
811+
"""
812+
return self
813+
814+
def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any]) -> None:
815+
"""Exit the context manager and close the catalog.
816+
817+
Args:
818+
exc_type: Exception type if an exception occurred.
819+
exc_val: Exception value if an exception occurred.
820+
exc_tb: Exception traceback if an exception occurred.
821+
"""
822+
self.close()
823+
796824
def __repr__(self) -> str:
797825
"""Return the string representation of the Catalog class."""
798826
return f"{self.name} ({self.__class__})"

pyiceberg/catalog/sql.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,3 +733,14 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool:
733733

734734
def drop_view(self, identifier: Union[str, Identifier]) -> None:
735735
raise NotImplementedError
736+
737+
def close(self) -> None:
738+
"""Close the catalog and release database connections.
739+
740+
This method closes the SQLAlchemy engine and disposes of all connection pools.
741+
This ensures that any cached connections are properly closed, which is especially
742+
important for blobfuse scenarios where file handles need to be closed for
743+
data to be flushed to persistent storage.
744+
"""
745+
if hasattr(self, "engine"):
746+
self.engine.dispose()

tests/catalog/test_base.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
)
4949
from pyiceberg.transforms import IdentityTransform
5050
from pyiceberg.typedef import EMPTY_DICT, Properties
51-
from pyiceberg.types import IntegerType, LongType, NestedField
51+
from pyiceberg.types import IntegerType, LongType, NestedField, StringType
5252

5353

5454
@pytest.fixture
@@ -631,3 +631,24 @@ def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) ->
631631
table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: new_metadata_path}).commit_transaction()
632632

633633
assert table.location_provider().new_metadata_location("metadata.json") == f"{new_metadata_path}/metadata.json"
634+
635+
636+
class TestCatalogClose:
637+
"""Test catalog close functionality."""
638+
639+
def test_in_memory_catalog_close(self, catalog: InMemoryCatalog) -> None:
640+
"""Test that InMemoryCatalog close method works."""
641+
# Should not raise any exception
642+
catalog.close()
643+
644+
def test_in_memory_catalog_context_manager(self, catalog: InMemoryCatalog) -> None:
645+
"""Test that InMemoryCatalog works as a context manager."""
646+
with InMemoryCatalog("test") as cat:
647+
assert cat.name == "test"
648+
# Create a namespace and table to test functionality
649+
cat.create_namespace("test_db")
650+
schema = Schema(NestedField(1, "name", StringType(), required=True))
651+
cat.create_table(("test_db", "test_table"), schema)
652+
653+
# InMemoryCatalog inherits close from SqlCatalog, so engine should be disposed
654+
assert hasattr(cat, "engine")

tests/catalog/test_sql.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
)
6161
from pyiceberg.transforms import IdentityTransform
6262
from pyiceberg.typedef import Identifier
63-
from pyiceberg.types import IntegerType, strtobool
63+
from pyiceberg.types import IntegerType, NestedField, StringType, strtobool
6464

6565
CATALOG_TABLES = [c.__tablename__ for c in SqlCatalogBaseTable.__subclasses__()]
6666

@@ -1704,3 +1704,56 @@ def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Sche
17041704
assert not os.path.exists(original_metadata_location[len("file://") :])
17051705
assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") :])
17061706
assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])
1707+
1708+
1709+
class TestSqlCatalogClose:
1710+
"""Test SqlCatalog close functionality."""
1711+
1712+
def test_sql_catalog_close(self, catalog_sqlite: SqlCatalog) -> None:
1713+
"""Test that SqlCatalog close method properly disposes the engine."""
1714+
# Verify engine exists
1715+
assert hasattr(catalog_sqlite, "engine")
1716+
1717+
# Close the catalog
1718+
catalog_sqlite.close()
1719+
1720+
# Verify engine is disposed by checking that the engine still exists
1721+
assert hasattr(catalog_sqlite, "engine")
1722+
1723+
def test_sql_catalog_context_manager(self, warehouse: Path) -> None:
1724+
"""Test that SqlCatalog works as a context manager."""
1725+
with SqlCatalog("test", uri="sqlite:///:memory:", warehouse=str(warehouse)) as catalog:
1726+
# Verify engine exists
1727+
assert hasattr(catalog, "engine")
1728+
1729+
# Create a namespace and table to test functionality
1730+
catalog.create_namespace("test_db")
1731+
schema = Schema(NestedField(1, "name", StringType(), required=True))
1732+
catalog.create_table(("test_db", "test_table"), schema)
1733+
1734+
# Verify engine is disposed after exiting context
1735+
assert hasattr(catalog, "engine")
1736+
1737+
def test_sql_catalog_context_manager_with_exception(self) -> None:
1738+
"""Test that SqlCatalog context manager properly closes even with exceptions."""
1739+
catalog = None
1740+
try:
1741+
with SqlCatalog("test", uri="sqlite:///:memory:") as cat:
1742+
catalog = cat
1743+
# Verify engine exists
1744+
assert hasattr(catalog, "engine")
1745+
raise ValueError("Test exception")
1746+
except ValueError:
1747+
pass
1748+
1749+
# Verify engine is disposed even after exception
1750+
assert catalog is not None
1751+
assert hasattr(catalog, "engine")
1752+
1753+
def test_sql_catalog_multiple_close_calls(self, catalog_sqlite: SqlCatalog) -> None:
1754+
"""Test that multiple close calls on SqlCatalog are safe."""
1755+
# First close
1756+
catalog_sqlite.close()
1757+
1758+
# Second close should not raise an exception
1759+
catalog_sqlite.close()

0 commit comments

Comments
 (0)