diff --git a/docs/assertions/snowflake/snowflake_dmfs.md b/docs/assertions/snowflake/snowflake_dmfs.md index dd406ef22b2a39..7c14c17437d009 100644 --- a/docs/assertions/snowflake/snowflake_dmfs.md +++ b/docs/assertions/snowflake/snowflake_dmfs.md @@ -180,7 +180,7 @@ snowsql -f dmf_definitions.sql snowsql -f dmf_associations.sql ``` -:::NOTE +::: NOTE Scheduling Data Metric Function on table incurs Serverless Credit Usage in Snowflake. Refer [Billing and Pricing](https://docs.snowflake.com/en/user-guide/data-quality-intro#billing-and-pricing) for more details. Please ensure you DROP Data Metric Function created via dmf_associations.sql if the assertion is no longer in use. ::: @@ -208,6 +208,127 @@ either via CLI or the UI visible as normal assertions. `datahub ingest -c snowflake.yml` +## Ingesting External (User-Created) DMFs + +In addition to DataHub-created DMFs, you can also ingest results from your own custom Snowflake Data Metric Functions. "External" here means DMFs that were created directly in Snowflake without using DataHub's assertion compiler - they exist outside of DataHub's management. + +### Why Use External DMFs? + +You might want to ingest external DMFs if: + +- **Pre-existing DMFs**: You already have DMFs in Snowflake that were created before adopting DataHub, and you want to see their results in DataHub without recreating them +- **Custom logic**: You need DMF logic that isn't supported by DataHub's assertion compiler (e.g., complex multi-table checks) +- **Team workflows**: Different teams manage DMFs directly in Snowflake, but you want centralized visibility in DataHub +- **Gradual adoption**: You want to start monitoring existing data quality checks in DataHub before fully migrating to DataHub-managed assertions + +### Enabling External DMF Ingestion + +To ingest external DMFs, add the `include_externally_managed_dmfs` flag to your Snowflake recipe: + +```yaml +source: + type: snowflake + config: + # ... connection config ... + + # Enable assertion results ingestion (required) + include_assertion_results: true + + # Enable external DMF ingestion (new) + include_externally_managed_dmfs: true + + # Time window for assertion results + start_time: "-7 days" +``` + +Both flags must be enabled for external DMF ingestion to work. + +### Requirements for External DMFs + +**External DMFs must return `1` for SUCCESS and `0` for FAILURE.** + +DataHub interprets the `VALUE` column from Snowflake's `DATA_QUALITY_MONITORING_RESULTS` table as: + +- `VALUE = 1` → Assertion **PASSED** +- `VALUE = 0` → Assertion **FAILED** + +This is because DataHub cannot interpret arbitrary return values (e.g., "100 null rows" - is that good or bad?). You must build the pass/fail logic into your DMF. + +::: warning What if my DMF returns other values? +If your DMF returns values other than 0 or 1, DataHub will mark the assertion result as **ERROR**: + +- `VALUE = 1` → **PASSED** +- `VALUE = 0` → **FAILED** +- `VALUE != 0 and VALUE != 1` (e.g., 5, 100, -1) → **ERROR** + +The ERROR state indicates that the DMF is not configured correctly for DataHub ingestion. You can identify these cases by: + +1. Checking the ingestion logs for warnings like: `DMF 'my_dmf' returned invalid value 100. Expected 1 (pass) or 0 (fail). Marking as ERROR.` +2. Looking for assertions with ERROR status in the DataHub UI + ::: + +#### Example: Writing External DMFs Correctly + +**WRONG** - Returns raw count (DataHub can't interpret this): + +```sql +CREATE DATA METRIC FUNCTION my_null_check(ARGT TABLE(col VARCHAR)) +RETURNS NUMBER AS +$$ + SELECT COUNT(*) FROM ARGT WHERE col IS NULL +$$; +-- Returns: 0, 5, 100, etc. - DataHub can't determine pass/fail! +``` + +**CORRECT** - Returns 1 (pass) or 0 (fail): + +```sql +CREATE DATA METRIC FUNCTION my_null_check(ARGT TABLE(col VARCHAR)) +RETURNS NUMBER AS +$$ + SELECT CASE WHEN COUNT(*) = 0 THEN 1 ELSE 0 END + FROM ARGT WHERE col IS NULL +$$; +-- Returns: 1 if no nulls (pass), 0 if has nulls (fail) +``` + +**CORRECT** - With threshold: + +```sql +CREATE DATA METRIC FUNCTION my_null_check_threshold(ARGT TABLE(col VARCHAR)) +RETURNS NUMBER AS +$$ + SELECT CASE WHEN COUNT(*) <= 10 THEN 1 ELSE 0 END + FROM ARGT WHERE col IS NULL +$$; +-- Returns: 1 if ≤10 nulls (pass), 0 if >10 nulls (fail) +``` + +### How External DMFs Differ from DataHub-Created DMFs + +| Aspect | DataHub-Created DMFs | External DMFs | +| ------------------ | -------------------------------------------------- | ----------------------------------------- | +| **Naming** | Prefixed with `datahub__` | Any name | +| **Definition** | Created via `datahub assertions compile` | Created manually in Snowflake | +| **Assertion Type** | Based on YAML definition (Freshness, Volume, etc.) | CUSTOM | +| **Source** | NATIVE (defined in DataHub) | EXTERNAL | +| **URN Generation** | Extracted from DMF name (`datahub__`) | Generated from Snowflake's `REFERENCE_ID` | + +### How External DMFs Appear in DataHub UI + +External DMFs appear in DataHub with: + +- **Assertion Type**: CUSTOM +- **Source**: EXTERNAL +- **Platform Instance**: Snowflake platform instance (if configured) +- **Description**: "External Snowflake DMF: {dmf_name}" +- **Custom Properties**: + - `snowflake_dmf_name`: The DMF function name + - `snowflake_reference_id`: Snowflake's unique identifier for the DMF-table binding + - `snowflake_dmf_columns`: Comma-separated list of columns the DMF operates on + +You can view external DMF assertions in the **Quality** tab of the associated dataset in the DataHub UI. They will show pass/fail history alongside any DataHub-created assertions. + ## Caveats - Currently, Snowflake supports at most 1000 DMF-table associations at the moment so you can not define more than 1000 assertions for snowflake. diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_assertion.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_assertion.py index 9b61bba73e0915..95e02fe7a7b59c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_assertion.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_assertion.py @@ -1,15 +1,18 @@ +import json import logging from datetime import datetime -from typing import Iterable, List, Optional +from typing import Dict, Iterable, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, field_validator from datahub.emitter.mce_builder import ( make_assertion_urn, make_data_platform_urn, make_dataplatform_instance_urn, + make_schema_field_urn, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import DatahubKey from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection @@ -25,11 +28,30 @@ AssertionRunStatus, ) from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance +from datahub.metadata.schema_classes import ( + AssertionInfoClass, + AssertionSourceClass, + AssertionSourceTypeClass, + AssertionTypeClass, + CustomAssertionInfoClass, +) from datahub.utilities.time import datetime_to_ts_millis logger: logging.Logger = logging.getLogger(__name__) +class SnowflakeExternalDmfKey(DatahubKey): + """Key for generating deterministic GUIDs for external Snowflake DMFs. + + Uses Snowflake's REFERENCE_ID which uniquely identifies the + DMF-table-column association. + """ + + platform: str = "snowflake" + reference_id: str + instance: Optional[str] = None + + class DataQualityMonitoringResult(BaseModel): MEASUREMENT_TIME: datetime METRIC_NAME: str @@ -37,6 +59,24 @@ class DataQualityMonitoringResult(BaseModel): TABLE_SCHEMA: str TABLE_DATABASE: str VALUE: int + REFERENCE_ID: str + ARGUMENT_NAMES: List[str] + + @field_validator("ARGUMENT_NAMES", mode="before") + @classmethod + def parse_argument_names(cls, v: object) -> List[str]: + """Parse ARGUMENT_NAMES from JSON string. + + Snowflake returns this column as a JSON-encoded string like '["col1", "col2"]'. + """ + if isinstance(v, str): + try: + parsed = json.loads(v) + if isinstance(parsed, list): + return parsed + except json.JSONDecodeError: + logger.debug(f"Failed to parse ARGUMENT_NAMES as JSON: {v}") + return [] class SnowflakeAssertionsHandler: @@ -56,20 +96,19 @@ def __init__( def get_assertion_workunits( self, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: + include_external = self.config.include_externally_managed_dmfs + cur = self.connection.query( SnowflakeQuery.dmf_assertion_results( datetime_to_ts_millis(self.config.start_time), datetime_to_ts_millis(self.config.end_time), + include_external=include_external, ) ) for db_row in cur: - mcp = self._process_result_row(db_row, discovered_datasets) - if mcp: - yield mcp.as_workunit(is_primary_source=False) - - if mcp.entityUrn and mcp.entityUrn not in self._urns_processed: - self._urns_processed.append(mcp.entityUrn) - yield self._gen_platform_instance_wu(mcp.entityUrn) + workunits = self._process_result_row(db_row, discovered_datasets) + for wu in workunits: + yield wu def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit: # Construct a MetadataChangeProposalWrapper object for assertion platform @@ -87,34 +126,122 @@ def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit: ), ).as_workunit(is_primary_source=False) + def _generate_external_dmf_guid(self, result: DataQualityMonitoringResult) -> str: + """Generate a stable, deterministic GUID for external DMFs.""" + key = SnowflakeExternalDmfKey( + reference_id=result.REFERENCE_ID, + instance=self.config.platform_instance, + ) + return key.guid() + + def _create_assertion_info_workunit( + self, + assertion_urn: str, + dataset_urn: str, + dmf_name: str, + argument_names: List[str], + reference_id: str, + ) -> MetadataWorkUnit: + """Create AssertionInfo for external DMFs.""" + # Field URN is only set for single-column DMFs. Multi-column DMFs are + # treated as table-level assertions with columns stored in custom properties. + field_urn: Optional[str] = None + if argument_names and len(argument_names) == 1: + field_urn = make_schema_field_urn(dataset_urn, argument_names[0]) + + custom_properties: Dict[str, str] = { + "snowflake_dmf_name": dmf_name, + "snowflake_reference_id": reference_id, + } + # Store all columns in custom properties regardless of count + if argument_names: + custom_properties["snowflake_dmf_columns"] = ",".join(argument_names) + + assertion_info = AssertionInfoClass( + type=AssertionTypeClass.CUSTOM, + customAssertion=CustomAssertionInfoClass( + type="Snowflake Data Metric Function", + entity=dataset_urn, + field=field_urn, + ), + source=AssertionSourceClass( + type=AssertionSourceTypeClass.EXTERNAL, + ), + description=f"External Snowflake DMF: {dmf_name}", + customProperties=custom_properties, + ) + + return MetadataChangeProposalWrapper( + entityUrn=assertion_urn, + aspect=assertion_info, + ).as_workunit(is_primary_source=False) + def _process_result_row( self, result_row: dict, discovered_datasets: List[str] - ) -> Optional[MetadataChangeProposalWrapper]: + ) -> List[MetadataWorkUnit]: + """Process a single DMF result row. Returns list of workunits.""" + workunits: List[MetadataWorkUnit] = [] + try: result = DataQualityMonitoringResult.model_validate(result_row) - assertion_guid = result.METRIC_NAME.split("__")[-1].lower() - status = bool(result.VALUE) # 1 if PASS, 0 if FAIL + + is_datahub_dmf = result.METRIC_NAME.lower().startswith("datahub__") + + if is_datahub_dmf: + assertion_guid = result.METRIC_NAME.split("__")[-1].lower() + else: + assertion_guid = self._generate_external_dmf_guid(result) + + assertion_urn = make_assertion_urn(assertion_guid) + assertee = self.identifiers.get_dataset_identifier( result.TABLE_NAME, result.TABLE_SCHEMA, result.TABLE_DATABASE ) - if assertee in discovered_datasets: - return MetadataChangeProposalWrapper( - entityUrn=make_assertion_urn(assertion_guid), - aspect=AssertionRunEvent( - timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME), - runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"), - asserteeUrn=self.identifiers.gen_dataset_urn(assertee), - status=AssertionRunStatus.COMPLETE, - assertionUrn=make_assertion_urn(assertion_guid), - result=AssertionResult( - type=( - AssertionResultType.SUCCESS - if status - else AssertionResultType.FAILURE - ) - ), - ), + if assertee not in discovered_datasets: + return [] + + dataset_urn = self.identifiers.gen_dataset_urn(assertee) + + if result.VALUE == 1: + result_type = AssertionResultType.SUCCESS + elif result.VALUE == 0: + result_type = AssertionResultType.FAILURE + else: + result_type = AssertionResultType.ERROR + logger.warning( + f"DMF '{result.METRIC_NAME}' returned invalid value {result.VALUE}. " + "Expected 1 (pass) or 0 (fail). Marking as ERROR." + ) + + if not is_datahub_dmf and assertion_urn not in self._urns_processed: + assertion_info_wu = self._create_assertion_info_workunit( + assertion_urn=assertion_urn, + dataset_urn=dataset_urn, + dmf_name=result.METRIC_NAME, + argument_names=result.ARGUMENT_NAMES, + reference_id=result.REFERENCE_ID, ) + workunits.append(assertion_info_wu) + + run_event_mcp = MetadataChangeProposalWrapper( + entityUrn=assertion_urn, + aspect=AssertionRunEvent( + timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME), + runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"), + asserteeUrn=dataset_urn, + status=AssertionRunStatus.COMPLETE, + assertionUrn=assertion_urn, + result=AssertionResult(type=result_type), + ), + ) + workunits.append(run_event_mcp.as_workunit(is_primary_source=False)) + + if assertion_urn not in self._urns_processed: + self._urns_processed.append(assertion_urn) + workunits.append(self._gen_platform_instance_wu(assertion_urn)) + + return workunits + except Exception as e: self.report.report_warning("assertion-result-parse-failure", str(e)) - return None + return [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 03d70b306a15cc..c7a6c317898b1b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -441,8 +441,20 @@ class SnowflakeV2Config( include_assertion_results: bool = Field( default=False, - description="Whether to ingest assertion run results for assertions created using Datahub" - " assertions CLI in snowflake", + description="Whether to ingest assertion run results for assertions " + "[created using DataHub assertions CLI](/docs/assertions/snowflake/snowflake_dmfs) " + "in Snowflake. Also required for external DMF ingestion.", + ) + + include_externally_managed_dmfs: bool = Field( + default=False, + description="Ingest user-created Snowflake DMFs (not created via DataHub) " + "as external assertions. Requires `include_assertion_results: true`. " + "When enabled, all DMFs (not just datahub__* prefixed) " + "will be ingested with their execution results. " + "IMPORTANT: External DMFs must return 1 for SUCCESS and 0 for FAILURE. " + "DataHub interprets VALUE=1 as passed, VALUE=0 as failed. " + "See [Snowflake DMF Assertions](/docs/assertions/snowflake/snowflake_dmfs) for details.", ) pushdown_deny_usernames: List[str] = Field( @@ -499,6 +511,15 @@ def validate_include_column_lineage(cls, v, info): ) return v + @field_validator("include_externally_managed_dmfs", mode="after") + @classmethod + def validate_include_externally_managed_dmfs(cls, v, info): + if not info.data.get("include_assertion_results") and v: + raise ValueError( + "include_assertion_results must be True for include_externally_managed_dmfs to be set." + ) + return v + @model_validator(mode="after") def validate_unsupported_configs(self) -> "SnowflakeV2Config": if ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index ccd148789caf95..5ca7c306338014 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -1126,9 +1126,20 @@ def table_upstreams_only( """ @staticmethod - def dmf_assertion_results(start_time_millis: int, end_time_millis: int) -> str: - pattern = r"datahub\\_\\_%" - escape_pattern = r"\\" + def dmf_assertion_results( + start_time_millis: int, + end_time_millis: int, + include_external: bool = False, + ) -> str: + # When include_external=True, don't filter by pattern + pattern_filter = "" + if not include_external: + pattern = r"datahub\\_\\_%" + escape_pattern = r"\\" + pattern_filter = ( + f"AND METRIC_NAME ilike '{pattern}' escape '{escape_pattern}'" + ) + return f""" SELECT MEASUREMENT_TIME AS "MEASUREMENT_TIME", @@ -1136,15 +1147,16 @@ def dmf_assertion_results(start_time_millis: int, end_time_millis: int) -> str: TABLE_NAME AS "TABLE_NAME", TABLE_SCHEMA AS "TABLE_SCHEMA", TABLE_DATABASE AS "TABLE_DATABASE", + REFERENCE_ID AS "REFERENCE_ID", + ARGUMENT_NAMES AS "ARGUMENT_NAMES", VALUE::INT AS "VALUE" FROM SNOWFLAKE.LOCAL.DATA_QUALITY_MONITORING_RESULTS WHERE MEASUREMENT_TIME >= to_timestamp_ltz({start_time_millis}, 3) AND MEASUREMENT_TIME < to_timestamp_ltz({end_time_millis}, 3) - AND METRIC_NAME ilike '{pattern}' escape '{escape_pattern}' - ORDER BY MEASUREMENT_TIME ASC; - + {pattern_filter} + ORDER BY MEASUREMENT_TIME ASC; """ @staticmethod diff --git a/metadata-ingestion/tests/unit/snowflake/test_snowflake_assertion.py b/metadata-ingestion/tests/unit/snowflake/test_snowflake_assertion.py new file mode 100644 index 00000000000000..ddbe247ee9d524 --- /dev/null +++ b/metadata-ingestion/tests/unit/snowflake/test_snowflake_assertion.py @@ -0,0 +1,545 @@ +from datetime import datetime +from unittest.mock import MagicMock + +import pytest + +from datahub.ingestion.source.snowflake.snowflake_assertion import ( + DataQualityMonitoringResult, + SnowflakeAssertionsHandler, +) +from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery +from datahub.metadata.com.linkedin.pegasus2avro.assertion import AssertionResultType +from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance +from datahub.metadata.schema_classes import ( + AssertionSourceTypeClass, + AssertionTypeClass, +) + + +class TestDataQualityMonitoringResultModel: + """Test the Pydantic model for DMF results.""" + + def test_parses_argument_names_from_json_string(self): + """Model should parse ARGUMENT_NAMES from JSON string (Snowflake format).""" + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "null_check", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 1, + "REFERENCE_ID": "ref_abc123", + "ARGUMENT_NAMES": '["amount", "quantity"]', + } + result = DataQualityMonitoringResult.model_validate(row) + assert result.REFERENCE_ID == "ref_abc123" + assert result.ARGUMENT_NAMES == ["amount", "quantity"] + + def test_parses_empty_argument_names(self): + """Model should return empty list for empty JSON array.""" + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "table_level_check", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 1, + "REFERENCE_ID": "ref_abc123", + "ARGUMENT_NAMES": "[]", + } + result = DataQualityMonitoringResult.model_validate(row) + assert result.ARGUMENT_NAMES == [] + + +class TestDmfAssertionResultsQuery: + """Test dmf_assertion_results query generation.""" + + def test_query_filters_datahub_prefix_by_default(self): + """Default query should filter for datahub__* DMFs only.""" + query = SnowflakeQuery.dmf_assertion_results( + start_time_millis=1000, + end_time_millis=2000, + include_external=False, + ) + assert "datahub" in query and "%" in query + assert "METRIC_NAME ilike" in query + assert "REFERENCE_ID" in query + assert "ARGUMENT_NAMES" in query + + def test_query_includes_all_dmfs_when_external_enabled(self): + """With include_external=True, no pattern filter.""" + query = SnowflakeQuery.dmf_assertion_results( + start_time_millis=1000, + end_time_millis=2000, + include_external=True, + ) + assert "ilike" not in query + assert "REFERENCE_ID" in query + assert "ARGUMENT_NAMES" in query + + +class TestExternalDmfGuidGeneration: + """Test GUID generation for external DMFs using REFERENCE_ID.""" + + @pytest.fixture + def handler(self): + """Create a handler with mocked dependencies.""" + config = MagicMock() + config.platform_instance = None + config.include_externally_managed_dmfs = True + report = MagicMock() + connection = MagicMock() + identifiers = MagicMock() + identifiers.platform = "snowflake" + return SnowflakeAssertionsHandler(config, report, connection, identifiers) + + def test_guid_is_deterministic(self, handler): + """Same REFERENCE_ID should always produce same GUID.""" + result = DataQualityMonitoringResult( + MEASUREMENT_TIME=datetime.now(), + METRIC_NAME="null_check", + TABLE_NAME="orders", + TABLE_SCHEMA="public", + TABLE_DATABASE="my_db", + VALUE=1, + REFERENCE_ID="ref_abc123", + ARGUMENT_NAMES="[]", + ) + guid1 = handler._generate_external_dmf_guid(result) + guid2 = handler._generate_external_dmf_guid(result) + assert guid1 == guid2 + + def test_guid_differs_for_different_reference_ids(self, handler): + """Different REFERENCE_IDs should produce different URNs.""" + result1 = DataQualityMonitoringResult( + MEASUREMENT_TIME=datetime.now(), + METRIC_NAME="null_check", + TABLE_NAME="orders", + TABLE_SCHEMA="public", + TABLE_DATABASE="my_db", + VALUE=1, + REFERENCE_ID="ref_123", + ARGUMENT_NAMES="[]", + ) + result2 = DataQualityMonitoringResult( + MEASUREMENT_TIME=datetime.now(), + METRIC_NAME="null_check", + TABLE_NAME="orders", + TABLE_SCHEMA="public", + TABLE_DATABASE="my_db", + VALUE=1, + REFERENCE_ID="ref_456", + ARGUMENT_NAMES="[]", + ) + guid1 = handler._generate_external_dmf_guid(result1) + guid2 = handler._generate_external_dmf_guid(result2) + assert guid1 != guid2 + + def test_guid_includes_platform_instance(self): + """Platform instance should affect GUID when configured.""" + config_with_instance = MagicMock() + config_with_instance.platform_instance = "prod" + config_with_instance.include_externally_managed_dmfs = True + + config_without_instance = MagicMock() + config_without_instance.platform_instance = None + config_without_instance.include_externally_managed_dmfs = True + + report = MagicMock() + connection = MagicMock() + identifiers = MagicMock() + identifiers.platform = "snowflake" + + handler_with = SnowflakeAssertionsHandler( + config_with_instance, report, connection, identifiers + ) + handler_without = SnowflakeAssertionsHandler( + config_without_instance, report, connection, identifiers + ) + + result = DataQualityMonitoringResult( + MEASUREMENT_TIME=datetime.now(), + METRIC_NAME="null_check", + TABLE_NAME="orders", + TABLE_SCHEMA="public", + TABLE_DATABASE="my_db", + VALUE=1, + REFERENCE_ID="ref_abc123", + ARGUMENT_NAMES="[]", + ) + + guid_with = handler_with._generate_external_dmf_guid(result) + guid_without = handler_without._generate_external_dmf_guid(result) + assert guid_with != guid_without + + +class TestAssertionInfoCreation: + """Test AssertionInfo aspect creation for external DMFs.""" + + @pytest.fixture + def handler(self): + """Create a handler with mocked dependencies.""" + config = MagicMock() + config.platform_instance = None + config.include_externally_managed_dmfs = True + report = MagicMock() + connection = MagicMock() + identifiers = MagicMock() + identifiers.platform = "snowflake" + return SnowflakeAssertionsHandler(config, report, connection, identifiers) + + def test_assertion_info_has_correct_type_and_source(self, handler): + """External DMFs should use CUSTOM type and EXTERNAL source.""" + wu = handler._create_assertion_info_workunit( + assertion_urn="urn:li:assertion:test123", + dataset_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.public.orders,PROD)", + dmf_name="null_check", + argument_names=[], + reference_id="ref_abc123", + ) + assertion_info = wu.metadata.aspect + assert assertion_info.type == AssertionTypeClass.CUSTOM + assert assertion_info.source.type == AssertionSourceTypeClass.EXTERNAL + assert assertion_info.customProperties["snowflake_dmf_name"] == "null_check" + assert assertion_info.customProperties["snowflake_reference_id"] == "ref_abc123" + + def test_field_urn_set_for_single_column(self, handler): + """Field URN should be set when DMF operates on single column.""" + wu = handler._create_assertion_info_workunit( + assertion_urn="urn:li:assertion:test123", + dataset_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.public.orders,PROD)", + dmf_name="null_check", + argument_names=["amount"], + reference_id="ref_abc123", + ) + assertion_info = wu.metadata.aspect + assert assertion_info.customAssertion.field is not None + assert "amount" in assertion_info.customAssertion.field + + def test_field_urn_none_for_multi_column(self, handler): + """Field URN should be None when DMF operates on multiple columns.""" + wu = handler._create_assertion_info_workunit( + assertion_urn="urn:li:assertion:test123", + dataset_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.public.orders,PROD)", + dmf_name="compare_columns", + argument_names=["col1", "col2"], + reference_id="ref_abc123", + ) + assertion_info = wu.metadata.aspect + assert assertion_info.customAssertion.field is None + assert assertion_info.customProperties["snowflake_dmf_columns"] == "col1,col2" + + +class TestMixedDmfProcessing: + """Test processing both DataHub and external DMFs together.""" + + @pytest.fixture + def handler(self): + """Create a handler with mocked dependencies.""" + config = MagicMock() + config.platform_instance = None + config.include_externally_managed_dmfs = True + report = MagicMock() + connection = MagicMock() + identifiers = MagicMock() + identifiers.platform = "snowflake" + identifiers.get_dataset_identifier.return_value = "my_db.public.orders" + identifiers.gen_dataset_urn.return_value = ( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.public.orders,PROD)" + ) + return SnowflakeAssertionsHandler(config, report, connection, identifiers) + + def test_datahub_dmf_extracts_guid_from_name(self, handler): + """DataHub DMFs (datahub__*) should extract GUID from name and not emit AssertionInfo.""" + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "datahub__abc123", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 1, + "REFERENCE_ID": "ref_xyz", + "ARGUMENT_NAMES": '["col1"]', + } + discovered = ["my_db.public.orders"] + workunits = handler._process_result_row(row, discovered) + + # Should have AssertionRunEvent and DataPlatformInstance (no AssertionInfo) + assert len(workunits) == 2 + aspect_names = [wu.metadata.aspectName for wu in workunits] + assert "assertionInfo" not in aspect_names + assert "abc123" in workunits[0].metadata.entityUrn + + def test_external_dmf_emits_assertion_info(self, handler): + """External DMFs should emit AssertionInfo.""" + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "my_custom_check", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 1, + "REFERENCE_ID": "ref_abc123", + "ARGUMENT_NAMES": '["amount"]', + } + discovered = ["my_db.public.orders"] + workunits = handler._process_result_row(row, discovered) + + # Should have AssertionInfo, AssertionRunEvent, and DataPlatformInstance + assert len(workunits) == 3 + aspect_names = [wu.metadata.aspectName for wu in workunits] + assert "assertionInfo" in aspect_names + + +class TestDataPlatformInstance: + """Test DataPlatformInstance aspect generation.""" + + def test_data_platform_instance_emitted_for_external_dmf(self): + """External DMFs should emit DataPlatformInstance aspect.""" + config = MagicMock() + config.platform_instance = "my_instance" + config.include_externally_managed_dmfs = True + report = MagicMock() + connection = MagicMock() + identifiers = MagicMock() + identifiers.platform = "snowflake" + identifiers.get_dataset_identifier.return_value = "my_db.public.orders" + identifiers.gen_dataset_urn.return_value = ( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.public.orders,PROD)" + ) + handler = SnowflakeAssertionsHandler(config, report, connection, identifiers) + + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "my_custom_check", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 1, + "REFERENCE_ID": "ref_abc123", + "ARGUMENT_NAMES": '["amount"]', + } + discovered = ["my_db.public.orders"] + workunits = handler._process_result_row(row, discovered) + + # Find DataPlatformInstance workunit using type-safe filtering + platform_instance_wus = [ + wu for wu in workunits if wu.get_aspect_of_type(DataPlatformInstance) + ] + assert len(platform_instance_wus) == 1 + + aspect = platform_instance_wus[0].get_aspect_of_type(DataPlatformInstance) + assert aspect is not None + assert aspect.platform == "urn:li:dataPlatform:snowflake" + assert ( + aspect.instance + == "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,my_instance)" + ) + + def test_data_platform_instance_emitted_for_datahub_dmf(self): + """DataHub DMFs should also emit DataPlatformInstance aspect.""" + config = MagicMock() + config.platform_instance = "prod" + config.include_externally_managed_dmfs = True + report = MagicMock() + connection = MagicMock() + identifiers = MagicMock() + identifiers.platform = "snowflake" + identifiers.get_dataset_identifier.return_value = "my_db.public.orders" + identifiers.gen_dataset_urn.return_value = ( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.public.orders,PROD)" + ) + handler = SnowflakeAssertionsHandler(config, report, connection, identifiers) + + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "datahub__abc123", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 1, + "REFERENCE_ID": "ref_xyz", + "ARGUMENT_NAMES": '["col1"]', + } + discovered = ["my_db.public.orders"] + workunits = handler._process_result_row(row, discovered) + + # Find DataPlatformInstance workunit using type-safe filtering + platform_instance_wus = [ + wu for wu in workunits if wu.get_aspect_of_type(DataPlatformInstance) + ] + assert len(platform_instance_wus) == 1 + + aspect = platform_instance_wus[0].get_aspect_of_type(DataPlatformInstance) + assert aspect is not None + assert aspect.platform == "urn:li:dataPlatform:snowflake" + assert ( + aspect.instance + == "urn:li:dataPlatformInstance:(urn:li:dataPlatform:snowflake,prod)" + ) + + def test_data_platform_instance_without_instance_configured(self): + """DataPlatformInstance should have None instance when not configured.""" + config = MagicMock() + config.platform_instance = None + config.include_externally_managed_dmfs = True + report = MagicMock() + connection = MagicMock() + identifiers = MagicMock() + identifiers.platform = "snowflake" + identifiers.get_dataset_identifier.return_value = "my_db.public.orders" + identifiers.gen_dataset_urn.return_value = ( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.public.orders,PROD)" + ) + handler = SnowflakeAssertionsHandler(config, report, connection, identifiers) + + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "my_check", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 1, + "REFERENCE_ID": "ref_123", + "ARGUMENT_NAMES": "[]", + } + discovered = ["my_db.public.orders"] + workunits = handler._process_result_row(row, discovered) + + # Find DataPlatformInstance workunit using type-safe filtering + platform_instance_wus = [ + wu for wu in workunits if wu.get_aspect_of_type(DataPlatformInstance) + ] + assert len(platform_instance_wus) == 1 + + aspect = platform_instance_wus[0].get_aspect_of_type(DataPlatformInstance) + assert aspect is not None + assert aspect.platform == "urn:li:dataPlatform:snowflake" + assert aspect.instance is None + + def test_data_platform_instance_emitted_once_per_assertion(self): + """DataPlatformInstance should only be emitted once per unique assertion.""" + config = MagicMock() + config.platform_instance = "my_instance" + config.include_externally_managed_dmfs = True + report = MagicMock() + connection = MagicMock() + identifiers = MagicMock() + identifiers.platform = "snowflake" + identifiers.get_dataset_identifier.return_value = "my_db.public.orders" + identifiers.gen_dataset_urn.return_value = ( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.public.orders,PROD)" + ) + handler = SnowflakeAssertionsHandler(config, report, connection, identifiers) + + # Process same DMF twice (simulating multiple results for same assertion) + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "my_check", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 1, + "REFERENCE_ID": "ref_123", + "ARGUMENT_NAMES": "[]", + } + discovered = ["my_db.public.orders"] + + # First call + workunits1 = handler._process_result_row(row, discovered) + platform_instance_wus1 = [ + wu for wu in workunits1 if wu.get_aspect_of_type(DataPlatformInstance) + ] + assert len(platform_instance_wus1) == 1 + + # Second call with same assertion + workunits2 = handler._process_result_row(row, discovered) + platform_instance_wus2 = [ + wu for wu in workunits2 if wu.get_aspect_of_type(DataPlatformInstance) + ] + # Should not emit DataPlatformInstance again + assert len(platform_instance_wus2) == 0 + + +class TestAssertionResultTypes: + """Test assertion result type mapping based on VALUE.""" + + @pytest.fixture + def handler(self): + """Create a handler with mocked dependencies.""" + config = MagicMock() + config.platform_instance = None + config.include_externally_managed_dmfs = True + report = MagicMock() + connection = MagicMock() + identifiers = MagicMock() + identifiers.platform = "snowflake" + identifiers.get_dataset_identifier.return_value = "my_db.public.orders" + identifiers.gen_dataset_urn.return_value = ( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.public.orders,PROD)" + ) + return SnowflakeAssertionsHandler(config, report, connection, identifiers) + + def test_value_1_is_success(self, handler): + """VALUE=1 should result in SUCCESS.""" + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "my_check", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 1, + "REFERENCE_ID": "ref_123", + "ARGUMENT_NAMES": "[]", + } + discovered = ["my_db.public.orders"] + workunits = handler._process_result_row(row, discovered) + + run_event_wu = [ + wu for wu in workunits if wu.metadata.aspectName == "assertionRunEvent" + ][0] + assert run_event_wu.metadata.aspect.result.type == AssertionResultType.SUCCESS + + def test_value_0_is_failure(self, handler): + """VALUE=0 should result in FAILURE.""" + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": "my_check", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": 0, + "REFERENCE_ID": "ref_123", + "ARGUMENT_NAMES": "[]", + } + discovered = ["my_db.public.orders"] + workunits = handler._process_result_row(row, discovered) + + run_event_wu = [ + wu for wu in workunits if wu.metadata.aspectName == "assertionRunEvent" + ][0] + assert run_event_wu.metadata.aspect.result.type == AssertionResultType.FAILURE + + def test_other_values_are_error(self, handler): + """VALUES other than 0 or 1 should result in ERROR.""" + for invalid_value in [5, 100, -1, 999]: + # Reset handler state for each iteration + handler._urns_processed = [] + + row = { + "MEASUREMENT_TIME": datetime.now(), + "METRIC_NAME": f"my_check_{invalid_value}", + "TABLE_NAME": "orders", + "TABLE_SCHEMA": "public", + "TABLE_DATABASE": "my_db", + "VALUE": invalid_value, + "REFERENCE_ID": f"ref_{invalid_value}", + "ARGUMENT_NAMES": "[]", + } + discovered = ["my_db.public.orders"] + workunits = handler._process_result_row(row, discovered) + + run_event_wu = [ + wu for wu in workunits if wu.metadata.aspectName == "assertionRunEvent" + ][0] + assert run_event_wu.metadata.aspect.result.type == AssertionResultType.ERROR