Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions docs/assertions/snowflake/snowflake_dmfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,103 @@ either via CLI or the UI visible as normal assertions.

`datahub ingest -c snowflake.yml`

## Ingesting External (User-Created) DMFs

In addition to DataHub-created DMFs, you can also ingest results from your own custom Snowflake Data Metric Functions. This allows you to monitor data quality assertions that were created outside of DataHub.

### Enabling External DMF Ingestion

To ingest external DMFs, add the `include_external_dmf_assertions` flag to your Snowflake recipe:

```yaml
source:
type: snowflake
config:
# ... connection config ...

# Enable assertion results ingestion (required)
include_assertion_results: true

# Enable external DMF ingestion (new)
include_external_dmf_assertions: true

# Time window for assertion results
start_time: "-7 days"
```

Both flags must be enabled for external DMF ingestion to work.

### Requirements for External DMFs

**External DMFs must return `1` for SUCCESS and `0` for FAILURE.**

DataHub interprets the `VALUE` column from Snowflake's `DATA_QUALITY_MONITORING_RESULTS` table as:

- `VALUE = 1` → Assertion **PASSED**
- `VALUE = 0` → Assertion **FAILED**

This is because DataHub cannot interpret arbitrary return values (e.g., "100 null rows" - is that good or bad?). You must build the pass/fail logic into your DMF.

#### Example: Writing External DMFs Correctly

