Skip to content

Commit ab836f9

Browse files
authored
Add support for ColumnValues DQDL rule (#655)
* Add ColumnValues Rule Support
1 parent cb56bd5 commit ab836f9

File tree

5 files changed

+979
-35
lines changed

5 files changed

+979
-35
lines changed

src/main/scala/com/amazon/deequ/dqdl/translation/DQDLRuleTranslator.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
55
* use this file except in compliance with the License. A copy of the License
@@ -34,6 +34,7 @@ import com.amazon.deequ.dqdl.translation.rules.UniqueValueRatioRule
3434
import com.amazon.deequ.dqdl.translation.rules.UniquenessRule
3535
import com.amazon.deequ.dqdl.translation.rules.ColumnLengthRule
3636
import com.amazon.deequ.dqdl.translation.rules.ColumnExistsRule
37+
import com.amazon.deequ.dqdl.translation.rules.ColumnValuesRule
3738
import com.amazon.deequ.dqdl.translation.rules.RowCountMatchRule
3839
import com.amazon.deequ.dqdl.translation.rules.ReferentialIntegrityRule
3940
import com.amazon.deequ.dqdl.translation.rules.DatasetMatchRule
@@ -70,7 +71,8 @@ object DQDLRuleTranslator {
7071
"CustomSql" -> new CustomSqlRule,
7172
"IsPrimaryKey" -> new IsPrimaryKeyRule,
7273
"ColumnLength" -> new ColumnLengthRule,
73-
"ColumnExists" -> new ColumnExistsRule
74+
"ColumnExists" -> new ColumnExistsRule,
75+
"ColumnValues" -> new ColumnValuesRule
7476
)
7577

7678
/**
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
/**
2+
* Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License
6+
* is located at
7+
*
8+
* http://aws.amazon.com/apache2.0/
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.amazon.deequ.dqdl.translation.rules
18+
19+
import com.amazon.deequ.checks.Check
20+
import com.amazon.deequ.checks.CheckLevel
21+
import com.amazon.deequ.dqdl.model.DeequMetricMapping
22+
import com.amazon.deequ.dqdl.translation.DQDLRuleConverter
23+
import com.amazon.deequ.dqdl.util.DQDLUtility.addWhereClause
24+
import com.amazon.deequ.dqdl.util.DQDLUtility.isWhereClausePresent
25+
import com.amazon.deequ.dqdl.util.DQDLUtility.requiresToBeQuoted
26+
import software.amazon.glue.dqdl.model.DQRule
27+
import software.amazon.glue.dqdl.model.condition.number.AtomicNumberOperand
28+
import software.amazon.glue.dqdl.model.condition.number.NullNumericOperand
29+
import software.amazon.glue.dqdl.model.condition.number.NumberBasedCondition
30+
import software.amazon.glue.dqdl.model.condition.number.NumberBasedConditionOperator._
31+
import software.amazon.glue.dqdl.model.condition.string.KeywordStringOperand
32+
import software.amazon.glue.dqdl.model.condition.string.QuotedStringOperand
33+
import software.amazon.glue.dqdl.model.condition.string.StringBasedCondition
34+
import software.amazon.glue.dqdl.model.condition.string.StringBasedConditionOperator
35+
36+
import scala.collection.JavaConverters._
37+
38+
case class ColumnValuesRule() extends DQDLRuleConverter {
39+
40+
override def convert(rule: DQRule): Either[String, (Check, Seq[DeequMetricMapping])] = {
41+
val targetColumn = rule.getParameters.asScala("TargetColumn")
42+
val transformedCol = if (requiresToBeQuoted(targetColumn)) s"`$targetColumn`" else targetColumn
43+
val check = Check(CheckLevel.Error, java.util.UUID.randomUUID.toString)
44+
45+
rule.getCondition match {
46+
case condition: NumberBasedCondition =>
47+
mkNumericCheck(check, targetColumn, transformedCol, condition, rule)
48+
case condition: StringBasedCondition =>
49+
mkStringCheck(check, targetColumn, transformedCol, condition, rule)
50+
case _ =>
51+
Left(s"Unsupported condition type for ColumnValues rule: " +
52+
s"${Option(rule.getCondition).map(_.getClass.getSimpleName).getOrElse("null")}")
53+
}
54+
}
55+
56+
private def mkNumericCheck(check: Check, targetColumn: String, transformedCol: String,
57+
condition: NumberBasedCondition,
58+
rule: DQRule): Either[String, (Check, Seq[DeequMetricMapping])] = {
59+
val rawOperands = condition.getOperands.asScala
60+
if (!rawOperands.forall(op => op.isInstanceOf[AtomicNumberOperand] || op.isInstanceOf[NullNumericOperand])) {
61+
return Left("ColumnValues rule only supports numeric operands or NULL keyword in conditions.")
62+
}
63+
if (rawOperands.isEmpty) {
64+
return Left("ColumnValues rule requires at least one operand.")
65+
}
66+
67+
val hasNullOperand = rawOperands.exists(_.isInstanceOf[NullNumericOperand])
68+
val numericOperands = rawOperands.collect { case a: AtomicNumberOperand => a.getOperand.toDouble }
69+
70+
condition.getOperator match {
71+
case GREATER_THAN =>
72+
val resultCheck = if (isWhereClausePresent(rule)) {
73+
check
74+
.hasMin(targetColumn, _ > numericOperands.head).where(rule.getWhereClause)
75+
.isComplete(targetColumn).where(rule.getWhereClause)
76+
} else {
77+
check
78+
.hasMin(targetColumn, _ > numericOperands.head)
79+
.isComplete(targetColumn)
80+
}
81+
Right((resultCheck, minMetric(targetColumn, rule)))
82+
83+
case GREATER_THAN_EQUAL_TO =>
84+
val resultCheck = if (isWhereClausePresent(rule)) {
85+
check
86+
.hasMin(targetColumn, _ >= numericOperands.head).where(rule.getWhereClause)
87+
.isComplete(targetColumn).where(rule.getWhereClause)
88+
} else {
89+
check
90+
.hasMin(targetColumn, _ >= numericOperands.head)
91+
.isComplete(targetColumn)
92+
}
93+
Right((resultCheck, minMetric(targetColumn, rule)))
94+
95+
case LESS_THAN =>
96+
val resultCheck = if (isWhereClausePresent(rule)) {
97+
check
98+
.hasMax(targetColumn, _ < numericOperands.head).where(rule.getWhereClause)
99+
.isComplete(targetColumn).where(rule.getWhereClause)
100+
} else {
101+
check
102+
.hasMax(targetColumn, _ < numericOperands.head)
103+
.isComplete(targetColumn)
104+
}
105+
Right((resultCheck, maxMetric(targetColumn, rule)))
106+
107+
case LESS_THAN_EQUAL_TO =>
108+
val resultCheck = if (isWhereClausePresent(rule)) {
109+
check
110+
.hasMax(targetColumn, _ <= numericOperands.head).where(rule.getWhereClause)
111+
.isComplete(targetColumn).where(rule.getWhereClause)
112+
} else {
113+
check
114+
.hasMax(targetColumn, _ <= numericOperands.head)
115+
.isComplete(targetColumn)
116+
}
117+
Right((resultCheck, maxMetric(targetColumn, rule)))
118+
119+
case BETWEEN =>
120+
if (numericOperands.size < 2) {
121+
return Left("BETWEEN requires two operands.")
122+
}
123+
val resultCheck = if (isWhereClausePresent(rule)) {
124+
check.isContainedIn(targetColumn, numericOperands.head, numericOperands.last,
125+
includeLowerBound = false, includeUpperBound = false).where(rule.getWhereClause)
126+
.isComplete(targetColumn).where(rule.getWhereClause)
127+
} else {
128+
check.isContainedIn(targetColumn, numericOperands.head, numericOperands.last,
129+
includeLowerBound = false, includeUpperBound = false)
130+
.isComplete(targetColumn)
131+
}
132+
Right((resultCheck, complianceMetric(targetColumn, check.description, rule)))
133+
134+
case NOT_BETWEEN =>
135+
if (numericOperands.size < 2) {
136+
return Left("NOT BETWEEN requires two operands.")
137+
}
138+
val sql = s"$transformedCol IS NOT NULL AND " +
139+
s"($transformedCol <= ${numericOperands.head} OR $transformedCol >= ${numericOperands.last})"
140+
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
141+
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
142+
143+
case IN =>
144+
val nums = numericOperands.mkString(", ")
145+
val sql = (numericOperands.nonEmpty, hasNullOperand) match {
146+
case (true, false) => s"$transformedCol IS NOT NULL AND $transformedCol IN ($nums)"
147+
case (true, true) => s"$transformedCol IN ($nums) OR $transformedCol IS NULL"
148+
case (false, true) => s"$transformedCol IS NULL"
149+
case _ => "FALSE"
150+
}
151+
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
152+
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
153+
154+
case NOT_IN =>
155+
val nums = numericOperands.mkString(", ")
156+
val sql = (numericOperands.nonEmpty, hasNullOperand) match {
157+
case (true, false) => s"$transformedCol IS NULL OR $transformedCol NOT IN ($nums)"
158+
case (true, true) => s"$transformedCol NOT IN ($nums) AND $transformedCol IS NOT NULL"
159+
case (false, true) => s"$transformedCol IS NOT NULL"
160+
case _ => "TRUE"
161+
}
162+
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
163+
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
164+
165+
case EQUALS =>
166+
if (hasNullOperand) {
167+
val sql = s"$transformedCol IS NULL"
168+
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
169+
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
170+
} else {
171+
val resultCheck = if (isWhereClausePresent(rule)) {
172+
check
173+
.hasMin(targetColumn, _ == numericOperands.head).where(rule.getWhereClause)
174+
.hasMax(targetColumn, _ == numericOperands.head).where(rule.getWhereClause)
175+
.isComplete(targetColumn).where(rule.getWhereClause)
176+
} else {
177+
check
178+
.hasMin(targetColumn, _ == numericOperands.head)
179+
.hasMax(targetColumn, _ == numericOperands.head)
180+
.isComplete(targetColumn)
181+
}
182+
Right((resultCheck, minMetric(targetColumn, rule) ++ maxMetric(targetColumn, rule)))
183+
}
184+
185+
case NOT_EQUALS =>
186+
if (hasNullOperand) {
187+
val sql = s"$transformedCol IS NOT NULL"
188+
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
189+
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
190+
} else {
191+
val sql = s"$transformedCol IS NULL OR $transformedCol != ${numericOperands.head}"
192+
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
193+
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
194+
}
195+
196+
case _ =>
197+
Left(s"Unsupported operator for ColumnValues numeric condition: ${condition.getOperator}")
198+
}
199+
}
200+
201+
private def mkStringCheck(check: Check, targetColumn: String, transformedCol: String,
202+
condition: StringBasedCondition,
203+
rule: DQRule): Either[String, (Check, Seq[DeequMetricMapping])] = {
204+
condition.getOperator match {
205+
case StringBasedConditionOperator.MATCHES =>
206+
val pattern = extractPattern(condition)
207+
val fullRegex = s"^${pattern}$$".r
208+
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex)),
209+
Seq(DeequMetricMapping("Column", targetColumn, "PatternMatch", "PatternMatch", None, rule = rule))))
210+
211+
case StringBasedConditionOperator.NOT_MATCHES =>
212+
val pattern = extractPattern(condition)
213+
val fullRegex = s"^(?!\\b${pattern}\\b).*$$".r
214+
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex)),
215+
Seq(DeequMetricMapping("Column", targetColumn, "PatternMatch", "PatternMatch", None, rule = rule))))
216+
217+
case StringBasedConditionOperator.IN | StringBasedConditionOperator.EQUALS =>
218+
val sql = constructComplianceCondition(transformedCol, condition, isNegated = false)
219+
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
220+
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
221+
222+
case StringBasedConditionOperator.NOT_IN | StringBasedConditionOperator.NOT_EQUALS =>
223+
val sql = constructComplianceCondition(transformedCol, condition, isNegated = true)
224+
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
225+
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
226+
227+
case _ =>
228+
Left(s"Unsupported operator for ColumnValues string condition: ${condition.getOperator}")
229+
}
230+
}
231+
232+
private def constructComplianceCondition(targetColumn: String, condition: StringBasedCondition,
233+
isNegated: Boolean): String = {
234+
val operands = condition.getOperands.asScala
235+
val quotedStrings = operands.collect { case q: QuotedStringOperand => q.getOperand }
236+
val keywordOperands = operands.collect { case k: KeywordStringOperand => k.formatOperand() }
237+
238+
val hasNull = keywordOperands.contains("NULL")
239+
val hasEmpty = keywordOperands.contains("EMPTY")
240+
val hasWhitespacesOnly = keywordOperands.contains("WHITESPACES_ONLY")
241+
242+
val conditions = scala.collection.mutable.ListBuffer[String]()
243+
244+
if (isNegated) {
245+
if (hasNull) conditions += s"$targetColumn IS NOT NULL"
246+
if (hasEmpty) conditions += s"$targetColumn != ''"
247+
if (hasWhitespacesOnly) {
248+
conditions += s"(LENGTH(TRIM($targetColumn)) > 0 OR LENGTH($targetColumn) = 0)"
249+
}
250+
if (quotedStrings.nonEmpty) {
251+
val valueList = quotedStrings.map(s => s"'${s.replace("'", "''")}'").mkString(", ")
252+
conditions += s"($targetColumn IS NULL OR $targetColumn NOT IN ($valueList))"
253+
}
254+
if (conditions.isEmpty) "TRUE" else conditions.mkString(" AND ")
255+
} else {
256+
if (hasNull) conditions += s"$targetColumn IS NULL"
257+
if (hasEmpty) conditions += s"$targetColumn = ''"
258+
if (hasWhitespacesOnly) {
259+
conditions += s"(LENGTH(TRIM($targetColumn)) = 0 AND LENGTH($targetColumn) > 0)"
260+
}
261+
if (quotedStrings.nonEmpty) {
262+
val valueList = quotedStrings.map(s => s"'${s.replace("'", "''")}'").mkString(", ")
263+
conditions += s"($targetColumn IS NOT NULL AND $targetColumn IN ($valueList))"
264+
}
265+
if (conditions.isEmpty) "FALSE" else conditions.mkString(" OR ")
266+
}
267+
}
268+
269+
private def minMetric(targetColumn: String, rule: DQRule): Seq[DeequMetricMapping] =
270+
Seq(DeequMetricMapping("Column", targetColumn, "Minimum", "Minimum", None, rule = rule))
271+
272+
private def maxMetric(targetColumn: String, rule: DQRule): Seq[DeequMetricMapping] =
273+
Seq(DeequMetricMapping("Column", targetColumn, "Maximum", "Maximum", None, rule = rule))
274+
275+
private def complianceMetric(targetColumn: String, desc: String, rule: DQRule): Seq[DeequMetricMapping] =
276+
Seq(DeequMetricMapping("Column", targetColumn, "ColumnValues.Compliance", "Compliance", Some(desc),
277+
rule = rule))
278+
279+
private def extractPattern(condition: StringBasedCondition): String =
280+
condition.getOperands.asScala.head match {
281+
case q: QuotedStringOperand => q.getOperand
282+
case other => other.toString
283+
}
284+
}

0 commit comments

Comments
 (0)