Skip to content

Commit 1c50f53

Browse files
authored
Add flag to allow disabling creation of catalog tables (#1155)
* Remove unnecessary _ensure_tables_exist method as this is already the default behavior of Metadata.create_all() * Added tests for creating Catalog tables when no, some or all tables already exist * add init_catalog_tables flag to SQLCatalog * add _ensure_tables_exists back until postgres integration tests completed * fixed tests, added flag to docs
1 parent 90ff39e commit 1c50f53

File tree

3 files changed

+91
-4
lines changed

3 files changed

+91
-4
lines changed

mkdocs/docs/configuration.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ catalog:
231231

232232
### SQL Catalog
233233

234-
The SQL catalog requires a database for its backend. PyIceberg supports PostgreSQL and SQLite through psycopg2. The database connection has to be configured using the `uri` property. See SQLAlchemy's [documentation for URL format](https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls):
234+
The SQL catalog requires a database for its backend. PyIceberg supports PostgreSQL and SQLite through psycopg2. The database connection has to be configured using the `uri` property. The init_catalog_tables is optional and defaults to True. If it is set to False, the catalog tables will not be created when the SQLCatalog is initialized. See SQLAlchemy's [documentation for URL format](https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls):
235235

236236
For PostgreSQL:
237237

@@ -240,6 +240,7 @@ catalog:
240240
default:
241241
type: sql
242242
uri: postgresql+psycopg2://username:password@localhost/mydatabase
243+
init_catalog_tables: false
243244
```
244245

245246
In the case of SQLite:
@@ -256,6 +257,7 @@ catalog:
256257
default:
257258
type: sql
258259
uri: sqlite:////tmp/pyiceberg.db
260+
init_catalog_tables: false
259261
```
260262

261263
| Key | Example | Default | Description |

pyiceberg/catalog/sql.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676

7777
DEFAULT_ECHO_VALUE = "false"
7878
DEFAULT_POOL_PRE_PING_VALUE = "false"
79+
DEFAULT_INIT_CATALOG_TABLES = "true"
7980

8081

8182
class SqlCatalogBaseTable(MappedAsDataclass, DeclarativeBase):
@@ -123,10 +124,12 @@ def __init__(self, name: str, **properties: str):
123124
echo_str = str(self.properties.get("echo", DEFAULT_ECHO_VALUE)).lower()
124125
echo = strtobool(echo_str) if echo_str != "debug" else "debug"
125126
pool_pre_ping = strtobool(self.properties.get("pool_pre_ping", DEFAULT_POOL_PRE_PING_VALUE))
127+
init_catalog_tables = strtobool(self.properties.get("init_catalog_tables", DEFAULT_INIT_CATALOG_TABLES))
126128

127129
self.engine = create_engine(uri_prop, echo=echo, pool_pre_ping=pool_pre_ping)
128130

129-
self._ensure_tables_exist()
131+
if init_catalog_tables:
132+
self._ensure_tables_exist()
130133

131134
def _ensure_tables_exist(self) -> None:
132135
with Session(self.engine) as session:

tests/catalog/test_sql.py

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,26 @@
1717

1818
import os
1919
from pathlib import Path
20-
from typing import Any, Generator, List
20+
from typing import Any, Generator, List, cast
2121

2222
import pyarrow as pa
2323
import pytest
2424
from pydantic_core import ValidationError
2525
from pytest_lazyfixture import lazy_fixture
26+
from sqlalchemy import Engine, create_engine, inspect
2627
from sqlalchemy.exc import ArgumentError, IntegrityError
2728

2829
from pyiceberg.catalog import (
2930
Catalog,
3031
load_catalog,
3132
)
32-
from pyiceberg.catalog.sql import DEFAULT_ECHO_VALUE, DEFAULT_POOL_PRE_PING_VALUE, SqlCatalog
33+
from pyiceberg.catalog.sql import (
34+
DEFAULT_ECHO_VALUE,
35+
DEFAULT_POOL_PRE_PING_VALUE,
36+
IcebergTables,
37+
SqlCatalog,
38+
SqlCatalogBaseTable,
39+
)
3340
from pyiceberg.exceptions import (
3441
CommitFailedException,
3542
NamespaceAlreadyExistsError,
@@ -54,6 +61,8 @@
5461
from pyiceberg.typedef import Identifier
5562
from pyiceberg.types import IntegerType, strtobool
5663

64+
CATALOG_TABLES = [c.__tablename__ for c in SqlCatalogBaseTable.__subclasses__()]
65+
5766

5867
@pytest.fixture(scope="module")
5968
def catalog_name() -> str:
@@ -132,6 +141,16 @@ def catalog_sqlite(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog,
132141
catalog.destroy_tables()
133142

134143

144+
@pytest.fixture(scope="module")
145+
def catalog_uri(warehouse: Path) -> str:
146+
return f"sqlite:////{warehouse}/sql-catalog.db"
147+
148+
149+
@pytest.fixture(scope="module")
150+
def alchemy_engine(catalog_uri: str) -> Engine:
151+
return create_engine(catalog_uri)
152+
153+
135154
@pytest.fixture(scope="module")
136155
def catalog_sqlite_without_rowcount(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]:
137156
props = {
@@ -225,6 +244,69 @@ def test_creation_from_impl(catalog_name: str, warehouse: Path) -> None:
225244
)
226245

227246

247+
def confirm_no_tables_exist(alchemy_engine: Engine) -> None:
248+
inspector = inspect(alchemy_engine)
249+
for c in SqlCatalogBaseTable.__subclasses__():
250+
if inspector.has_table(c.__tablename__):
251+
c.__table__.drop(alchemy_engine)
252+
253+
any_table_exists = any(t for t in inspector.get_table_names() if t in CATALOG_TABLES)
254+
if any_table_exists:
255+
pytest.raises(TableAlreadyExistsError, "Tables exist, but should not have been created yet")
256+
257+
258+
def confirm_all_tables_exist(catalog: SqlCatalog) -> None:
259+
all_tables_exists = True
260+
for t in CATALOG_TABLES:
261+
if t not in inspect(catalog.engine).get_table_names():
262+
all_tables_exists = False
263+
264+
assert isinstance(catalog, SqlCatalog), "Catalog should be a SQLCatalog"
265+
assert all_tables_exists, "Tables should have been created"
266+
267+
268+
def load_catalog_for_catalog_table_creation(catalog_name: str, catalog_uri: str) -> SqlCatalog:
269+
catalog = load_catalog(
270+
catalog_name,
271+
type="sql",
272+
uri=catalog_uri,
273+
init_catalog_tables="true",
274+
)
275+
276+
return cast(SqlCatalog, catalog)
277+
278+
279+
def test_creation_when_no_tables_exist(alchemy_engine: Engine, catalog_name: str, catalog_uri: str) -> None:
280+
confirm_no_tables_exist(alchemy_engine)
281+
catalog = load_catalog_for_catalog_table_creation(catalog_name=catalog_name, catalog_uri=catalog_uri)
282+
confirm_all_tables_exist(catalog)
283+
284+
285+
def test_creation_when_one_tables_exists(alchemy_engine: Engine, catalog_name: str, catalog_uri: str) -> None:
286+
confirm_no_tables_exist(alchemy_engine)
287+
288+
# Create one table
289+
inspector = inspect(alchemy_engine)
290+
IcebergTables.__table__.create(bind=alchemy_engine)
291+
assert IcebergTables.__tablename__ in [t for t in inspector.get_table_names() if t in CATALOG_TABLES]
292+
293+
catalog = load_catalog_for_catalog_table_creation(catalog_name=catalog_name, catalog_uri=catalog_uri)
294+
confirm_all_tables_exist(catalog)
295+
296+
297+
def test_creation_when_all_tables_exists(alchemy_engine: Engine, catalog_name: str, catalog_uri: str) -> None:
298+
confirm_no_tables_exist(alchemy_engine)
299+
300+
# Create all tables
301+
inspector = inspect(alchemy_engine)
302+
SqlCatalogBaseTable.metadata.create_all(bind=alchemy_engine)
303+
for c in CATALOG_TABLES:
304+
assert c in [t for t in inspector.get_table_names() if t in CATALOG_TABLES]
305+
306+
catalog = load_catalog_for_catalog_table_creation(catalog_name=catalog_name, catalog_uri=catalog_uri)
307+
confirm_all_tables_exist(catalog)
308+
309+
228310
@pytest.mark.parametrize(
229311
"catalog",
230312
[

0 commit comments

Comments
 (0)