Skip to content

Commit 4f69a98

Browse files
Feature/dbt bigquery/dataset replication add
add native BigQuery dataset replication configuration support to dbt, allowing users to configure replicas directly in dbt_project.yml, schema files, or model configs without custom hooks.
1 parent 6f89d7c commit 4f69a98

File tree

5 files changed

+234
-5
lines changed

5 files changed

+234
-5
lines changed

dbt-bigquery/src/dbt/adapters/bigquery/connections.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -554,16 +554,46 @@ def drop_dataset(self, database, schema) -> None:
554554
retry=self._retry.create_reopen_with_deadline(conn),
555555
)
556556

557-
def create_dataset(self, database, schema) -> Dataset:
557+
def create_dataset(
558+
self, database, schema, dataset_replicas=None, primary_replica=None
559+
) -> Dataset:
558560
conn = self.get_thread_connection()
559561
client: Client = conn.handle
560562
with self.exception_handler("create dataset"):
561-
return client.create_dataset(
563+
dataset = client.create_dataset(
562564
dataset=self.dataset_ref(database, schema),
563565
exists_ok=True,
564566
retry=self._retry.create_reopen_with_deadline(conn),
565567
)
566568

569+
# Apply replication configuration if specified and valid
570+
if dataset_replicas:
571+
# Basic validation to avoid malformed configs triggering DDL
572+
if not isinstance(dataset_replicas, list) or not all(
573+
isinstance(loc, str) and loc for loc in dataset_replicas
574+
):
575+
logger.warning(
576+
f"Invalid dataset_replicas for {database}.{schema}: {dataset_replicas}. Skipping replication configuration."
577+
)
578+
return dataset
579+
if primary_replica is not None and not isinstance(primary_replica, str):
580+
logger.warning(
581+
f"Invalid primary_replica for {database}.{schema}: {primary_replica}. Skipping primary configuration."
582+
)
583+
primary_replica = None
584+
585+
from dbt.adapters.bigquery.dataset import apply_dataset_replication
586+
587+
apply_dataset_replication(
588+
client=client,
589+
project=database,
590+
dataset=schema,
591+
desired_replicas=dataset_replicas,
592+
desired_primary=primary_replica,
593+
)
594+
595+
return dataset
596+
567597
def list_dataset(self, database: str):
568598
# The database string we get here is potentially quoted.
569599
# Strip that off for the API call.

dbt-bigquery/src/dbt/adapters/bigquery/dataset.py

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
from typing import List
1+
from typing import Dict, List, Optional, Any
22

3-
from google.cloud.bigquery import AccessEntry, Dataset
3+
from google.cloud.bigquery import AccessEntry, Client, Dataset
4+
from google.api_core import exceptions as google_exceptions
45

56
from dbt.adapters.events.logging import AdapterLogger
67

@@ -45,3 +46,112 @@ def add_access_entry_to_dataset(dataset: Dataset, access_entry: AccessEntry) ->
4546
access_entries.append(access_entry)
4647
dataset.access_entries = access_entries
4748
return dataset
49+
50+
51+
def get_dataset_replication_config(client: Client, project: str, dataset: str) -> Dict[str, Any]:
52+
"""Query current replication configuration from INFORMATION_SCHEMA."""
53+
# Query the dataset-scoped INFORMATION_SCHEMA; no extra WHERE needed.
54+
query = (
55+
f"SELECT replica_location, is_primary_replica "
56+
f"FROM `{project}.{dataset}.INFORMATION_SCHEMA.SCHEMATA_REPLICAS`"
57+
)
58+
try:
59+
result_iter = client.query(query).result()
60+
replicas: List[str] = []
61+
primary: Optional[str] = None
62+
for row in result_iter:
63+
replicas.append(row.replica_location)
64+
if row.is_primary_replica:
65+
primary = row.replica_location
66+
return {"replicas": replicas, "primary": primary}
67+
except (
68+
google_exceptions.NotFound,
69+
google_exceptions.BadRequest,
70+
google_exceptions.GoogleAPIError,
71+
) as exc:
72+
logger.warning(f"Unable to fetch replication info for `{project}.{dataset}`: {exc}")
73+
return {"replicas": [], "primary": None}
74+
75+
76+
def needs_replication_update(
77+
current_config: Dict[str, Any],
78+
desired_replicas: List[str],
79+
desired_primary: Optional[str] = None,
80+
) -> bool:
81+
"""Determine if replication configuration needs to be updated.
82+
83+
Args:
84+
current_config (Dict[str, Any]): Current config from get_dataset_replication_config
85+
desired_replicas (List[str]): Desired replica locations
86+
desired_primary (Optional[str]): Desired primary replica location
87+
88+
Returns:
89+
bool: True if update is needed, False otherwise
90+
"""
91+
current_replicas = set(current_config.get("replicas", []))
92+
desired_replicas_set = set(desired_replicas)
93+
94+
if current_replicas != desired_replicas_set:
95+
return True
96+
97+
return bool(desired_primary and current_config.get("primary") != desired_primary)
98+
99+
100+
def apply_dataset_replication(
101+
client: Client,
102+
project: str,
103+
dataset: str,
104+
desired_replicas: List[str],
105+
desired_primary: Optional[str] = None,
106+
) -> None:
107+
"""Apply replication configuration using ALTER SCHEMA DDL."""
108+
current = get_dataset_replication_config(client, project, dataset)
109+
110+
if not needs_replication_update(current, desired_replicas, desired_primary):
111+
logger.debug(f"Dataset {project}.{dataset} replication already configured correctly")
112+
return
113+
114+
logger.info(f"Configuring replication for dataset {project}.{dataset}")
115+
116+
current_replicas = set(current.get("replicas", []))
117+
desired_replicas_set = set(desired_replicas)
118+
119+
# Add new replicas
120+
to_add = desired_replicas_set - current_replicas
121+
for location in to_add:
122+
sql = f"ALTER SCHEMA `{project}.{dataset}` ADD REPLICA `{location}`"
123+
logger.info(f"Adding replica: {location}")
124+
try:
125+
client.query(sql).result()
126+
except google_exceptions.GoogleAPIError as e:
127+
# Ignore "already exists", warn otherwise
128+
if "already exists" not in str(e).lower():
129+
logger.warning(f"Failed to add replica {location}: {e}")
130+
131+
# Remove old replicas
132+
to_remove = current_replicas - desired_replicas_set
133+
for location in to_remove:
134+
sql = f"ALTER SCHEMA `{project}.{dataset}` DROP REPLICA `{location}`"
135+
logger.info(f"Dropping replica: {location}")
136+
try:
137+
client.query(sql).result()
138+
except google_exceptions.GoogleAPIError as e:
139+
logger.warning(f"Failed to drop replica {location}: {e}")
140+
141+
# Set primary replica if specified and different
142+
if desired_primary:
143+
if desired_primary not in desired_replicas_set:
144+
logger.warning(
145+
f"Desired primary replica '{desired_primary}' is not in desired replicas {sorted(desired_replicas_set)}. "
146+
"Skipping setting primary replica."
147+
)
148+
elif current.get("primary") != desired_primary:
149+
sql = (
150+
f"ALTER SCHEMA `{project}.{dataset}` "
151+
f"SET OPTIONS (default_replica = `{desired_primary}`)"
152+
)
153+
logger.info(f"Setting primary replica: {desired_primary}")
154+
try:
155+
client.query(sql).result()
156+
except google_exceptions.GoogleAPIError as e:
157+
logger.warning(f"Failed to set primary replica '{desired_primary}': {e}")

dbt-bigquery/src/dbt/adapters/bigquery/impl.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ class BigqueryConfig(AdapterConfig):
122122
submission_method: Optional[str] = None
123123
notebook_template_id: Optional[str] = None
124124
enable_change_history: Optional[bool] = None
125+
dataset_replicas: Optional[List[str]] = None
126+
primary_replica: Optional[str] = None
125127

126128

127129
class BigQueryAdapter(BaseAdapter):
@@ -344,6 +346,34 @@ def create_schema(self, relation: BigQueryRelation) -> None:
344346
# we can't update the cache here, as if the schema already existed we
345347
# don't want to (incorrectly) say that it's empty
346348

349+
@available.parse_none
350+
def create_dataset_with_replication(
351+
self,
352+
relation: BigQueryRelation,
353+
dataset_replicas: Optional[List[str]] = None,
354+
primary_replica: Optional[str] = None,
355+
) -> None:
356+
"""Create dataset and apply replication configuration if specified.
357+
358+
This method is called from the bigquery__create_schema macro to handle
359+
both dataset creation and replication configuration.
360+
361+
Args:
362+
relation: The relation representing the schema/dataset
363+
dataset_replicas: Optional list of replica locations
364+
primary_replica: Optional primary replica location
365+
"""
366+
database = relation.database
367+
schema = relation.schema
368+
369+
# Create the dataset (this handles exists_ok internally)
370+
self.connections.create_dataset(
371+
database=database,
372+
schema=schema,
373+
dataset_replicas=dataset_replicas,
374+
primary_replica=primary_replica,
375+
)
376+
347377
def drop_schema(self, relation: BigQueryRelation) -> None:
348378
# still use a client method, rather than SQL 'drop schema ... cascade'
349379
database = relation.database
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{% macro bigquery__create_schema(relation) -%}
2+
{%- set dataset_replicas = config.get('dataset_replicas') -%}
3+
{%- set primary_replica = config.get('primary_replica') -%}
4+
5+
{# Normalize dataset_replicas to a list of strings #}
6+
{%- if dataset_replicas is string -%}
7+
{# Allow comma-separated strings #}
8+
{%- set dataset_replicas = dataset_replicas.split(',') | map('trim') | list -%}
9+
{%- endif -%}
10+
{%- if dataset_replicas is not none and dataset_replicas is not sequence -%}
11+
{{ log("Invalid dataset_replicas config; expected list or comma-separated string. Skipping replication.", info=True) }}
12+
{%- set dataset_replicas = none -%}
13+
{%- endif -%}
14+
15+
{%- if dataset_replicas -%}
16+
{{ log("Configuring dataset " ~ relation.schema ~ " with replicas: " ~ dataset_replicas | join(', '), info=True) }}
17+
{%- if primary_replica -%}
18+
{{ log(" Primary replica: " ~ primary_replica, info=True) }}
19+
{%- endif -%}
20+
{%- endif -%}
21+
22+
{% do adapter.create_dataset_with_replication(relation, dataset_replicas, primary_replica) %}
23+
{% endmacro %}

dbt-bigquery/tests/unit/test_dataset.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
1+
from dbt.adapters.bigquery.dataset import (
2+
add_access_entry_to_dataset,
3+
is_access_entry_in_dataset,
4+
needs_replication_update,
5+
)
26
from dbt.adapters.bigquery import BigQueryRelation
37

48
from google.cloud.bigquery import Dataset, AccessEntry, DatasetReference
@@ -88,3 +92,35 @@ def test_is_access_entry_in_dataset_returns_false_if_entry_not_in_dataset():
8892
dataset = Dataset(dataset_ref)
8993
access_entry = AccessEntry(None, "table", entity)
9094
assert not is_access_entry_in_dataset(dataset, access_entry)
95+
96+
97+
def test_needs_replication_update_returns_true_when_replicas_differ():
98+
current_config = {"replicas": ["us-east1", "us-west1"], "primary": None}
99+
desired_replicas = ["us-east1", "us-west1", "europe-west1"]
100+
assert needs_replication_update(current_config, desired_replicas)
101+
102+
103+
def test_needs_replication_update_returns_true_when_primary_differs():
104+
current_config = {"replicas": ["us-east1", "us-west1"], "primary": "us-east1"}
105+
desired_replicas = ["us-east1", "us-west1"]
106+
desired_primary = "us-west1"
107+
assert needs_replication_update(current_config, desired_replicas, desired_primary)
108+
109+
110+
def test_needs_replication_update_returns_false_when_config_matches():
111+
current_config = {"replicas": ["us-east1", "us-west1"], "primary": "us-east1"}
112+
desired_replicas = ["us-east1", "us-west1"]
113+
desired_primary = "us-east1"
114+
assert not needs_replication_update(current_config, desired_replicas, desired_primary)
115+
116+
117+
def test_needs_replication_update_returns_false_when_replicas_match_no_primary():
118+
current_config = {"replicas": ["us-east1", "us-west1"], "primary": None}
119+
desired_replicas = ["us-east1", "us-west1"]
120+
assert not needs_replication_update(current_config, desired_replicas)
121+
122+
123+
def test_needs_replication_update_handles_empty_current_config():
124+
current_config = {"replicas": [], "primary": None}
125+
desired_replicas = ["us-east1"]
126+
assert needs_replication_update(current_config, desired_replicas)

0 commit comments

Comments
 (0)