Skip to content

Commit 835dbe1

Browse files
authored
Add BigQuery Metastore Catalog (#2068)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change This PR brings BigQuery Metastore support to Python after it was merged into the [Java implementation](apache/iceberg#12808). This allows Iceberg catalog functionality to be backed by BigQuery. It supports creating/deleting/listing namespaces (datasets in BigQuery terminology), creating/deleting/listing tables, and registering tables. This is my first PR of size to iceberg-python, so any advice would be appreciated! # Are these changes tested? Integration and unit tests included. # Are there any user-facing changes? Introduces a new Catalog type. <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 4f02298 commit 835dbe1

File tree

8 files changed

+2248
-1268
lines changed

8 files changed

+2248
-1268
lines changed

mkdocs/docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ You can mix and match optional dependencies depending on your needs:
4646
| hive-kerberos | Support for Hive metastore in Kerberos environment |
4747
| glue | Support for AWS Glue |
4848
| dynamodb | Support for AWS DynamoDB |
49+
| bigquery | Support for Google Cloud BigQuery |
4950
| sql-postgres | Support for SQL Catalog backed by Postgresql |
5051
| sql-sqlite | Support for SQL Catalog backed by SQLite |
5152
| pyarrow | PyArrow as a FileIO implementation to interact with the object store |

poetry.lock

Lines changed: 1423 additions & 1268 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/catalog/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class CatalogType(Enum):
118118
DYNAMODB = "dynamodb"
119119
SQL = "sql"
120120
IN_MEMORY = "in-memory"
121+
BIGQUERY = "bigquery"
121122

122123

123124
def load_rest(name: str, conf: Properties) -> Catalog:
@@ -173,13 +174,23 @@ def load_in_memory(name: str, conf: Properties) -> Catalog:
173174
raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-sqlite]'") from exc
174175

175176

177+
def load_bigquery(name: str, conf: Properties) -> Catalog:
178+
try:
179+
from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog
180+
181+
return BigQueryMetastoreCatalog(name, **conf)
182+
except ImportError as exc:
183+
raise NotInstalledError("BigQuery support not installed: pip install 'pyiceberg[bigquery]'") from exc
184+
185+
176186
AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
177187
CatalogType.REST: load_rest,
178188
CatalogType.HIVE: load_hive,
179189
CatalogType.GLUE: load_glue,
180190
CatalogType.DYNAMODB: load_dynamodb,
181191
CatalogType.SQL: load_sql,
182192
CatalogType.IN_MEMORY: load_in_memory,
193+
CatalogType.BIGQUERY: load_bigquery,
183194
}
184195

185196

pyiceberg/catalog/bigquery_metastore.py

