Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b8d8d81
add repro test
colin-rogers-dbt Apr 1, 2025
2b95bdb
Revert "add repro test"
colin-rogers-dbt Apr 30, 2025
56add79
add file_format as a top level integration config field and update st…
colin-rogers-dbt May 28, 2025
b318d0f
Merge branch 'main' into dbt-bigquery/use-external_volume-for-uri
colin-rogers-dbt May 28, 2025
7a1b26a
add test case
colin-rogers-dbt May 29, 2025
d2c3754
add change log entry
colin-rogers-dbt May 29, 2025
d49f785
Merge remote-tracking branch 'fork/dbt-bigquery/use-external_volume-f…
colin-rogers-dbt May 29, 2025
cd7cc5f
update FakeCatalogIntegrationConfig
colin-rogers-dbt May 29, 2025
ae2edfa
add repro test
colin-rogers-dbt Apr 1, 2025
c9bde2e
Revert "add repro test"
colin-rogers-dbt Apr 30, 2025
3277f7e
Merge branch 'main' into dbt-bigquery/use-external_volume-for-uri
colin-rogers-dbt May 29, 2025
1820af5
swap generic for biglake_metastore
colin-rogers-dbt May 30, 2025
756ce42
split out info schema catalog integration type for bigquery
colin-rogers-dbt Jun 2, 2025
ada14ee
add info schema
colin-rogers-dbt Jun 2, 2025
accb9c1
add base_location_subpath + unit tests
colin-rogers-dbt Jun 2, 2025
f43998e
merge main
colin-rogers-dbt Jun 3, 2025
62c4d5a
add / update changelogs
colin-rogers-dbt Jun 3, 2025
9e307be
move changelogs to the right directory
colin-rogers-dbt Jun 3, 2025
647a91b
move changelogs to the right directory
colin-rogers-dbt Jun 3, 2025
54e4abf
handle base_location_subpath being None
colin-rogers-dbt Jun 3, 2025
08d36b6
update catalog defaults to info_schema
colin-rogers-dbt Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: generate storage_uri from external_volume, base_location_root and base_location_subpath
time: 2025-05-29T09:29:32.913024-07:00
custom:
Author: colin-rogers-dbt
Issue: "1129"
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: populate file_format from top level integration config field
time: 2025-06-03T11:47:05.292026-07:00
custom:
Author: colin-rogers-dbt
Issue: "1129"
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Rename and separate out info schema and biglake catalog integrations
time: 2025-06-03T11:47:46.831408-07:00
custom:
Author: colin-rogers-dbt
Issue: "1129"
7 changes: 3 additions & 4 deletions dbt-bigquery/src/dbt/adapters/bigquery/catalogs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dbt.adapters.bigquery.catalogs._generic import (
BigQueryCatalogIntegration,
BigQueryCatalogRelation,
)
from dbt.adapters.bigquery.catalogs._biglake_metastore import BigLakeCatalogIntegration
from dbt.adapters.bigquery.catalogs._info_schema import BigQueryInfoSchemaCatalogIntegration
from dbt.adapters.bigquery.catalogs._relation import BigQueryCatalogRelation
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Optional

from dbt.adapters.catalogs import CatalogIntegration
from dbt.adapters.contracts.relation import RelationConfig

from dbt.adapters.bigquery import constants
from dbt.adapters.bigquery.catalogs._relation import BigQueryCatalogRelation


class BigLakeCatalogIntegration(CatalogIntegration):
catalog_type = constants.BIGLAKE_CATALOG_TYPE
allows_writes = True

def build_relation(self, model: RelationConfig) -> BigQueryCatalogRelation:
"""
Args:
model: `config.model` (not `model`) from the jinja context
"""

return BigQueryCatalogRelation(
catalog_type=self.catalog_type,
catalog_name=self.catalog_name,
table_format=self.table_format,
file_format=self.file_format,
external_volume=self.external_volume,
storage_uri=self._calculate_storage_uri(model),
)

def _calculate_storage_uri(self, model: RelationConfig) -> Optional[str]:
if not model.config:
return None

if model_storage_uri := model.config.get("storage_uri"):
return model_storage_uri

if not self.external_volume:
return None

prefix = model.config.get("base_location_root") or "_dbt"
storage_uri = f"{self.external_volume}/{prefix}/{model.schema}/{model.name}"
if suffix := model.config.get("base_location_subpath"):
storage_uri = f"{storage_uri}/{suffix}"
return storage_uri
50 changes: 0 additions & 50 deletions dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_generic.py

This file was deleted.

