From 2734aab5cf8a08f4c6f3f4b0bd9c6f60609f4fef Mon Sep 17 00:00:00 2001 From: Greg Hansen Date: Fri, 5 Dec 2025 14:17:24 -0500 Subject: [PATCH 1/9] Update has_valid_schema check to accept a reference table --- docs/dqx/docs/reference/quality_checks.mdx | 36 ++++++++++++- src/databricks/labs/dqx/check_funcs.py | 39 +++++++++++--- tests/integration/test_dataset_checks.py | 62 ++++++++++++++++++---- tests/unit/test_dataset_checks.py | 29 ++++++++++ 4 files changed, 150 insertions(+), 16 deletions(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index cf364d5c6..13cd7062b 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -1395,7 +1395,7 @@ You can also define your own custom dataset-level checks (see [Creating custom c | `sql_query` | Checks whether the condition column produced by a SQL query is satisfied. The check expects the query to return a boolean condition column indicating whether a record meets the requirement (True = fail, False = pass), and one or more merge columns so that results can be joined back to the input DataFrame to preserve all original records. Important considerations: if merge columns aren't unique, multiple query rows can attach to a single input row, potentially causing false positives. Performance tip: since the check must join back to the input DataFrame to retain original records, writing a custom dataset-level rule is usually more performant than `sql_query` check. | `query`: query string, must return all merge columns and condition column; `input_placeholder`: name to be used in the sql query as `{{ input_placeholder }}` to refer to the input DataFrame, optional reference DataFrames are referred by the name provided in the dictionary of reference DataFrames (e.g. `{{ ref_df_key }}`, dictionary of DataFrames can be passed when applying checks); `merge_columns`: list of columns used for merging with the input DataFrame which must exist in the input DataFrame and be present in output of the sql query; `condition_column`: name of the column indicating a violation (False = pass, True = fail); `msg`: (optional) message to output; `name`: (optional) name of the resulting check (it can be overwritten by `name` specified at the check level); `negate`: if the condition should be negated | | `compare_datasets` | Compares two DataFrames at both row and column levels, providing detailed information about differences, including new or missing rows and column-level changes. Only columns present in both the source and reference DataFrames are compared. Use with caution if `check_missing_records` is enabled, as this may increase the number of rows in the output beyond the original input DataFrame. The comparison does not support Map types (any column comparison on map type is skipped automatically). Comparing datasets is valuable for validating data during migrations, detecting drift, performing regression testing, or verifying synchronization between source and target systems. | `columns`: columns to use for row matching with the reference DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'df.columns'; `ref_columns`: list of columns in the reference DataFrame or Table to row match against the source DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'ref_df.columns'; note that `columns` are matched with `ref_columns` by position, so the order of the provided columns in both lists must be exactly aligned; `exclude_columns`: (optional) list of columns to exclude from the value comparison but not from row matching (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'); the `exclude_columns` field does not alter the list of columns used to determine row matches (columns), it only controls which columns are skipped during the value comparison; `ref_df_name`: (optional) name of the reference DataFrame (dictionary of DataFrames can be passed when applying checks); `ref_table`: (optional) fully qualified reference table name; either `ref_df_name` or `ref_table` must be provided but never both; the number of passed `columns` and `ref_columns` must match and keys are checks in the given order; `check_missing_records`: perform a FULL OUTER JOIN to identify records that are missing from source or reference DataFrames, default is False; use with caution as this may increase the number of rows in the output, as unmatched rows from both sides are included; `null_safe_row_matching`: (optional) treat NULLs as equal when matching rows using `columns` and `ref_columns` (default: True); `null_safe_column_value_matching`: (optional) treat NULLs as equal when comparing column values (default: True) | | `is_data_fresh_per_time_window` | Freshness check that validates whether at least X records arrive within every Y-minute time window. | `column`: timestamp column (can be a string column name or a column expression); `window_minutes`: time window in minutes to check for data arrival; `min_records_per_window`: minimum number of records expected per time window; `lookback_windows`: (optional) number of time windows to look back from `curr_timestamp`, it filters records to include only those within the specified number of time windows from `curr_timestamp` (if no lookback is provided, the check is applied to the entire dataset); `curr_timestamp`: (optional) current timestamp column (if not provided, current_timestamp() function is used) | -| `has_valid_schema` | Schema check that validates whether the DataFrame schema matches an expected schema. In non-strict mode, validates that all expected columns exist with compatible types (allows extra columns). In strict mode, validates exact schema match (same columns, same order, same types) for all columns by default or for all columns specified in `columns`. This check is applied at the dataset level and reports schema violations for all rows in the DataFrame when incompatibilities are detected. | `expected_schema`: expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object; `columns`: (optional) list of columns to validate (if not provided, all columns are considered); `strict`: (optional) whether to perform strict schema validation (default: False) - False: validates that all expected columns exist with compatible types, True: validates exact schema match | +| `has_valid_schema` | Schema check that validates whether the DataFrame schema matches an expected schema. In non-strict mode, validates that all expected columns exist with compatible types (allows extra columns). In strict mode, validates exact schema match (same columns, same order, same types) for all columns by default or for all columns specified in `columns`. This check is applied at the dataset level and reports schema violations for all rows in the DataFrame when incompatibilities are detected. | `expected_schema`: (optional) expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object; `ref_table`: (optional) fully qualified reference table name to load the schema from (e.g. "catalog.schema.table"); either `expected_schema` or `ref_table` must be provided but never both; `columns`: (optional) list of columns to validate (if not provided, all columns are considered); `strict`: (optional) whether to perform strict schema validation (default: False) - False: validates that all expected columns exist with compatible types, True: validates exact schema match | **Compare datasets check** @@ -1729,6 +1729,21 @@ Complex data types are supported as well. - id - name +# has_valid_schema check using reference table +- criticality: error + check: + function: has_valid_schema + arguments: + ref_table: "catalog1.schema1.reference_table" + +# has_valid_schema check using reference table with strict mode +- criticality: error + check: + function: has_valid_schema + arguments: + ref_table: "catalog1.schema1.reference_table" + strict: true + # apply check to multiple columns - criticality: error check: @@ -2124,6 +2139,25 @@ checks = [ }, ), + # has_valid_schema check using reference table + DQDatasetRule( + criticality="error", + check_func=check_funcs.has_valid_schema, + check_func_kwargs={ + "ref_table": "catalog1.schema1.reference_table", + }, + ), + + # has_valid_schema check using reference table with strict mode + DQDatasetRule( + criticality="error", + check_func=check_funcs.has_valid_schema, + check_func_kwargs={ + "ref_table": "catalog1.schema1.reference_table", + "strict": True, + }, + ), + # apply check to multiple columns *DQForEachColRule( check_func=check_funcs.is_unique, # 'columns' as first argument diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 40def0770..2caa34e18 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1702,7 +1702,8 @@ def apply(df: DataFrame) -> DataFrame: @register_rule("dataset") def has_valid_schema( - expected_schema: str | types.StructType, + expected_schema: str | types.StructType | None = None, + ref_table: str | None = None, columns: list[str | Column] | None = None, strict: bool = False, ) -> tuple[Column, Callable]: @@ -1716,6 +1717,7 @@ def has_valid_schema( Args: expected_schema: Expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object. + ref_table: Name of the reference table to load the schema from (e.g. "catalog.schema.table") columns: Optional list of columns to validate (default: all columns are considered) strict: Whether to perform strict schema validation (default: False). - False: Validates that all expected columns exist with compatible types (allows extra columns) @@ -1727,20 +1729,33 @@ def has_valid_schema( - A closure that applies the schema check and adds the necessary condition columns. Raises: - InvalidParameterError: If the schema string is invalid or cannot be parsed, or if - the input schema is neither a string nor a StructType. + InvalidParameterError: + - If the *expected_schema* string is invalid or cannot be parsed + - If *expected_schema* is neither a string nor a StructType + - If both *expected_schema* and *ref_table* are specified + - If neither *expected_schema* nor *ref_table* are specified + + Note: + Exactly one of *expected_schema* or *ref_table* must be specified. """ + if expected_schema and ref_table: + raise InvalidParameterError( + "Cannot specify both 'expected_schema' and 'ref_table' when using 'has_valid_schema'" + ) + + if not expected_schema and not ref_table: + raise InvalidParameterError( + "Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'" + ) column_names: list[str] | None = None if columns: column_names = [get_column_name_or_alias(col) if not isinstance(col, str) else col for col in columns] - - _expected_schema = _get_schema(expected_schema, column_names) unique_str = uuid.uuid4().hex # make sure any column added to the dataframe is unique condition_col = f"__schema_condition_{unique_str}" message_col = f"__schema_message_{unique_str}" - def apply(df: DataFrame) -> DataFrame: + def apply(df: DataFrame, spark: SparkSession) -> DataFrame: """ Apply the schema compatibility check logic to the DataFrame. @@ -1748,10 +1763,22 @@ def apply(df: DataFrame) -> DataFrame: Args: df: The input DataFrame to validate for schema compatibility. + spark: SparkSession used to get the reference table schema Returns: The DataFrame with additional condition and message columns for schema validation. """ + + if ref_table: + ref_df = _get_ref_df(None, ref_table, None, spark) + _expected_schema = _get_schema(ref_df.schema, column_names) + + elif expected_schema: + _expected_schema = _get_schema(expected_schema, column_names) + + else: + raise ValueError("Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'") + actual_schema = df.select(*columns).schema if columns else df.schema if strict: diff --git a/tests/integration/test_dataset_checks.py b/tests/integration/test_dataset_checks.py index 99eafa264..812cb9d26 100644 --- a/tests/integration/test_dataset_checks.py +++ b/tests/integration/test_dataset_checks.py @@ -1600,7 +1600,7 @@ def test_has_valid_schema_permissive_mode_extra_column(spark): expected_schema = "a string, b int" # Expected schema without extra column condition, apply_method = has_valid_schema(expected_schema) - actual_apply_df = apply_method(test_df) + actual_apply_df = apply_method(test_df, spark) actual_condition_df = actual_apply_df.select("a", "b", "c", condition) expected_condition_df = spark.createDataFrame( @@ -1669,7 +1669,7 @@ def test_has_valid_schema_permissive_mode_type_widening(spark): expected_schema = "a varchar(10), b long, c decimal(5, 1), d float, e timestamp, f timestamp, g boolean, h binary, i array, j map, k struct, l map, invalid_col int" condition, apply_method = has_valid_schema(expected_schema) - actual_apply_df = apply_method(test_df) + actual_apply_df = apply_method(test_df, spark) actual_condition_df = actual_apply_df.select( "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "invalid_col", condition ) @@ -1753,7 +1753,7 @@ def test_has_valid_schema_permissive_mode_missing_column(spark): ] ) condition, apply_method = has_valid_schema(expected_schema) - actual_apply_df = apply_method(test_df) + actual_apply_df = apply_method(test_df, spark) actual_condition_df = actual_apply_df.select("a", "b", condition) expected_condition_df = spark.createDataFrame( @@ -1785,7 +1785,7 @@ def test_has_valid_schema_permissive_mode_incompatible_column_type(spark): expected_schema = "a string, b int" condition, apply_method = has_valid_schema(expected_schema) - actual_apply_df = apply_method(test_df) + actual_apply_df = apply_method(test_df, spark) actual_condition_df = actual_apply_df.select("a", "b", condition) expected_condition_df = spark.createDataFrame( @@ -1817,7 +1817,7 @@ def test_has_valid_schema_strict_mode_missing_column(spark): expected_schema = "a string, b int, c double" condition, apply_method = has_valid_schema(expected_schema, strict=True) - actual_apply_df = apply_method(test_df) + actual_apply_df = apply_method(test_df, spark) actual_condition_df = actual_apply_df.select("a", "b", condition) expected_condition_df = spark.createDataFrame( @@ -1849,7 +1849,7 @@ def test_has_valid_schema_strict_mode_extra_column(spark): expected_schema = "a string, b int" condition, apply_method = has_valid_schema(expected_schema, strict=True) - actual_apply_df = apply_method(test_df) + actual_apply_df = apply_method(test_df, spark) actual_condition_df = actual_apply_df.select("a", "b", "c", condition) expected_condition_df = spark.createDataFrame( @@ -1883,7 +1883,7 @@ def test_has_valid_schema_strict_mode_wrong_column_order(spark): expected_schema = "a string, b int" condition, apply_method = has_valid_schema(expected_schema, strict=True) - actual_apply_df = apply_method(test_df) + actual_apply_df = apply_method(test_df, spark) actual_condition_df = actual_apply_df.select("b", "a", condition) expected_condition_df = spark.createDataFrame( @@ -1915,7 +1915,7 @@ def test_has_valid_schema_with_specified_columns(spark): expected_schema = "a string, b int, c string, e int" condition, apply_method = has_valid_schema(expected_schema, columns=["a", "b"], strict=False) - actual_apply_df = apply_method(test_df) + actual_apply_df = apply_method(test_df, spark) actual_condition_df = actual_apply_df.select("a", "b", "c", "d", condition) expected_condition_df = spark.createDataFrame( @@ -1939,7 +1939,7 @@ def test_has_valid_schema_with_specific_columns_mismatch(spark: SparkSession): expected_schema = "a string, b int, c string" condition, apply_method = has_valid_schema(expected_schema, columns=["a", "b"], strict=True) - actual_apply_df = apply_method(test_df) + actual_apply_df = apply_method(test_df, spark) actual_condition_df = actual_apply_df.select("a", "b", "c", condition) expected_condition_df = spark.createDataFrame( @@ -1960,3 +1960,47 @@ def test_has_valid_schema_with_specific_columns_mismatch(spark: SparkSession): "a string, b string, c double, has_invalid_schema string", ) assert_df_equality(actual_condition_df, expected_condition_df, ignore_nullable=True) + + +def test_has_valid_schema_with_ref_table(spark, make_schema, make_random): + catalog_name = TEST_CATALOG + schema = make_schema(catalog_name=catalog_name) + ref_table_name = f"{catalog_name}.{schema.name}.{make_random(8).lower()}" + + ref_df = spark.createDataFrame( + [ + ["ref1", 100], + ["ref2", 200], + ], + "a string, b int", + ) + ref_df.write.mode("overwrite").saveAsTable(ref_table_name) + + test_df = spark.createDataFrame( + [ + ["str1", "not_an_int"], + ["str2", "also_not_int"], + ], + "a string, b string", + ) + + condition, apply_method = has_valid_schema(ref_table=ref_table_name) + actual_apply_df = apply_method(test_df, spark) + actual_condition_df = actual_apply_df.select("a", "b", condition) + + expected_condition_df = spark.createDataFrame( + [ + [ + "str1", + "not_an_int", + "Schema validation failed: Column 'b' has incompatible type, expected 'integer', got 'string'", + ], + [ + "str2", + "also_not_int", + "Schema validation failed: Column 'b' has incompatible type, expected 'integer', got 'string'", + ], + ], + "a string, b string, has_invalid_schema string", + ) + assert_df_equality(actual_condition_df, expected_condition_df, ignore_nullable=True) diff --git a/tests/unit/test_dataset_checks.py b/tests/unit/test_dataset_checks.py index 58180f159..2bf8e80b1 100644 --- a/tests/unit/test_dataset_checks.py +++ b/tests/unit/test_dataset_checks.py @@ -187,3 +187,32 @@ def test_is_data_fresh_per_time_window_exceptions( min_records_per_window=min_records_per_window, lookback_windows=lookback_windows, ) + + +@pytest.mark.parametrize( + "expected_schema, ref_table, expected_exception, expected_message", + [ + ( + "a: string, b: int", + "catalog.schema.table", + InvalidParameterError, + "Cannot specify both 'expected_schema' and 'ref_table' when using 'has_valid_schema'", + ), + ( + None, + None, + InvalidParameterError, + "Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'", + ), + ], +) +def test_has_valid_schema_exceptions(expected_schema, ref_table, expected_exception, expected_message): + with pytest.raises(expected_exception, match=expected_message): + DQDatasetRule( + criticality="warn", + check_func=check_funcs.has_valid_schema, + check_func_kwargs={ + "expected_schema": expected_schema, + "ref_table": ref_table, + }, + ) From ca90746c76672a4addd554dce99c63dd59e778f3 Mon Sep 17 00:00:00 2001 From: Greg Hansen Date: Fri, 5 Dec 2025 16:12:48 -0500 Subject: [PATCH 2/9] Update error type --- src/databricks/labs/dqx/check_funcs.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 2caa34e18..b8ae42f0a 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1777,7 +1777,9 @@ def apply(df: DataFrame, spark: SparkSession) -> DataFrame: _expected_schema = _get_schema(expected_schema, column_names) else: - raise ValueError("Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'") + raise InvalidParameterError( + "Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'" + ) actual_schema = df.select(*columns).schema if columns else df.schema From 9e3d990cddd3200ddb6d3882f3af8b41e5e316e6 Mon Sep 17 00:00:00 2001 From: Greg Hansen Date: Fri, 5 Dec 2025 17:38:43 -0500 Subject: [PATCH 3/9] Validate explicitly provided schema before apply --- src/databricks/labs/dqx/check_funcs.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index b8ae42f0a..5ed0f038a 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1751,6 +1751,10 @@ def has_valid_schema( column_names: list[str] | None = None if columns: column_names = [get_column_name_or_alias(col) if not isinstance(col, str) else col for col in columns] + + if expected_schema: + expected_schema = _get_schema(expected_schema, column_names) + unique_str = uuid.uuid4().hex # make sure any column added to the dataframe is unique condition_col = f"__schema_condition_{unique_str}" message_col = f"__schema_message_{unique_str}" @@ -1774,7 +1778,7 @@ def apply(df: DataFrame, spark: SparkSession) -> DataFrame: _expected_schema = _get_schema(ref_df.schema, column_names) elif expected_schema: - _expected_schema = _get_schema(expected_schema, column_names) + _expected_schema = expected_schema else: raise InvalidParameterError( From e3197cb8ba82be9cf7ca7bc66439c0b91f4d547b Mon Sep 17 00:00:00 2001 From: Greg Hansen Date: Fri, 5 Dec 2025 17:46:08 -0500 Subject: [PATCH 4/9] Fix implementation --- src/databricks/labs/dqx/check_funcs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 5ed0f038a..e7888ce22 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1753,7 +1753,7 @@ def has_valid_schema( column_names = [get_column_name_or_alias(col) if not isinstance(col, str) else col for col in columns] if expected_schema: - expected_schema = _get_schema(expected_schema, column_names) + expected_schema_struct = _get_schema(expected_schema, column_names) unique_str = uuid.uuid4().hex # make sure any column added to the dataframe is unique condition_col = f"__schema_condition_{unique_str}" @@ -1778,7 +1778,7 @@ def apply(df: DataFrame, spark: SparkSession) -> DataFrame: _expected_schema = _get_schema(ref_df.schema, column_names) elif expected_schema: - _expected_schema = expected_schema + _expected_schema = expected_schema_struct else: raise InvalidParameterError( From e1183b91896db3b436fabdfdc3df5ef348bcbdc8 Mon Sep 17 00:00:00 2001 From: Greg Hansen Date: Sun, 7 Dec 2025 21:30:11 -0500 Subject: [PATCH 5/9] Support schema validation with a reference DataFrame --- docs/dqx/docs/reference/quality_checks.mdx | 41 ++++++++++++++- src/databricks/labs/dqx/check_funcs.py | 32 +++++------- tests/integration/test_dataset_checks.py | 59 ++++++++++++++++++---- tests/unit/test_dataset_checks.py | 27 +++++----- 4 files changed, 114 insertions(+), 45 deletions(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index e28f088b2..dbc901b54 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -1396,7 +1396,7 @@ You can also define your own custom dataset-level checks (see [Creating custom c | `sql_query` | Checks whether the condition column produced by a SQL query is satisfied. The check expects the query to return a boolean condition column indicating whether a record meets the requirement (True = fail, False = pass), and one or more merge columns so that results can be joined back to the input DataFrame to preserve all original records. Important considerations: if merge columns aren't unique, multiple query rows can attach to a single input row, potentially causing false positives. Performance tip: since the check must join back to the input DataFrame to retain original records, writing a custom dataset-level rule is usually more performant than `sql_query` check. | `query`: query string, must return all merge columns and condition column; `input_placeholder`: name to be used in the sql query as `{{ input_placeholder }}` to refer to the input DataFrame, optional reference DataFrames are referred by the name provided in the dictionary of reference DataFrames (e.g. `{{ ref_df_key }}`, dictionary of DataFrames can be passed when applying checks); `merge_columns`: list of columns used for merging with the input DataFrame which must exist in the input DataFrame and be present in output of the sql query; `condition_column`: name of the column indicating a violation (False = pass, True = fail); `msg`: (optional) message to output; `name`: (optional) name of the resulting check (it can be overwritten by `name` specified at the check level); `negate`: if the condition should be negated | | `compare_datasets` | Compares two DataFrames at both row and column levels, providing detailed information about differences, including new or missing rows and column-level changes. Only columns present in both the source and reference DataFrames are compared. Use with caution if `check_missing_records` is enabled, as this may increase the number of rows in the output beyond the original input DataFrame. The comparison does not support Map types (any column comparison on map type is skipped automatically). Comparing datasets is valuable for validating data during migrations, detecting drift, performing regression testing, or verifying synchronization between source and target systems. | `columns`: columns to use for row matching with the reference DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'df.columns'; `ref_columns`: list of columns in the reference DataFrame or Table to row match against the source DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'ref_df.columns'; note that `columns` are matched with `ref_columns` by position, so the order of the provided columns in both lists must be exactly aligned; `exclude_columns`: (optional) list of columns to exclude from the value comparison but not from row matching (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'); the `exclude_columns` field does not alter the list of columns used to determine row matches (columns), it only controls which columns are skipped during the value comparison; `ref_df_name`: (optional) name of the reference DataFrame (dictionary of DataFrames can be passed when applying checks); `ref_table`: (optional) fully qualified reference table name; either `ref_df_name` or `ref_table` must be provided but never both; the number of passed `columns` and `ref_columns` must match and keys are checks in the given order; `check_missing_records`: perform a FULL OUTER JOIN to identify records that are missing from source or reference DataFrames, default is False; use with caution as this may increase the number of rows in the output, as unmatched rows from both sides are included; `null_safe_row_matching`: (optional) treat NULLs as equal when matching rows using `columns` and `ref_columns` (default: True); `null_safe_column_value_matching`: (optional) treat NULLs as equal when comparing column values (default: True) | | `is_data_fresh_per_time_window` | Freshness check that validates whether at least X records arrive within every Y-minute time window. | `column`: timestamp column (can be a string column name or a column expression); `window_minutes`: time window in minutes to check for data arrival; `min_records_per_window`: minimum number of records expected per time window; `lookback_windows`: (optional) number of time windows to look back from `curr_timestamp`, it filters records to include only those within the specified number of time windows from `curr_timestamp` (if no lookback is provided, the check is applied to the entire dataset); `curr_timestamp`: (optional) current timestamp column (if not provided, current_timestamp() function is used) | -| `has_valid_schema` | Schema check that validates whether the DataFrame schema matches an expected schema. In non-strict mode, validates that all expected columns exist with compatible types (allows extra columns). In strict mode, validates exact schema match (same columns, same order, same types) for all columns by default or for all columns specified in `columns`. This check is applied at the dataset level and reports schema violations for all rows in the DataFrame when incompatibilities are detected. | `expected_schema`: (optional) expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object; `ref_table`: (optional) fully qualified reference table name to load the schema from (e.g. "catalog.schema.table"); either `expected_schema` or `ref_table` must be provided but never both; `columns`: (optional) list of columns to validate (if not provided, all columns are considered); `strict`: (optional) whether to perform strict schema validation (default: False) - False: validates that all expected columns exist with compatible types, True: validates exact schema match | +| `has_valid_schema` | Schema check that validates whether the DataFrame schema matches an expected schema. In non-strict mode, validates that all expected columns exist with compatible types (allows extra columns). In strict mode, validates exact schema match (same columns, same order, same types) for all columns by default or for all columns specified in `columns`. This check is applied at the dataset level and reports schema violations for all rows in the DataFrame when incompatibilities are detected. | `expected_schema`: (optional) expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object; `ref_df_name`: (optional) name of the reference DataFrame to load the schema from (dictionary of DataFrames can be passed when applying checks); `ref_table`: (optional) fully qualified reference table name to load the schema from (e.g. "catalog.schema.table"); exactly one of `expected_schema`, `ref_df_name`, or `ref_table` must be provided; `columns`: (optional) list of columns to validate (if not provided, all columns are considered); `strict`: (optional) whether to perform strict schema validation (default: False) - False: validates that all expected columns exist with compatible types, True: validates exact schema match | | `has_no_outliers` | Checks whether the values in the input column contain any outliers. This function implements a median absolute deviation (MAD) algorithm to find outliers. | `column`: column of type numeric to check (can be a string column name or a column expression); | **Compare datasets check** @@ -1745,6 +1745,23 @@ Complex data types are supported as well. ref_table: "catalog1.schema1.reference_table" strict: true +# has_valid_schema check using reference DataFrame +- criticality: error + check: + function: has_valid_schema + arguments: + ref_df_name: "my_ref_df" + +# has_valid_schema check using reference DataFrame with specific columns +- criticality: warn + check: + function: has_valid_schema + arguments: + ref_df_name: "my_ref_df" + columns: + - id + - name + # apply check to multiple columns - criticality: error check: @@ -2173,6 +2190,25 @@ checks = [ }, ), + # has_valid_schema check using reference DataFrame + DQDatasetRule( + criticality="error", + check_func=check_funcs.has_valid_schema, + check_func_kwargs={ + "ref_df_name": "my_ref_df", + }, + ), + + # has_valid_schema check using reference DataFrame with specific columns + DQDatasetRule( + criticality="warn", + check_func=check_funcs.has_valid_schema, + check_func_kwargs={ + "ref_df_name": "my_ref_df", + "columns": ["id", "name"], + }, + ), + # apply check to multiple columns *DQForEachColRule( check_func=check_funcs.is_unique, # 'columns' as first argument @@ -2206,6 +2242,9 @@ The reference DataFrames are used in selected Dataset-level checks: * `compare_datasets`: required for this check if `ref_df_name` argument is specified and not `ref_table`, e.g. `ref_df_name="ref_df_key"`. The value of `ref_df_name` must match the key in the `ref_dfs` dictionary. +* `has_valid_schema`: required for this check if `ref_df_name` argument is specified and not `ref_table` or `expected_schema`, e.g. `ref_df_name="ref_df_key"`. + The value of `ref_df_name` must match the key in the `ref_dfs` dictionary. The schema from the reference DataFrame is used to validate the input DataFrame schema. + * `sql_query`: the reference DataFrames are registered as temporary views and can be used in the sql query. For example, if you have a reference DataFrame named `ref_df_key`, you can use it in the SQL query as `{{ ref_df_key }}`: diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index fe78d453d..86f706a62 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1839,6 +1839,7 @@ def apply(df: DataFrame) -> DataFrame: @register_rule("dataset") def has_valid_schema( expected_schema: str | types.StructType | None = None, + ref_df_name: str | None = None, ref_table: str | None = None, columns: list[str | Column] | None = None, strict: bool = False, @@ -1853,6 +1854,7 @@ def has_valid_schema( Args: expected_schema: Expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object. + ref_df_name: Name of the reference DataFrame (used when passing DataFrames directly). ref_table: Name of the reference table to load the schema from (e.g. "catalog.schema.table") columns: Optional list of columns to validate (default: all columns are considered) strict: Whether to perform strict schema validation (default: False). @@ -1872,30 +1874,27 @@ def has_valid_schema( - If neither *expected_schema* nor *ref_table* are specified Note: - Exactly one of *expected_schema* or *ref_table* must be specified. + Exactly one of *expected_schema*, *ref_df_name*, or *ref_table* must be specified. """ - if expected_schema and ref_table: - raise InvalidParameterError( - "Cannot specify both 'expected_schema' and 'ref_table' when using 'has_valid_schema'" - ) + expected_params = ["expected_schema", "ref_df_name", "ref_table"] + non_null_params = dict(filter(lambda param: param[0] in expected_params and param[1] is not None, locals().items())) - if not expected_schema and not ref_table: + if len(non_null_params) != 1: raise InvalidParameterError( - "Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'" + "Must specify one of 'expected_schema', 'ref_df_name', or 'ref_table' when using 'has_valid_schema'" ) column_names: list[str] | None = None if columns: column_names = [get_column_name_or_alias(col) if not isinstance(col, str) else col for col in columns] - if expected_schema: - expected_schema_struct = _get_schema(expected_schema, column_names) + expected_schema = _get_schema(expected_schema or types.StructType(), column_names) unique_str = uuid.uuid4().hex # make sure any column added to the dataframe is unique condition_col = f"__schema_condition_{unique_str}" message_col = f"__schema_message_{unique_str}" - def apply(df: DataFrame, spark: SparkSession) -> DataFrame: + def apply(df: DataFrame, spark: SparkSession, ref_dfs: dict[str, DataFrame]) -> DataFrame: """ Apply the schema compatibility check logic to the DataFrame. @@ -1904,22 +1903,17 @@ def apply(df: DataFrame, spark: SparkSession) -> DataFrame: Args: df: The input DataFrame to validate for schema compatibility. spark: SparkSession used to get the reference table schema + ref_dfs: A dictionary mapping reference DataFrame names to DataFrame objects. Returns: The DataFrame with additional condition and message columns for schema validation. """ - if ref_table: - ref_df = _get_ref_df(None, ref_table, None, spark) + if ref_df_name or ref_table: + ref_df = _get_ref_df(ref_df_name, ref_table, ref_dfs, spark) _expected_schema = _get_schema(ref_df.schema, column_names) - - elif expected_schema: - _expected_schema = expected_schema_struct - else: - raise InvalidParameterError( - "Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'" - ) + _expected_schema = expected_schema actual_schema = df.select(*columns).schema if columns else df.schema diff --git a/tests/integration/test_dataset_checks.py b/tests/integration/test_dataset_checks.py index 0f0a753a0..cb2374120 100644 --- a/tests/integration/test_dataset_checks.py +++ b/tests/integration/test_dataset_checks.py @@ -2331,7 +2331,7 @@ def test_has_valid_schema_permissive_mode_extra_column(spark): expected_schema = "a string, b int" # Expected schema without extra column condition, apply_method = has_valid_schema(expected_schema) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) actual_condition_df = actual_apply_df.select("a", "b", "c", condition) expected_condition_df = spark.createDataFrame( @@ -2400,7 +2400,7 @@ def test_has_valid_schema_permissive_mode_type_widening(spark): expected_schema = "a varchar(10), b long, c decimal(5, 1), d float, e timestamp, f timestamp, g boolean, h binary, i array, j map, k struct, l map, invalid_col int" condition, apply_method = has_valid_schema(expected_schema) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) actual_condition_df = actual_apply_df.select( "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "invalid_col", condition ) @@ -2484,7 +2484,7 @@ def test_has_valid_schema_permissive_mode_missing_column(spark): ] ) condition, apply_method = has_valid_schema(expected_schema) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) actual_condition_df = actual_apply_df.select("a", "b", condition) expected_condition_df = spark.createDataFrame( @@ -2516,7 +2516,7 @@ def test_has_valid_schema_permissive_mode_incompatible_column_type(spark): expected_schema = "a string, b int" condition, apply_method = has_valid_schema(expected_schema) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) actual_condition_df = actual_apply_df.select("a", "b", condition) expected_condition_df = spark.createDataFrame( @@ -2548,7 +2548,7 @@ def test_has_valid_schema_strict_mode_missing_column(spark): expected_schema = "a string, b int, c double" condition, apply_method = has_valid_schema(expected_schema, strict=True) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) actual_condition_df = actual_apply_df.select("a", "b", condition) expected_condition_df = spark.createDataFrame( @@ -2580,7 +2580,7 @@ def test_has_valid_schema_strict_mode_extra_column(spark): expected_schema = "a string, b int" condition, apply_method = has_valid_schema(expected_schema, strict=True) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) actual_condition_df = actual_apply_df.select("a", "b", "c", condition) expected_condition_df = spark.createDataFrame( @@ -2614,7 +2614,7 @@ def test_has_valid_schema_strict_mode_wrong_column_order(spark): expected_schema = "a string, b int" condition, apply_method = has_valid_schema(expected_schema, strict=True) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) actual_condition_df = actual_apply_df.select("b", "a", condition) expected_condition_df = spark.createDataFrame( @@ -2646,7 +2646,7 @@ def test_has_valid_schema_with_specified_columns(spark): expected_schema = "a string, b int, c string, e int" condition, apply_method = has_valid_schema(expected_schema, columns=["a", "b"], strict=False) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) actual_condition_df = actual_apply_df.select("a", "b", "c", "d", condition) expected_condition_df = spark.createDataFrame( @@ -2670,7 +2670,7 @@ def test_has_valid_schema_with_specific_columns_mismatch(spark: SparkSession): expected_schema = "a string, b int, c string" condition, apply_method = has_valid_schema(expected_schema, columns=["a", "b"], strict=True) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) actual_condition_df = actual_apply_df.select("a", "b", "c", condition) expected_condition_df = spark.createDataFrame( @@ -2716,7 +2716,46 @@ def test_has_valid_schema_with_ref_table(spark, make_schema, make_random): ) condition, apply_method = has_valid_schema(ref_table=ref_table_name) - actual_apply_df = apply_method(test_df, spark) + actual_apply_df = apply_method(test_df, spark, {}) + actual_condition_df = actual_apply_df.select("a", "b", condition) + + expected_condition_df = spark.createDataFrame( + [ + [ + "str1", + "not_an_int", + "Schema validation failed: Column 'b' has incompatible type, expected 'integer', got 'string'", + ], + [ + "str2", + "also_not_int", + "Schema validation failed: Column 'b' has incompatible type, expected 'integer', got 'string'", + ], + ], + "a string, b string, has_invalid_schema string", + ) + assert_df_equality(actual_condition_df, expected_condition_df, ignore_nullable=True) + + +def test_has_valid_schema_with_ref_df_name(spark: SparkSession): + ref_df = spark.createDataFrame( + [ + ["ref1", 100], + ["ref2", 200], + ], + "a string, b int", + ) + + test_df = spark.createDataFrame( + [ + ["str1", "not_an_int"], + ["str2", "also_not_int"], + ], + "a string, b string", + ) + + condition, apply_method = has_valid_schema(ref_df_name="my_ref_df") + actual_apply_df = apply_method(test_df, spark, {"my_ref_df": ref_df}) actual_condition_df = actual_apply_df.select("a", "b", condition) expected_condition_df = spark.createDataFrame( diff --git a/tests/unit/test_dataset_checks.py b/tests/unit/test_dataset_checks.py index 2bf8e80b1..639db1c47 100644 --- a/tests/unit/test_dataset_checks.py +++ b/tests/unit/test_dataset_checks.py @@ -190,29 +190,26 @@ def test_is_data_fresh_per_time_window_exceptions( @pytest.mark.parametrize( - "expected_schema, ref_table, expected_exception, expected_message", + "expected_schema, ref_df_name, ref_table", [ - ( - "a: string, b: int", - "catalog.schema.table", - InvalidParameterError, - "Cannot specify both 'expected_schema' and 'ref_table' when using 'has_valid_schema'", - ), - ( - None, - None, - InvalidParameterError, - "Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'", - ), + ("a: string, b: int", None, "catalog.schema.table"), + ("a: string, b: int", "ref_df", None), + (None, "ref_df", "catalog.schema.table"), + ("a: string, b: int", "ref_df", "catalog.schema.table"), + (None, None, None), ], ) -def test_has_valid_schema_exceptions(expected_schema, ref_table, expected_exception, expected_message): - with pytest.raises(expected_exception, match=expected_message): +def test_has_valid_schema_exceptions(expected_schema, ref_df_name, ref_table): + with pytest.raises( + InvalidParameterError, + match="Must specify one of 'expected_schema', 'ref_df_name', or 'ref_table' when using 'has_valid_schema'", + ): DQDatasetRule( criticality="warn", check_func=check_funcs.has_valid_schema, check_func_kwargs={ "expected_schema": expected_schema, + "ref_df_name": ref_df_name, "ref_table": ref_table, }, ) From f7d8ad72511ef2d18c701aa4d1d83bf78b4eb01e Mon Sep 17 00:00:00 2001 From: Greg Hansen Date: Mon, 8 Dec 2025 13:51:23 -0500 Subject: [PATCH 6/9] Fix tests --- tests/integration/test_profiler_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_profiler_workflow.py b/tests/integration/test_profiler_workflow.py index 65059eb00..3dd3496ef 100644 --- a/tests/integration/test_profiler_workflow.py +++ b/tests/integration/test_profiler_workflow.py @@ -440,7 +440,7 @@ def test_profiler_workflow_with_ai_rules_generation_with_custom_funcs(ws, spark, break expected_ai_generated_check = { - 'check': {'arguments': {'column': 'name', 'suffix': 'c'}, 'function': 'not_ends_with_suffix'}, + 'check': {'arguments': {'column': 'name', 'suffix': '"c"'}, 'function': 'not_ends_with_suffix'}, 'criticality': 'error', } From 4b0dcf5a06f8ee0d676618aee591416adf68a451 Mon Sep 17 00:00:00 2001 From: Greg Hansen Date: Mon, 8 Dec 2025 15:18:07 -0500 Subject: [PATCH 7/9] Fix tests --- tests/integration/test_profiler_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_profiler_workflow.py b/tests/integration/test_profiler_workflow.py index 3dd3496ef..65059eb00 100644 --- a/tests/integration/test_profiler_workflow.py +++ b/tests/integration/test_profiler_workflow.py @@ -440,7 +440,7 @@ def test_profiler_workflow_with_ai_rules_generation_with_custom_funcs(ws, spark, break expected_ai_generated_check = { - 'check': {'arguments': {'column': 'name', 'suffix': '"c"'}, 'function': 'not_ends_with_suffix'}, + 'check': {'arguments': {'column': 'name', 'suffix': 'c'}, 'function': 'not_ends_with_suffix'}, 'criticality': 'error', } From 88d058a6fc8031c9a6196b67352de1c04ce0d524 Mon Sep 17 00:00:00 2001 From: Greg Hansen Date: Mon, 8 Dec 2025 18:03:56 -0500 Subject: [PATCH 8/9] Fix docstrings and rename test --- src/databricks/labs/dqx/check_funcs.py | 4 ++-- tests/unit/test_dataset_checks.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 0dcc7e0df..906147f95 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1897,8 +1897,8 @@ def has_valid_schema( InvalidParameterError: - If the *expected_schema* string is invalid or cannot be parsed - If *expected_schema* is neither a string nor a StructType - - If both *expected_schema* and *ref_table* are specified - - If neither *expected_schema* nor *ref_table* are specified + - If more than one of *expected_schema*, *ref_df_name*, or *ref_table* are specified + - If none of *expected_schema*, *ref_df_name*, or *ref_table* are specified Note: Exactly one of *expected_schema*, *ref_df_name*, or *ref_table* must be specified. diff --git a/tests/unit/test_dataset_checks.py b/tests/unit/test_dataset_checks.py index beaaefdb9..7ddb4b18a 100644 --- a/tests/unit/test_dataset_checks.py +++ b/tests/unit/test_dataset_checks.py @@ -242,7 +242,7 @@ def test_is_data_fresh_per_time_window_exceptions( (None, None, None), ], ) -def test_has_valid_schema_exceptions(expected_schema, ref_df_name, ref_table): +def test_has_valid_schema_parameter_validation(expected_schema, ref_df_name, ref_table): with pytest.raises( InvalidParameterError, match="Must specify one of 'expected_schema', 'ref_df_name', or 'ref_table' when using 'has_valid_schema'", From 5effa9dd8dd44334d270388b4dcd57820bddaedc Mon Sep 17 00:00:00 2001 From: Greg Hansen Date: Tue, 9 Dec 2025 08:24:06 -0500 Subject: [PATCH 9/9] Fix test --- tests/integration/test_profiler_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_profiler_workflow.py b/tests/integration/test_profiler_workflow.py index 65059eb00..13fa9149a 100644 --- a/tests/integration/test_profiler_workflow.py +++ b/tests/integration/test_profiler_workflow.py @@ -440,7 +440,7 @@ def test_profiler_workflow_with_ai_rules_generation_with_custom_funcs(ws, spark, break expected_ai_generated_check = { - 'check': {'arguments': {'column': 'name', 'suffix': 'c'}, 'function': 'not_ends_with_suffix'}, + 'check': {'arguments': {'column': 'name', 'suffix': "'c'"}, 'function': 'not_ends_with_suffix'}, 'criticality': 'error', }