Skip to content

Commit 7bdbd90

Browse files
authored
Merge branch 'master' into add-columndatatype-rule
2 parents 026be6c + 309650c commit 7bdbd90

14 files changed

+1431
-14
lines changed

README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ Deequ also supports [DQDL](https://docs.aws.amazon.com/glue/latest/dg/dqdl.html)
158158
- **ColumnExists**: `ColumnExists "column"`
159159
- **RowCountMatch**: `RowCountMatch "referenceDataset" >= 0.9`
160160
- **DataFreshness**: `DataFreshness "Order_Date" <= 24 hours`
161+
- **Composite Rules**: Combine multiple rules with `and` / `or` operators
162+
- Simple: `(RowCount > 0) and (IsComplete "column")`
163+
- Nested: `(Rule1) or ((Rule2) and (Rule3))`
161164

162165
### Scala Example
163166

@@ -223,6 +226,57 @@ Dataset<Row> results = EvaluateDataQuality.process(df, ruleset);
223226
results.show();
224227
```
225228

229+
### Composite Rules Example
230+
231+
Composite rules allow you to combine multiple data quality checks using logical operators (`and`, `or`). This enables complex validation scenarios:
232+
233+
```scala
234+
import com.amazon.deequ.dqdl.EvaluateDataQuality
235+
import org.apache.spark.sql.SparkSession
236+
237+
val spark = SparkSession.builder()
238+
.appName("Composite Rules Example")
239+
.master("local[*]")
240+
.getOrCreate()
241+
242+
import spark.implicits._
243+
244+
val df = Seq(
245+
(1, "Alice", 25, "alice@example.com"),
246+
(2, "Bob", 30, "bob@example.com"),
247+
(3, "Charlie", 35, "charlie@example.com")
248+
).toDF("id", "name", "age", "email")
249+
250+
// Simple AND: Both conditions must be true
251+
val andRule = """Rules=[(RowCount > 0) and (IsComplete "email")]"""
252+
val andResults = EvaluateDataQuality.process(df, andRule)
253+
andResults.show()
254+
255+
// Simple OR: At least one condition must be true
256+
val orRule = """Rules=[(RowCount > 100) or (IsUnique "id")]"""
257+
val orResults = EvaluateDataQuality.process(df, orRule)
258+
orResults.show()
259+
260+
// Nested composition: Complex logic with multiple levels
261+
val nestedRule = """Rules=[
262+
((IsComplete "name") and (IsComplete "email")) or
263+
((RowCount > 0) and (IsUnique "id"))
264+
]"""
265+
val nestedResults = EvaluateDataQuality.process(df, nestedRule)
266+
nestedResults.show()
267+
268+
// Multiple composite rules in one ruleset
269+
val multipleRules = """Rules=[
270+
(RowCount > 0) and (IsComplete "id"),
271+
(IsUnique "id") or (IsUnique "email"),
272+
((Mean "age" > 20) and (Mean "age" < 50)) or (RowCount < 10)
273+
]"""
274+
val multipleResults = EvaluateDataQuality.process(df, multipleRules)
275+
multipleResults.show()
276+
```
277+
278+
**Note:** Composite rules currently support dataset-level evaluation only. Row-level evaluation (identifying which specific rows pass/fail) is not yet implemented.
279+
226280
## Citation
227281

228282
If you would like to reference this package in a research paper, please cite:

src/main/scala/com/amazon/deequ/dqdl/execution/DQDLExecutor.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
55
* use this file except in compliance with the License. A copy of the License
@@ -16,8 +16,8 @@
1616

1717
package com.amazon.deequ.dqdl.execution
1818

19-
import com.amazon.deequ.dqdl.execution.executors.{ColumnDataTypeExecutor, DataFreshnessExecutor, DeequRulesExecutor, ReferentialIntegrityExecutor, RowCountMatchExecutor, UnsupportedRulesExecutor}
20-
import com.amazon.deequ.dqdl.model.{ColumnDataTypeExecutableRule, DataFreshnessExecutableRule, DeequExecutableRule, ExecutableRule, Failed, ReferentialIntegrityExecutableRule, RowCountMatchExecutableRule, RuleOutcome, UnsupportedExecutableRule}
19+
import com.amazon.deequ.dqdl.execution.executors.{AggregateMatchExecutor, ColumnNamesMatchPatternExecutor, CompositeRulesExecutor, ColumnDataTypeExecutor, DataFreshnessExecutor, DatasetMatchExecutor, DeequRulesExecutor, ReferentialIntegrityExecutor, RowCountMatchExecutor, UnsupportedRulesExecutor}
20+
import com.amazon.deequ.dqdl.model.{AggregateMatchExecutableRule, ColumnNamesMatchPatternExecutableRule, CompositeExecutableRule, ColumnDataTypeExecutableRule, DataFreshnessExecutableRule, DatasetMatchExecutableRule, DeequExecutableRule, ExecutableRule, Failed, ReferentialIntegrityExecutableRule, RowCountMatchExecutableRule, RuleOutcome, UnsupportedExecutableRule}
2121
import org.apache.spark.sql.DataFrame
2222
import software.amazon.glue.dqdl.model.DQRule
2323

@@ -35,11 +35,15 @@ object DQDLExecutor {
3535
// Map from rule class to its executor
3636
private val executors = Map[Class[_ <: ExecutableRule], RuleExecutor[_ <: ExecutableRule]](
3737
classOf[DeequExecutableRule] -> DeequRulesExecutor,
38+
classOf[CompositeExecutableRule] -> CompositeRulesExecutor,
3839
classOf[UnsupportedExecutableRule] -> UnsupportedRulesExecutor,
3940
classOf[RowCountMatchExecutableRule] -> RowCountMatchExecutor,
4041
classOf[ReferentialIntegrityExecutableRule] -> ReferentialIntegrityExecutor,
4142
classOf[DataFreshnessExecutableRule] -> DataFreshnessExecutor,
4243
classOf[ColumnDataTypeExecutableRule] -> ColumnDataTypeExecutor
44+
classOf[ColumnNamesMatchPatternExecutableRule] -> ColumnNamesMatchPatternExecutor,
45+
classOf[DatasetMatchExecutableRule] -> DatasetMatchExecutor,
46+
classOf[AggregateMatchExecutableRule] -> AggregateMatchExecutor
4347
)
4448

4549
def executeRules(rules: Seq[ExecutableRule], df: DataFrame,
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.dqdl.execution.executors
18+
19+
import com.amazon.deequ.dqdl.execution.DQDLExecutor
20+
import com.amazon.deequ.dqdl.model.{AggregateMatchExecutableRule, AggregateOperation, Avg, Failed, Passed, RuleOutcome, Sum}
21+
import org.apache.spark.sql.DataFrame
22+
import org.apache.spark.sql.functions.{avg, sum}
23+
import org.apache.spark.sql.types.DoubleType
24+
import software.amazon.glue.dqdl.model.DQRule
25+
26+
import scala.util.{Failure, Success, Try}
27+
28+
object AggregateMatchExecutor extends DQDLExecutor.RuleExecutor[AggregateMatchExecutableRule] {
29+
30+
private val PrimaryAlias = "primary"
31+
32+
override def executeRules(rules: Seq[AggregateMatchExecutableRule], df: DataFrame,
33+
additionalDataSources: Map[String, DataFrame] = Map.empty): Map[DQRule, RuleOutcome] = {
34+
val dataSources = additionalDataSources + (PrimaryAlias -> df)
35+
rules.map { rule =>
36+
rule.dqRule -> evaluateRule(rule, dataSources)
37+
}.toMap
38+
}
39+
40+
private def evaluateRule(rule: AggregateMatchExecutableRule,
41+
dataSources: Map[String, DataFrame]): RuleOutcome = {
42+
val result = for {
43+
first <- evaluateAggregate(rule.firstAggregateOperation, dataSources)
44+
second <- evaluateAggregate(rule.secondAggregateOperation, dataSources)
45+
} yield computeRatio(first, second)
46+
47+
result match {
48+
case Right(ratio) =>
49+
val metricName = rule.evaluatedMetricName.get
50+
val metrics = Map(metricName -> ratio)
51+
if (rule.assertion(ratio)) {
52+
RuleOutcome(rule.dqRule, Passed, None, metrics)
53+
} else {
54+
RuleOutcome(rule.dqRule, Failed,
55+
Some(s"Value: $ratio does not meet the constraint requirement."), metrics)
56+
}
57+
case Left(errorMsg) =>
58+
RuleOutcome(rule.dqRule, Failed, Some(errorMsg))
59+
}
60+
}
61+
62+
private def evaluateAggregate(op: AggregateOperation,
63+
dataSources: Map[String, DataFrame]): Either[String, Double] = {
64+
dataSources.get(op.dataSourceAlias) match {
65+
case Some(ds) =>
66+
val colOp = op match {
67+
case Avg(_, _) => avg(op.column).cast(DoubleType)
68+
case Sum(_, _) => sum(op.column).cast(DoubleType)
69+
}
70+
Try(ds.select(colOp).collect().head.getAs[Double](0)) match {
71+
case Success(v) => Right(v)
72+
case Failure(ex) => Left(s"Exception: ${ex.getClass.getName}")
73+
}
74+
case None =>
75+
Left(s"${op.dataSourceAlias} not found in additional sources")
76+
}
77+
}
78+
79+
private def computeRatio(first: Double, second: Double): Double = {
80+
if (first == 0 && second == 0) 1.0 else if (second == 0) 0.0 else first / second
81+
}
82+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.dqdl.execution.executors
18+
19+
import com.amazon.deequ.dqdl.execution.DQDLExecutor
20+
import com.amazon.deequ.dqdl.model.{ColumnNamesMatchPatternExecutableRule, Failed, Passed, RuleOutcome}
21+
import org.apache.spark.sql.DataFrame
22+
import software.amazon.glue.dqdl.model.DQRule
23+
24+
import scala.util.matching.Regex
25+
import scala.util.Try
26+
27+
object ColumnNamesMatchPatternExecutor extends DQDLExecutor.RuleExecutor[ColumnNamesMatchPatternExecutableRule] {
28+
29+
override def executeRules(rules: Seq[ColumnNamesMatchPatternExecutableRule], df: DataFrame,
30+
additionalDataSources: Map[String, DataFrame] = Map.empty): Map[DQRule, RuleOutcome] = {
31+
rules.map { rule =>
32+
val pattern = Try(new Regex(rule.pattern)).getOrElse {
33+
throw new IllegalArgumentException(
34+
s"Invalid regex pattern '${rule.pattern}' for ColumnNamesMatchPattern rule")
35+
}
36+
val columns = df.columns
37+
val columnCount = columns.length
38+
val unmatchedColumns = columns.filter { col => pattern.findAllMatchIn(col).isEmpty }
39+
val metricValue = (columnCount - unmatchedColumns.length).toDouble / columnCount.toDouble
40+
val evaluatedMetric = Map("Dataset.*.ColumnNamesPatternMatchRatio" -> metricValue)
41+
42+
val outcome = if (unmatchedColumns.isEmpty) {
43+
RuleOutcome(rule.dqRule, Passed, None, evaluatedMetric)
44+
} else {
45+
RuleOutcome(rule.dqRule, Failed,
46+
Some(s"Columns that do not match the pattern: ${unmatchedColumns.mkString(", ")}"),
47+
evaluatedMetric)
48+
}
49+
50+
rule.dqRule -> outcome
51+
}.toMap
52+
}
53+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/**
2+
* Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.dqdl.execution.executors
18+
19+
import com.amazon.deequ.dqdl.execution.DQDLExecutor
20+
import com.amazon.deequ.dqdl.model.{CompositeExecutableRule, ExecutableRule, RuleOutcome}
21+
import com.amazon.deequ.dqdl.translation.RuleOutcomeTranslator
22+
import org.apache.spark.sql.DataFrame
23+
import software.amazon.glue.dqdl.model.DQRule
24+
25+
/**
26+
* Executor for composite rules that combine multiple rules with AND/OR operators.
27+
*
28+
* This executor handles the evaluation of composite rules by:
29+
* 1. Flattening the rule tree to extract all leaf (non-composite) rules
30+
* 2. Executing all leaf rules once (avoiding duplicate execution)
31+
* 3. Composing the outcomes using the specified logical operators
32+
*
33+
* Example composite rules:
34+
* - `(RowCount > 0) and (IsComplete "column")`
35+
* - `(ColumnValues "col" > 10) or (IsUnique "col")`
36+
* - `((Rule1) and (Rule2)) or (Rule3)` (nested composition)
37+
*
38+
* Note: Currently only supports dataset-level evaluation.
39+
*/
40+
object CompositeRulesExecutor extends DQDLExecutor.RuleExecutor[CompositeExecutableRule] {
41+
42+
/**
43+
* Executes composite rules by evaluating all nested rules and composing their outcomes.
44+
*
45+
* The execution strategy:
46+
* 1. Flattens all composite rules to extract unique leaf rules
47+
* 2. Executes each leaf rule once (shared across multiple composites if needed)
48+
* 3. Uses RuleOutcomeTranslator to compose outcomes with AND/OR logic
49+
*
50+
* @param rules The composite rules to execute
51+
* @param df The DataFrame to evaluate rules against
52+
* @param additionalDataSources Additional DataFrames for dataset comparison rules
53+
* @return Map of rules to their composed outcomes
54+
*/
55+
override def executeRules(
56+
rules: Seq[CompositeExecutableRule],
57+
df: DataFrame,
58+
additionalDataSources: Map[String, DataFrame] = Map.empty
59+
): Map[DQRule, RuleOutcome] = {
60+
61+
// Flatten all nested rules to get leaf rules
62+
val allNestedRules = rules.flatMap(flattenRules).distinct
63+
64+
// Execute all nested rules
65+
val nestedOutcomes = DQDLExecutor.executeRules(
66+
allNestedRules,
67+
df,
68+
additionalDataSources
69+
)
70+
71+
// Compose outcomes for each composite rule
72+
rules.map { compositeRule =>
73+
val outcome = RuleOutcomeTranslator.collectOutcome(
74+
compositeRule.dqRule,
75+
nestedOutcomes
76+
)
77+
compositeRule.dqRule -> outcome
78+
}.toMap
79+
}
80+
81+
/**
82+
* Recursively flattens composite rules to extract all leaf (non-composite) rules.
83+
*
84+
* This ensures that each leaf rule is executed only once, even if it appears
85+
* in multiple composite rules or at different nesting levels.
86+
*
87+
* @param rule The composite rule to flatten
88+
* @return Sequence of all leaf rules contained within the composite
89+
*/
90+
private def flattenRules(rule: CompositeExecutableRule): Seq[ExecutableRule] = {
91+
rule.nestedRules.flatMap {
92+
case composite: CompositeExecutableRule => flattenRules(composite)
93+
case other => Seq(other)
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)