**WRONG** - Returns raw count (DataHub can't interpret this):

```sql
CREATE DATA METRIC FUNCTION my_null_check(ARGT TABLE(col VARCHAR))
RETURNS NUMBER AS
$$
SELECT COUNT(*) FROM ARGT WHERE col IS NULL
$$;
-- Returns: 0, 5, 100, etc. - DataHub can't determine pass/fail!
```

**CORRECT** - Returns 1 (pass) or 0 (fail):

```sql
CREATE DATA METRIC FUNCTION my_null_check(ARGT TABLE(col VARCHAR))
RETURNS NUMBER AS
$$
SELECT CASE WHEN COUNT(*) = 0 THEN 1 ELSE 0 END
FROM ARGT WHERE col IS NULL
$$;
-- Returns: 1 if no nulls (pass), 0 if has nulls (fail)
```

**CORRECT** - With threshold:

```sql
CREATE DATA METRIC FUNCTION my_null_check_threshold(ARGT TABLE(col VARCHAR))
RETURNS NUMBER AS
$$
SELECT CASE WHEN COUNT(*) <= 10 THEN 1 ELSE 0 END
FROM ARGT WHERE col IS NULL
$$;
-- Returns: 1 if ≤10 nulls (pass), 0 if >10 nulls (fail)
```

### How External DMFs Differ from DataHub-Created DMFs

| Aspect | DataHub-Created DMFs | External DMFs |
| ------------------ | ------------------------------------------------------- | ----------------------------------------- |
| **Naming** | Prefixed with `datahub__` | Any name |
| **Definition** | Created via `datahub assertions compile` | Created manually by user |
| **Assertion Type** | Based on assertion definition (Freshness, Volume, etc.) | CUSTOM |
| **Source** | INFERRED | EXTERNAL |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the Source:

https://docs.datahub.com/docs/generated/metamodel/entities/assertion#assertion-source

The assertionInfo aspect includes an AssertionSource that identifies the origin of the assertion:

  • NATIVE: Defined directly in DataHub (DataHub Cloud feature)
  • EXTERNAL: Ingested from external tools (Great Expectations, dbt, Snowflake, etc.)
  • INFERRED: Generated by ML-based inference systems (DataHub Cloud feature)

External assertions should have a corresponding dataPlatformInstance aspect that identifies the specific platform instance they originated from.

My concerns:

  • INFERRED seems to be limited to ML-based inference systems. We may be deviating original purpose.
  • We are missing the DataPlatformInstance aspect. Which is the one that identifies the origin, according to the docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, yes it should be native for datahub created dmfs.
In code we already emits DataPlatformInstance aspect for every unique assertion URN (both DataHub-created and external DMFs).

Added in the doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @jayacryl around the source choice here

| **URN Generation** | Extracted from DMF name | Generated from Snowflake's `REFERENCE_ID` |

### How External DMFs Appear in DataHub UI

External DMFs appear in DataHub with:

- **Assertion Type**: CUSTOM
- **Source**: EXTERNAL
- **Description**: "External Snowflake DMF: {dmf_name}"
- **Custom Properties**:
- `snowflake_dmf_name`: The DMF function name
- `snowflake_dmf_columns`: Comma-separated list of columns the DMF operates on

You can view external DMF assertions in the **Quality** tab of the associated dataset in the DataHub UI. They will show pass/fail history alongside any DataHub-created assertions.

## Caveats

- Currently, Snowflake supports at most 1000 DMF-table associations at the moment so you can not define more than 1000 assertions for snowflake.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
import logging
from datetime import datetime
from typing import Iterable, List, Optional
from typing import Dict, Iterable, List, Optional

from pydantic import BaseModel
from pydantic import BaseModel, field_validator

from datahub.emitter.mce_builder import (
datahub_guid,
make_assertion_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
make_schema_field_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand All @@ -25,6 +28,13 @@
AssertionRunStatus,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
from datahub.metadata.schema_classes import (
AssertionInfoClass,
AssertionSourceClass,
AssertionSourceTypeClass,
AssertionTypeClass,
CustomAssertionInfoClass,
)
from datahub.utilities.time import datetime_to_ts_millis

logger: logging.Logger = logging.getLogger(__name__)
Expand All @@ -37,6 +47,24 @@ class DataQualityMonitoringResult(BaseModel):
TABLE_SCHEMA: str
TABLE_DATABASE: str
VALUE: int
REFERENCE_ID: str
ARGUMENT_NAMES: List[str]

@field_validator("ARGUMENT_NAMES", mode="before")
@classmethod
def parse_argument_names(cls, v: object) -> List[str]:
"""Parse ARGUMENT_NAMES from JSON string.

Snowflake returns this column as a JSON-encoded string like '["col1", "col2"]'.
"""
if isinstance(v, str):
try:
parsed = json.loads(v)
if isinstance(parsed, list):
return parsed
except json.JSONDecodeError:
logger.debug(f"Failed to parse ARGUMENT_NAMES as JSON: {v}")
return []


class SnowflakeAssertionsHandler:
Expand All @@ -56,20 +84,19 @@ def __init__(
def get_assertion_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
include_external = self.config.include_external_dmf_assertions

cur = self.connection.query(
SnowflakeQuery.dmf_assertion_results(
datetime_to_ts_millis(self.config.start_time),
datetime_to_ts_millis(self.config.end_time),
include_external=include_external,
)
)
for db_row in cur:
mcp = self._process_result_row(db_row, discovered_datasets)
if mcp:
yield mcp.as_workunit(is_primary_source=False)

if mcp.entityUrn and mcp.entityUrn not in self._urns_processed:
self._urns_processed.append(mcp.entityUrn)
yield self._gen_platform_instance_wu(mcp.entityUrn)
workunits = self._process_result_row(db_row, discovered_datasets)
for wu in workunits:
yield wu

def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:
# Construct a MetadataChangeProposalWrapper object for assertion platform
Expand All @@ -87,34 +114,122 @@ def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:
),
).as_workunit(is_primary_source=False)

def _generate_external_dmf_guid(self, result: DataQualityMonitoringResult) -> str:
"""Generate a stable, deterministic GUID for external DMFs.

Uses Snowflake's REFERENCE_ID which uniquely identifies the
DMF-table-column association.
"""
guid_dict: Dict[str, str] = {
"platform": "snowflake",
"reference_id": result.REFERENCE_ID,
}
if self.config.platform_instance:
guid_dict["instance"] = self.config.platform_instance

return datahub_guid(guid_dict)

def _create_assertion_info_workunit(
self,
assertion_urn: str,
dataset_urn: str,
dmf_name: str,
argument_names: List[str],
) -> MetadataWorkUnit:
"""Create AssertionInfo for external DMFs."""
# Field URN is only set for single-column DMFs. Multi-column DMFs are
# treated as table-level assertions with columns stored in custom properties.
field_urn: Optional[str] = None
if argument_names and len(argument_names) == 1:
field_urn = make_schema_field_urn(dataset_urn, argument_names[0])

custom_properties: Dict[str, str] = {
"snowflake_dmf_name": dmf_name,
}
# Store all columns in custom properties regardless of count
if argument_names:
custom_properties["snowflake_dmf_columns"] = ",".join(argument_names)

assertion_info = AssertionInfoClass(
type=AssertionTypeClass.CUSTOM,
customAssertion=CustomAssertionInfoClass(
type="Snowflake Data Metric Function",
entity=dataset_urn,
field=field_urn,
),
source=AssertionSourceClass(
type=AssertionSourceTypeClass.EXTERNAL,
),
description=f"External Snowflake DMF: {dmf_name}",
customProperties=custom_properties,
)

return MetadataChangeProposalWrapper(
entityUrn=assertion_urn,
aspect=assertion_info,
).as_workunit(is_primary_source=False)

def _process_result_row(
self, result_row: dict, discovered_datasets: List[str]
) -> Optional[MetadataChangeProposalWrapper]:
) -> List[MetadataWorkUnit]:
"""Process a single DMF result row. Returns list of workunits."""
workunits: List[MetadataWorkUnit] = []

