Skip to content

Commit 6dba28b

Browse files
committed
Fix AnalyzerOptions handling in DQDL rule converters for WHERE clause and NULL behavior
1 parent a47469c commit 6dba28b

File tree

12 files changed

+318
-60
lines changed

12 files changed

+318
-60
lines changed

src/main/scala/com/amazon/deequ/dqdl/translation/DQDLRuleConverter.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,29 @@
1616

1717
package com.amazon.deequ.dqdl.translation
1818

19+
import com.amazon.deequ.analyzers.{AnalyzerOptions, FilteredRowOutcome, NullBehavior}
1920
import com.amazon.deequ.checks.Check
2021
import com.amazon.deequ.dqdl.execution.DefaultOperandEvaluator
2122
import com.amazon.deequ.dqdl.model.DeequMetricMapping
23+
import com.amazon.deequ.dqdl.util.DQDLUtility.isWhereClausePresent
2224
import software.amazon.glue.dqdl.model.DQRule
2325
import software.amazon.glue.dqdl.model.condition.number.NumberBasedCondition
2426

2527

2628
trait DQDLRuleConverter {
29+
30+
val DEFAULT_ANALYZER_OPTION: Option[AnalyzerOptions] =
31+
Some(AnalyzerOptions(NullBehavior.EmptyString, FilteredRowOutcome.TRUE))
32+
2733
def convert(rule: DQRule): Either[String, (Check, Seq[DeequMetricMapping])]
2834

2935
def assertionAsScala(dqRule: DQRule, e: NumberBasedCondition): Double => Boolean = {
3036
val evaluator = DefaultOperandEvaluator
3137
(d: Double) => e.evaluate(d, dqRule, evaluator)
3238
}
39+
40+
protected def analyzerOptionsForWhereClause(rule: DQRule): Option[AnalyzerOptions] =
41+
if (isWhereClausePresent(rule)) DEFAULT_ANALYZER_OPTION else None
3342
}
3443

3544

src/main/scala/com/amazon/deequ/dqdl/translation/rules/ColumnLengthRule.scala

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,20 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
5959
val operands = rawOperands.map(_.getOperand.toDouble)
6060
val transformedColForSparkSql = if (requiresToBeQuoted(targetColumn)) s"`$targetColumn`" else targetColumn
6161

62-
def withMultipleConstraints(minAssertion: Double => Boolean, maxAssertion: Double => Boolean): Check = {
62+
val opts = DEFAULT_ANALYZER_OPTION
63+
64+
def withMultipleConstraints(minAssertion: Double => Boolean,
65+
maxAssertion: Double => Boolean): Check = {
6366
if (isWhereClausePresent(rule)) {
6467
check
65-
.hasMinLength(targetColumn, minAssertion).where(rule.getWhereClause)
66-
.hasMaxLength(targetColumn, maxAssertion).where(rule.getWhereClause)
68+
.hasMinLength(targetColumn, minAssertion, analyzerOptions = opts)
69+
.where(rule.getWhereClause)
70+
.hasMaxLength(targetColumn, maxAssertion, analyzerOptions = opts)
71+
.where(rule.getWhereClause)
6772
} else {
6873
check
69-
.hasMinLength(targetColumn, minAssertion)
70-
.hasMaxLength(targetColumn, maxAssertion)
74+
.hasMinLength(targetColumn, minAssertion, analyzerOptions = opts)
75+
.hasMaxLength(targetColumn, maxAssertion, analyzerOptions = opts)
7176
}
7277
}
7378

@@ -76,16 +81,20 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
7681
Right(withMultipleConstraints(_ > operands.head, _ < operands.last))
7782

7883
case GREATER_THAN_EQUAL_TO =>
79-
Right(addWhereClause(rule, check.hasMinLength(targetColumn, _ >= operands.head)))
84+
Right(addWhereClause(rule, check.hasMinLength(targetColumn, _ >= operands.head,
85+
analyzerOptions = opts)))
8086

8187
case GREATER_THAN =>
82-
Right(addWhereClause(rule, check.hasMinLength(targetColumn, _ > operands.head)))
88+
Right(addWhereClause(rule, check.hasMinLength(targetColumn, _ > operands.head,
89+
analyzerOptions = opts)))
8390

8491
case LESS_THAN_EQUAL_TO =>
85-
Right(addWhereClause(rule, check.hasMaxLength(targetColumn, _ <= operands.head)))
92+
Right(addWhereClause(rule, check.hasMaxLength(targetColumn, _ <= operands.head,
93+
analyzerOptions = opts)))
8694

8795
case LESS_THAN =>
88-
Right(addWhereClause(rule, check.hasMaxLength(targetColumn, _ < operands.head)))
96+
Right(addWhereClause(rule, check.hasMaxLength(targetColumn, _ < operands.head,
97+
analyzerOptions = opts)))
8998

9099
case EQUALS =>
91100
Right(withMultipleConstraints(_ == operands.head, _ == operands.head))
@@ -103,7 +112,7 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
103112
s"length($transformedColForSparkSql) in (${operands.mkString(",")})"
104113
}
105114
Right(addWhereClause(rule, check.satisfies(complianceCondition, check.description, _ == 1.0,
106-
columns = List(transformedColForSparkSql))))
115+
columns = List(transformedColForSparkSql), analyzerOptions = opts)))
107116

