diff --git a/dbt-bigquery/.changes/unreleased/Features-20250529-092932.yaml b/dbt-bigquery/.changes/unreleased/Features-20250529-092932.yaml new file mode 100644 index 000000000..b3dcce758 --- /dev/null +++ b/dbt-bigquery/.changes/unreleased/Features-20250529-092932.yaml @@ -0,0 +1,6 @@ +kind: Features +body: generate storage_uri from external_volume, base_location_root and base_location_subpath +time: 2025-05-29T09:29:32.913024-07:00 +custom: + Author: colin-rogers-dbt + Issue: "1129" diff --git a/dbt-bigquery/.changes/unreleased/Under the Hood-20250603-114705.yaml b/dbt-bigquery/.changes/unreleased/Under the Hood-20250603-114705.yaml new file mode 100644 index 000000000..89efb7103 --- /dev/null +++ b/dbt-bigquery/.changes/unreleased/Under the Hood-20250603-114705.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: populate file_format from top level integration config field +time: 2025-06-03T11:47:05.292026-07:00 +custom: + Author: colin-rogers-dbt + Issue: "1129" diff --git a/dbt-bigquery/.changes/unreleased/Under the Hood-20250603-114746.yaml b/dbt-bigquery/.changes/unreleased/Under the Hood-20250603-114746.yaml new file mode 100644 index 000000000..8e0140d56 --- /dev/null +++ b/dbt-bigquery/.changes/unreleased/Under the Hood-20250603-114746.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Rename and separate out info schema and biglake catalog integrations +time: 2025-06-03T11:47:46.831408-07:00 +custom: + Author: colin-rogers-dbt + Issue: "1129" diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/__init__.py b/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/__init__.py index 137880308..0e8b66dee 100644 --- a/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/__init__.py +++ b/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/__init__.py @@ -1,4 +1,3 @@ -from dbt.adapters.bigquery.catalogs._generic import ( - BigQueryCatalogIntegration, - BigQueryCatalogRelation, -) +from dbt.adapters.bigquery.catalogs._biglake_metastore import BigLakeCatalogIntegration +from dbt.adapters.bigquery.catalogs._info_schema import BigQueryInfoSchemaCatalogIntegration +from dbt.adapters.bigquery.catalogs._relation import BigQueryCatalogRelation diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_biglake_metastore.py b/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_biglake_metastore.py new file mode 100644 index 000000000..3fb49f1b3 --- /dev/null +++ b/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_biglake_metastore.py @@ -0,0 +1,43 @@ +from typing import Optional + +from dbt.adapters.catalogs import CatalogIntegration +from dbt.adapters.contracts.relation import RelationConfig + +from dbt.adapters.bigquery import constants +from dbt.adapters.bigquery.catalogs._relation import BigQueryCatalogRelation + + +class BigLakeCatalogIntegration(CatalogIntegration): + catalog_type = constants.BIGLAKE_CATALOG_TYPE + allows_writes = True + + def build_relation(self, model: RelationConfig) -> BigQueryCatalogRelation: + """ + Args: + model: `config.model` (not `model`) from the jinja context + """ + + return BigQueryCatalogRelation( + catalog_type=self.catalog_type, + catalog_name=self.catalog_name, + table_format=self.table_format, + file_format=self.file_format, + external_volume=self.external_volume, + storage_uri=self._calculate_storage_uri(model), + ) + + def _calculate_storage_uri(self, model: RelationConfig) -> Optional[str]: + if not model.config: + return None + + if model_storage_uri := model.config.get("storage_uri"): + return model_storage_uri + + if not self.external_volume: + return None + + prefix = model.config.get("base_location_root") or "_dbt" + storage_uri = f"{self.external_volume}/{prefix}/{model.schema}/{model.name}" + if suffix := model.config.get("base_location_subpath"): + storage_uri = f"{storage_uri}/{suffix}" + return storage_uri diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_generic.py b/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_generic.py deleted file mode 100644 index 1aacf522c..000000000 --- a/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_generic.py +++ /dev/null @@ -1,50 +0,0 @@ -from dataclasses import dataclass -from typing import Optional - -from dbt.adapters.catalogs import CatalogIntegration -from dbt.adapters.contracts.relation import RelationConfig - -from dbt.adapters.bigquery import constants, parse_model - - -@dataclass -class BigQueryCatalogRelation: - catalog_type: str = constants.DEFAULT_INFO_SCHEMA_CATALOG.catalog_type - catalog_name: Optional[str] = constants.DEFAULT_INFO_SCHEMA_CATALOG.name - table_format: Optional[str] = constants.INFO_SCHEMA_TABLE_FORMAT - file_format: Optional[str] = constants.INFO_SCHEMA_FILE_FORMAT - external_volume: Optional[str] = None - - @property - def storage_uri(self) -> Optional[str]: - return self.external_volume - - @storage_uri.setter - def storage_uri(self, value: Optional[str]) -> None: - self.external_volume = value - - -class BigQueryCatalogIntegration(CatalogIntegration): - catalog_type = constants.GENERIC_CATALOG_TYPE - allows_writes = True - - @property - def storage_uri(self) -> Optional[str]: - return self.external_volume - - @storage_uri.setter - def storage_uri(self, value: Optional[str]) -> None: - self.external_volume = value - - def build_relation(self, model: RelationConfig) -> BigQueryCatalogRelation: - """ - Args: - model: `config.model` (not `model`) from the jinja context - """ - return BigQueryCatalogRelation( - catalog_type=self.catalog_type, - catalog_name=self.catalog_name, - table_format=self.table_format, - file_format=self.file_format, - external_volume=parse_model.storage_uri(model) or self.external_volume, - ) diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_info_schema.py b/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_info_schema.py new file mode 100644 index 000000000..b6aecf447 --- /dev/null +++ b/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_info_schema.py @@ -0,0 +1,25 @@ +from dbt.adapters.catalogs import CatalogIntegration +from dbt.adapters.contracts.relation import RelationConfig + +from dbt.adapters.bigquery import constants +from dbt.adapters.bigquery.catalogs._relation import BigQueryCatalogRelation + + +class BigQueryInfoSchemaCatalogIntegration(CatalogIntegration): + catalog_type = constants.DEFAULT_INFO_SCHEMA_CATALOG.catalog_type + allows_writes = True + + def build_relation(self, model: RelationConfig) -> BigQueryCatalogRelation: + """ + Args: + model: `config.model` (not `model`) from the jinja context + """ + + return BigQueryCatalogRelation( + catalog_type=self.catalog_type, + catalog_name=self.catalog_name, + table_format=self.table_format, + file_format=self.file_format, + external_volume=None, + storage_uri=None, + ) diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_relation.py b/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_relation.py new file mode 100644 index 000000000..f587052bf --- /dev/null +++ b/dbt-bigquery/src/dbt/adapters/bigquery/catalogs/_relation.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass +from typing import Optional + +from dbt.adapters.catalogs import CatalogRelation + +from dbt.adapters.bigquery import constants + + +@dataclass +class BigQueryCatalogRelation(CatalogRelation): + catalog_type: str = constants.DEFAULT_INFO_SCHEMA_CATALOG.catalog_type + catalog_name: Optional[str] = constants.DEFAULT_INFO_SCHEMA_CATALOG.name + table_format: Optional[str] = constants.INFO_SCHEMA_TABLE_FORMAT + file_format: Optional[str] = constants.INFO_SCHEMA_FILE_FORMAT + external_volume: Optional[str] = None + storage_uri: Optional[str] = None diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/constants.py b/dbt-bigquery/src/dbt/adapters/bigquery/constants.py index df4d5dca3..5a14d9689 100644 --- a/dbt-bigquery/src/dbt/adapters/bigquery/constants.py +++ b/dbt-bigquery/src/dbt/adapters/bigquery/constants.py @@ -12,22 +12,24 @@ PARQUET_FILE_FORMAT = "parquet" -GENERIC_CATALOG_TYPE = "generic" +BIGLAKE_CATALOG_TYPE = "biglake_metastore" DEFAULT_INFO_SCHEMA_CATALOG = SimpleNamespace( name="info_schema", catalog_name="info_schema", - catalog_type=GENERIC_CATALOG_TYPE, + catalog_type="INFO_SCHEMA", # these don't show up in BigQuery; this is a dbt convention table_format=INFO_SCHEMA_TABLE_FORMAT, external_volume=None, file_format=INFO_SCHEMA_FILE_FORMAT, + adapter_properties={}, ) DEFAULT_ICEBERG_CATALOG = SimpleNamespace( name="managed_iceberg", catalog_name="managed_iceberg", - catalog_type=GENERIC_CATALOG_TYPE, + catalog_type=BIGLAKE_CATALOG_TYPE, table_format=ICEBERG_TABLE_FORMAT, external_volume=None, file_format=PARQUET_FILE_FORMAT, + adapter_properties={}, ) diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/impl.py b/dbt-bigquery/src/dbt/adapters/bigquery/impl.py index cda412e42..50cbfebe5 100644 --- a/dbt-bigquery/src/dbt/adapters/bigquery/impl.py +++ b/dbt-bigquery/src/dbt/adapters/bigquery/impl.py @@ -57,7 +57,8 @@ from dbt.adapters.bigquery import constants, parse_model from dbt.adapters.bigquery.catalogs import ( - BigQueryCatalogIntegration, + BigLakeCatalogIntegration, + BigQueryInfoSchemaCatalogIntegration, BigQueryCatalogRelation, ) from dbt.adapters.bigquery.column import BigQueryColumn, get_nested_column_data_types @@ -136,7 +137,7 @@ class BigQueryAdapter(BaseAdapter): AdapterSpecificConfigs = BigqueryConfig - CATALOG_INTEGRATIONS = [BigQueryCatalogIntegration] + CATALOG_INTEGRATIONS = [BigLakeCatalogIntegration, BigQueryInfoSchemaCatalogIntegration] CONSTRAINT_SUPPORT = { ConstraintType.check: ConstraintSupport.NOT_SUPPORTED, ConstraintType.not_null: ConstraintSupport.ENFORCED, @@ -816,7 +817,7 @@ def get_table_options( if catalog_relation.table_format == constants.ICEBERG_TABLE_FORMAT: opts["table_format"] = f"'{catalog_relation.table_format}'" opts["file_format"] = f"'{catalog_relation.file_format}'" - opts["storage_uri"] = f"'{catalog_relation.external_volume}'" + opts["storage_uri"] = f"'{catalog_relation.storage_uri}'" return opts diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/parse_model.py b/dbt-bigquery/src/dbt/adapters/bigquery/parse_model.py index 83ae22693..a843d6dec 100644 --- a/dbt-bigquery/src/dbt/adapters/bigquery/parse_model.py +++ b/dbt-bigquery/src/dbt/adapters/bigquery/parse_model.py @@ -19,7 +19,3 @@ def catalog_name(model: RelationConfig) -> Optional[str]: return _catalog return constants.DEFAULT_INFO_SCHEMA_CATALOG.name - - -def storage_uri(model: RelationConfig) -> Optional[str]: - return model.config.get("storage_uri") if model.config else None diff --git a/dbt-bigquery/tests/functional/adapter/catalog_integrations/test_catalog_integration.py b/dbt-bigquery/tests/functional/adapter/catalog_integrations/test_catalog_integration.py new file mode 100644 index 000000000..d8e78b78e --- /dev/null +++ b/dbt-bigquery/tests/functional/adapter/catalog_integrations/test_catalog_integration.py @@ -0,0 +1,67 @@ +import os +from datetime import datetime as dt +import pytest +from dbt.tests.adapter.catalog_integrations.test_catalog_integration import ( + BaseCatalogIntegrationValidation, +) +from dbt.tests.util import run_dbt + +_BQ_BUCKET = f"gs://{os.getenv('BIGQUERY_TEST_ICEBERG_BUCKET')}" +_STATIC_URI = f"{_BQ_BUCKET}/{str(dt.now())}" + +MODEL__BASIC_ICEBERG_TABLE = """ + {{ config(materialized='table', catalog='basic_iceberg_catalog') }} + select 1 as id + """ + +MODEL__SPECIFY_LOCATION_TABLE = """ + {{ config(materialized='table', catalog='basic_iceberg_catalog', + base_location_root='custom_location') }} + select 1 as id + """ + +MODEL__SPECIFY_URI_TABLE = ( + """ + {{ config(materialized='table', catalog='basic_iceberg_catalog', + storage_uri='""" + + _STATIC_URI + + """') }} + select 1 as id + """ +) + + +class TestGenericCatalogIntegration(BaseCatalogIntegrationValidation): + + @pytest.fixture(scope="class") + def catalogs(self): + return { + "catalogs": [ + { + "name": "basic_iceberg_catalog", + "active_write_integration": "basic_iceberg_catalog_integration", + "write_integrations": [ + { + "name": "basic_iceberg_catalog_integration", + "catalog_type": "biglake_metastore", + "file_format": "parquet", + "table_format": "iceberg", + "external_volume": _BQ_BUCKET, + } + ], + }, + ] + } + + @pytest.fixture(scope="class") + def models(self): + return { + "models": { + "basic_iceberg_table.sql": MODEL__BASIC_ICEBERG_TABLE, + "specify_location_table.sql": MODEL__SPECIFY_LOCATION_TABLE, + "specify_uri_table.sql": MODEL__SPECIFY_URI_TABLE, + } + } + + def test_basic_iceberg_catalog_integration(self, project): + run_dbt(["run"]) diff --git a/dbt-bigquery/tests/unit/test_catalog_integrations.py b/dbt-bigquery/tests/unit/test_catalog_integrations.py new file mode 100644 index 000000000..d9e584590 --- /dev/null +++ b/dbt-bigquery/tests/unit/test_catalog_integrations.py @@ -0,0 +1,70 @@ +import unittest +from types import SimpleNamespace +from unittest.mock import MagicMock +from dbt.adapters.bigquery.catalogs import BigLakeCatalogIntegration +from dbt.adapters.contracts.relation import RelationConfig + + +class TestBigLakeCatalogIntegration(unittest.TestCase): + def setUp(self): + self.integration = BigLakeCatalogIntegration( + config=SimpleNamespace( + name="test_biglake_catalog_integration", + external_volume="test_external_volume", + catalog_type="biglake", + catalog_name="test_catalog_name", + table_format="test_table_format", + file_format="test_file_format", + ) + ) + self.integration.external_volume = "test_external_volume" + + def test_storage_uri_no_inputs(self): + model = MagicMock(spec=RelationConfig) + model.config = {"has": "a_value"} + model.schema = "test_schema" + model.name = "test_model_name" + + expected_uri = "test_external_volume/_dbt/test_schema/test_model_name" + result = self.integration._calculate_storage_uri(model) + self.assertEqual(expected_uri, result) + + def test_storage_uri_base_location_root(self): + model = MagicMock(spec=RelationConfig) + model.config = {"base_location_root": "foo"} + model.schema = "test_schema" + model.name = "test_model_name" + + expected_uri = "test_external_volume/foo/test_schema/test_model_name" + result = self.integration._calculate_storage_uri(model) + self.assertEqual(expected_uri, result) + + def test_storage_uri_base_location_subpath(self): + model = MagicMock(spec=RelationConfig) + model.config = {"base_location_subpath": "bar"} + model.schema = "test_schema" + model.name = "test_model_name" + + expected_uri = "test_external_volume/_dbt/test_schema/test_model_name/bar" + result = self.integration._calculate_storage_uri(model) + self.assertEqual(expected_uri, result) + + def test_storage_uri_base_location_root_and_subpath(self): + model = MagicMock(spec=RelationConfig) + model.config = {"base_location_root": "foo", "base_location_subpath": "bar"} + model.schema = "test_schema" + model.name = "test_model_name" + + expected_uri = "test_external_volume/foo/test_schema/test_model_name/bar" + result = self.integration._calculate_storage_uri(model) + self.assertEqual(expected_uri, result) + + def test_storage_uri_from_model_config(self): + model = MagicMock(spec=RelationConfig) + model.config = {"storage_uri": "custom_storage_uri"} + model.schema = "test_schema" + model.name = "test_model_name" + + expected_uri = "custom_storage_uri" + result = self.integration._calculate_storage_uri(model) + self.assertEqual(expected_uri, result)