try:
result = DataQualityMonitoringResult.model_validate(result_row)
assertion_guid = result.METRIC_NAME.split("__")[-1].lower()
status = bool(result.VALUE) # 1 if PASS, 0 if FAIL

is_datahub_dmf = result.METRIC_NAME.lower().startswith("datahub__")

if is_datahub_dmf:
assertion_guid = result.METRIC_NAME.split("__")[-1].lower()
else:
assertion_guid = self._generate_external_dmf_guid(result)

assertion_urn = make_assertion_urn(assertion_guid)

assertee = self.identifiers.get_dataset_identifier(
result.TABLE_NAME, result.TABLE_SCHEMA, result.TABLE_DATABASE
)
if assertee in discovered_datasets:
return MetadataChangeProposalWrapper(
entityUrn=make_assertion_urn(assertion_guid),
aspect=AssertionRunEvent(
timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME),
runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"),
asserteeUrn=self.identifiers.gen_dataset_urn(assertee),
status=AssertionRunStatus.COMPLETE,
assertionUrn=make_assertion_urn(assertion_guid),
result=AssertionResult(
type=(
AssertionResultType.SUCCESS
if status
else AssertionResultType.FAILURE
)
),
),
if assertee not in discovered_datasets:
return []

dataset_urn = self.identifiers.gen_dataset_urn(assertee)
status = bool(result.VALUE) # 1 if PASS, 0 if FAIL

if not is_datahub_dmf and assertion_urn not in self._urns_processed:
assertion_info_wu = self._create_assertion_info_workunit(
assertion_urn=assertion_urn,
dataset_urn=dataset_urn,
dmf_name=result.METRIC_NAME,
argument_names=result.ARGUMENT_NAMES,
)
workunits.append(assertion_info_wu)

run_event_mcp = MetadataChangeProposalWrapper(
entityUrn=assertion_urn,
aspect=AssertionRunEvent(
timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME),
runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"),
asserteeUrn=dataset_urn,
status=AssertionRunStatus.COMPLETE,
assertionUrn=assertion_urn,
result=AssertionResult(
type=(
AssertionResultType.SUCCESS
if status
else AssertionResultType.FAILURE
)
),
),
)
workunits.append(run_event_mcp.as_workunit(is_primary_source=False))

if assertion_urn not in self._urns_processed:
self._urns_processed.append(assertion_urn)
workunits.append(self._gen_platform_instance_wu(assertion_urn))

return workunits

except Exception as e:
self.report.report_warning("assertion-result-parse-failure", str(e))
return None
return []
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,16 @@ class SnowflakeV2Config(
" assertions CLI in snowflake",
)

include_external_dmf_assertions: bool = Field(
default=False,
description="Ingest user-created Snowflake DMFs (not created via DataHub) "
"as external assertions. Requires `include_assertion_results: true`. "
"When enabled, all DMFs (not just datahub__* prefixed) "
"will be ingested with their execution results. "
"IMPORTANT: External DMFs must return 1 for SUCCESS and 0 for FAILURE. "
"DataHub interprets VALUE=1 as passed, VALUE=0 as failed.",
)

pushdown_deny_usernames: List[str] = Field(
default=[],
description="List of snowflake usernames (SQL LIKE patterns, e.g., 'SERVICE_%', '%_PROD', 'TEST_USER') which will NOT be considered for lineage/usage/queries extraction. "
Expand Down Expand Up @@ -499,6 +509,15 @@ def validate_include_column_lineage(cls, v, info):
)
return v

@field_validator("include_external_dmf_assertions", mode="after")
@classmethod
def validate_include_external_dmf_assertions(cls, v, info):
if not info.data.get("include_assertion_results") and v:
raise ValueError(
"include_assertion_results must be True for include_external_dmf_assertions to be set."
)
return v

@model_validator(mode="after")
def validate_unsupported_configs(self) -> "SnowflakeV2Config":
if (
Expand Down
Loading
Loading