108117
case NOT_IN =>
109118
val complianceCondition =
@@ -115,7 +124,7 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
115124
s"length($transformedColForSparkSql) not in (${operands.mkString(",")})"
116125
}
117126
Right(addWhereClause(rule, check.satisfies(complianceCondition, check.description, _ == 1.0,
118-
columns = List(transformedColForSparkSql))))
127+
columns = List(transformedColForSparkSql), analyzerOptions = opts)))
119128

120129
case NOT_BETWEEN =>
121130
val notBetweenSparkSql = s"(length($transformedColForSparkSql) <= ${operands.head}) or " +
@@ -124,7 +133,7 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
124133
if (operands.contains(0.0)) s"$transformedColForSparkSql is not NULL or ($notBetweenSparkSql)"
125134
else s"$transformedColForSparkSql is NULL or ($notBetweenSparkSql)"
126135
Right(addWhereClause(rule, check.satisfies(complianceCondition, check.description, _ == 1.0,
127-
columns = List(transformedColForSparkSql))))
136+
columns = List(transformedColForSparkSql), analyzerOptions = opts)))
128137

129138
case _ => Left("Unsupported operator for ColumnLength rule.")
130139
}

src/main/scala/com/amazon/deequ/dqdl/translation/rules/ColumnValuesRule.scala

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.amazon.deequ.dqdl.translation.rules
1818

19+
import com.amazon.deequ.analyzers.{AnalyzerOptions, FilteredRowOutcome, NullBehavior}
1920
import com.amazon.deequ.checks.Check
2021
import com.amazon.deequ.checks.CheckLevel
2122
import com.amazon.deequ.dqdl.model.DeequMetricMapping
@@ -71,12 +72,17 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
7172
val hasNullOperand = rawOperands.exists(_.isInstanceOf[NullNumericOperand])
7273
val numericOperands = rawOperands.collect { case a: AtomicNumberOperand => a.getOperand.toDouble }
7374

