Skip to content
This repository was archived by the owner on Sep 2, 2025. It is now read-only.

Commit 7d9e7c5

Browse files
committed
Extract DataProfileScan to a separate module
1 parent 9937855 commit 7d9e7c5

File tree

2 files changed

+172
-159
lines changed

2 files changed

+172
-159
lines changed

dbt/adapters/bigquery/dataplex.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
from dataclasses import dataclass
2+
import hashlib
3+
from typing import Optional
4+
5+
from dbt.adapters.bigquery import BigQueryConnectionManager
6+
from google.cloud import dataplex_v1
7+
from google.protobuf import field_mask_pb2
8+
9+
10+
@dataclass
11+
class DataProfileScanSetting:
12+
location: str
13+
scan_id: Optional[str]
14+
15+
project_id: str
16+
dataset_id: str
17+
table_id: str
18+
19+
sampling_percent: Optional[float]
20+
row_filter: Optional[str]
21+
cron: Optional[str]
22+
23+
def parent(self):
24+
return f"projects/{self.project_id}/locations/{self.location}"
25+
26+
def data_scan_name(self):
27+
return f"{self.parent()}/dataScans/{self.scan_id}"
28+
29+
30+
class DataProfileScan():
31+
def __init__(self, connections: BigQueryConnectionManager):
32+
self.connections = connections
33+
34+
# If the label `dataplex-dp-published-*` is not assigned, we cannot view the results of the Data Profile Scan from BigQuery
35+
def _update_labels_with_data_profile_scan_labels(
36+
self,
37+
project_id: str,
38+
dataset_id: str,
39+
table_id: str,
40+
location: str,
41+
scan_id: str,
42+
):
43+
table = self.connections.get_bq_table(project_id, dataset_id, table_id)
44+
original_labels = table.labels
45+
profile_scan_labels = {
46+
"dataplex-dp-published-scan": scan_id,
47+
"dataplex-dp-published-project": project_id,
48+
"dataplex-dp-published-location": location,
49+
}
50+
table.labels = {**original_labels, **profile_scan_labels}
51+
self.connections.get_thread_connection().handle.update_table(table, ["labels"])
52+
53+
# scan_id must be unique within the project and no longer than 63 characters,
54+
# so generate an id that meets the constraints
55+
def _generate_unique_scan_id(self, dataset_id: str, table_id: str) -> str:
56+
md5 = hashlib.md5(f"{dataset_id}_{table_id}".encode("utf-8")).hexdigest()
57+
return f"dbt-{table_id.replace('_', '-')}-{md5}"[:63]
58+
59+
def _create_or_update_data_profile_scan(
60+
self,
61+
client: dataplex_v1.DataScanServiceClient,
62+
scan_setting: DataProfileScanSetting,
63+
):
64+
data_profile_spec = dataplex_v1.DataProfileSpec(
65+
sampling_percent=scan_setting.sampling_percent,
66+
row_filter=scan_setting.row_filter,
67+
)
68+
display_name = (
69+
f"Data Profile Scan for {scan_setting.table_id} in {scan_setting.dataset_id}"
70+
)
71+
description = f"This is a Data Profile Scan for {scan_setting.project_id}.{scan_setting.dataset_id}.{scan_setting.table_id}. Created by dbt."
72+
labels = {
73+
"managed_by": "dbt",
74+
}
75+
76+
if scan_setting.cron:
77+
trigger = dataplex_v1.Trigger(
78+
schedule=dataplex_v1.Trigger.Schedule(cron=scan_setting.cron)
79+
)
80+
else:
81+
trigger = dataplex_v1.Trigger(on_demand=dataplex_v1.Trigger.OnDemand())
82+
execution_spec = dataplex_v1.DataScan.ExecutionSpec(trigger=trigger)
83+
84+
if all(
85+
scan.name != scan_setting.data_scan_name()
86+
for scan in client.list_data_scans(parent=scan_setting.parent())
87+
):
88+
data_scan = dataplex_v1.DataScan(
89+
data=dataplex_v1.DataSource(
90+
resource=f"//bigquery.googleapis.com/projects/{scan_setting.project_id}/datasets/{scan_setting.dataset_id}/tables/{scan_setting.table_id}"
91+
),
92+
data_profile_spec=data_profile_spec,
93+
execution_spec=execution_spec,
94+
display_name=display_name,
95+
description=description,
96+
labels=labels,
97+
)
98+
request = dataplex_v1.CreateDataScanRequest(
99+
parent=scan_setting.parent(),
100+
data_scan_id=scan_setting.scan_id,
101+
data_scan=data_scan,
102+
)
103+
client.create_data_scan(request=request).result()
104+
else:
105+
request = dataplex_v1.GetDataScanRequest(
106+
name=scan_setting.data_scan_name(),
107+
)
108+
data_scan = client.get_data_scan(request=request)
109+
110+
data_scan.data_profile_spec = data_profile_spec
111+
data_scan.execution_spec = execution_spec
112+
data_scan.display_name = display_name
113+
data_scan.description = description
114+
data_scan.labels = labels
115+
116+
update_mask = field_mask_pb2.FieldMask(
117+
paths=[
118+
"data_profile_spec",
119+
"execution_spec",
120+
"display_name",
121+
"description",
122+
"labels",
123+
]
124+
)
125+
request = dataplex_v1.UpdateDataScanRequest(
126+
data_scan=data_scan,
127+
update_mask=update_mask,
128+
)
129+
client.update_data_scan(request=request).result()
130+
131+
def create_or_update_data_profile_scan(self, config):
132+
project_id = config.get("database")
133+
dataset_id = config.get("schema")
134+
table_id = config.get("name")
135+
136+
data_profile_config = config.get("config").get("data_profile_scan", {})
137+
138+
# Skip if data_profile_scan is not configured
139+
if not data_profile_config:
140+
return None
141+
142+
client = dataplex_v1.DataScanServiceClient()
143+
scan_setting = DataProfileScanSetting(
144+
location=data_profile_config["location"],
145+
scan_id=data_profile_config.get(
146+
"scan_id", self._generate_unique_scan_id(dataset_id, table_id)
147+
),
148+
project_id=project_id,
149+
dataset_id=dataset_id,
150+
table_id=table_id,
151+
sampling_percent=data_profile_config.get("sampling_percent", None),
152+
row_filter=data_profile_config.get("row_filter", None),
153+
cron=data_profile_config.get("cron", None),
154+
)
155+
156+
# Delete existing data profile scan if it is disabled
157+
if not data_profile_config.get("enabled", True):
158+
client.delete_data_scan(name=scan_setting.data_scan_name())
159+
return None
160+
161+
self._create_or_update_data_profile_scan(client, scan_setting)
162+
163+
if not scan_setting.cron:
164+
client.run_data_scan(
165+
request=dataplex_v1.RunDataScanRequest(name=scan_setting.data_scan_name())
166+
)
167+
168+
self._update_labels_with_data_profile_scan_labels(
169+
project_id, dataset_id, table_id, scan_setting.location, scan_setting.scan_id
170+
)

