Skip to content

Commit c030c5a

Browse files
author
Joshua Zexter
committed
Add DatasetMatch DQDL Rule Support
1 parent 6f376b4 commit c030c5a

File tree

6 files changed

+379
-8
lines changed

6 files changed

+379
-8
lines changed

src/main/scala/com/amazon/deequ/dqdl/execution/DQDLExecutor.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package com.amazon.deequ.dqdl.execution
1818

19-
import com.amazon.deequ.dqdl.execution.executors.{DeequRulesExecutor, ReferentialIntegrityExecutor, RowCountMatchExecutor, UnsupportedRulesExecutor}
20-
import com.amazon.deequ.dqdl.model.{DeequExecutableRule, ExecutableRule, Failed, ReferentialIntegrityExecutableRule, RowCountMatchExecutableRule, RuleOutcome, UnsupportedExecutableRule}
19+
import com.amazon.deequ.dqdl.execution.executors.{DatasetMatchExecutor, DeequRulesExecutor, ReferentialIntegrityExecutor, RowCountMatchExecutor, UnsupportedRulesExecutor}
20+
import com.amazon.deequ.dqdl.model.{DatasetMatchExecutableRule, DeequExecutableRule, ExecutableRule, Failed, ReferentialIntegrityExecutableRule, RowCountMatchExecutableRule, RuleOutcome, UnsupportedExecutableRule}
2121
import org.apache.spark.sql.DataFrame
2222
import software.amazon.glue.dqdl.model.DQRule
2323

@@ -37,7 +37,8 @@ object DQDLExecutor {
3737
classOf[DeequExecutableRule] -> DeequRulesExecutor,
3838
classOf[UnsupportedExecutableRule] -> UnsupportedRulesExecutor,
3939
classOf[RowCountMatchExecutableRule] -> RowCountMatchExecutor,
40-
classOf[ReferentialIntegrityExecutableRule] -> ReferentialIntegrityExecutor
40+
classOf[ReferentialIntegrityExecutableRule] -> ReferentialIntegrityExecutor,
41+
classOf[DatasetMatchExecutableRule] -> DatasetMatchExecutor
4142
)
4243

