Skip to content

Commit 43aa36f

Browse files
authored
feat(snowflake): add support for external DMF assertion ingestion (#16058)
Authored-by: Rajat Singh <254366865+rajatoss@users.noreply.github.com>
1 parent 1ee2d68 commit 43aa36f

File tree

5 files changed

+865
-39
lines changed

5 files changed

+865
-39
lines changed

docs/assertions/snowflake/snowflake_dmfs.md

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ snowsql -f dmf_definitions.sql
180180
snowsql -f dmf_associations.sql
181181
```
182182

183-
:::NOTE
183+
::: NOTE
184184
Scheduling Data Metric Function on table incurs Serverless Credit Usage in Snowflake. Refer [Billing and Pricing](https://docs.snowflake.com/en/user-guide/data-quality-intro#billing-and-pricing) for more details.
185185
Please ensure you DROP Data Metric Function created via dmf_associations.sql if the assertion is no longer in use.
186186
:::
@@ -208,6 +208,127 @@ either via CLI or the UI visible as normal assertions.
208208

209209
`datahub ingest -c snowflake.yml`
210210

211+
## Ingesting External (User-Created) DMFs
212+
213+
In addition to DataHub-created DMFs, you can also ingest results from your own custom Snowflake Data Metric Functions. "External" here means DMFs that were created directly in Snowflake without using DataHub's assertion compiler - they exist outside of DataHub's management.
214+
215+
### Why Use External DMFs?
216+
217+
You might want to ingest external DMFs if:
218+
219+
- **Pre-existing DMFs**: You already have DMFs in Snowflake that were created before adopting DataHub, and you want to see their results in DataHub without recreating them
220+
- **Custom logic**: You need DMF logic that isn't supported by DataHub's assertion compiler (e.g., complex multi-table checks)
221+
- **Team workflows**: Different teams manage DMFs directly in Snowflake, but you want centralized visibility in DataHub
222+
- **Gradual adoption**: You want to start monitoring existing data quality checks in DataHub before fully migrating to DataHub-managed assertions
223+
224+
### Enabling External DMF Ingestion
225+
226+
To ingest external DMFs, add the `include_externally_managed_dmfs` flag to your Snowflake recipe:
227+
228+
```yaml
229+
source:
230+
type: snowflake
231+
config:
232+
# ... connection config ...
233+
234+
# Enable assertion results ingestion (required)
235+
include_assertion_results: true
236+
237+
# Enable external DMF ingestion (new)
238+
include_externally_managed_dmfs: true
239+
240+
# Time window for assertion results
241+
start_time: "-7 days"
242+
```
243+
244+
Both flags must be enabled for external DMF ingestion to work.
245+
246+
### Requirements for External DMFs
247+
248+
**External DMFs must return `1` for SUCCESS and `0` for FAILURE.**
249+
250+
DataHub interprets the `VALUE` column from Snowflake's `DATA_QUALITY_MONITORING_RESULTS` table as:
251+
252+
- `VALUE = 1` → Assertion **PASSED**
253+
- `VALUE = 0` → Assertion **FAILED**
254+
255+
This is because DataHub cannot interpret arbitrary return values (e.g., "100 null rows" - is that good or bad?). You must build the pass/fail logic into your DMF.
256+
257+
::: warning What if my DMF returns other values?
258+
If your DMF returns values other than 0 or 1, DataHub will mark the assertion result as **ERROR**:
259+
260+
- `VALUE = 1` → **PASSED**
261+
- `VALUE = 0` → **FAILED**
262+
- `VALUE != 0 and VALUE != 1` (e.g., 5, 100, -1) → **ERROR**
263+
264+
The ERROR state indicates that the DMF is not configured correctly for DataHub ingestion. You can identify these cases by:
265+
266+
1. Checking the ingestion logs for warnings like: `DMF 'my_dmf' returned invalid value 100. Expected 1 (pass) or 0 (fail). Marking as ERROR.`
267+
2. Looking for assertions with ERROR status in the DataHub UI
268+
:::
269+
270+
#### Example: Writing External DMFs Correctly
271+
272+
**WRONG** - Returns raw count (DataHub can't interpret this):
273+
274+
```sql
275+
CREATE DATA METRIC FUNCTION my_null_check(ARGT TABLE(col VARCHAR))
276+
RETURNS NUMBER AS
277+
$$
278+
SELECT COUNT(*) FROM ARGT WHERE col IS NULL
279+
$$;
280+
-- Returns: 0, 5, 100, etc. - DataHub can't determine pass/fail!
281+
```
282+
283+
**CORRECT** - Returns 1 (pass) or 0 (fail):
284+
285+
```sql
286+
CREATE DATA METRIC FUNCTION my_null_check(ARGT TABLE(col VARCHAR))
287+
RETURNS NUMBER AS
288+
$$
289+
SELECT CASE WHEN COUNT(*) = 0 THEN 1 ELSE 0 END
290+
FROM ARGT WHERE col IS NULL
291+
$$;
292+
-- Returns: 1 if no nulls (pass), 0 if has nulls (fail)
293+
```
294+
295+
**CORRECT** - With threshold:
296+
297+
```sql
298+
CREATE DATA METRIC FUNCTION my_null_check_threshold(ARGT TABLE(col VARCHAR))
299+
RETURNS NUMBER AS
300+
$$
301+
SELECT CASE WHEN COUNT(*) <= 10 THEN 1 ELSE 0 END
302+
FROM ARGT WHERE col IS NULL
303+
$$;
304+
-- Returns: 1 if ≤10 nulls (pass), 0 if >10 nulls (fail)
305+
```
306+
307+
### How External DMFs Differ from DataHub-Created DMFs
308+
309+
| Aspect | DataHub-Created DMFs | External DMFs |
310+
| ------------------ | -------------------------------------------------- | ----------------------------------------- |
311+
| **Naming** | Prefixed with `datahub__` | Any name |
312+
| **Definition** | Created via `datahub assertions compile` | Created manually in Snowflake |
313+
| **Assertion Type** | Based on YAML definition (Freshness, Volume, etc.) | CUSTOM |
314+
| **Source** | NATIVE (defined in DataHub) | EXTERNAL |
315+
| **URN Generation** | Extracted from DMF name (`datahub__<guid>`) | Generated from Snowflake's `REFERENCE_ID` |
316+
317+
### How External DMFs Appear in DataHub UI
318+
319+
External DMFs appear in DataHub with:
320+
321+
- **Assertion Type**: CUSTOM
322+
- **Source**: EXTERNAL
323+
- **Platform Instance**: Snowflake platform instance (if configured)
324+
- **Description**: "External Snowflake DMF: {dmf_name}"
325+
- **Custom Properties**:
326+
- `snowflake_dmf_name`: The DMF function name
327+
- `snowflake_reference_id`: Snowflake's unique identifier for the DMF-table binding
328+
- `snowflake_dmf_columns`: Comma-separated list of columns the DMF operates on
329+
330+
You can view external DMF assertions in the **Quality** tab of the associated dataset in the DataHub UI. They will show pass/fail history alongside any DataHub-created assertions.
331+
211332
## Caveats
212333

213334
- Currently, Snowflake supports at most 1000 DMF-table associations at the moment so you can not define more than 1000 assertions for snowflake.
Lines changed: 157 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1+
import json
12
import logging
23
from datetime import datetime
3-
from typing import Iterable, List, Optional
4+
from typing import Dict, Iterable, List, Optional
45

5-
from pydantic import BaseModel
6+
from pydantic import BaseModel, field_validator
67

78
from datahub.emitter.mce_builder import (
89
make_assertion_urn,
910
make_data_platform_urn,
1011
make_dataplatform_instance_urn,
12+
make_schema_field_urn,
1113
)
1214
from datahub.emitter.mcp import MetadataChangeProposalWrapper
15+
from datahub.emitter.mcp_builder import DatahubKey
1316
from datahub.ingestion.api.workunit import MetadataWorkUnit
1417
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
1518
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
@@ -25,18 +28,55 @@
2528
AssertionRunStatus,
2629
)
2730
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
31+
from datahub.metadata.schema_classes import (
32+
AssertionInfoClass,
33+
AssertionSourceClass,
34+
AssertionSourceTypeClass,
35+
AssertionTypeClass,
36+
CustomAssertionInfoClass,
37+
)
2838
from datahub.utilities.time import datetime_to_ts_millis
2939

3040
logger: logging.Logger = logging.getLogger(__name__)
3141

3242

43+
class SnowflakeExternalDmfKey(DatahubKey):
44+
"""Key for generating deterministic GUIDs for external Snowflake DMFs.
45+
46+
Uses Snowflake's REFERENCE_ID which uniquely identifies the
47+
DMF-table-column association.
48+
"""
49+
50+
platform: str = "snowflake"
51+
reference_id: str
52+
instance: Optional[str] = None
53+
54+
3355
class DataQualityMonitoringResult(BaseModel):
3456
MEASUREMENT_TIME: datetime
3557
METRIC_NAME: str
3658
TABLE_NAME: str
3759
TABLE_SCHEMA: str
3860
TABLE_DATABASE: str
3961
VALUE: int
62+
REFERENCE_ID: str
63+
ARGUMENT_NAMES: List[str]
64+
65+
@field_validator("ARGUMENT_NAMES", mode="before")
66+
@classmethod
67+
def parse_argument_names(cls, v: object) -> List[str]:
68+
"""Parse ARGUMENT_NAMES from JSON string.
69+
70+
Snowflake returns this column as a JSON-encoded string like '["col1", "col2"]'.
71+
"""
72+
if isinstance(v, str):
73+
try:
74+
parsed = json.loads(v)
75+
if isinstance(parsed, list):
76+
return parsed
77+
except json.JSONDecodeError:
78+
logger.debug(f"Failed to parse ARGUMENT_NAMES as JSON: {v}")
79+
return []
4080

4181

4282
class SnowflakeAssertionsHandler:
@@ -56,20 +96,19 @@ def __init__(
5696
def get_assertion_workunits(
5797
self, discovered_datasets: List[str]
5898
) -> Iterable[MetadataWorkUnit]:
99+
include_external = self.config.include_externally_managed_dmfs
100+
59101
cur = self.connection.query(
60102
SnowflakeQuery.dmf_assertion_results(
61103
datetime_to_ts_millis(self.config.start_time),
62104
datetime_to_ts_millis(self.config.end_time),
105+
include_external=include_external,
63106
)
64107
)
65108
for db_row in cur:
66-
mcp = self._process_result_row(db_row, discovered_datasets)
67-
if mcp:
68-
yield mcp.as_workunit(is_primary_source=False)
69-
70-
if mcp.entityUrn and mcp.entityUrn not in self._urns_processed:
71-
self._urns_processed.append(mcp.entityUrn)
72-
yield self._gen_platform_instance_wu(mcp.entityUrn)
109+
workunits = self._process_result_row(db_row, discovered_datasets)
110+
for wu in workunits:
111+
yield wu
73112

74113
def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:
75114
# Construct a MetadataChangeProposalWrapper object for assertion platform
@@ -87,34 +126,122 @@ def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:
87126
),
88127
).as_workunit(is_primary_source=False)
89128

129+
def _generate_external_dmf_guid(self, result: DataQualityMonitoringResult) -> str:
130+
"""Generate a stable, deterministic GUID for external DMFs."""
131+
key = SnowflakeExternalDmfKey(
132+
reference_id=result.REFERENCE_ID,
133+
instance=self.config.platform_instance,
134+
)
135+
return key.guid()
136+
137+
def _create_assertion_info_workunit(
138+
self,
139+
assertion_urn: str,
140+
dataset_urn: str,
141+
dmf_name: str,
142+
argument_names: List[str],
143+
reference_id: str,
144+
) -> MetadataWorkUnit:
145+
"""Create AssertionInfo for external DMFs."""
146+
# Field URN is only set for single-column DMFs. Multi-column DMFs are
147+
# treated as table-level assertions with columns stored in custom properties.
148+
field_urn: Optional[str] = None
149+
if argument_names and len(argument_names) == 1:
150+
field_urn = make_schema_field_urn(dataset_urn, argument_names[0])
151+
152+
custom_properties: Dict[str, str] = {
153+
"snowflake_dmf_name": dmf_name,
154+
"snowflake_reference_id": reference_id,
155+
}
156+
# Store all columns in custom properties regardless of count
157+
if argument_names:
158+
custom_properties["snowflake_dmf_columns"] = ",".join(argument_names)
159+
160+
assertion_info = AssertionInfoClass(
161+
type=AssertionTypeClass.CUSTOM,
162+
customAssertion=CustomAssertionInfoClass(
163+
type="Snowflake Data Metric Function",
164+
entity=dataset_urn,
165+
field=field_urn,
166+
),
167+
source=AssertionSourceClass(
168+
type=AssertionSourceTypeClass.EXTERNAL,
169+
),
170+
description=f"External Snowflake DMF: {dmf_name}",
171+
customProperties=custom_properties,
172+
)
173+
174+
return MetadataChangeProposalWrapper(
175+
entityUrn=assertion_urn,
176+
aspect=assertion_info,
177+
).as_workunit(is_primary_source=False)
178+
90179
def _process_result_row(
91180
self, result_row: dict, discovered_datasets: List[str]
92-
) -> Optional[MetadataChangeProposalWrapper]:
181+
) -> List[MetadataWorkUnit]:
182+
"""Process a single DMF result row. Returns list of workunits."""
183+
workunits: List[MetadataWorkUnit] = []
184+
93185
try:
94186
result = DataQualityMonitoringResult.model_validate(result_row)
95-
assertion_guid = result.METRIC_NAME.split("__")[-1].lower()
96-
status = bool(result.VALUE) # 1 if PASS, 0 if FAIL
187+
188+
is_datahub_dmf = result.METRIC_NAME.lower().startswith("datahub__")
189+
190+
if is_datahub_dmf:
191+
assertion_guid = result.METRIC_NAME.split("__")[-1].lower()
192+
else:
193+
assertion_guid = self._generate_external_dmf_guid(result)
194+
195+
assertion_urn = make_assertion_urn(assertion_guid)
196+
97197
assertee = self.identifiers.get_dataset_identifier(
98198
result.TABLE_NAME, result.TABLE_SCHEMA, result.TABLE_DATABASE
99199
)
100-
if assertee in discovered_datasets:
101-
return MetadataChangeProposalWrapper(
102-
entityUrn=make_assertion_urn(assertion_guid),
103-
aspect=AssertionRunEvent(
104-
timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME),
105-
runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"),
106-
asserteeUrn=self.identifiers.gen_dataset_urn(assertee),
107-
status=AssertionRunStatus.COMPLETE,
108-
assertionUrn=make_assertion_urn(assertion_guid),
109-
result=AssertionResult(
110-
type=(
111-
AssertionResultType.SUCCESS
112-
if status
113-
else AssertionResultType.FAILURE
114-
)
115-
),
116-
),
200+
if assertee not in discovered_datasets:
201+
return []
202+
203+
dataset_urn = self.identifiers.gen_dataset_urn(assertee)
204+
205+
if result.VALUE == 1:
206+
result_type = AssertionResultType.SUCCESS
207+
elif result.VALUE == 0:
208+
result_type = AssertionResultType.FAILURE
209+
else:
210+
result_type = AssertionResultType.ERROR
211+
logger.warning(
212+
f"DMF '{result.METRIC_NAME}' returned invalid value {result.VALUE}. "
213+
"Expected 1 (pass) or 0 (fail). Marking as ERROR."
214+
)
215+
216+
if not is_datahub_dmf and assertion_urn not in self._urns_processed:
217+
assertion_info_wu = self._create_assertion_info_workunit(
218+
assertion_urn=assertion_urn,
219+
dataset_urn=dataset_urn,
220+
dmf_name=result.METRIC_NAME,
221+
argument_names=result.ARGUMENT_NAMES,
222+
reference_id=result.REFERENCE_ID,
117223
)
224+
workunits.append(assertion_info_wu)
225+
226+
run_event_mcp = MetadataChangeProposalWrapper(
227+
entityUrn=assertion_urn,
228+
aspect=AssertionRunEvent(
229+
timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME),
230+
runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"),
231+
asserteeUrn=dataset_urn,
232+
status=AssertionRunStatus.COMPLETE,
233+
assertionUrn=assertion_urn,
234+
result=AssertionResult(type=result_type),
235+
),
236+
)
237+
workunits.append(run_event_mcp.as_workunit(is_primary_source=False))
238+
239+
if assertion_urn not in self._urns_processed:
240+
self._urns_processed.append(assertion_urn)
241+
workunits.append(self._gen_platform_instance_wu(assertion_urn))
242+
243+
return workunits
244+
118245
except Exception as e:
119246
self.report.report_warning("assertion-result-parse-failure", str(e))
120-
return None
247+
return []

0 commit comments

Comments
 (0)