25 changes: 25 additions & 0 deletions dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_info_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dbt.adapters.catalogs import CatalogIntegration
from dbt.adapters.contracts.relation import RelationConfig

from dbt.adapters.bigquery import constants
from dbt.adapters.bigquery.catalogs._relation import BigQueryCatalogRelation


class BigQueryInfoSchemaCatalogIntegration(CatalogIntegration):
catalog_type = constants.DEFAULT_INFO_SCHEMA_CATALOG.catalog_type
allows_writes = True

def build_relation(self, model: RelationConfig) -> BigQueryCatalogRelation:
"""
Args:
model: `config.model` (not `model`) from the jinja context
"""

return BigQueryCatalogRelation(
catalog_type=self.catalog_type,
catalog_name=self.catalog_name,
table_format=self.table_format,
file_format=self.file_format,
external_volume=None,
storage_uri=None,
)
16 changes: 16 additions & 0 deletions dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_relation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass
from typing import Optional

from dbt.adapters.catalogs import CatalogRelation

from dbt.adapters.bigquery import constants


@dataclass
class BigQueryCatalogRelation(CatalogRelation):
catalog_type: str = constants.DEFAULT_INFO_SCHEMA_CATALOG.catalog_type
catalog_name: Optional[str] = constants.DEFAULT_INFO_SCHEMA_CATALOG.name
table_format: Optional[str] = constants.INFO_SCHEMA_TABLE_FORMAT
file_format: Optional[str] = constants.INFO_SCHEMA_FILE_FORMAT
external_volume: Optional[str] = None
storage_uri: Optional[str] = None
8 changes: 5 additions & 3 deletions dbt-bigquery/src/dbt/adapters/bigquery/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,24 @@
PARQUET_FILE_FORMAT = "parquet"


GENERIC_CATALOG_TYPE = "generic"
BIGLAKE_CATALOG_TYPE = "biglake_metastore"


DEFAULT_INFO_SCHEMA_CATALOG = SimpleNamespace(
name="info_schema",
catalog_name="info_schema",
catalog_type=GENERIC_CATALOG_TYPE,
catalog_type="INFO_SCHEMA", # these don't show up in BigQuery; this is a dbt convention
table_format=INFO_SCHEMA_TABLE_FORMAT,
external_volume=None,
file_format=INFO_SCHEMA_FILE_FORMAT,
adapter_properties={},
)
DEFAULT_ICEBERG_CATALOG = SimpleNamespace(
name="managed_iceberg",
catalog_name="managed_iceberg",
catalog_type=GENERIC_CATALOG_TYPE,
catalog_type=BIGLAKE_CATALOG_TYPE,
table_format=ICEBERG_TABLE_FORMAT,
external_volume=None,
file_format=PARQUET_FILE_FORMAT,
adapter_properties={},
)
7 changes: 4 additions & 3 deletions dbt-bigquery/src/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@

from dbt.adapters.bigquery import constants, parse_model
from dbt.adapters.bigquery.catalogs import (
BigQueryCatalogIntegration,
BigLakeCatalogIntegration,
BigQueryInfoSchemaCatalogIntegration,
BigQueryCatalogRelation,
)
from dbt.adapters.bigquery.column import BigQueryColumn, get_nested_column_data_types
Expand Down Expand Up @@ -136,7 +137,7 @@ class BigQueryAdapter(BaseAdapter):

AdapterSpecificConfigs = BigqueryConfig

