Skip to content

Commit a272bf7

Browse files
mark-majorsungwy
andauthored
Glue and Hive catalog return only Iceberg tables (#1145)
* glue and hive returns iceberg tables * fix formatting * Update pyiceberg/catalog/glue.py Co-authored-by: Sung Yun <[email protected]> --------- Co-authored-by: Sung Yun <[email protected]>
1 parent 1c50f53 commit a272bf7

File tree

5 files changed

+49
-17
lines changed

5 files changed

+49
-17
lines changed

pyiceberg/catalog/dynamodb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
397397
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
398398

399399
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
400-
"""List tables under the given namespace in the catalog (including non-Iceberg tables).
400+
"""List Iceberg tables under the given namespace in the catalog.
401401
402402
Args:
403403
namespace (str | Identifier): Namespace identifier to search.

pyiceberg/catalog/glue.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
670670
self.glue.delete_database(Name=database_name)
671671

672672
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
673-
"""List tables under the given namespace in the catalog (including non-Iceberg tables).
673+
"""List Iceberg tables under the given namespace in the catalog.
674674
675675
Args:
676676
namespace (str | Identifier): Namespace identifier to search.
@@ -698,7 +698,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
698698

699699
except self.glue.exceptions.EntityNotFoundException as e:
700700
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
701-
return [(database_name, table["Name"]) for table in table_list]
701+
return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)]
702702

703703
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
704704
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
@@ -781,3 +781,7 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
781781

782782
def drop_view(self, identifier: Union[str, Identifier]) -> None:
783783
raise NotImplementedError
784+
785+
@staticmethod
786+
def __is_iceberg_table(table: TableTypeDef) -> bool:
787+
return table["Parameters"] is not None and table["Parameters"][TABLE_TYPE].lower() == ICEBERG

pyiceberg/catalog/hive.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
631631
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
632632

633633
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
634-
"""List tables under the given namespace in the catalog (including non-Iceberg tables).
634+
"""List Iceberg tables under the given namespace in the catalog.
635635
636636
When the database doesn't exist, it will just return an empty list.
637637
@@ -646,7 +646,13 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
646646
"""
647647
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
648648
with self._client as open_client:
649-
return [(database_name, table_name) for table_name in open_client.get_all_tables(db_name=database_name)]
649+
return [
650+
(database_name, table.tableName)
651+
for table in open_client.get_table_objects_by_name(
652+
dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name)
653+
)
654+
if table.parameters[TABLE_TYPE].lower() == ICEBERG
655+
]
650656

651657
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
652658
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

tests/catalog/test_glue.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,9 +448,23 @@ def test_list_tables(
448448
) -> None:
449449
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
450450
test_catalog.create_namespace(namespace=database_name)
451+
452+
non_iceberg_table_name = "non_iceberg_table"
453+
glue_client = boto3.client("glue", endpoint_url=moto_endpoint_url)
454+
glue_client.create_table(
455+
DatabaseName=database_name,
456+
TableInput={
457+
"Name": non_iceberg_table_name,
458+
"TableType": "EXTERNAL_TABLE",
459+
"Parameters": {"table_type": "noniceberg"},
460+
},
461+
)
462+
451463
for table_name in table_list:
452464
test_catalog.create_table((database_name, table_name), table_schema_nested)
453465
loaded_table_list = test_catalog.list_tables(database_name)
466+
467+
assert (database_name, non_iceberg_table_name) not in loaded_table_list
454468
for table_name in table_list:
455469
assert (database_name, table_name) in loaded_table_list
456470

tests/catalog/test_hive.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# pylint: disable=protected-access,redefined-outer-name
1818
import copy
1919
import uuid
20+
from copy import deepcopy
2021
from unittest.mock import MagicMock, call, patch
2122

2223
import pytest
@@ -905,23 +906,30 @@ def test_drop_database_does_not_exists() -> None:
905906
assert "Database does not exists: does_not_exists" in str(exc_info.value)
906907

907908

908-
def test_list_tables() -> None:
909+
def test_list_tables(hive_table: HiveTable) -> None:
909910
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
910911

912+
tbl1 = deepcopy(hive_table)
913+
tbl1.tableName = "table1"
914+
tbl1.dbName = "database"
915+
tbl2 = deepcopy(hive_table)
916+
tbl2.tableName = "table2"
917+
tbl2.dbName = "database"
918+
tbl3 = deepcopy(hive_table)
919+
tbl3.tableName = "table3"
920+
tbl3.dbName = "database"
921+
tbl3.parameters["table_type"] = "non_iceberg"
922+
911923
catalog._client = MagicMock()
912-
catalog._client.__enter__().get_all_tables.return_value = ["table1", "table2"]
924+
catalog._client.__enter__().get_all_tables.return_value = ["table1", "table2", "table3"]
925+
catalog._client.__enter__().get_table_objects_by_name.return_value = [tbl1, tbl2, tbl3]
913926

914-
assert catalog.list_tables("database") == [
915-
(
916-
"database",
917-
"table1",
918-
),
919-
(
920-
"database",
921-
"table2",
922-
),
923-
]
927+
got_tables = catalog.list_tables("database")
928+
assert got_tables == [("database", "table1"), ("database", "table2")]
924929
catalog._client.__enter__().get_all_tables.assert_called_with(db_name="database")
930+
catalog._client.__enter__().get_table_objects_by_name.assert_called_with(
931+
dbname="database", tbl_names=["table1", "table2", "table3"]
932+
)
925933

926934

927935
def test_list_namespaces() -> None:

0 commit comments

Comments
 (0)