Skip to content

Commit 055938d

Browse files
author
Jack Ye
authored
Support loading custom catalog impl (#947)
* Support loading custom catalog impl * add more tests * fix lint * fix typo * fix typo 2
1 parent 9b6503d commit 055938d

File tree

4 files changed

+88
-6
lines changed

4 files changed

+88
-6
lines changed

mkdocs/docs/configuration.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,13 @@ For the FileIO there are several configuration options available:
139139

140140
## Catalogs
141141

142-
PyIceberg currently has native support for REST, SQL, Hive, Glue and DynamoDB.
142+
PyIceberg currently has native catalog type support for REST, SQL, Hive, Glue and DynamoDB.
143+
Alternatively, you can also directly set the catalog implementation:
144+
145+
| Key | Example | Description |
146+
| --------------- | ---------------------------- | ------------------------------------------------------------------------------------------------ |
147+
| type | rest | Type of catalog, one of `rest`, `sql`, `hive`, `glue`, `dymamodb`. Default to `rest` |
148+
| py-catalog-impl | mypackage.mymodule.MyCatalog | Sets the catalog explicitly to an implementation, and will fail explicitly if it can't be loaded |
143149

144150
There are three ways to pass in configuration:
145151

@@ -379,6 +385,18 @@ catalog:
379385

380386
<!-- prettier-ignore-end -->
381387

388+
### Custom Catalog Implementations
389+
390+
If you want to load any custom catalog implementation, you can set catalog configurations like the following:
391+
392+
```yaml
393+
catalog:
394+
default:
395+
py-catalog-impl: mypackage.mymodule.MyCatalog
396+
custom-key1: value1
397+
custom-key2: value2
398+
```
399+
382400
## Unified AWS Credentials
383401

384402
You can explicitly set the AWS credentials for both Glue/DynamoDB Catalog and S3 FileIO by configuring `client.*` properties. For example:

pyiceberg/catalog/__init__.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from __future__ import annotations
1919

20+
import importlib
2021
import logging
2122
import re
2223
import uuid
@@ -77,6 +78,7 @@
7778

7879
TOKEN = "token"
7980
TYPE = "type"
81+
PY_CATALOG_IMPL = "py-catalog-impl"
8082
ICEBERG = "iceberg"
8183
TABLE_TYPE = "table_type"
8284
WAREHOUSE_LOCATION = "warehouse"
@@ -233,6 +235,19 @@ def load_catalog(name: Optional[str] = None, **properties: Optional[str]) -> Cat
233235
catalog_type: Optional[CatalogType]
234236
provided_catalog_type = conf.get(TYPE)
235237

238+
if catalog_impl := properties.get(PY_CATALOG_IMPL):
239+
if provided_catalog_type:
240+
raise ValueError(
241+
"Must not set both catalog type and py-catalog-impl configurations, "
242+
f"but found type {provided_catalog_type} and py-catalog-impl {catalog_impl}"
243+
)
244+
245+
if catalog := _import_catalog(name, catalog_impl, properties):
246+
logger.info("Loaded Catalog: %s", catalog_impl)
247+
return catalog
248+
else:
249+
raise ValueError(f"Could not initialize Catalog: {catalog_impl}")
250+
236251
catalog_type = None
237252
if provided_catalog_type and isinstance(provided_catalog_type, str):
238253
catalog_type = CatalogType[provided_catalog_type.upper()]
@@ -283,6 +298,20 @@ def delete_data_files(io: FileIO, manifests_to_delete: List[ManifestFile]) -> No
283298
deleted_files[path] = True
284299

285300

301+
def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Optional[Catalog]:
302+
try:
303+
path_parts = catalog_impl.split(".")
304+
if len(path_parts) < 2:
305+
raise ValueError(f"py-catalog-impl should be full path (module.CustomCatalog), got: {catalog_impl}")
306+
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
307+
module = importlib.import_module(module_name)
308+
class_ = getattr(module, class_name)
309+
return class_(name, **properties)
310+
except ModuleNotFoundError:
311+
logger.warning("Could not initialize Catalog: %s", catalog_impl)
312+
return None
313+
314+
286315
@dataclass
287316
class PropertiesUpdateSummary:
288317
removed: List[str]

tests/catalog/test_base.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,7 @@
3232
from pydantic_core import ValidationError
3333
from pytest_lazyfixture import lazy_fixture
3434

35-
from pyiceberg.catalog import (
36-
Catalog,
37-
MetastoreCatalog,
38-
PropertiesUpdateSummary,
39-
)
35+
from pyiceberg.catalog import Catalog, MetastoreCatalog, PropertiesUpdateSummary, load_catalog
4036
from pyiceberg.exceptions import (
4137
NamespaceAlreadyExistsError,
4238
NamespaceNotEmptyError,
@@ -295,6 +291,30 @@ def given_catalog_has_a_table(
295291
)
296292

297293

294+
def test_load_catalog_impl_not_full_path() -> None:
295+
with pytest.raises(ValueError) as exc_info:
296+
load_catalog("catalog", **{"py-catalog-impl": "CustomCatalog"})
297+
298+
assert "py-catalog-impl should be full path (module.CustomCatalog), got: CustomCatalog" in str(exc_info.value)
299+
300+
301+
def test_load_catalog_impl_does_not_exist() -> None:
302+
with pytest.raises(ValueError) as exc_info:
303+
load_catalog("catalog", **{"py-catalog-impl": "pyiceberg.does.not.exist.Catalog"})
304+
305+
assert "Could not initialize Catalog: pyiceberg.does.not.exist.Catalog" in str(exc_info.value)
306+
307+
308+
def test_load_catalog_has_type_and_impl() -> None:
309+
with pytest.raises(ValueError) as exc_info:
310+
load_catalog("catalog", **{"py-catalog-impl": "pyiceberg.does.not.exist.Catalog", "type": "sql"})
311+
312+
assert (
313+
"Must not set both catalog type and py-catalog-impl configurations, "
314+
"but found type sql and py-catalog-impl pyiceberg.does.not.exist.Catalog" in str(exc_info.value)
315+
)
316+
317+
298318
def test_namespace_from_tuple() -> None:
299319
# Given
300320
identifier = ("com", "organization", "department", "my_table")

tests/catalog/test_sql.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from pyiceberg.catalog import (
2929
Catalog,
30+
load_catalog,
3031
)
3132
from pyiceberg.catalog.sql import DEFAULT_ECHO_VALUE, DEFAULT_POOL_PRE_PING_VALUE, SqlCatalog
3233
from pyiceberg.exceptions import (
@@ -210,6 +211,20 @@ def test_creation_with_pool_pre_ping_parameter(catalog_name: str, warehouse: Pat
210211
)
211212

212213

214+
def test_creation_from_impl(catalog_name: str, warehouse: Path) -> None:
215+
assert isinstance(
216+
load_catalog(
217+
catalog_name,
218+
**{
219+
"py-catalog-impl": "pyiceberg.catalog.sql.SqlCatalog",
220+
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
221+
"warehouse": f"file://{warehouse}",
222+
},
223+
),
224+
SqlCatalog,
225+
)
226+
227+
213228
@pytest.mark.parametrize(
214229
"catalog",
215230
[

0 commit comments

Comments
 (0)