75+
val opts = analyzerOptionsForWhereClause(rule)
76+
val nullFailOpts: Option[AnalyzerOptions] =
77+
Some(AnalyzerOptions(NullBehavior.Fail,
78+
opts.map(_.filteredRow).getOrElse(FilteredRowOutcome.TRUE)))
79+
7480
condition.getOperator match {
7581
case GREATER_THAN =>
7682
val resultCheck = if (isWhereClausePresent(rule)) {
7783
check
78-
.hasMin(targetColumn, _ > numericOperands.head).where(rule.getWhereClause)
79-
.isComplete(targetColumn).where(rule.getWhereClause)
84+
.hasMin(targetColumn, _ > numericOperands.head, analyzerOptions = opts).where(rule.getWhereClause)
85+
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
8086
} else {
8187
check
8288
.hasMin(targetColumn, _ > numericOperands.head)
@@ -87,8 +93,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
8793
case GREATER_THAN_EQUAL_TO =>
8894
val resultCheck = if (isWhereClausePresent(rule)) {
8995
check
90-
.hasMin(targetColumn, _ >= numericOperands.head).where(rule.getWhereClause)
91-
.isComplete(targetColumn).where(rule.getWhereClause)
96+
.hasMin(targetColumn, _ >= numericOperands.head, analyzerOptions = opts).where(rule.getWhereClause)
97+
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
9298
} else {
9399
check
94100
.hasMin(targetColumn, _ >= numericOperands.head)
@@ -99,8 +105,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
99105
case LESS_THAN =>
100106
val resultCheck = if (isWhereClausePresent(rule)) {
101107
check
102-
.hasMax(targetColumn, _ < numericOperands.head).where(rule.getWhereClause)
103-
.isComplete(targetColumn).where(rule.getWhereClause)
108+
.hasMax(targetColumn, _ < numericOperands.head, analyzerOptions = opts).where(rule.getWhereClause)
109+
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
104110
} else {
105111
check
106112
.hasMax(targetColumn, _ < numericOperands.head)
@@ -111,8 +117,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
111117
case LESS_THAN_EQUAL_TO =>
112118
val resultCheck = if (isWhereClausePresent(rule)) {
113119
check
114-
.hasMax(targetColumn, _ <= numericOperands.head).where(rule.getWhereClause)
115-
.isComplete(targetColumn).where(rule.getWhereClause)
120+
.hasMax(targetColumn, _ <= numericOperands.head, analyzerOptions = opts).where(rule.getWhereClause)
121+
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
116122
} else {
117123
check
118124
.hasMax(targetColumn, _ <= numericOperands.head)
@@ -127,7 +133,7 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
127133
val resultCheck = if (isWhereClausePresent(rule)) {
128134
check.isContainedIn(targetColumn, numericOperands.head, numericOperands.last,
129135
includeLowerBound = false, includeUpperBound = false).where(rule.getWhereClause)
130-
.isComplete(targetColumn).where(rule.getWhereClause)
136+
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
131137
} else {
132138
check.isContainedIn(targetColumn, numericOperands.head, numericOperands.last,
133139
includeLowerBound = false, includeUpperBound = false)
@@ -142,7 +148,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
142148
val sql = s"$transformedCol IS NOT NULL AND " +
143149
s"($transformedCol <= ${numericOperands.head} OR $transformedCol >= ${numericOperands.last})"
144150
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
145-
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
151+
columns = List(transformedCol), analyzerOptions = opts)),
152+
complianceMetric(targetColumn, check.description, rule)))
146153

147154
case IN =>
148155
val nums = numericOperands.mkString(", ")
@@ -153,7 +160,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
153160
case _ => "FALSE"
154161
}
155162
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
156-
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
163+
columns = List(transformedCol), analyzerOptions = opts)),
164+
complianceMetric(targetColumn, check.description, rule)))
157165

158166
case NOT_IN =>
159167
val nums = numericOperands.mkString(", ")
@@ -164,23 +172,28 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
164172
case _ => "TRUE"
165173
}
166174
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
167-
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
175+
columns = List(transformedCol), analyzerOptions = opts)),
176+
complianceMetric(targetColumn, check.description, rule)))
168177

