Skip to content

Commit 89f944e

Browse files
Add query comments to system queries (#1091)
Co-authored-by: Colin Rogers <[email protected]>
1 parent 30cb79b commit 89f944e

File tree

4 files changed

+112
-8
lines changed

4 files changed

+112
-8
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Fixes
2+
body: Use execute wrapper instead of raw execute to add the query comment as query header
3+
time: 2025-05-13T01:02:33.891023-07:00
4+
custom:
5+
Author: versusfacit Kayrnt
6+
Issue: "1090"

dbt-bigquery/src/dbt/adapters/bigquery/connections.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,28 @@ def raw_execute(
274274
limit=limit,
275275
)
276276

277+
def raw_execute_with_comment(
278+
self,
279+
sql: str,
280+
use_legacy_sql: bool = False,
281+
limit: Optional[int] = None,
282+
dry_run: bool = False,
283+
):
284+
"""
285+
A lightweight wrapper over raw_execute that prepends the dbt query comment.
286+
287+
This exists as a "third way" between raw_execute (fully manual, no preprocessing)
288+
and execute (postprocessing and formatting). This is useful when you need query
289+
auditing but no Adapter Response.
290+
"""
291+
sql = self._add_query_comment(sql)
292+
return self.raw_execute(
293+
sql,
294+
use_legacy_sql=use_legacy_sql,
295+
limit=limit,
296+
dry_run=dry_run,
297+
)
298+
277299
def execute(
278300
self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None
279301
) -> Tuple[BigQueryAdapterResponse, "agate.Table"]:

dbt-bigquery/src/dbt/adapters/bigquery/impl.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ def get_column_schema_from_query(self, sql: str) -> List[BigQueryColumn]:
446446
:param str sql: The sql to execute.
447447
:return: List[BigQueryColumn]
448448
"""
449-
_, iterator = self.connections.raw_execute(sql)
449+
_, iterator = self.connections.raw_execute_with_comment(sql)
450450
columns = [self.Column.create_from_field(field) for field in iterator.schema]
451451
flattened_columns = []
452452
for column in columns:
@@ -458,7 +458,7 @@ def get_columns_in_select_sql(self, select_sql: str) -> List[BigQueryColumn]:
458458
try:
459459
conn = self.connections.get_thread_connection()
460460
client = conn.handle
461-
query_job, iterator = self.connections.raw_execute(select_sql)
461+
query_job, iterator = self.connections.raw_execute_with_comment(select_sql)
462462
query_table = client.get_table(query_job.destination)
463463
return self._get_dbt_columns_from_bq_table(query_table)
464464

dbt-bigquery/tests/functional/test_incremental_materialization.py

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
import json
12
import pytest
2-
from dbt.tests.util import run_dbt
3+
import re
4+
from dbt.tests.util import run_dbt, run_dbt_and_capture
35

46
# This is a short term hack, we need to go back
57
# and make adapter implementations of:
68
# https://github.com/dbt-labs/dbt-core/pull/6330
79

10+
11+
_COMMENT_RE = re.compile(r'/\*\s*{[^}]*"dbt_version"\s*:\s*"[^"]+"[^}]*}\s*\*/')
12+
813
_INCREMENTAL_MODEL = """
914
{{
1015
config(
@@ -14,16 +19,41 @@
1419
1520
{% if not is_incremental() %}
1621
17-
select 10 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
18-
select 30 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour
22+
select
23+
10 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour
24+
union all select
25+
30 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour
1926
2027
{% else %}
2128
22-
select 20 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
23-
select 40 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour
29+
select
30+
20 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour
31+
union all select
32+
40 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour
2433
2534
{% endif %}
26-
-- Test Comment To Prevent Reccurence of https://github.com/dbt-labs/dbt-core/issues/6485
35+
-- Test Comment To Prevent Recurrence of
36+
-- https://github.com/dbt-labs/dbt-core/issues/6485
37+
"""
38+
39+
INCREMENTAL_MODEL_COPY_PARTITIONS = """
40+
{{
41+
config(
42+
materialized='incremental',
43+
incremental_strategy='insert_overwrite',
44+
partition_by={
45+
'field': '_partition',
46+
'granularity': 'day',
47+
'data_type': 'timestamp',
48+
'time_ingestion_partitioning': True,
49+
'copy_partitions': True,
50+
},
51+
on_schema_change='append_new_columns'
52+
)
53+
}}
54+
SELECT
55+
timestamp_trunc(current_timestamp(), day) AS _partition,
56+
'some value' AS col1
2757
"""
2858

2959

@@ -39,3 +69,49 @@ def test_incremental_model_succeeds(self, project):
3969
assert len(results) == 1
4070
results = run_dbt(["run"])
4171
assert len(results) == 1
72+
73+
74+
class TestAllQueriesHaveDbtComment:
75+
@pytest.fixture(scope="class")
76+
def models(self):
77+
return {"my_incremental_model.sql": INCREMENTAL_MODEL_COPY_PARTITIONS}
78+
79+
def _extract_executed_sql(self, raw_logs: str) -> list[str]:
80+
"""
81+
Return every SQL script that dbt 1.4+ actually sent to BigQuery.
82+
83+
In JSON logs each statement is logged by an event whose `data`
84+
payload is a dict containing a key `"sql"`.
85+
"""
86+
scripts: list[str] = []
87+
for line in raw_logs.splitlines():
88+
try:
89+
parsed = json.loads(line)
90+
except json.JSONDecodeError:
91+
continue
92+
93+
data = parsed.get("data")
94+
if isinstance(data, dict) and "sql" in data:
95+
sql = str(data["sql"]).strip()
96+
if sql:
97+
scripts.append(sql)
98+
return scripts
99+
100+
def _has_structured_comment(self, sql: str) -> bool:
101+
"""True iff the first non-blank line is the structured dbt comment."""
102+
first_line = sql.lstrip().splitlines()[0]
103+
return bool(_COMMENT_RE.fullmatch(first_line))
104+
105+
def test_every_query_has_comment(self, project):
106+
run_dbt(["run"])
107+
_, raw_logs = run_dbt_and_capture(["--debug", "--log-format=json", "run"])
108+
109+
executed_sqls = self._extract_executed_sql(raw_logs)
110+
assert executed_sqls, "No SQL was captured from the dbt logs"
111+
112+
missing = [sql for sql in executed_sqls if not self._has_structured_comment(sql)]
113+
114+
assert not missing, (
115+
f"{len(missing)} queries are missing structured dbt comments.\n\n"
116+
+ "\n\n---\n\n".join(missing)
117+
)

0 commit comments

Comments
 (0)