dbt/adapters/bigquery/impl.py

Lines changed: 2 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from dataclasses import dataclass
22
from datetime import datetime
3-
import hashlib
43
import json
54
import threading
65
from multiprocessing.context import SpawnContext
@@ -54,14 +53,13 @@
5453
import google.oauth2
5554
import google.cloud.bigquery
5655
from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable
57-
from google.cloud import dataplex_v1
5856
import google.cloud.exceptions
59-
from google.protobuf import field_mask_pb2
6057
import pytz
6158

6259
from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager
6360
from dbt.adapters.bigquery.column import get_nested_column_data_types
6461
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse
62+
from dbt.adapters.bigquery.dataplex import DataProfileScan
6563
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
6664
from dbt.adapters.bigquery.python_submissions import (
6765
ClusterDataprocHelper,
@@ -99,26 +97,6 @@ def render(self):
9997
return f"{self.project}.{self.dataset}"
10098

10199

102-
@dataclass
103-
class DataProfileScanSetting:
104-
location: str
105-
scan_id: Optional[str]
106-
107-
project_id: str
108-
dataset_id: str
109-
table_id: str
110-
111-
sampling_percent: Optional[float]
112-
row_filter: Optional[str]
113-
cron: Optional[str]
114-
115-
def parent(self):
116-
return f"projects/{self.project_id}/locations/{self.location}"
117-
118-
def data_scan_name(self):
119-
return f"{self.parent()}/dataScans/{self.scan_id}"
120-
121-
122100
def _stub_relation(*args, **kwargs):
123101
return BigQueryRelation.create(
124102
database="", schema="", identifier="", quote_policy={}, type=BigQueryRelation.Table
@@ -1023,141 +1001,6 @@ def validate_sql(self, sql: str) -> AdapterResponse:
10231001
"""
10241002
return self.connections.dry_run(sql)
10251003

1026-
# If the label `dataplex-dp-published-*` is not assigned, we cannot view the results of the Data Profile Scan from BigQuery
1027-
def _update_labels_with_data_profile_scan_labels(
1028-
self,
1029-
project_id: str,
1030-
dataset_id: str,
1031-
table_id: str,
1032-
location: str,
1033-
scan_id: str,
1034-
):
1035-
table = self.connections.get_bq_table(project_id, dataset_id, table_id)
1036-
original_labels = table.labels
1037-
profile_scan_labels = {
1038-
"dataplex-dp-published-scan": scan_id,
1039-
"dataplex-dp-published-project": project_id,
1040-
"dataplex-dp-published-location": location,
1041-
}
1042-
table.labels = {**original_labels, **profile_scan_labels}
1043-
self.connections.get_thread_connection().handle.update_table(table, ["labels"])
1044-
1045-
# scan_id must be unique within the project and no longer than 63 characters,
1046-
# so generate an id that meets the constraints
1047-
def _generate_unique_scan_id(self, dataset_id: str, table_id: str) -> str:
1048-
md5 = hashlib.md5(f"{dataset_id}_{table_id}".encode("utf-8")).hexdigest()
1049-
return f"dbt-{table_id.replace('_', '-')}-{md5}"[:63]
1050-
1051-
def _create_or_update_data_profile_scan(
1052-
self,
1053-
client: dataplex_v1.DataScanServiceClient,
1054-
scan_setting: DataProfileScanSetting,
1055-
):
1056-
data_profile_spec = dataplex_v1.DataProfileSpec(
1057-
sampling_percent=scan_setting.sampling_percent,
1058-
row_filter=scan_setting.row_filter,
1059-
)
1060-
display_name = (
1061-
f"Data Profile Scan for {scan_setting.table_id} in {scan_setting.dataset_id}"
1062-
)
1063-
description = f"This is a Data Profile Scan for {scan_setting.project_id}.{scan_setting.dataset_id}.{scan_setting.table_id}. Created by dbt."
1064-
labels = {
1065-
"managed_by": "dbt",
1066-
}
1067-
1068-
if scan_setting.cron:
1069-
trigger = dataplex_v1.Trigger(
1070-
schedule=dataplex_v1.Trigger.Schedule(cron=scan_setting.cron)
1071-
)
1072-
else:
1073-
trigger = dataplex_v1.Trigger(on_demand=dataplex_v1.Trigger.OnDemand())
1074-
execution_spec = dataplex_v1.DataScan.ExecutionSpec(trigger=trigger)
1075-
1076-
if all(
1077-
scan.name != scan_setting.data_scan_name()
1078-
for scan in client.list_data_scans(parent=scan_setting.parent())
1079-
):
1080-
data_scan = dataplex_v1.DataScan(
1081-
data=dataplex_v1.DataSource(
1082-
resource=f"//bigquery.googleapis.com/projects/{scan_setting.project_id}/datasets/{scan_setting.dataset_id}/tables/{scan_setting.table_id}"
1083-
),
1084-
data_profile_spec=data_profile_spec,
1085-
execution_spec=execution_spec,
1086-
display_name=display_name,
1087-
description=description,
1088-
labels=labels,
1089-
)
1090-
request = dataplex_v1.CreateDataScanRequest(
1091-
parent=scan_setting.parent(),
1092-
data_scan_id=scan_setting.scan_id,
1093-
data_scan=data_scan,
1094-
)
1095-
client.create_data_scan(request=request).result()
1096-
else:
1097-
request = dataplex_v1.GetDataScanRequest(
1098-
name=scan_setting.data_scan_name(),
1099-
)
1100-
data_scan = client.get_data_scan(request=request)
1101-
1102-
data_scan.data_profile_spec = data_profile_spec
1103-
data_scan.execution_spec = execution_spec
1104-
data_scan.display_name = display_name
1105-
data_scan.description = description
1106-
data_scan.labels = labels
1107-
1108-
update_mask = field_mask_pb2.FieldMask(
1109-
paths=[
1110-
"data_profile_spec",
1111-
"execution_spec",
1112-
"display_name",
1113-
"description",
1114-
"labels",
1115-
]
1116-
)
1117-
request = dataplex_v1.UpdateDataScanRequest(
1118-
data_scan=data_scan,
1119-
update_mask=update_mask,
1120-
)
1121-
client.update_data_scan(request=request).result()
1122-
11231004
@available
11241005
def create_or_update_data_profile_scan(self, config):
1125-
project_id = config.get("database")
1126-
dataset_id = config.get("schema")
1127-
table_id = config.get("name")
1128-
1129-
data_profile_config = config.get("config").get("data_profile_scan", {})
1130-
1131-
# Skip if data_profile_scan is not configured
1132-
if not data_profile_config:
1133-
return None
1134-
1135-
client = dataplex_v1.DataScanServiceClient()
1136-
scan_setting = DataProfileScanSetting(
1137-
location=data_profile_config["location"],
1138-
scan_id=data_profile_config.get(
1139-
"scan_id", self._generate_unique_scan_id(dataset_id, table_id)
1140-
),
1141-
project_id=project_id,
1142-
dataset_id=dataset_id,
1143-
table_id=table_id,
1144-
sampling_percent=data_profile_config.get("sampling_percent", None),
1145-
row_filter=data_profile_config.get("row_filter", None),
1146-
cron=data_profile_config.get("cron", None),
1147-
)
1148-
1149-
# Delete existing data profile scan if it is disabled
1150-
if not data_profile_config.get("enabled", True):
1151-
client.delete_data_scan(name=scan_setting.data_scan_name())
1152-
return None
1153-
1154-
self._create_or_update_data_profile_scan(client, scan_setting)
1155-
1156-
if not scan_setting.cron:
1157-
client.run_data_scan(
1158-
request=dataplex_v1.RunDataScanRequest(name=scan_setting.data_scan_name())
1159-
)
1160-
1161-
self._update_labels_with_data_profile_scan_labels(
1162-
project_id, dataset_id, table_id, scan_setting.location, scan_setting.scan_id
1163-
)
1006+
DataProfileScan(self.connections).create_or_update_data_profile_scan(config)

0 commit comments

Comments
 (0)