4344
def executeRules(rules: Seq[ExecutableRule], df: DataFrame,
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.dqdl.execution.executors
18+
19+
import com.amazon.deequ.comparison.{DataSynchronization, DatasetMatchFailed}
20+
import com.amazon.deequ.dqdl.execution.DQDLExecutor
21+
import com.amazon.deequ.dqdl.model.{DatasetMatchExecutableRule, Failed, Passed, RuleOutcome}
22+
import org.apache.spark.sql.DataFrame
23+
import org.apache.spark.sql.functions.{col, lit}
24+
import software.amazon.glue.dqdl.model.DQRule
25+
26+
import scala.util.{Failure, Success, Try}
27+
28+
object DatasetMatchExecutor extends DQDLExecutor.RuleExecutor[DatasetMatchExecutableRule] {
29+
30+
override def executeRules(rules: Seq[DatasetMatchExecutableRule], df: DataFrame,
31+
additionalDataSources: Map[String, DataFrame]): Map[DQRule, RuleOutcome] = {
32+
rules.map { rule =>
33+
rule.dqRule -> (additionalDataSources.get(rule.referenceDatasetAlias) match {
34+
case Some(referenceDF) =>
35+
val matchCols = rule.matchColumnMappings.orElse {
36+
Some(df.columns.filterNot(rule.keyColumnMappings.contains).map(c => c -> c).toMap)
37+
}
38+
val outcomeCol = java.util.UUID.randomUUID().toString
39+
40+
Try(DataSynchronization.columnMatchRowLevel(
41+
df, referenceDF, rule.keyColumnMappings, matchCols, Some(outcomeCol))) match {
42+
case Success(Left(DatasetMatchFailed(errorMessage, _, _))) =>
43+
RuleOutcome(rule.dqRule, Failed, Some(errorMessage))
44+
case Success(Right(augmentedDF)) =>
45+
val filteredCount = augmentedDF.filter(col(outcomeCol) === lit(true)).count()
46+
val totalCount = df.count()
47+
val ratio = filteredCount.toDouble / totalCount
48+
val metrics = Map(s"Dataset.${rule.referenceDatasetAlias}.DatasetMatch" -> ratio)
49+
50+
if (rule.assertion(ratio)) RuleOutcome(rule.dqRule, Passed, None, metrics)
51+
else RuleOutcome(rule.dqRule, Failed,
52+
Some(s"Value: $ratio does not meet the constraint requirement."), metrics)
53+
case Failure(ex) =>
54+
RuleOutcome(rule.dqRule, Failed,
55+
Some(s"Exception thrown while evaluating rule: ${ex.getClass.getName}"))
56+
}
57+
case None =>
58+
RuleOutcome(rule.dqRule, Failed,
59+
Some(s"${rule.referenceDatasetAlias} not found in additional sources"))
60+
})
61+
}.toMap
62+
}
63+
}

src/main/scala/com/amazon/deequ/dqdl/model/ExecutableRule.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ case class ReferentialIntegrityExecutableRule(dqRule: DQRule,
5757
Some(s"Column.$referenceDatasetAlias.ReferentialIntegrity")
5858
}
5959

60+
case class DatasetMatchExecutableRule(dqRule: DQRule,
61+
referenceDatasetAlias: String,
62+
keyColumnMappings: Map[String, String],
63+
matchColumnMappings: Option[Map[String, String]],
64+
assertion: Double => Boolean) extends ExecutableRule {
65+
override val evaluatedMetricName: Option[String] =
66+
Some(s"Dataset.$referenceDatasetAlias.DatasetMatch")
67+
}
68+
6069
case class DeequMetricMapping(entity: String,
6170
instance: String,
6271
name: String,

src/main/scala/com/amazon/deequ/dqdl/translation/DQDLRuleTranslator.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import com.amazon.deequ.dqdl.translation.rules.ColumnLengthRule
3535
import com.amazon.deequ.dqdl.translation.rules.ColumnExistsRule
3636
import com.amazon.deequ.dqdl.translation.rules.RowCountMatchRule
3737
import com.amazon.deequ.dqdl.translation.rules.ReferentialIntegrityRule
38+
import com.amazon.deequ.dqdl.translation.rules.DatasetMatchRule
3839
import software.amazon.glue.dqdl.model.DQRule
3940
import software.amazon.glue.dqdl.model.DQRuleset
4041

@@ -87,6 +88,11 @@ object DQDLRuleTranslator {
8788
case Right(executableRule) => executableRule
8889
case Left(message) => UnsupportedExecutableRule(rule, Some(message))
8990
}
91+
case "DatasetMatch" =>
92+
DatasetMatchRule.toExecutableRule(rule) match {
93+
case Right(executableRule) => executableRule
94+
case Left(message) => UnsupportedExecutableRule(rule, Some(message))
95+
}
9096
case _ =>
9197
translateRule(rule) match {
9298
case Right(deequExecutableRule) => deequExecutableRule
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.dqdl.translation.rules
18+
19+
import com.amazon.deequ.dqdl.execution.DefaultOperandEvaluator
20+
import com.amazon.deequ.dqdl.model.DatasetMatchExecutableRule
21+
import software.amazon.glue.dqdl.model.DQRule
22+
import software.amazon.glue.dqdl.model.condition.number.NumberBasedCondition
23+
24+
import scala.collection.JavaConverters._
25+
26+
object DatasetMatchRule {
27+
28+
def toExecutableRule(rule: DQRule): Either[String, DatasetMatchExecutableRule] = {
29+
val params = rule.getParameters.asScala.mapValues(_.replaceAll("\"", ""))
30+
31+
val referenceAlias = params("ReferenceDatasetAlias")
32+
val keyMappings = parseColumnMappings(params("KeyColumnMappings"))
33+
34+
keyMappings match {
35+
case None => Left("Could not parse KeyColumnMappings")
36+
case Some(keyMap) =>
37+
val matchMappings = params.get("MatchColumnMappings").flatMap(parseColumnMappings)
38+
val condition = rule.getCondition.asInstanceOf[NumberBasedCondition]
39+
val assertion: Double => Boolean = d => condition.evaluate(d, rule, DefaultOperandEvaluator)
40+
Right(DatasetMatchExecutableRule(rule, referenceAlias, keyMap, matchMappings, assertion))
41+
}
42+
}
43+
44+
private def parseColumnMappings(mappings: String): Option[Map[String, String]] = {
45+
if (mappings.isEmpty) return None
46+
Some(mappings.split(",").map { mapping =>
47+
val parts = mapping.trim.split("->")
48+
if (parts.length == 2) parts(0).trim -> parts(1).trim
49+
else parts(0).trim -> parts(0).trim
50+
}.toMap)
51+
}
52+
}

0 commit comments

Comments
 (0)