169178
case EQUALS =>
170179
if (hasNullOperand) {
171180
val sql = s"$transformedCol IS NULL"
172181
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
173-
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
182+
columns = List(transformedCol), analyzerOptions = opts)),
183+
complianceMetric(targetColumn, check.description, rule)))
174184
} else {
175185
val resultCheck = if (isWhereClausePresent(rule)) {
176186
check
177-
.hasMin(targetColumn, _ == numericOperands.head).where(rule.getWhereClause)
178-
.hasMax(targetColumn, _ == numericOperands.head).where(rule.getWhereClause)
179-
.isComplete(targetColumn).where(rule.getWhereClause)
187+
.hasMin(targetColumn, _ == numericOperands.head,
188+
analyzerOptions = nullFailOpts).where(rule.getWhereClause)
189+
.hasMax(targetColumn, _ == numericOperands.head,
190+
analyzerOptions = nullFailOpts).where(rule.getWhereClause)
191+
.isComplete(targetColumn, analyzerOptions = opts)
192+
.where(rule.getWhereClause)
180193
} else {
181194
check
182-
.hasMin(targetColumn, _ == numericOperands.head)
183-
.hasMax(targetColumn, _ == numericOperands.head)
195+
.hasMin(targetColumn, _ == numericOperands.head, analyzerOptions = nullFailOpts)
196+
.hasMax(targetColumn, _ == numericOperands.head, analyzerOptions = nullFailOpts)
184197
.isComplete(targetColumn)
185198
}
186199
Right((resultCheck, minMetric(targetColumn, rule) ++ maxMetric(targetColumn, rule)))
@@ -190,11 +203,13 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
190203
if (hasNullOperand) {
191204
val sql = s"$transformedCol IS NOT NULL"
192205
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
193-
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
206+
columns = List(transformedCol), analyzerOptions = opts)),
207+
complianceMetric(targetColumn, check.description, rule)))
194208
} else {
195209
val sql = s"$transformedCol IS NULL OR $transformedCol != ${numericOperands.head}"
196210
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
197-
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
211+
columns = List(transformedCol), analyzerOptions = opts)),
212+
complianceMetric(targetColumn, check.description, rule)))
198213
}
199214

200215
case _ =>
@@ -209,24 +224,28 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
209224
case StringBasedConditionOperator.MATCHES =>
210225
val pattern = extractPattern(condition)
211226
val fullRegex = s"^${pattern}$$".r
212-
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex)),
227+
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex,
228+
analyzerOptions = analyzerOptionsForWhereClause(rule))),
213229
Seq(DeequMetricMapping("Column", targetColumn, "PatternMatch", "PatternMatch", None, rule = rule))))
214230

215231
case StringBasedConditionOperator.NOT_MATCHES =>
216232
val pattern = extractPattern(condition)
217233
val fullRegex = s"^(?!\\b${pattern}\\b).*$$".r
218-
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex)),
234+
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex,
235+
analyzerOptions = analyzerOptionsForWhereClause(rule))),
219236
Seq(DeequMetricMapping("Column", targetColumn, "PatternMatch", "PatternMatch", None, rule = rule))))
220237

221238
case StringBasedConditionOperator.IN | StringBasedConditionOperator.EQUALS =>
222239
val sql = constructComplianceCondition(transformedCol, condition, isNegated = false)
223240
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
224-
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
241+
columns = List(transformedCol), analyzerOptions = analyzerOptionsForWhereClause(rule))),
242+
complianceMetric(targetColumn, check.description, rule)))
225243

226244
case StringBasedConditionOperator.NOT_IN | StringBasedConditionOperator.NOT_EQUALS =>
227245
val sql = constructComplianceCondition(transformedCol, condition, isNegated = true)
228246
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
229-
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
247+
columns = List(transformedCol), analyzerOptions = analyzerOptionsForWhereClause(rule))),
248+
complianceMetric(targetColumn, check.description, rule)))
230249

231250
case _ =>
232251
Left(s"Unsupported operator for ColumnValues string condition: ${condition.getOperator}")
@@ -247,9 +266,9 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
247266