Lines changed: 422 additions & 0 deletions
Large diffs are not rendered by default.

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ ray = [
7373
python-snappy = { version = ">=0.6.0,<1.0.0", optional = true }
7474
thrift = { version = ">=0.13.0,<1.0.0", optional = true }
7575
boto3 = { version = ">=1.24.59", optional = true }
76+
google-cloud-bigquery = { version = "^3.33.0", optional = true }
7677
s3fs = { version = ">=2023.1.0", optional = true }
7778
adlfs = { version = ">=2024.7.0", optional = true }
7879
gcsfs = { version = ">=2023.1.0", optional = true }
@@ -288,6 +289,10 @@ ignore_missing_imports = true
288289
module = "pyiceberg_core.*"
289290
ignore_missing_imports = true
290291

292+
[[tool.mypy.overrides]]
293+
module = "google.*"
294+
ignore_missing_imports = true
295+
291296
[tool.poetry.scripts]
292297
pyiceberg = "pyiceberg.cli.console:run"
293298

@@ -314,6 +319,7 @@ s3fs = ["s3fs"]
314319
glue = ["boto3"]
315320
adlfs = ["adlfs"]
316321
dynamodb = ["boto3"]
322+
bigquery = ["google-cloud-bigquery"]
317323
zstandard = ["zstandard"]
318324
sql-postgres = ["sqlalchemy", "psycopg2-binary"]
319325
sql-sqlite = ["sqlalchemy"]
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import os
18+
19+
import pytest
20+
from pytest_mock import MockFixture
21+
22+
from pyiceberg.catalog.bigquery_metastore import BigQueryMetastoreCatalog
23+
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError
24+
from pyiceberg.io import load_file_io
25+
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
26+
from pyiceberg.schema import Schema
27+
from pyiceberg.serializers import ToOutputFile
28+
from pyiceberg.table.metadata import new_table_metadata
29+
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER
30+
from tests.conftest import BQ_TABLE_METADATA_LOCATION_REGEX
31+
32+
33+
@pytest.mark.skipif(os.environ.get("GCP_CREDENTIALS") is None)
34+
def test_create_table_with_database_location(
35+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
36+
) -> None:
37+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
38+
39+
catalog_name = "test_ddb_catalog"
40+
identifier = (gcp_dataset_name, table_name)
41+
test_catalog = BigQueryMetastoreCatalog(
42+
catalog_name,
43+
**{
44+
"gcp.bigquery.project-id": "alexstephen-test-1",
45+
"warehouse": "gs://alexstephen-test-bq-bucket/",
46+
"gcp.bigquery.credentials-info": os.environ["GCP_CREDENTIALS"],
47+
},
48+
)
49+
test_catalog.create_namespace(namespace=gcp_dataset_name)
50+
table = test_catalog.create_table(identifier, table_schema_nested)
51+
assert table.name() == identifier
52+
assert BQ_TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
53+
54+
tables_in_namespace = test_catalog.list_tables(namespace=gcp_dataset_name)
55+
assert identifier in tables_in_namespace
56+
57+
58+
@pytest.mark.skipif(os.environ.get("GCP_CREDENTIALS") is None)
59+
def test_drop_table_with_database_location(
60+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
61+
) -> None:
62+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
63+
64+
catalog_name = "test_ddb_catalog"
65+
identifier = (gcp_dataset_name, table_name)
66+
test_catalog = BigQueryMetastoreCatalog(
67+
catalog_name,
68+
**{
69+
"gcp.bigquery.project-id": "alexstephen-test-1",
70+
"warehouse": "gs://alexstephen-test-bq-bucket/",
71+
"gcp.bigquery.credentials-info": os.environ["GCP_CREDENTIALS"],
72+
},
73+
)
74+
test_catalog.create_namespace(namespace=gcp_dataset_name)
75+
test_catalog.create_table(identifier, table_schema_nested)
76+
test_catalog.drop_table(identifier)
77+
78+
tables_in_namespace_after_drop = test_catalog.list_tables(namespace=gcp_dataset_name)
79+
assert identifier not in tables_in_namespace_after_drop
80+
81+
# Expect that the table no longer exists.
82+
try:
83+
test_catalog.load_table(identifier)
84+
raise AssertionError()
85+
except NoSuchTableError:
86+
assert True
87+
88+
89+
@pytest.mark.skipif(os.environ.get("GCP_CREDENTIALS") is None)
90+
def test_create_and_drop_namespace(
91+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
92+
) -> None:
93+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
94+
95+
# Create namespace.
96+
catalog_name = "test_ddb_catalog"
97+
test_catalog = BigQueryMetastoreCatalog(
98+
catalog_name,
99+
**{
100+
"gcp.bigquery.project-id": "alexstephen-test-1",
101+
"warehouse": "gs://alexstephen-test-bq-bucket/",
102+
"gcp.bigquery.credentials-info": os.environ["GCP_CREDENTIALS"],
103+
},
104+
)
105+
test_catalog.create_namespace(namespace=gcp_dataset_name)
106+
107+
# Ensure that the namespace exists.
108+
namespaces = test_catalog.list_namespaces()
109+
assert (gcp_dataset_name,) in namespaces
110+
111+
# Drop the namespace and ensure it does not exist.
112+
test_catalog.drop_namespace(namespace=gcp_dataset_name)
113+
namespaces_after_drop = test_catalog.list_namespaces()
114+
assert (gcp_dataset_name,) not in namespaces_after_drop
115+
116+
# Verify with load_namespace_properties as well
117+
with pytest.raises(NoSuchNamespaceError):
118+
test_catalog.load_namespace_properties(gcp_dataset_name)
119+
120+
121+
@pytest.mark.skipif(os.environ.get("GCP_CREDENTIALS") is None)
122+
def test_register_table(
123+
mocker: MockFixture, _bucket_initialize: None, table_schema_nested: Schema, gcp_dataset_name: str, table_name: str
124+
) -> None:
125+
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
126+
127+
catalog_name = "test_bq_register_catalog"
128+
identifier = (gcp_dataset_name, table_name)
129+
warehouse_path = "gs://alexstephen-test-bq-bucket/" # Matches conftest BUCKET_NAME for GCS interaction
130+
131+
test_catalog = BigQueryMetastoreCatalog(
132+
catalog_name,
133+
**{
134+
"gcp.bigquery.project-id": "alexstephen-test-1",
135+
"warehouse": "gs://alexstephen-test-bq-bucket/",
136+
"gcp.bigquery.credentials-info": os.environ["GCP_CREDENTIALS"],
137+
},
138+
)
139+
140+
test_catalog.create_namespace(namespace=gcp_dataset_name)
141+
142+
# Manually create a metadata file in GCS
143+
table_gcs_location = f"{warehouse_path.rstrip('/')}/{gcp_dataset_name}.db/{table_name}"
144+
# Construct a unique metadata file name similar to how pyiceberg would
145+
metadata_file_name = "00000-aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa.metadata.json"
146+
metadata_gcs_path = f"{table_gcs_location}/metadata/{metadata_file_name}"
147+
148+
metadata = new_table_metadata(
149+
location=table_gcs_location,
150+
schema=table_schema_nested,
151+
properties={},
152+
partition_spec=UNPARTITIONED_PARTITION_SPEC,
153+
sort_order=UNSORTED_SORT_ORDER,
154+
)
155+
io = load_file_io(properties=test_catalog.properties, location=metadata_gcs_path)
156+
test_catalog._write_metadata(metadata, io, metadata_gcs_path)
157+
ToOutputFile.table_metadata(metadata, io.new_output(metadata_gcs_path), overwrite=True)
158+
159+
# Register the table
160+
registered_table = test_catalog.register_table(identifier, metadata_gcs_path)
161+
162+
assert registered_table.name() == identifier
163+
assert registered_table.metadata_location == metadata_gcs_path
164+
assert registered_table.metadata.location == table_gcs_location
165+
assert BQ_TABLE_METADATA_LOCATION_REGEX.match(registered_table.metadata_location)
166+
167+
# Verify table exists and is loadable
168+
loaded_table = test_catalog.load_table(identifier)
169+
assert loaded_table.name() == registered_table.name()
170+
assert loaded_table.metadata_location == metadata_gcs_path
171+
172+
# Clean up
173+
test_catalog.drop_table(identifier)
174+
test_catalog.drop_namespace(gcp_dataset_name)

0 commit comments

Comments
 (0)