Skip to content

Commit 293c764

Browse files
ghansemwojtyczka
andauthored
Update has_valid_schema check to accept a reference dataframe or table (#960)
## Changes <!-- Summary of your changes that are easy to understand. Add screenshots when necessary --> This PR introduces reference datasets (either tables or dataframes) for the `has_valid_schema` check function. The behavior is as follows: - When `ref_dfs` is created in-code and `ref_df_name` is specified, the valid schema will be determined from the reference dataframe - When `ref_table` is specified, the valid schema will be determined by loading the reference table as a Spark dataframe Specifying multiple valid schema sources (e.g. `expected_schema` and `ref_df_name` or `ref_table`) will raise an `InvalidParameterError`. ### Linked issues <!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved. See https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword --> Resolves #959 ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [x] manually tested - [x] added unit tests - [x] added integration tests - [ ] added end-to-end tests - [ ] added performance tests --------- Co-authored-by: Marcin Wojtyczka <[email protected]>
1 parent 76aed0b commit 293c764

File tree

5 files changed

+225
-16
lines changed

5 files changed

+225
-16
lines changed

docs/dqx/docs/reference/quality_checks.mdx

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1396,7 +1396,7 @@ You can also define your own custom dataset-level checks (see [Creating custom c
13961396
| `sql_query` | Checks whether the condition column produced by a SQL query is satisfied. The check supports two modes: **Row-level validation** (when `merge_columns` is provided) - query results are joined back to the input DataFrame to mark specific rows; **Dataset-level validation** (when `merge_columns` is None or empty) - the check result applies to all rows (or filtered rows if `row_filter` is used), making it ideal for aggregate validations with custom metrics. The query must return a boolean condition column (True = fail, False = pass). For row-level checks: if merge columns aren't unique, multiple query rows can attach to a single input row, potentially causing false positives. Performance tip: for complex queries, writing a custom dataset-level rule is usually more performant than `sql_query` check. | `query`: query string, must return condition column (and merge columns if provided); `input_placeholder`: name to be used in the sql query as `{{ input_placeholder }}` to refer to the input DataFrame, optional reference DataFrames are referred by the name provided in the dictionary of reference DataFrames (e.g. `{{ ref_df_key }}`, dictionary of DataFrames can be passed when applying checks); `merge_columns`: (optional) list of columns used for merging with the input DataFrame which must exist in the input DataFrame and be present in output of the sql query; when not provided (None or empty list), the check result applies to all rows in the dataset (dataset-level validation); `condition_column`: name of the column indicating a violation (False = pass, True = fail); `msg`: (optional) message to output; `name`: (optional) name of the resulting check (it can be overwritten by `name` specified at the check level); `negate`: if the condition should be negated |
13971397
| `compare_datasets` | Compares two DataFrames at both row and column levels, providing detailed information about differences, including new or missing rows and column-level changes. Only columns present in both the source and reference DataFrames are compared. Use with caution if `check_missing_records` is enabled, as this may increase the number of rows in the output beyond the original input DataFrame. The comparison does not support Map types (any column comparison on map type is skipped automatically). Comparing datasets is valuable for validating data during migrations, detecting drift, performing regression testing, or verifying synchronization between source and target systems. | `columns`: columns to use for row matching with the reference DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'df.columns'; `ref_columns`: list of columns in the reference DataFrame or Table to row match against the source DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'ref_df.columns'; note that `columns` are matched with `ref_columns` by position, so the order of the provided columns in both lists must be exactly aligned; `exclude_columns`: (optional) list of columns to exclude from the value comparison but not from row matching (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'); the `exclude_columns` field does not alter the list of columns used to determine row matches (columns), it only controls which columns are skipped during the value comparison; `ref_df_name`: (optional) name of the reference DataFrame (dictionary of DataFrames can be passed when applying checks); `ref_table`: (optional) fully qualified reference table name; either `ref_df_name` or `ref_table` must be provided but never both; the number of passed `columns` and `ref_columns` must match and keys are checks in the given order; `check_missing_records`: perform a FULL OUTER JOIN to identify records that are missing from source or reference DataFrames, default is False; use with caution as this may increase the number of rows in the output, as unmatched rows from both sides are included; `null_safe_row_matching`: (optional) treat NULLs as equal when matching rows using `columns` and `ref_columns` (default: True); `null_safe_column_value_matching`: (optional) treat NULLs as equal when comparing column values (default: True) |
13981398
| `is_data_fresh_per_time_window` | Freshness check that validates whether at least X records arrive within every Y-minute time window. | `column`: timestamp column (can be a string column name or a column expression); `window_minutes`: time window in minutes to check for data arrival; `min_records_per_window`: minimum number of records expected per time window; `lookback_windows`: (optional) number of time windows to look back from `curr_timestamp`, it filters records to include only those within the specified number of time windows from `curr_timestamp` (if no lookback is provided, the check is applied to the entire dataset); `curr_timestamp`: (optional) current timestamp column (if not provided, current_timestamp() function is used) |
1399-
| `has_valid_schema` | Schema check that validates whether the DataFrame schema matches an expected schema. In non-strict mode, validates that all expected columns exist with compatible types (allows extra columns). In strict mode, validates exact schema match (same columns, same order, same types) for all columns by default or for all columns specified in `columns`. This check is applied at the dataset level and reports schema violations for all rows in the DataFrame when incompatibilities are detected. | `expected_schema`: expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object; `columns`: (optional) list of columns to validate (if not provided, all columns are considered); `strict`: (optional) whether to perform strict schema validation (default: False) - False: validates that all expected columns exist with compatible types, True: validates exact schema match |
1399+
| `has_valid_schema` | Schema check that validates whether the DataFrame schema matches an expected schema. In non-strict mode, validates that all expected columns exist with compatible types (allows extra columns). In strict mode, validates exact schema match (same columns, same order, same types) for all columns by default or for all columns specified in `columns`. This check is applied at the dataset level and reports schema violations for all rows in the DataFrame when incompatibilities are detected. | `expected_schema`: (optional) expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object; `ref_df_name`: (optional) name of the reference DataFrame to load the schema from (dictionary of DataFrames can be passed when applying checks); `ref_table`: (optional) fully qualified reference table name to load the schema from (e.g. "catalog.schema.table"); exactly one of `expected_schema`, `ref_df_name`, or `ref_table` must be provided; `columns`: (optional) list of columns to validate (if not provided, all columns are considered); `strict`: (optional) whether to perform strict schema validation (default: False) - False: validates that all expected columns exist with compatible types, True: validates exact schema match |
14001400
| `has_no_outliers` | Checks whether the values in the input column contain any outliers. This function implements a median absolute deviation (MAD) algorithm to find outliers. | `column`: column of type numeric to check (can be a string column name or a column expression); |
14011401

14021402
**Compare datasets check**
@@ -1742,6 +1742,38 @@ Complex data types are supported as well.
17421742
- id
17431743
- name
17441744

1745+
# has_valid_schema check using reference table
1746+
- criticality: error
1747+
check:
1748+
function: has_valid_schema
1749+
arguments:
1750+
ref_table: "catalog1.schema1.reference_table"
1751+
1752+
# has_valid_schema check using reference table with strict mode
1753+
- criticality: error
1754+
check:
1755+
function: has_valid_schema
1756+
arguments:
1757+
ref_table: "catalog1.schema1.reference_table"
1758+
strict: true
1759+
1760+
# has_valid_schema check using reference DataFrame
1761+
- criticality: error
1762+
check:
1763+
function: has_valid_schema
1764+
arguments:
1765+
ref_df_name: "my_ref_df"
1766+
1767+
# has_valid_schema check using reference DataFrame with specific columns
1768+
- criticality: warn
1769+
check:
1770+
function: has_valid_schema
1771+
arguments:
1772+
ref_df_name: "my_ref_df"
1773+
columns:
1774+
- id
1775+
- name
1776+
17451777
# apply check to multiple columns
17461778
- criticality: error
17471779
check:
@@ -2165,6 +2197,44 @@ checks = [
21652197
},
21662198
),
21672199

2200+
# has_valid_schema check using reference table
2201+
DQDatasetRule(
2202+
criticality="error",
2203+
check_func=check_funcs.has_valid_schema,
2204+
check_func_kwargs={
2205+
"ref_table": "catalog1.schema1.reference_table",
2206+
},
2207+
),
2208+
2209+
# has_valid_schema check using reference table with strict mode
2210+
DQDatasetRule(
2211+
criticality="error",
2212+
check_func=check_funcs.has_valid_schema,
2213+
check_func_kwargs={
2214+
"ref_table": "catalog1.schema1.reference_table",
2215+
"strict": True,
2216+
},
2217+
),
2218+
2219+
# has_valid_schema check using reference DataFrame
2220+
DQDatasetRule(
2221+
criticality="error",
2222+
check_func=check_funcs.has_valid_schema,
2223+
check_func_kwargs={
2224+
"ref_df_name": "my_ref_df",
2225+
},
2226+
),
2227+
2228+
# has_valid_schema check using reference DataFrame with specific columns
2229+
DQDatasetRule(
2230+
criticality="warn",
2231+
check_func=check_funcs.has_valid_schema,
2232+
check_func_kwargs={
2233+
"ref_df_name": "my_ref_df",
2234+
"columns": ["id", "name"],
2235+
},
2236+
),
2237+
21682238
# apply check to multiple columns
21692239
*DQForEachColRule(
21702240
check_func=check_funcs.is_unique, # 'columns' as first argument
@@ -2198,6 +2268,9 @@ The reference DataFrames are used in selected Dataset-level checks:
21982268
* `compare_datasets`: required for this check if `ref_df_name` argument is specified and not `ref_table`, e.g. `ref_df_name="ref_df_key"`.
21992269
The value of `ref_df_name` must match the key in the `ref_dfs` dictionary.
22002270

2271+
* `has_valid_schema`: required for this check if `ref_df_name` argument is specified and not `ref_table` or `expected_schema`, e.g. `ref_df_name="ref_df_key"`.
2272+
The value of `ref_df_name` must match the key in the `ref_dfs` dictionary. The schema from the reference DataFrame is used to validate the input DataFrame schema.
2273+
22012274
* `sql_query`: the reference DataFrames are registered as temporary views and can be used in the sql query.
22022275

22032276
For example, if you have a reference DataFrame named `ref_df_key`, you can use it in the SQL query as `{{ ref_df_key }}`:

src/databricks/labs/dqx/check_funcs.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1865,7 +1865,9 @@ def apply(df: DataFrame) -> DataFrame:
18651865

18661866
@register_rule("dataset")
18671867
def has_valid_schema(
1868-
expected_schema: str | types.StructType,
1868+
expected_schema: str | types.StructType | None = None,
1869+
ref_df_name: str | None = None,
1870+
ref_table: str | None = None,
18691871
columns: list[str | Column] | None = None,
18701872
strict: bool = False,
18711873
) -> tuple[Column, Callable]:
@@ -1879,6 +1881,8 @@ def has_valid_schema(
18791881
18801882
Args:
18811883
expected_schema: Expected schema as a DDL string (e.g., "id INT, name STRING") or StructType object.
1884+
ref_df_name: Name of the reference DataFrame (used when passing DataFrames directly).
1885+
ref_table: Name of the reference table to load the schema from (e.g. "catalog.schema.table")
18821886
columns: Optional list of columns to validate (default: all columns are considered)
18831887
strict: Whether to perform strict schema validation (default: False).
18841888
- False: Validates that all expected columns exist with compatible types (allows extra columns)
@@ -1890,31 +1894,54 @@ def has_valid_schema(
18901894
- A closure that applies the schema check and adds the necessary condition columns.
18911895
18921896
Raises:
1893-
InvalidParameterError: If the schema string is invalid or cannot be parsed, or if
1894-
the input schema is neither a string nor a StructType.
1897+
InvalidParameterError:
1898+
- If the *expected_schema* string is invalid or cannot be parsed
1899+
- If *expected_schema* is neither a string nor a StructType
1900+
- If more than one of *expected_schema*, *ref_df_name*, or *ref_table* are specified
1901+
- If none of *expected_schema*, *ref_df_name*, or *ref_table* are specified
1902+
1903+
Note:
1904+
Exactly one of *expected_schema*, *ref_df_name*, or *ref_table* must be specified.
18951905
"""
1906+
expected_params = ["expected_schema", "ref_df_name", "ref_table"]
1907+
non_null_params = dict(filter(lambda param: param[0] in expected_params and param[1] is not None, locals().items()))
1908+
1909+
if len(non_null_params) != 1:
1910+
raise InvalidParameterError(
1911+
"Must specify one of 'expected_schema', 'ref_df_name', or 'ref_table' when using 'has_valid_schema'"
1912+
)
18961913

18971914
column_names: list[str] | None = None
18981915
if columns:
18991916
column_names = [get_column_name_or_alias(col) if not isinstance(col, str) else col for col in columns]
19001917

1901-
_expected_schema = _get_schema(expected_schema, column_names)
1918+
expected_schema = _get_schema(expected_schema or types.StructType(), column_names)
1919+
19021920
unique_str = uuid.uuid4().hex # make sure any column added to the dataframe is unique
19031921
condition_col = f"__schema_condition_{unique_str}"
19041922
message_col = f"__schema_message_{unique_str}"
19051923

1906-
def apply(df: DataFrame) -> DataFrame:
1924+
def apply(df: DataFrame, spark: SparkSession, ref_dfs: dict[str, DataFrame]) -> DataFrame:
19071925
"""
19081926
Apply the schema compatibility check logic to the DataFrame.
19091927
19101928
Adds columns indicating whether the DataFrame schema is incompatible with the expected schema.
19111929
19121930
Args:
19131931
df: The input DataFrame to validate for schema compatibility.
1932+
spark: SparkSession used to get the reference table schema
1933+
ref_dfs: A dictionary mapping reference DataFrame names to DataFrame objects.
19141934
19151935
Returns:
19161936
The DataFrame with additional condition and message columns for schema validation.
19171937
"""
1938+
1939+
if ref_df_name or ref_table:
1940+
ref_df = _get_ref_df(ref_df_name, ref_table, ref_dfs, spark)
1941+
_expected_schema = _get_schema(ref_df.schema, column_names)
1942+
else:
1943+
_expected_schema = expected_schema
1944+
19181945
actual_schema = df.select(*columns).schema if columns else df.schema
19191946

19201947
if strict:

0 commit comments

Comments
 (0)