Skip to content

Commit 2a8b260

Browse files
authored
Add pool_pre_ping param to SQLCatalog (#886)
* Add pool_pre_ping param to SQLCatalog and fix echo parsing logic * Remove print statements * Add unit tests for catalog params * PR review changes * Slight code optimization * Adopting code simplification from PR review * Convert parameterized tests to for loop
1 parent a8d3f17 commit 2a8b260

File tree

3 files changed

+61
-4
lines changed

3 files changed

+61
-4
lines changed

mkdocs/docs/configuration.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,12 @@ catalog:
222222
uri: sqlite:////tmp/pyiceberg.db
223223
```
224224

225+
| Key | Example | Default | Description |
226+
| ------------- | ------------------------------------------------------------ | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
227+
| uri | postgresql+psycopg2://username:password@localhost/mydatabase | | SQLAlchemy backend URL for the catalog database (see [documentation for URL format](https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls)) |
228+
| echo | true | false | SQLAlchemy engine [echo param](https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine.params.echo) to log all statements to the default log handler |
229+
| pool_pre_ping | true | false | SQLAlchemy engine [pool_pre_ping param](https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine.params.pool_pre_ping) to test connections for liveness upon each checkout |
230+
225231
## Hive Catalog
226232

227233
```yaml

pyiceberg/catalog/sql.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,14 @@
6464
from pyiceberg.table.metadata import new_table_metadata
6565
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6666
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
67+
from pyiceberg.types import strtobool
6768

6869
if TYPE_CHECKING:
6970
import pyarrow as pa
7071

72+
DEFAULT_ECHO_VALUE = "false"
73+
DEFAULT_POOL_PRE_PING_VALUE = "false"
74+
7175

7276
class SqlCatalogBaseTable(MappedAsDataclass, DeclarativeBase):
7377
pass
@@ -110,8 +114,12 @@ def __init__(self, name: str, **properties: str):
110114

111115
if not (uri_prop := self.properties.get("uri")):
112116
raise NoSuchPropertyException("SQL connection URI is required")
113-
echo = bool(self.properties.get("echo", False))
114-
self.engine = create_engine(uri_prop, echo=echo)
117+
118+
echo_str = str(self.properties.get("echo", DEFAULT_ECHO_VALUE)).lower()
119+
echo = strtobool(echo_str) if echo_str != "debug" else "debug"
120+
pool_pre_ping = strtobool(self.properties.get("pool_pre_ping", DEFAULT_POOL_PRE_PING_VALUE))
121+
122+
self.engine = create_engine(uri_prop, echo=echo, pool_pre_ping=pool_pre_ping)
115123

116124
self._ensure_tables_exist()
117125

tests/catalog/test_sql.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from pyiceberg.catalog import (
2929
Catalog,
3030
)
31-
from pyiceberg.catalog.sql import SqlCatalog
31+
from pyiceberg.catalog.sql import DEFAULT_ECHO_VALUE, DEFAULT_POOL_PRE_PING_VALUE, SqlCatalog
3232
from pyiceberg.exceptions import (
3333
CommitFailedException,
3434
NamespaceAlreadyExistsError,
@@ -52,7 +52,7 @@
5252
)
5353
from pyiceberg.transforms import IdentityTransform
5454
from pyiceberg.typedef import Identifier
55-
from pyiceberg.types import IntegerType
55+
from pyiceberg.types import IntegerType, strtobool
5656

5757

5858
@pytest.fixture(scope="module")
@@ -168,6 +168,49 @@ def test_creation_with_unsupported_uri(catalog_name: str) -> None:
168168
SqlCatalog(catalog_name, uri="unsupported:xxx")
169169

170170

171+
def test_creation_with_echo_parameter(catalog_name: str, warehouse: Path) -> None:
172+
# echo_param, expected_echo_value
173+
test_cases = [(None, strtobool(DEFAULT_ECHO_VALUE)), ("debug", "debug"), ("true", True), ("false", False)]
174+
175+
for echo_param, expected_echo_value in test_cases:
176+
props = {
177+
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
178+
"warehouse": f"file://{warehouse}",
179+
}
180+
# None is for default value
181+
if echo_param is not None:
182+
props["echo"] = echo_param
183+
catalog = SqlCatalog(catalog_name, **props)
184+
assert catalog.engine._echo == expected_echo_value, (
185+
f"Assertion failed: expected echo value {expected_echo_value}, "
186+
f"but got {catalog.engine._echo}. For echo_param={echo_param}"
187+
)
188+
189+
190+
def test_creation_with_pool_pre_ping_parameter(catalog_name: str, warehouse: Path) -> None:
191+
# pool_pre_ping_param, expected_pool_pre_ping_value
192+
test_cases = [
193+
(None, strtobool(DEFAULT_POOL_PRE_PING_VALUE)),
194+
("true", True),
195+
("false", False),
196+
]
197+
198+
for pool_pre_ping_param, expected_pool_pre_ping_value in test_cases:
199+
props = {
200+
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
201+
"warehouse": f"file://{warehouse}",
202+
}
203+
# None is for default value
204+
if pool_pre_ping_param is not None:
205+
props["pool_pre_ping"] = pool_pre_ping_param
206+
207+
catalog = SqlCatalog(catalog_name, **props)
208+
assert catalog.engine.pool._pre_ping == expected_pool_pre_ping_value, (
209+
f"Assertion failed: expected pool_pre_ping value {expected_pool_pre_ping_value}, "
210+
f"but got {catalog.engine.pool._pre_ping}. For pool_pre_ping_param={pool_pre_ping_param}"
211+
)
212+
213+
171214
@pytest.mark.parametrize(
172215
"catalog",
173216
[

0 commit comments

Comments
 (0)