Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add BigQuery dataset replication configuration support
time: 2025-10-15T17:45:12.24332-04:00
custom:
Author: gavin-burns-US
Issue: "1396"
7 changes: 0 additions & 7 deletions dbt-bigquery/.changes/unreleased/Fixes-20251009-141523.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion dbt-bigquery/hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ setup = [
]
code-quality = "pre-commit run --all-files"
unit-tests = "python -m pytest {args:tests/unit}"
integration-tests = "python -m pytest --profile service_account {args:tests/functional}"
integration-tests = "python -m pytest --profile service_account --log-level=${LOG_LEVEL:-DEBUG} {args:tests/functional}"
docker-dev = [
"docker build -f docker/dev.Dockerfile -t dbt-bigquery-dev .",
"docker run --rm -it --name dbt-bigquery-dev -v $(shell pwd):/opt/code dbt-bigquery-dev",
Expand Down
34 changes: 32 additions & 2 deletions dbt-bigquery/src/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,16 +554,46 @@ def drop_dataset(self, database, schema) -> None:
retry=self._retry.create_reopen_with_deadline(conn),
)

def create_dataset(self, database, schema) -> Dataset:
def create_dataset(
self, database, schema, dataset_replicas=None, primary_replica=None
) -> Dataset:
conn = self.get_thread_connection()
client: Client = conn.handle
with self.exception_handler("create dataset"):
return client.create_dataset(
dataset = client.create_dataset(
dataset=self.dataset_ref(database, schema),
exists_ok=True,
retry=self._retry.create_reopen_with_deadline(conn),
)

# Apply replication configuration if specified and valid
if dataset_replicas:
# Basic validation to avoid malformed configs triggering DDL
if not isinstance(dataset_replicas, list) or not all(
isinstance(loc, str) and loc for loc in dataset_replicas
):
logger.warning(
f"Invalid dataset_replicas for {database}.{schema}: {dataset_replicas}. Skipping replication configuration."
)
return dataset
if primary_replica is not None and not isinstance(primary_replica, str):
logger.warning(
f"Invalid primary_replica for {database}.{schema}: {primary_replica}. Skipping primary configuration."
)
primary_replica = None

from dbt.adapters.bigquery.dataset import apply_dataset_replication

apply_dataset_replication(
client=client,
project=database,
dataset=schema,
desired_replicas=dataset_replicas,
desired_primary=primary_replica,
)

return dataset

def list_dataset(self, database: str):
# The database string we get here is potentially quoted.
# Strip that off for the API call.
Expand Down
114 changes: 112 additions & 2 deletions dbt-bigquery/src/dbt/adapters/bigquery/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List
from typing import Dict, List, Optional, Any

from google.cloud.bigquery import AccessEntry, Dataset
from google.cloud.bigquery import AccessEntry, Client, Dataset
from google.api_core import exceptions as google_exceptions

from dbt.adapters.events.logging import AdapterLogger

Expand Down Expand Up @@ -45,3 +46,112 @@ def add_access_entry_to_dataset(dataset: Dataset, access_entry: AccessEntry) ->
access_entries.append(access_entry)
dataset.access_entries = access_entries
return dataset


def get_dataset_replication_config(client: Client, project: str, dataset: str) -> Dict[str, Any]:
"""Query current replication configuration from INFORMATION_SCHEMA."""
# Query the dataset-scoped INFORMATION_SCHEMA; no extra WHERE needed.
query = (
f"SELECT replica_location, is_primary_replica "
f"FROM `{project}.{dataset}.INFORMATION_SCHEMA.SCHEMATA_REPLICAS`"
)
try:
result_iter = client.query(query).result()
replicas: List[str] = []
primary: Optional[str] = None
for row in result_iter:
replicas.append(row.replica_location)
if row.is_primary_replica:
primary = row.replica_location
return {"replicas": replicas, "primary": primary}
except (
google_exceptions.NotFound,
google_exceptions.BadRequest,
google_exceptions.GoogleAPIError,
) as exc:
logger.warning(f"Unable to fetch replication info for `{project}.{dataset}`: {exc}")
return {"replicas": [], "primary": None}


def needs_replication_update(
current_config: Dict[str, Any],
desired_replicas: List[str],
desired_primary: Optional[str] = None,
) -> bool:
"""Determine if replication configuration needs to be updated.
Args:
current_config (Dict[str, Any]): Current config from get_dataset_replication_config
desired_replicas (List[str]): Desired replica locations
desired_primary (Optional[str]): Desired primary replica location
Returns:
bool: True if update is needed, False otherwise
"""
current_replicas = set(current_config.get("replicas", []))
desired_replicas_set = set(desired_replicas)

if current_replicas != desired_replicas_set:
return True

return bool(desired_primary and current_config.get("primary") != desired_primary)


def apply_dataset_replication(
client: Client,
project: str,
dataset: str,
desired_replicas: List[str],
desired_primary: Optional[str] = None,
) -> None:
"""Apply replication configuration using ALTER SCHEMA DDL."""
current = get_dataset_replication_config(client, project, dataset)

if not needs_replication_update(current, desired_replicas, desired_primary):
logger.debug(f"Dataset {project}.{dataset} replication already configured correctly")
return

logger.info(f"Configuring replication for dataset {project}.{dataset}")

current_replicas = set(current.get("replicas", []))
desired_replicas_set = set(desired_replicas)

# Add new replicas
to_add = desired_replicas_set - current_replicas
for location in to_add:
sql = f"ALTER SCHEMA `{project}.{dataset}` ADD REPLICA `{location}`"
logger.info(f"Adding replica: {location}")
try:
client.query(sql).result()
except google_exceptions.GoogleAPIError as e:
# Ignore "already exists", warn otherwise
if "already exists" not in str(e).lower():
logger.warning(f"Failed to add replica {location}: {e}")

# Remove old replicas
to_remove = current_replicas - desired_replicas_set
for location in to_remove:
sql = f"ALTER SCHEMA `{project}.{dataset}` DROP REPLICA `{location}`"
logger.info(f"Dropping replica: {location}")
try:
client.query(sql).result()
except google_exceptions.GoogleAPIError as e:
logger.warning(f"Failed to drop replica {location}: {e}")

# Set primary replica if specified and different
if desired_primary:
if desired_primary not in desired_replicas_set:
logger.warning(
f"Desired primary replica '{desired_primary}' is not in desired replicas {sorted(desired_replicas_set)}. "
"Skipping setting primary replica."
)
elif current.get("primary") != desired_primary:
sql = (
f"ALTER SCHEMA `{project}.{dataset}` "
f"SET OPTIONS (default_replica = `{desired_primary}`)"
)
logger.info(f"Setting primary replica: {desired_primary}")
try:
client.query(sql).result()
except google_exceptions.GoogleAPIError as e:
logger.warning(f"Failed to set primary replica '{desired_primary}': {e}")
30 changes: 30 additions & 0 deletions dbt-bigquery/src/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class BigqueryConfig(AdapterConfig):
submission_method: Optional[str] = None
notebook_template_id: Optional[str] = None
enable_change_history: Optional[bool] = None
dataset_replicas: Optional[List[str]] = None
primary_replica: Optional[str] = None


class BigQueryAdapter(BaseAdapter):
Expand Down Expand Up @@ -344,6 +346,34 @@ def create_schema(self, relation: BigQueryRelation) -> None:
# we can't update the cache here, as if the schema already existed we
# don't want to (incorrectly) say that it's empty

@available.parse_none
def create_dataset_with_replication(
self,
relation: BigQueryRelation,
dataset_replicas: Optional[List[str]] = None,
primary_replica: Optional[str] = None,
) -> None:
"""Create dataset and apply replication configuration if specified.
This method is called from the bigquery__create_schema macro to handle
both dataset creation and replication configuration.
Args:
relation: The relation representing the schema/dataset
dataset_replicas: Optional list of replica locations
primary_replica: Optional primary replica location
"""
database = relation.database
schema = relation.schema

# Create the dataset (this handles exists_ok internally)
self.connections.create_dataset(
database=database,
schema=schema,
dataset_replicas=dataset_replicas,
primary_replica=primary_replica,
)

def drop_schema(self, relation: BigQueryRelation) -> None:
# still use a client method, rather than SQL 'drop schema ... cascade'
database = relation.database
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{% macro bigquery__create_schema(relation) -%}
{%- set dataset_replicas = config.get('dataset_replicas') -%}
{%- set primary_replica = config.get('primary_replica') -%}

{# Normalize dataset_replicas to a list of strings #}
{%- if dataset_replicas is string -%}
{# Allow comma-separated strings #}
{%- set dataset_replicas = dataset_replicas.split(',') | map('trim') | list -%}
{%- endif -%}
{%- if dataset_replicas is not none and dataset_replicas is not sequence -%}
{{ log("Invalid dataset_replicas config; expected list or comma-separated string. Skipping replication.", info=True) }}
{%- set dataset_replicas = none -%}
{%- endif -%}

{%- if dataset_replicas -%}
{{ log("Configuring dataset " ~ relation.schema ~ " with replicas: " ~ dataset_replicas | join(', '), info=True) }}
{%- if primary_replica -%}
{{ log(" Primary replica: " ~ primary_replica, info=True) }}
{%- endif -%}
{%- endif -%}

{% do adapter.create_dataset_with_replication(relation, dataset_replicas, primary_replica) %}
{% endmacro %}
38 changes: 37 additions & 1 deletion dbt-bigquery/tests/unit/test_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
from dbt.adapters.bigquery.dataset import (
add_access_entry_to_dataset,
is_access_entry_in_dataset,
needs_replication_update,
)
from dbt.adapters.bigquery import BigQueryRelation

from google.cloud.bigquery import Dataset, AccessEntry, DatasetReference
Expand Down Expand Up @@ -88,3 +92,35 @@ def test_is_access_entry_in_dataset_returns_false_if_entry_not_in_dataset():
dataset = Dataset(dataset_ref)
access_entry = AccessEntry(None, "table", entity)
assert not is_access_entry_in_dataset(dataset, access_entry)


def test_needs_replication_update_returns_true_when_replicas_differ():
current_config = {"replicas": ["us-east1", "us-west1"], "primary": None}
desired_replicas = ["us-east1", "us-west1", "europe-west1"]
assert needs_replication_update(current_config, desired_replicas)


def test_needs_replication_update_returns_true_when_primary_differs():
current_config = {"replicas": ["us-east1", "us-west1"], "primary": "us-east1"}
desired_replicas = ["us-east1", "us-west1"]
desired_primary = "us-west1"
assert needs_replication_update(current_config, desired_replicas, desired_primary)


def test_needs_replication_update_returns_false_when_config_matches():
current_config = {"replicas": ["us-east1", "us-west1"], "primary": "us-east1"}
desired_replicas = ["us-east1", "us-west1"]
desired_primary = "us-east1"
assert not needs_replication_update(current_config, desired_replicas, desired_primary)


def test_needs_replication_update_returns_false_when_replicas_match_no_primary():
current_config = {"replicas": ["us-east1", "us-west1"], "primary": None}
desired_replicas = ["us-east1", "us-west1"]
assert not needs_replication_update(current_config, desired_replicas)


def test_needs_replication_update_handles_empty_current_config():
current_config = {"replicas": [], "primary": None}
desired_replicas = ["us-east1"]
assert needs_replication_update(current_config, desired_replicas)
Loading