CATALOG_INTEGRATIONS = [BigQueryCatalogIntegration]
CATALOG_INTEGRATIONS = [BigLakeCatalogIntegration, BigQueryInfoSchemaCatalogIntegration]
CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.not_null: ConstraintSupport.ENFORCED,
Expand Down Expand Up @@ -816,7 +817,7 @@ def get_table_options(
if catalog_relation.table_format == constants.ICEBERG_TABLE_FORMAT:
opts["table_format"] = f"'{catalog_relation.table_format}'"
opts["file_format"] = f"'{catalog_relation.file_format}'"
opts["storage_uri"] = f"'{catalog_relation.external_volume}'"
opts["storage_uri"] = f"'{catalog_relation.storage_uri}'"

return opts

Expand Down
4 changes: 0 additions & 4 deletions dbt-bigquery/src/dbt/adapters/bigquery/parse_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,3 @@ def catalog_name(model: RelationConfig) -> Optional[str]:
return _catalog

return constants.DEFAULT_INFO_SCHEMA_CATALOG.name


def storage_uri(model: RelationConfig) -> Optional[str]:
return model.config.get("storage_uri") if model.config else None
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
from datetime import datetime as dt
import pytest
from dbt.tests.adapter.catalog_integrations.test_catalog_integration import (
BaseCatalogIntegrationValidation,
)
from dbt.tests.util import run_dbt

_BQ_BUCKET = f"gs://{os.getenv('BIGQUERY_TEST_ICEBERG_BUCKET')}"
_STATIC_URI = f"{_BQ_BUCKET}/{str(dt.now())}"

MODEL__BASIC_ICEBERG_TABLE = """
{{ config(materialized='table', catalog='basic_iceberg_catalog') }}
select 1 as id
"""

MODEL__SPECIFY_LOCATION_TABLE = """
{{ config(materialized='table', catalog='basic_iceberg_catalog',
base_location_root='custom_location') }}
select 1 as id
"""

MODEL__SPECIFY_URI_TABLE = (
"""
{{ config(materialized='table', catalog='basic_iceberg_catalog',
storage_uri='"""
+ _STATIC_URI
+ """') }}
select 1 as id
"""
)


class TestGenericCatalogIntegration(BaseCatalogIntegrationValidation):

@pytest.fixture(scope="class")
def catalogs(self):
return {
"catalogs": [
{
"name": "basic_iceberg_catalog",
"active_write_integration": "basic_iceberg_catalog_integration",
"write_integrations": [
{
"name": "basic_iceberg_catalog_integration",
"catalog_type": "biglake_metastore",
"file_format": "parquet",
"table_format": "iceberg",
"external_volume": _BQ_BUCKET,
}
],
},
]
}

@pytest.fixture(scope="class")
def models(self):
return {
"models": {
"basic_iceberg_table.sql": MODEL__BASIC_ICEBERG_TABLE,
"specify_location_table.sql": MODEL__SPECIFY_LOCATION_TABLE,
"specify_uri_table.sql": MODEL__SPECIFY_URI_TABLE,
}
}

def test_basic_iceberg_catalog_integration(self, project):
run_dbt(["run"])
70 changes: 70 additions & 0 deletions dbt-bigquery/tests/unit/test_catalog_integrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import unittest
from types import SimpleNamespace
from unittest.mock import MagicMock
from dbt.adapters.bigquery.catalogs import BigLakeCatalogIntegration
from dbt.adapters.contracts.relation import RelationConfig


class TestBigLakeCatalogIntegration(unittest.TestCase):
def setUp(self):
self.integration = BigLakeCatalogIntegration(
config=SimpleNamespace(
name="test_biglake_catalog_integration",
external_volume="test_external_volume",
catalog_type="biglake",
catalog_name="test_catalog_name",
table_format="test_table_format",
file_format="test_file_format",
)
)
self.integration.external_volume = "test_external_volume"

def test_storage_uri_no_inputs(self):
model = MagicMock(spec=RelationConfig)
model.config = {"has": "a_value"}
model.schema = "test_schema"
model.name = "test_model_name"

expected_uri = "test_external_volume/_dbt/test_schema/test_model_name"
result = self.integration._calculate_storage_uri(model)
self.assertEqual(expected_uri, result)

def test_storage_uri_base_location_root(self):
model = MagicMock(spec=RelationConfig)
model.config = {"base_location_root": "foo"}
model.schema = "test_schema"
model.name = "test_model_name"

expected_uri = "test_external_volume/foo/test_schema/test_model_name"
result = self.integration._calculate_storage_uri(model)
self.assertEqual(expected_uri, result)

def test_storage_uri_base_location_subpath(self):
model = MagicMock(spec=RelationConfig)
model.config = {"base_location_subpath": "bar"}
model.schema = "test_schema"
model.name = "test_model_name"

expected_uri = "test_external_volume/_dbt/test_schema/test_model_name/bar"
result = self.integration._calculate_storage_uri(model)
self.assertEqual(expected_uri, result)

def test_storage_uri_base_location_root_and_subpath(self):
model = MagicMock(spec=RelationConfig)
model.config = {"base_location_root": "foo", "base_location_subpath": "bar"}
model.schema = "test_schema"
model.name = "test_model_name"

expected_uri = "test_external_volume/foo/test_schema/test_model_name/bar"
result = self.integration._calculate_storage_uri(model)
self.assertEqual(expected_uri, result)

def test_storage_uri_from_model_config(self):
model = MagicMock(spec=RelationConfig)
model.config = {"storage_uri": "custom_storage_uri"}
model.schema = "test_schema"
model.name = "test_model_name"

expected_uri = "custom_storage_uri"
result = self.integration._calculate_storage_uri(model)
self.assertEqual(expected_uri, result)