diff --git a/dbt-bigquery/.changes/unreleased/Features-20251015-174512.yaml b/dbt-bigquery/.changes/unreleased/Features-20251015-174512.yaml new file mode 100644 index 000000000..2f85c697f --- /dev/null +++ b/dbt-bigquery/.changes/unreleased/Features-20251015-174512.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add BigQuery dataset replication configuration support +time: 2025-10-15T17:45:12.24332-04:00 +custom: + Author: gavin-burns-US + Issue: "1396" diff --git a/dbt-bigquery/.changes/unreleased/Fixes-20251009-141523.yaml b/dbt-bigquery/.changes/unreleased/Fixes-20251009-141523.yaml deleted file mode 100644 index 37448fc7f..000000000 --- a/dbt-bigquery/.changes/unreleased/Fixes-20251009-141523.yaml +++ /dev/null @@ -1,7 +0,0 @@ -kind: Fixes -body: Fix BigFrames python model + oauth + notebook template by adding a Request to - creds.refresh calll -time: 2025-10-09T14:15:23.578214-07:00 -custom: - Author: colin-rogers-dbt - Issue: "1379" diff --git a/dbt-bigquery/hatch.toml b/dbt-bigquery/hatch.toml index b6d2667b0..00a8c69db 100644 --- a/dbt-bigquery/hatch.toml +++ b/dbt-bigquery/hatch.toml @@ -34,7 +34,7 @@ setup = [ ] code-quality = "pre-commit run --all-files" unit-tests = "python -m pytest {args:tests/unit}" -integration-tests = "python -m pytest --profile service_account {args:tests/functional}" +integration-tests = "python -m pytest --profile service_account --log-level=${LOG_LEVEL:-DEBUG} {args:tests/functional}" docker-dev = [ "docker build -f docker/dev.Dockerfile -t dbt-bigquery-dev .", "docker run --rm -it --name dbt-bigquery-dev -v $(shell pwd):/opt/code dbt-bigquery-dev", diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/connections.py b/dbt-bigquery/src/dbt/adapters/bigquery/connections.py index 3092a7e50..343530299 100644 --- a/dbt-bigquery/src/dbt/adapters/bigquery/connections.py +++ b/dbt-bigquery/src/dbt/adapters/bigquery/connections.py @@ -554,16 +554,46 @@ def drop_dataset(self, database, schema) -> None: retry=self._retry.create_reopen_with_deadline(conn), ) - def create_dataset(self, database, schema) -> Dataset: + def create_dataset( + self, database, schema, dataset_replicas=None, primary_replica=None + ) -> Dataset: conn = self.get_thread_connection() client: Client = conn.handle with self.exception_handler("create dataset"): - return client.create_dataset( + dataset = client.create_dataset( dataset=self.dataset_ref(database, schema), exists_ok=True, retry=self._retry.create_reopen_with_deadline(conn), ) + # Apply replication configuration if specified and valid + if dataset_replicas: + # Basic validation to avoid malformed configs triggering DDL + if not isinstance(dataset_replicas, list) or not all( + isinstance(loc, str) and loc for loc in dataset_replicas + ): + logger.warning( + f"Invalid dataset_replicas for {database}.{schema}: {dataset_replicas}. Skipping replication configuration." + ) + return dataset + if primary_replica is not None and not isinstance(primary_replica, str): + logger.warning( + f"Invalid primary_replica for {database}.{schema}: {primary_replica}. Skipping primary configuration." + ) + primary_replica = None + + from dbt.adapters.bigquery.dataset import apply_dataset_replication + + apply_dataset_replication( + client=client, + project=database, + dataset=schema, + desired_replicas=dataset_replicas, + desired_primary=primary_replica, + ) + + return dataset + def list_dataset(self, database: str): # The database string we get here is potentially quoted. # Strip that off for the API call. diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/dataset.py b/dbt-bigquery/src/dbt/adapters/bigquery/dataset.py index a4504294a..0564291d8 100644 --- a/dbt-bigquery/src/dbt/adapters/bigquery/dataset.py +++ b/dbt-bigquery/src/dbt/adapters/bigquery/dataset.py @@ -1,6 +1,7 @@ -from typing import List +from typing import Dict, List, Optional, Any -from google.cloud.bigquery import AccessEntry, Dataset +from google.cloud.bigquery import AccessEntry, Client, Dataset +from google.api_core import exceptions as google_exceptions from dbt.adapters.events.logging import AdapterLogger @@ -45,3 +46,112 @@ def add_access_entry_to_dataset(dataset: Dataset, access_entry: AccessEntry) -> access_entries.append(access_entry) dataset.access_entries = access_entries return dataset + + +def get_dataset_replication_config(client: Client, project: str, dataset: str) -> Dict[str, Any]: + """Query current replication configuration from INFORMATION_SCHEMA.""" + # Query the dataset-scoped INFORMATION_SCHEMA; no extra WHERE needed. + query = ( + f"SELECT replica_location, is_primary_replica " + f"FROM `{project}.{dataset}.INFORMATION_SCHEMA.SCHEMATA_REPLICAS`" + ) + try: + result_iter = client.query(query).result() + replicas: List[str] = [] + primary: Optional[str] = None + for row in result_iter: + replicas.append(row.replica_location) + if row.is_primary_replica: + primary = row.replica_location + return {"replicas": replicas, "primary": primary} + except ( + google_exceptions.NotFound, + google_exceptions.BadRequest, + google_exceptions.GoogleAPIError, + ) as exc: + logger.warning(f"Unable to fetch replication info for `{project}.{dataset}`: {exc}") + return {"replicas": [], "primary": None} + + +def needs_replication_update( + current_config: Dict[str, Any], + desired_replicas: List[str], + desired_primary: Optional[str] = None, +) -> bool: + """Determine if replication configuration needs to be updated. + + Args: + current_config (Dict[str, Any]): Current config from get_dataset_replication_config + desired_replicas (List[str]): Desired replica locations + desired_primary (Optional[str]): Desired primary replica location + + Returns: + bool: True if update is needed, False otherwise + """ + current_replicas = set(current_config.get("replicas", [])) + desired_replicas_set = set(desired_replicas) + + if current_replicas != desired_replicas_set: + return True + + return bool(desired_primary and current_config.get("primary") != desired_primary) + + +def apply_dataset_replication( + client: Client, + project: str, + dataset: str, + desired_replicas: List[str], + desired_primary: Optional[str] = None, +) -> None: + """Apply replication configuration using ALTER SCHEMA DDL.""" + current = get_dataset_replication_config(client, project, dataset) + + if not needs_replication_update(current, desired_replicas, desired_primary): + logger.debug(f"Dataset {project}.{dataset} replication already configured correctly") + return + + logger.info(f"Configuring replication for dataset {project}.{dataset}") + + current_replicas = set(current.get("replicas", [])) + desired_replicas_set = set(desired_replicas) + + # Add new replicas + to_add = desired_replicas_set - current_replicas + for location in to_add: + sql = f"ALTER SCHEMA `{project}.{dataset}` ADD REPLICA `{location}`" + logger.info(f"Adding replica: {location}") + try: + client.query(sql).result() + except google_exceptions.GoogleAPIError as e: + # Ignore "already exists", warn otherwise + if "already exists" not in str(e).lower(): + logger.warning(f"Failed to add replica {location}: {e}") + + # Remove old replicas + to_remove = current_replicas - desired_replicas_set + for location in to_remove: + sql = f"ALTER SCHEMA `{project}.{dataset}` DROP REPLICA `{location}`" + logger.info(f"Dropping replica: {location}") + try: + client.query(sql).result() + except google_exceptions.GoogleAPIError as e: + logger.warning(f"Failed to drop replica {location}: {e}") + + # Set primary replica if specified and different + if desired_primary: + if desired_primary not in desired_replicas_set: + logger.warning( + f"Desired primary replica '{desired_primary}' is not in desired replicas {sorted(desired_replicas_set)}. " + "Skipping setting primary replica." + ) + elif current.get("primary") != desired_primary: + sql = ( + f"ALTER SCHEMA `{project}.{dataset}` " + f"SET OPTIONS (default_replica = `{desired_primary}`)" + ) + logger.info(f"Setting primary replica: {desired_primary}") + try: + client.query(sql).result() + except google_exceptions.GoogleAPIError as e: + logger.warning(f"Failed to set primary replica '{desired_primary}': {e}") diff --git a/dbt-bigquery/src/dbt/adapters/bigquery/impl.py b/dbt-bigquery/src/dbt/adapters/bigquery/impl.py index 8efc62ab5..035fdd359 100644 --- a/dbt-bigquery/src/dbt/adapters/bigquery/impl.py +++ b/dbt-bigquery/src/dbt/adapters/bigquery/impl.py @@ -122,6 +122,8 @@ class BigqueryConfig(AdapterConfig): submission_method: Optional[str] = None notebook_template_id: Optional[str] = None enable_change_history: Optional[bool] = None + dataset_replicas: Optional[List[str]] = None + primary_replica: Optional[str] = None class BigQueryAdapter(BaseAdapter): @@ -344,6 +346,34 @@ def create_schema(self, relation: BigQueryRelation) -> None: # we can't update the cache here, as if the schema already existed we # don't want to (incorrectly) say that it's empty + @available.parse_none + def create_dataset_with_replication( + self, + relation: BigQueryRelation, + dataset_replicas: Optional[List[str]] = None, + primary_replica: Optional[str] = None, + ) -> None: + """Create dataset and apply replication configuration if specified. + + This method is called from the bigquery__create_schema macro to handle + both dataset creation and replication configuration. + + Args: + relation: The relation representing the schema/dataset + dataset_replicas: Optional list of replica locations + primary_replica: Optional primary replica location + """ + database = relation.database + schema = relation.schema + + # Create the dataset (this handles exists_ok internally) + self.connections.create_dataset( + database=database, + schema=schema, + dataset_replicas=dataset_replicas, + primary_replica=primary_replica, + ) + def drop_schema(self, relation: BigQueryRelation) -> None: # still use a client method, rather than SQL 'drop schema ... cascade' database = relation.database diff --git a/dbt-bigquery/src/dbt/include/bigquery/macros/adapters/schema.sql b/dbt-bigquery/src/dbt/include/bigquery/macros/adapters/schema.sql new file mode 100644 index 000000000..8182f49a6 --- /dev/null +++ b/dbt-bigquery/src/dbt/include/bigquery/macros/adapters/schema.sql @@ -0,0 +1,23 @@ +{% macro bigquery__create_schema(relation) -%} + {%- set dataset_replicas = config.get('dataset_replicas') -%} + {%- set primary_replica = config.get('primary_replica') -%} + + {# Normalize dataset_replicas to a list of strings #} + {%- if dataset_replicas is string -%} + {# Allow comma-separated strings #} + {%- set dataset_replicas = dataset_replicas.split(',') | map('trim') | list -%} + {%- endif -%} + {%- if dataset_replicas is not none and dataset_replicas is not sequence -%} + {{ log("Invalid dataset_replicas config; expected list or comma-separated string. Skipping replication.", info=True) }} + {%- set dataset_replicas = none -%} + {%- endif -%} + + {%- if dataset_replicas -%} + {{ log("Configuring dataset " ~ relation.schema ~ " with replicas: " ~ dataset_replicas | join(', '), info=True) }} + {%- if primary_replica -%} + {{ log(" Primary replica: " ~ primary_replica, info=True) }} + {%- endif -%} + {%- endif -%} + + {% do adapter.create_dataset_with_replication(relation, dataset_replicas, primary_replica) %} +{% endmacro %} diff --git a/dbt-bigquery/tests/unit/test_dataset.py b/dbt-bigquery/tests/unit/test_dataset.py index 750cc139f..d37bbe2da 100644 --- a/dbt-bigquery/tests/unit/test_dataset.py +++ b/dbt-bigquery/tests/unit/test_dataset.py @@ -1,4 +1,8 @@ -from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset +from dbt.adapters.bigquery.dataset import ( + add_access_entry_to_dataset, + is_access_entry_in_dataset, + needs_replication_update, +) from dbt.adapters.bigquery import BigQueryRelation from google.cloud.bigquery import Dataset, AccessEntry, DatasetReference @@ -88,3 +92,35 @@ def test_is_access_entry_in_dataset_returns_false_if_entry_not_in_dataset(): dataset = Dataset(dataset_ref) access_entry = AccessEntry(None, "table", entity) assert not is_access_entry_in_dataset(dataset, access_entry) + + +def test_needs_replication_update_returns_true_when_replicas_differ(): + current_config = {"replicas": ["us-east1", "us-west1"], "primary": None} + desired_replicas = ["us-east1", "us-west1", "europe-west1"] + assert needs_replication_update(current_config, desired_replicas) + + +def test_needs_replication_update_returns_true_when_primary_differs(): + current_config = {"replicas": ["us-east1", "us-west1"], "primary": "us-east1"} + desired_replicas = ["us-east1", "us-west1"] + desired_primary = "us-west1" + assert needs_replication_update(current_config, desired_replicas, desired_primary) + + +def test_needs_replication_update_returns_false_when_config_matches(): + current_config = {"replicas": ["us-east1", "us-west1"], "primary": "us-east1"} + desired_replicas = ["us-east1", "us-west1"] + desired_primary = "us-east1" + assert not needs_replication_update(current_config, desired_replicas, desired_primary) + + +def test_needs_replication_update_returns_false_when_replicas_match_no_primary(): + current_config = {"replicas": ["us-east1", "us-west1"], "primary": None} + desired_replicas = ["us-east1", "us-west1"] + assert not needs_replication_update(current_config, desired_replicas) + + +def test_needs_replication_update_handles_empty_current_config(): + current_config = {"replicas": [], "primary": None} + desired_replicas = ["us-east1"] + assert needs_replication_update(current_config, desired_replicas)