Skip to content
Merged
36 changes: 35 additions & 1 deletion docs/dqx/docs/reference/quality_checks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ You can also define your own custom dataset-level checks (see [Creating custom c
| `sql_query` | Checks whether the condition column produced by a SQL query is satisfied. The check expects the query to return a boolean condition column indicating whether a record meets the requirement (True = fail, False = pass), and one or more merge columns so that results can be joined back to the input DataFrame to preserve all original records. Important considerations: if merge columns aren't unique, multiple query rows can attach to a single input row, potentially causing false positives. Performance tip: since the check must join back to the input DataFrame to retain original records, writing a custom dataset-level rule is usually more performant than `sql_query` check. | `query`: query string, must return all merge columns and condition column; `input_placeholder`: name to be used in the sql query as `{{ input_placeholder }}` to refer to the input DataFrame, optional reference DataFrames are referred by the name provided in the dictionary of reference DataFrames (e.g. `{{ ref_df_key }}`, dictionary of DataFrames can be passed when applying checks); `merge_columns`: list of columns used for merging with the input DataFrame which must exist in the input DataFrame and be present in output of the sql query; `condition_column`: name of the column indicating a violation (False = pass, True = fail); `msg`: (optional) message to output; `name`: (optional) name of the resulting check (it can be overwritten by `name` specified at the check level); `negate`: if the condition should be negated |
| `compare_datasets` | Compares two DataFrames at both row and column levels, providing detailed information about differences, including new or missing rows and column-level changes. Only columns present in both the source and reference DataFrames are compared. Use with caution if `check_missing_records` is enabled, as this may increase the number of rows in the output beyond the original input DataFrame. The comparison does not support Map types (any column comparison on map type is skipped automatically). Comparing datasets is valuable for validating data during migrations, detecting drift, performing regression testing, or verifying synchronization between source and target systems. | `columns`: columns to use for row matching with the reference DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'df.columns'; `ref_columns`: list of columns in the reference DataFrame or Table to row match against the source DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'ref_df.columns'; note that `columns` are matched with `ref_columns` by position, so the order of the provided columns in both lists must be exactly aligned; `exclude_columns`: (optional) list of columns to exclude from the value comparison but not from row matching (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'); the `exclude_columns` field does not alter the list of columns used to determine row matches (columns), it only controls which columns are skipped during the value comparison; `ref_df_name`: (optional) name of the reference DataFrame (dictionary of DataFrames can be passed when applying checks); `ref_table`: (optional) fully qualified reference table name; either `ref_df_name` or `ref_table` must be provided but never both; the number of passed `columns` and `ref_columns` must match and keys are checks in the given order; `check_missing_records`: perform a FULL OUTER JOIN to identify records that are missing from source or reference DataFrames, default is False; use with caution as this may increase the number of rows in the output, as unmatched rows from both sides are included; `null_safe_row_matching`: (optional) treat NULLs as equal when matching rows using `columns` and `ref_columns` (default: True); `null_safe_column_value_matching`: (optional) treat NULLs as equal when comparing column values (default: True) |
| `is_data_fresh_per_time_window` | Freshness check that validates whether at least X records arrive within every Y-minute time window. | `column`: timestamp column (can be a string column name or a column expression); `window_minutes`: time window in minutes to check for data arrival; `min_records_per_window`: minimum number of records expected per time window; `lookback_windows`: (optional) number of time windows to look back from `curr_timestamp`, it filters records to include only those within the specified number of time windows from `curr_timestamp` (if no lookback is provided, the check is applied to the entire dataset); `curr_timestamp`: (optional) current timestamp column (if not provided, current_timestamp() function is used) |
| `has_valid_schema` | Schema check that validates whether the DataFrame schema matches an expected schema. In non-strict mode, validates that all expected columns exist with compatible types (allows extra columns). In strict mode, validates exact schema match (same columns, same order, same types) for all columns by default or for all columns specified in `columns`. This check is applied at the dataset level and reports schema violations for all rows in the DataFrame when incompatibilities are detected. | `expected_schema`: expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object; `columns`: (optional) list of columns to validate (if not provided, all columns are considered); `strict`: (optional) whether to perform strict schema validation (default: False) - False: validates that all expected columns exist with compatible types, True: validates exact schema match |
| `has_valid_schema` | Schema check that validates whether the DataFrame schema matches an expected schema. In non-strict mode, validates that all expected columns exist with compatible types (allows extra columns). In strict mode, validates exact schema match (same columns, same order, same types) for all columns by default or for all columns specified in `columns`. This check is applied at the dataset level and reports schema violations for all rows in the DataFrame when incompatibilities are detected. | `expected_schema`: (optional) expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object; `ref_table`: (optional) fully qualified reference table name to load the schema from (e.g. "catalog.schema.table"); either `expected_schema` or `ref_table` must be provided but never both; `columns`: (optional) list of columns to validate (if not provided, all columns are considered); `strict`: (optional) whether to perform strict schema validation (default: False) - False: validates that all expected columns exist with compatible types, True: validates exact schema match |

**Compare datasets check**

Expand Down Expand Up @@ -1729,6 +1729,21 @@ Complex data types are supported as well.
- id
- name

# has_valid_schema check using reference table
- criticality: error
check:
function: has_valid_schema
arguments:
ref_table: "catalog1.schema1.reference_table"

# has_valid_schema check using reference table with strict mode
- criticality: error
check:
function: has_valid_schema
arguments:
ref_table: "catalog1.schema1.reference_table"
strict: true

# apply check to multiple columns
- criticality: error
check:
Expand Down Expand Up @@ -2124,6 +2139,25 @@ checks = [
},
),

# has_valid_schema check using reference table
DQDatasetRule(
criticality="error",
check_func=check_funcs.has_valid_schema,
check_func_kwargs={
"ref_table": "catalog1.schema1.reference_table",
},
),

# has_valid_schema check using reference table with strict mode
DQDatasetRule(
criticality="error",
check_func=check_funcs.has_valid_schema,
check_func_kwargs={
"ref_table": "catalog1.schema1.reference_table",
"strict": True,
},
),

# apply check to multiple columns
*DQForEachColRule(
check_func=check_funcs.is_unique, # 'columns' as first argument
Expand Down
39 changes: 33 additions & 6 deletions src/databricks/labs/dqx/check_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,8 @@ def apply(df: DataFrame) -> DataFrame:

@register_rule("dataset")
def has_valid_schema(
expected_schema: str | types.StructType,
expected_schema: str | types.StructType | None = None,
ref_table: str | None = None,
columns: list[str | Column] | None = None,
strict: bool = False,
) -> tuple[Column, Callable]:
Expand All @@ -1716,6 +1717,7 @@ def has_valid_schema(

Args:
expected_schema: Expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object.
ref_table: Name of the reference table to load the schema from (e.g. "catalog.schema.table")
columns: Optional list of columns to validate (default: all columns are considered)
strict: Whether to perform strict schema validation (default: False).
- False: Validates that all expected columns exist with compatible types (allows extra columns)
Expand All @@ -1727,31 +1729,56 @@ def has_valid_schema(
- A closure that applies the schema check and adds the necessary condition columns.

Raises:
InvalidParameterError: If the schema string is invalid or cannot be parsed, or if
the input schema is neither a string nor a StructType.
InvalidParameterError:
- If the *expected_schema* string is invalid or cannot be parsed
- If *expected_schema* is neither a string nor a StructType
- If both *expected_schema* and *ref_table* are specified
- If neither *expected_schema* nor *ref_table* are specified

Note:
Exactly one of *expected_schema* or *ref_table* must be specified.
"""
if expected_schema and ref_table:
raise InvalidParameterError(
"Cannot specify both 'expected_schema' and 'ref_table' when using 'has_valid_schema'"
)

if not expected_schema and not ref_table:
raise InvalidParameterError(
"Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'"
)

column_names: list[str] | None = None
if columns:
column_names = [get_column_name_or_alias(col) if not isinstance(col, str) else col for col in columns]

_expected_schema = _get_schema(expected_schema, column_names)
unique_str = uuid.uuid4().hex # make sure any column added to the dataframe is unique
condition_col = f"__schema_condition_{unique_str}"
message_col = f"__schema_message_{unique_str}"

def apply(df: DataFrame) -> DataFrame:
def apply(df: DataFrame, spark: SparkSession) -> DataFrame:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make it compatible with other methods that take reference table. There should be an option to provide the reference data frame as well similar to foreign_key check.

"""
Apply the schema compatibility check logic to the DataFrame.

Adds columns indicating whether the DataFrame schema is incompatible with the expected schema.

Args:
df: The input DataFrame to validate for schema compatibility.
spark: SparkSession used to get the reference table schema

Returns:
The DataFrame with additional condition and message columns for schema validation.
"""

if ref_table:
ref_df = _get_ref_df(None, ref_table, None, spark)
_expected_schema = _get_schema(ref_df.schema, column_names)

elif expected_schema:
_expected_schema = _get_schema(expected_schema, column_names)

else:
raise ValueError("Must specify one of 'expected_schema' or 'ref_table' when using 'has_valid_schema'")

actual_schema = df.select(*columns).schema if columns else df.schema

if strict:
Expand Down
62 changes: 53 additions & 9 deletions tests/integration/test_dataset_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,7 @@ def test_has_valid_schema_permissive_mode_extra_column(spark):

expected_schema = "a string, b int" # Expected schema without extra column
condition, apply_method = has_valid_schema(expected_schema)
actual_apply_df = apply_method(test_df)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select("a", "b", "c", condition)

expected_condition_df = spark.createDataFrame(
Expand Down Expand Up @@ -1669,7 +1669,7 @@ def test_has_valid_schema_permissive_mode_type_widening(spark):

expected_schema = "a varchar(10), b long, c decimal(5, 1), d float, e timestamp, f timestamp, g boolean, h binary, i array<char(1)>, j map<varchar(10), int>, k struct<field1: varchar(5), field2: byte, field3: timestamp>, l map<string, string>, invalid_col int"
condition, apply_method = has_valid_schema(expected_schema)
actual_apply_df = apply_method(test_df)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select(
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "invalid_col", condition
)
Expand Down Expand Up @@ -1753,7 +1753,7 @@ def test_has_valid_schema_permissive_mode_missing_column(spark):
]
)
condition, apply_method = has_valid_schema(expected_schema)
actual_apply_df = apply_method(test_df)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select("a", "b", condition)

expected_condition_df = spark.createDataFrame(
Expand Down Expand Up @@ -1785,7 +1785,7 @@ def test_has_valid_schema_permissive_mode_incompatible_column_type(spark):

expected_schema = "a string, b int"
condition, apply_method = has_valid_schema(expected_schema)
actual_apply_df = apply_method(test_df)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select("a", "b", condition)

expected_condition_df = spark.createDataFrame(
Expand Down Expand Up @@ -1817,7 +1817,7 @@ def test_has_valid_schema_strict_mode_missing_column(spark):

expected_schema = "a string, b int, c double"
condition, apply_method = has_valid_schema(expected_schema, strict=True)
actual_apply_df = apply_method(test_df)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select("a", "b", condition)

expected_condition_df = spark.createDataFrame(
Expand Down Expand Up @@ -1849,7 +1849,7 @@ def test_has_valid_schema_strict_mode_extra_column(spark):

expected_schema = "a string, b int"
condition, apply_method = has_valid_schema(expected_schema, strict=True)
actual_apply_df = apply_method(test_df)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select("a", "b", "c", condition)

expected_condition_df = spark.createDataFrame(
Expand Down Expand Up @@ -1883,7 +1883,7 @@ def test_has_valid_schema_strict_mode_wrong_column_order(spark):

expected_schema = "a string, b int"
condition, apply_method = has_valid_schema(expected_schema, strict=True)
actual_apply_df = apply_method(test_df)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select("b", "a", condition)

expected_condition_df = spark.createDataFrame(
Expand Down Expand Up @@ -1915,7 +1915,7 @@ def test_has_valid_schema_with_specified_columns(spark):

expected_schema = "a string, b int, c string, e int"
condition, apply_method = has_valid_schema(expected_schema, columns=["a", "b"], strict=False)
actual_apply_df = apply_method(test_df)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select("a", "b", "c", "d", condition)

expected_condition_df = spark.createDataFrame(
Expand All @@ -1939,7 +1939,7 @@ def test_has_valid_schema_with_specific_columns_mismatch(spark: SparkSession):

expected_schema = "a string, b int, c string"
condition, apply_method = has_valid_schema(expected_schema, columns=["a", "b"], strict=True)
actual_apply_df = apply_method(test_df)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select("a", "b", "c", condition)

expected_condition_df = spark.createDataFrame(
Expand All @@ -1960,3 +1960,47 @@ def test_has_valid_schema_with_specific_columns_mismatch(spark: SparkSession):
"a string, b string, c double, has_invalid_schema string",
)
assert_df_equality(actual_condition_df, expected_condition_df, ignore_nullable=True)


def test_has_valid_schema_with_ref_table(spark, make_schema, make_random):
catalog_name = TEST_CATALOG
schema = make_schema(catalog_name=catalog_name)
ref_table_name = f"{catalog_name}.{schema.name}.{make_random(8).lower()}"

ref_df = spark.createDataFrame(
[
["ref1", 100],
["ref2", 200],
],
"a string, b int",
)
ref_df.write.mode("overwrite").saveAsTable(ref_table_name)

test_df = spark.createDataFrame(
[
["str1", "not_an_int"],
["str2", "also_not_int"],
],
"a string, b string",
)

condition, apply_method = has_valid_schema(ref_table=ref_table_name)
actual_apply_df = apply_method(test_df, spark)
actual_condition_df = actual_apply_df.select("a", "b", condition)

expected_condition_df = spark.createDataFrame(
[
[
"str1",
"not_an_int",
"Schema validation failed: Column 'b' has incompatible type, expected 'integer', got 'string'",
],
[
"str2",
"also_not_int",
"Schema validation failed: Column 'b' has incompatible type, expected 'integer', got 'string'",
],
],
"a string, b string, has_invalid_schema string",
)
assert_df_equality(actual_condition_df, expected_condition_df, ignore_nullable=True)
Loading
Loading