Skip to content

Commit 5bd82b4

Browse files
authored
Add iceberg catalog support for dbt-bigquery (#1105)
1 parent eb33650 commit 5bd82b4

File tree

15 files changed

+568
-0
lines changed

15 files changed

+568
-0
lines changed

.github/workflows/_integration-tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ jobs:
117117
BIGQUERY_TEST_SERVICE_ACCOUNT_JSON: ${{ secrets.BIGQUERY_TEST_SERVICE_ACCOUNT_JSON }}
118118
BIGQUERY_TEST_ALT_DATABASE: ${{ vars.BIGQUERY_TEST_ALT_DATABASE }}
119119
BIGQUERY_TEST_NO_ACCESS_DATABASE: ${{ vars.BIGQUERY_TEST_NO_ACCESS_DATABASE }}
120+
BIGQUERY_TEST_ICEBERG_BUCKET: ${{ vars.BIGQUERY_TEST_ICEBERG_BUCKET }}
120121
DBT_TEST_USER_1: ${{ vars.DBT_TEST_USER_1 }}
121122
DBT_TEST_USER_2: ${{ vars.DBT_TEST_USER_2 }}
122123
DBT_TEST_USER_3: ${{ vars.DBT_TEST_USER_3 }}
@@ -155,6 +156,7 @@ jobs:
155156
BIGQUERY_TEST_SERVICE_ACCOUNT_JSON: ${{ secrets.BIGQUERY_TEST_SERVICE_ACCOUNT_JSON }}
156157
BIGQUERY_TEST_ALT_DATABASE: ${{ vars.BIGQUERY_TEST_ALT_DATABASE }}
157158
BIGQUERY_TEST_NO_ACCESS_DATABASE: ${{ vars.BIGQUERY_TEST_NO_ACCESS_DATABASE }}
159+
BIGQUERY_TEST_ICEBERG_BUCKET: ${{ vars.BIGQUERY_TEST_ICEBERG_BUCKET }}
158160
DBT_TEST_USER_1: ${{ vars.DBT_TEST_USER_1 }}
159161
DBT_TEST_USER_2: ${{ vars.DBT_TEST_USER_2 }}
160162
DBT_TEST_USER_3: ${{ vars.DBT_TEST_USER_3 }}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: Add support for managed Iceberg catalogs
3+
time: 2025-05-19T20:46:51.087397-04:00
4+
custom:
5+
Author: mikealfare
6+
Issue: "1105"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from dbt.adapters.bigquery.catalogs._generic import (
2+
BigQueryCatalogIntegration,
3+
BigQueryCatalogRelation,
4+
)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
from dbt.adapters.catalogs import (
5+
CatalogIntegration,
6+
CatalogIntegrationConfig,
7+
)
8+
from dbt.adapters.contracts.relation import RelationConfig
9+
10+
from dbt.adapters.bigquery import constants, parse_model
11+
12+
13+
@dataclass
14+
class BigQueryCatalogRelation:
15+
catalog_type: str = constants.DEFAULT_INFO_SCHEMA_CATALOG.catalog_type
16+
catalog_name: Optional[str] = constants.DEFAULT_INFO_SCHEMA_CATALOG.name
17+
table_format: Optional[str] = constants.INFO_SCHEMA_TABLE_FORMAT
18+
file_format: Optional[str] = constants.INFO_SCHEMA_FILE_FORMAT
19+
external_volume: Optional[str] = None
20+
21+
@property
22+
def storage_uri(self) -> Optional[str]:
23+
return self.external_volume
24+
25+
@storage_uri.setter
26+
def storage_uri(self, value: Optional[str]) -> None:
27+
self.external_volume = value
28+
29+
30+
class BigQueryCatalogIntegration(CatalogIntegration):
31+
catalog_type = constants.GENERIC_CATALOG_TYPE
32+
allows_writes = True
33+
34+
def __init__(self, config: CatalogIntegrationConfig) -> None:
35+
super().__init__(config)
36+
self.file_format: Optional[str] = config.adapter_properties.get(
37+
"file_format", constants.INFO_SCHEMA_FILE_FORMAT
38+
)
39+
40+
@property
41+
def storage_uri(self) -> Optional[str]:
42+
return self.external_volume
43+
44+
@storage_uri.setter
45+
def storage_uri(self, value: Optional[str]) -> None:
46+
self.external_volume = value
47+
48+
def build_relation(self, model: RelationConfig) -> BigQueryCatalogRelation:
49+
"""
50+
Args:
51+
model: `config.model` (not `model`) from the jinja context
52+
"""
53+
return BigQueryCatalogRelation(
54+
catalog_type=self.catalog_type,
55+
catalog_name=self.catalog_name,
56+
table_format=self.table_format,
57+
file_format=self.file_format,
58+
external_volume=parse_model.storage_uri(model) or self.external_volume,
59+
)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from types import SimpleNamespace
2+
3+
4+
ADAPTER_TYPE = "bigquery"
5+
6+
7+
INFO_SCHEMA_TABLE_FORMAT = "default"
8+
ICEBERG_TABLE_FORMAT = "iceberg"
9+
10+
11+
INFO_SCHEMA_FILE_FORMAT = "default"
12+
PARQUET_FILE_FORMAT = "parquet"
13+
14+
15+
GENERIC_CATALOG_TYPE = "generic"
16+
17+
18+
DEFAULT_INFO_SCHEMA_CATALOG = SimpleNamespace(
19+
name="info_schema",
20+
catalog_name="info_schema",
21+
catalog_type=GENERIC_CATALOG_TYPE,
22+
table_format=INFO_SCHEMA_TABLE_FORMAT,
23+
external_volume=None,
24+
adapter_properties={
25+
"file_format": INFO_SCHEMA_FILE_FORMAT,
26+
},
27+
)
28+
DEFAULT_ICEBERG_CATALOG = SimpleNamespace(
29+
name="managed_iceberg",
30+
catalog_name="managed_iceberg",
31+
catalog_type=GENERIC_CATALOG_TYPE,
32+
table_format=ICEBERG_TABLE_FORMAT,
33+
external_volume=None,
34+
adapter_properties={
35+
"file_format": PARQUET_FILE_FORMAT,
36+
},
37+
)

dbt-bigquery/src/dbt/adapters/bigquery/impl.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from dbt_common.events.functions import fire_event
3434
import dbt_common.exceptions
3535
import dbt_common.exceptions.base
36+
from dbt_common.exceptions import DbtInternalError
3637
from dbt_common.utils import filter_null_values
3738
from dbt.adapters.base import (
3839
AdapterConfig,
@@ -47,12 +48,18 @@
4748
from dbt.adapters.base.impl import FreshnessResponse
4849
from dbt.adapters.cache import _make_ref_key_dict
4950
from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support
51+
from dbt.adapters.catalogs import CatalogRelation
5052
from dbt.adapters.contracts.connection import AdapterResponse
5153
from dbt.adapters.contracts.macros import MacroResolverProtocol
5254
from dbt.adapters.contracts.relation import RelationConfig
5355
from dbt.adapters.events.logging import AdapterLogger
5456
from dbt.adapters.events.types import SchemaCreation, SchemaDrop
5557

58+
from dbt.adapters.bigquery import constants, parse_model
59+
from dbt.adapters.bigquery.catalogs import (
60+
BigQueryCatalogIntegration,
61+
BigQueryCatalogRelation,
62+
)
5663
from dbt.adapters.bigquery.column import BigQueryColumn, get_nested_column_data_types
5764
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager
5865
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
@@ -129,6 +136,7 @@ class BigQueryAdapter(BaseAdapter):
129136

130137
AdapterSpecificConfigs = BigqueryConfig
131138

139+
CATALOG_INTEGRATIONS = [BigQueryCatalogIntegration]
132140
CONSTRAINT_SUPPORT = {
133141
ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
134142
ConstraintType.not_null: ConstraintSupport.ENFORCED,
@@ -147,6 +155,8 @@ class BigQueryAdapter(BaseAdapter):
147155
def __init__(self, config, mp_context: SpawnContext) -> None:
148156
super().__init__(config, mp_context)
149157
self.connections: BigQueryConnectionManager = self.connections
158+
self.add_catalog_integration(constants.DEFAULT_INFO_SCHEMA_CATALOG)
159+
self.add_catalog_integration(constants.DEFAULT_ICEBERG_CATALOG)
150160

151161
###
152162
# Implementations of abstract methods
@@ -797,6 +807,17 @@ def get_table_options(
797807
if config.get("partition_expiration_days") is not None:
798808
opts["partition_expiration_days"] = config.get("partition_expiration_days")
799809

810+
relation_config = getattr(config, "model", None)
811+
if not temporary and (
812+
catalog_relation := self.build_catalog_relation(relation_config)
813+
):
814+
if not isinstance(catalog_relation, BigQueryCatalogRelation):
815+
raise DbtInternalError("Unexpected catalog relation")
816+
if catalog_relation.table_format == constants.ICEBERG_TABLE_FORMAT:
817+
opts["table_format"] = f"'{catalog_relation.table_format}'"
818+
opts["file_format"] = f"'{catalog_relation.file_format}'"
819+
opts["storage_uri"] = f"'{catalog_relation.external_volume}'"
820+
800821
return opts
801822

802823
@available.parse(lambda *a, **k: {})
@@ -986,3 +1007,23 @@ def validate_sql(self, sql: str) -> AdapterResponse:
9861007
:param str sql: The sql to validate
9871008
"""
9881009
return self.connections.dry_run(sql)
1010+
1011+
@available
1012+
def build_catalog_relation(self, model: RelationConfig) -> Optional[CatalogRelation]:
1013+
"""
1014+
Builds a relation for a given configuration.
1015+
1016+
This method uses the provided configuration to determine the appropriate catalog
1017+
integration and config parser for building the relation. It defaults to the information schema
1018+
catalog if none is provided in the configuration for backward compatibility.
1019+
1020+
Args:
1021+
model (RelationConfig): `config.model` (not `model`) from the jinja context
1022+
1023+
Returns:
1024+
Any: The constructed relation object generated through the catalog integration and parser
1025+
"""
1026+
if catalog := parse_model.catalog_name(model):
1027+
catalog_integration = self.get_catalog_integration(catalog)
1028+
return catalog_integration.build_relation(model)
1029+
return None
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from typing import Optional
2+
3+
from dbt.adapters.contracts.relation import RelationConfig
4+
5+
from dbt.adapters.bigquery import constants
6+
7+
8+
def catalog_name(model: RelationConfig) -> Optional[str]:
9+
# while this looks equivalent to `if not getattr(model, "config", None):`, `mypy` disagrees
10+
if not hasattr(model, "config") or not model.config:
11+
return None
12+
13+
if _catalog := model.config.get("catalog"):
14+
return _catalog
15+
16+
return constants.DEFAULT_INFO_SCHEMA_CATALOG.name
17+
18+
19+
def storage_uri(model: RelationConfig) -> Optional[str]:
20+
return model.config.get("storage_uri") if model.config else None

dbt-bigquery/src/dbt/include/bigquery/macros/adapters.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
{%- set columns = '(' ~ table_dest_columns_csv ~ ')' -%}
1212
{%- endif -%}
1313

14+
{%- set catalog_relation = adapter.build_catalog_relation(config.model) -%}
15+
1416
{{ sql_header if sql_header is not none }}
1517

1618
create or replace table {{ relation }}
@@ -26,6 +28,7 @@
2628
{{ partition_by(partition_config) }}
2729
{{ cluster_by(raw_cluster_by) }}
2830

31+
{% if catalog_relation.table_format == 'iceberg' and not temporary %}with connection default{% endif %}
2932
{{ bigquery_table_options(config, model, temporary) }}
3033

3134
{#-- PARTITION BY cannot be used with the AS query_statement clause.

dbt-bigquery/test.env.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
BIGQUERY_TEST_NO_ACCESS_DATABASE=
66
# Authentication method required to hookup to BigQuery via client library.
77
BIGQUERY_TEST_SERVICE_ACCOUNT_JSON='{}'
8+
# A bucket with write permissions for storing iceberg tables
9+
BIGQUERY_TEST_ICEBERG_BUCKET=
810

911
# tests for local ci/cd
1012
DBT_TEST_USER_1="group:[email protected]"

dbt-bigquery/tests/functional/adapter/catalog_integration_tests/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)