248267
if (isNegated) {
249268
if (hasNull) conditions += s"$targetColumn IS NOT NULL"
250-
if (hasEmpty) conditions += s"$targetColumn != ''"
269+
if (hasEmpty) conditions += s"($targetColumn IS NULL OR $targetColumn != '')"
251270
if (hasWhitespacesOnly) {
252-
conditions += s"(LENGTH(TRIM($targetColumn)) > 0 OR LENGTH($targetColumn) = 0)"
271+
conditions += s"($targetColumn IS NULL OR LENGTH(TRIM($targetColumn)) > 0 OR LENGTH($targetColumn) = 0)"
253272
}
254273
if (quotedStrings.nonEmpty) {
255274
val valueList = quotedStrings.map(s => s"'${s.replace("'", "''")}'").mkString(", ")
@@ -258,9 +277,9 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
258277
if (conditions.isEmpty) "TRUE" else conditions.mkString(" AND ")
259278
} else {
260279
if (hasNull) conditions += s"$targetColumn IS NULL"
261-
if (hasEmpty) conditions += s"$targetColumn = ''"
280+
if (hasEmpty) conditions += s"($targetColumn IS NOT NULL AND $targetColumn = '')"
262281
if (hasWhitespacesOnly) {
263-
conditions += s"(LENGTH(TRIM($targetColumn)) = 0 AND LENGTH($targetColumn) > 0)"
282+
conditions += s"($targetColumn IS NOT NULL AND LENGTH(TRIM($targetColumn)) = 0 AND LENGTH($targetColumn) > 0)"
264283
}
265284
if (quotedStrings.nonEmpty) {
266285
val valueList = quotedStrings.map(s => s"'${s.replace("'", "''")}'").mkString(", ")

src/main/scala/com/amazon/deequ/dqdl/translation/rules/CompletenessRule.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ case class CompletenessRule() extends DQDLRuleConverter {
2929
override def convert(rule: DQRule): Either[String, (Check, Seq[DeequMetricMapping])] = {
3030
val col = rule.getParameters.asScala("TargetColumn")
3131
val check = Check(CheckLevel.Error, java.util.UUID.randomUUID.toString)
32-
.hasCompleteness(col, assertionAsScala(rule, rule.getCondition.asInstanceOf[NumberBasedCondition]), None, None)
32+
.hasCompleteness(col, assertionAsScala(rule, rule.getCondition.asInstanceOf[NumberBasedCondition]),
33+
None, analyzerOptionsForWhereClause(rule))
3334
Right(
3435
addWhereClause(rule, check),
3536
Seq(DeequMetricMapping("Column", col, "Completeness", "Completeness", None, rule = rule)))

src/main/scala/com/amazon/deequ/dqdl/translation/rules/EntropyRule.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package com.amazon.deequ.dqdl.translation.rules
1919
import com.amazon.deequ.checks.{Check, CheckLevel}
2020
import com.amazon.deequ.dqdl.model.DeequMetricMapping
2121
import com.amazon.deequ.dqdl.translation.DQDLRuleConverter
22-
import com.amazon.deequ.dqdl.util.DQDLUtility.addWhereClause
2322
import software.amazon.glue.dqdl.model.DQRule
2423
import software.amazon.glue.dqdl.model.condition.number.NumberBasedCondition
2524

@@ -31,7 +30,7 @@ case class EntropyRule() extends DQDLRuleConverter {
3130
val check = Check(CheckLevel.Error, java.util.UUID.randomUUID.toString)
3231
.hasEntropy(col, assertionAsScala(rule, rule.getCondition.asInstanceOf[NumberBasedCondition]))
3332
Right(
34-
addWhereClause(rule, check),
33+
check,
3534
Seq(DeequMetricMapping("Column", col, "Entropy", "Entropy", None, rule = rule)))
3635
}
3736
}

src/main/scala/com/amazon/deequ/dqdl/translation/rules/IsCompleteRule.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import scala.collection.JavaConverters._
2828
case class IsCompleteRule() extends DQDLRuleConverter {
2929
override def convert(rule: DQRule): Either[String, (Check, Seq[DeequMetricMapping])] = {
3030
val col = rule.getParameters.asScala("TargetColumn")
31-
val check = Check(CheckLevel.Error, java.util.UUID.randomUUID.toString).isComplete(col)
31+
val check = Check(CheckLevel.Error, java.util.UUID.randomUUID.toString)
32+
.isComplete(col, None, analyzerOptionsForWhereClause(rule))
3233
Right(
3334
addWhereClause(rule, check),
3435
Seq(DeequMetricMapping("Column", col, "Completeness", "Completeness", None, rule = rule)))

0 commit comments

Comments
 (0)