-
Notifications
You must be signed in to change notification settings - Fork 8
Open
Labels
Description
Add Data Quarantine Capability for Invalid Records in Data Quality Actions
Summary
Enhance the existing data quality transform action to support a quarantine pattern where invalid records are flagged and preserved in a separate partition instead of being dropped. This allows data engineers to analyze, debug, and potentially reprocess quarantined records.
Current Behavior
Lakehouse Plumber's data quality actions support three failure modes:
warn- Log warnings but process all recordsdrop- Remove records that violate expectationsfail- Stop pipeline execution on violations
When using drop, invalid records are permanently removed with no ability to inspect or recover them.
Proposed Enhancement
Add a fourth failure mode: quarantine that:
- Adds an
is_quarantinedboolean column to the output table - Marks records as
trueif they violate any quarantine rules,falseotherwise - Stores all records (valid + invalid) in a partitioned table using
partition_cols=["is_quarantined"] - Automatically generates companion views for easy access:
{target}_valid- Only records withis_quarantined=false{target}_quarantined- Only records withis_quarantined=true
Example Configuration
actions:
- name: trips_data_quality
type: transform
transform_type: data_quality
source: raw_trips_data
target: trips_data_quarantine
readMode: stream
expectations_file: "expectations/trips_quality.json"
quarantine_mode: true # NEW FLAG
description: "Apply quality checks with quarantine"Expectations File (expectations/trips_quality.json):
{
"version": "1.0",
"table": "trips",
"expectations": [
{
"name": "valid_pickup_zip",
"expression": "pickup_zip IS NOT NULL",
"failureAction": "quarantine"
},
{
"name": "valid_dropoff_zip",
"expression": "dropoff_zip IS NOT NULL",
"failureAction": "quarantine"
}
]
}Generated Code Pattern
The above configuration would generate code similar to:
import dlt
from pyspark.sql.functions import expr
# Define quarantine rules
rules = {
"valid_pickup_zip": "(pickup_zip IS NOT NULL)",
"valid_dropoff_zip": "(dropoff_zip IS NOT NULL)",
}
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="trips_data_quarantine",
temporary=False,
partition_cols=["is_quarantined"],
comment="Trips data with quarantine tracking"
)
@dlt.expect_all(rules) # Still track metrics for all rules
def trips_data_quarantine():
return (
spark.readStream.table("raw_trips_data")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(name="trips_data_quarantine_valid")
def trips_data_quarantine_valid():
"""Valid trips records only"""
return spark.read.table("trips_data_quarantine").filter("is_quarantined=false")
@dlt.view(name="trips_data_quarantine_quarantined")
def trips_data_quarantine_quarantined():
"""Quarantined trips records for analysis"""
return spark.read.table("trips_data_quarantine").filter("is_quarantined=true")Benefits
- Observability - Preserve invalid records for root cause analysis
- Data Recovery - Reprocess quarantined records after fixing upstream issues
- Debugging - Understand patterns in data quality violations
- Compliance - Audit trail of rejected records
- Gradual Quality Improvement - Deploy pipelines without losing data while fixing quality issues
Implementation Notes
- Should work alongside existing
fail,drop, andwarnactions - Quarantine table should be a regular DLT table (not temporary) for persistence
- Consider adding
quarantine_reasoncolumn to capture which rule(s) caused quarantine - The
_validand_quarantinedviews should be automatically generated whenquarantine_mode: true - Partitioning by
is_quarantinedimproves query performance when filtering
Related Components
src/lhp/utils/dqe.py- DQE parser needs quarantine action supportsrc/lhp/generators/transform/data_quality.py- Code generation for quarantine patterndocs/actions_reference.rst- Documentation update
Acceptance Criteria
- Add
quarantineas a validfailureActionin expectations - Support
quarantine_mode: trueflag in data quality actions - Generate table with
is_quarantinedcolumn and partitioning - Auto-generate
{target}_validand{target}_quarantinedviews - Update documentation with quarantine examples
- Add unit tests for quarantine code generation
- Add integration test with sample quarantine pipeline
Reactions are currently unavailable