Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,29 @@

package com.amazon.deequ.dqdl.translation

import com.amazon.deequ.analyzers.{AnalyzerOptions, FilteredRowOutcome, NullBehavior}
import com.amazon.deequ.checks.Check
import com.amazon.deequ.dqdl.execution.DefaultOperandEvaluator
import com.amazon.deequ.dqdl.model.DeequMetricMapping
import com.amazon.deequ.dqdl.util.DQDLUtility.isWhereClausePresent
import software.amazon.glue.dqdl.model.DQRule
import software.amazon.glue.dqdl.model.condition.number.NumberBasedCondition


trait DQDLRuleConverter {

val DEFAULT_ANALYZER_OPTION: Option[AnalyzerOptions] =
Some(AnalyzerOptions(NullBehavior.EmptyString, FilteredRowOutcome.TRUE))

def convert(rule: DQRule): Either[String, (Check, Seq[DeequMetricMapping])]

def assertionAsScala(dqRule: DQRule, e: NumberBasedCondition): Double => Boolean = {
val evaluator = DefaultOperandEvaluator
(d: Double) => e.evaluate(d, dqRule, evaluator)
}

protected def analyzerOptionsForWhereClause(rule: DQRule): Option[AnalyzerOptions] =
if (isWhereClausePresent(rule)) DEFAULT_ANALYZER_OPTION else None
}


Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,20 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
val operands = rawOperands.map(_.getOperand.toDouble)
val transformedColForSparkSql = if (requiresToBeQuoted(targetColumn)) s"`$targetColumn`" else targetColumn

def withMultipleConstraints(minAssertion: Double => Boolean, maxAssertion: Double => Boolean): Check = {
val opts = DEFAULT_ANALYZER_OPTION

def withMultipleConstraints(minAssertion: Double => Boolean,
maxAssertion: Double => Boolean): Check = {
if (isWhereClausePresent(rule)) {
check
.hasMinLength(targetColumn, minAssertion).where(rule.getWhereClause)
.hasMaxLength(targetColumn, maxAssertion).where(rule.getWhereClause)
.hasMinLength(targetColumn, minAssertion, analyzerOptions = opts)
.where(rule.getWhereClause)
.hasMaxLength(targetColumn, maxAssertion, analyzerOptions = opts)
.where(rule.getWhereClause)
} else {
check
.hasMinLength(targetColumn, minAssertion)
.hasMaxLength(targetColumn, maxAssertion)
.hasMinLength(targetColumn, minAssertion, analyzerOptions = opts)
.hasMaxLength(targetColumn, maxAssertion, analyzerOptions = opts)
}
}

Expand All @@ -76,16 +81,20 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
Right(withMultipleConstraints(_ > operands.head, _ < operands.last))

case GREATER_THAN_EQUAL_TO =>
Right(addWhereClause(rule, check.hasMinLength(targetColumn, _ >= operands.head)))
Right(addWhereClause(rule, check.hasMinLength(targetColumn, _ >= operands.head,
analyzerOptions = opts)))

case GREATER_THAN =>
Right(addWhereClause(rule, check.hasMinLength(targetColumn, _ > operands.head)))
Right(addWhereClause(rule, check.hasMinLength(targetColumn, _ > operands.head,
analyzerOptions = opts)))

case LESS_THAN_EQUAL_TO =>
Right(addWhereClause(rule, check.hasMaxLength(targetColumn, _ <= operands.head)))
Right(addWhereClause(rule, check.hasMaxLength(targetColumn, _ <= operands.head,
analyzerOptions = opts)))

case LESS_THAN =>
Right(addWhereClause(rule, check.hasMaxLength(targetColumn, _ < operands.head)))
Right(addWhereClause(rule, check.hasMaxLength(targetColumn, _ < operands.head,
analyzerOptions = opts)))

case EQUALS =>
Right(withMultipleConstraints(_ == operands.head, _ == operands.head))
Expand All @@ -103,7 +112,7 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
s"length($transformedColForSparkSql) in (${operands.mkString(",")})"
}
Right(addWhereClause(rule, check.satisfies(complianceCondition, check.description, _ == 1.0,
columns = List(transformedColForSparkSql))))
columns = List(transformedColForSparkSql), analyzerOptions = opts)))

case NOT_IN =>
val complianceCondition =
Expand All @@ -115,7 +124,7 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
s"length($transformedColForSparkSql) not in (${operands.mkString(",")})"
}
Right(addWhereClause(rule, check.satisfies(complianceCondition, check.description, _ == 1.0,
columns = List(transformedColForSparkSql))))
columns = List(transformedColForSparkSql), analyzerOptions = opts)))

case NOT_BETWEEN =>
val notBetweenSparkSql = s"(length($transformedColForSparkSql) <= ${operands.head}) or " +
Expand All @@ -124,7 +133,7 @@ case class ColumnLengthRule() extends DQDLRuleConverter {
if (operands.contains(0.0)) s"$transformedColForSparkSql is not NULL or ($notBetweenSparkSql)"
else s"$transformedColForSparkSql is NULL or ($notBetweenSparkSql)"
Right(addWhereClause(rule, check.satisfies(complianceCondition, check.description, _ == 1.0,
columns = List(transformedColForSparkSql))))
columns = List(transformedColForSparkSql), analyzerOptions = opts)))

case _ => Left("Unsupported operator for ColumnLength rule.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.amazon.deequ.dqdl.translation.rules

import com.amazon.deequ.analyzers.{AnalyzerOptions, FilteredRowOutcome, NullBehavior}
import com.amazon.deequ.checks.Check
import com.amazon.deequ.checks.CheckLevel
import com.amazon.deequ.dqdl.model.DeequMetricMapping
Expand Down Expand Up @@ -71,12 +72,17 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
val hasNullOperand = rawOperands.exists(_.isInstanceOf[NullNumericOperand])
val numericOperands = rawOperands.collect { case a: AtomicNumberOperand => a.getOperand.toDouble }

val opts = analyzerOptionsForWhereClause(rule)
val nullFailOpts: Option[AnalyzerOptions] =
Some(AnalyzerOptions(NullBehavior.Fail,
opts.map(_.filteredRow).getOrElse(FilteredRowOutcome.TRUE)))

condition.getOperator match {
case GREATER_THAN =>
val resultCheck = if (isWhereClausePresent(rule)) {
check
.hasMin(targetColumn, _ > numericOperands.head).where(rule.getWhereClause)
.isComplete(targetColumn).where(rule.getWhereClause)
.hasMin(targetColumn, _ > numericOperands.head, analyzerOptions = opts).where(rule.getWhereClause)
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
} else {
check
.hasMin(targetColumn, _ > numericOperands.head)
Expand All @@ -87,8 +93,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
case GREATER_THAN_EQUAL_TO =>
val resultCheck = if (isWhereClausePresent(rule)) {
check
.hasMin(targetColumn, _ >= numericOperands.head).where(rule.getWhereClause)
.isComplete(targetColumn).where(rule.getWhereClause)
.hasMin(targetColumn, _ >= numericOperands.head, analyzerOptions = opts).where(rule.getWhereClause)
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
} else {
check
.hasMin(targetColumn, _ >= numericOperands.head)
Expand All @@ -99,8 +105,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
case LESS_THAN =>
val resultCheck = if (isWhereClausePresent(rule)) {
check
.hasMax(targetColumn, _ < numericOperands.head).where(rule.getWhereClause)
.isComplete(targetColumn).where(rule.getWhereClause)
.hasMax(targetColumn, _ < numericOperands.head, analyzerOptions = opts).where(rule.getWhereClause)
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
} else {
check
.hasMax(targetColumn, _ < numericOperands.head)
Expand All @@ -111,8 +117,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
case LESS_THAN_EQUAL_TO =>
val resultCheck = if (isWhereClausePresent(rule)) {
check
.hasMax(targetColumn, _ <= numericOperands.head).where(rule.getWhereClause)
.isComplete(targetColumn).where(rule.getWhereClause)
.hasMax(targetColumn, _ <= numericOperands.head, analyzerOptions = opts).where(rule.getWhereClause)
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
} else {
check
.hasMax(targetColumn, _ <= numericOperands.head)
Expand All @@ -127,7 +133,7 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
val resultCheck = if (isWhereClausePresent(rule)) {
check.isContainedIn(targetColumn, numericOperands.head, numericOperands.last,
includeLowerBound = false, includeUpperBound = false).where(rule.getWhereClause)
.isComplete(targetColumn).where(rule.getWhereClause)
.isComplete(targetColumn, None, opts).where(rule.getWhereClause)
} else {
check.isContainedIn(targetColumn, numericOperands.head, numericOperands.last,
includeLowerBound = false, includeUpperBound = false)
Expand All @@ -142,7 +148,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
val sql = s"$transformedCol IS NOT NULL AND " +
s"($transformedCol <= ${numericOperands.head} OR $transformedCol >= ${numericOperands.last})"
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
columns = List(transformedCol), analyzerOptions = opts)),
complianceMetric(targetColumn, check.description, rule)))

case IN =>
val nums = numericOperands.mkString(", ")
Expand All @@ -153,7 +160,8 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
case _ => "FALSE"
}
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
columns = List(transformedCol), analyzerOptions = opts)),
complianceMetric(targetColumn, check.description, rule)))

case NOT_IN =>
val nums = numericOperands.mkString(", ")
Expand All @@ -164,23 +172,28 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
case _ => "TRUE"
}
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
columns = List(transformedCol), analyzerOptions = opts)),
complianceMetric(targetColumn, check.description, rule)))

case EQUALS =>
if (hasNullOperand) {
val sql = s"$transformedCol IS NULL"
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
columns = List(transformedCol), analyzerOptions = opts)),
complianceMetric(targetColumn, check.description, rule)))
} else {
val resultCheck = if (isWhereClausePresent(rule)) {
check
.hasMin(targetColumn, _ == numericOperands.head).where(rule.getWhereClause)
.hasMax(targetColumn, _ == numericOperands.head).where(rule.getWhereClause)
.isComplete(targetColumn).where(rule.getWhereClause)
.hasMin(targetColumn, _ == numericOperands.head,
analyzerOptions = nullFailOpts).where(rule.getWhereClause)
.hasMax(targetColumn, _ == numericOperands.head,
analyzerOptions = nullFailOpts).where(rule.getWhereClause)
.isComplete(targetColumn, analyzerOptions = opts)
.where(rule.getWhereClause)
} else {
check
.hasMin(targetColumn, _ == numericOperands.head)
.hasMax(targetColumn, _ == numericOperands.head)
.hasMin(targetColumn, _ == numericOperands.head, analyzerOptions = nullFailOpts)
.hasMax(targetColumn, _ == numericOperands.head, analyzerOptions = nullFailOpts)
.isComplete(targetColumn)
}
Right((resultCheck, minMetric(targetColumn, rule) ++ maxMetric(targetColumn, rule)))
Expand All @@ -190,11 +203,13 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
if (hasNullOperand) {
val sql = s"$transformedCol IS NOT NULL"
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
columns = List(transformedCol), analyzerOptions = opts)),
complianceMetric(targetColumn, check.description, rule)))
} else {
val sql = s"$transformedCol IS NULL OR $transformedCol != ${numericOperands.head}"
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
columns = List(transformedCol), analyzerOptions = opts)),
complianceMetric(targetColumn, check.description, rule)))
}

case _ =>
Expand All @@ -209,24 +224,28 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
case StringBasedConditionOperator.MATCHES =>
val pattern = extractPattern(condition)
val fullRegex = s"^${pattern}$$".r
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex)),
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex,
analyzerOptions = analyzerOptionsForWhereClause(rule))),
Seq(DeequMetricMapping("Column", targetColumn, "PatternMatch", "PatternMatch", None, rule = rule))))

case StringBasedConditionOperator.NOT_MATCHES =>
val pattern = extractPattern(condition)
val fullRegex = s"^(?!\\b${pattern}\\b).*$$".r
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex)),
Right((addWhereClause(rule, check.hasPattern(targetColumn, fullRegex,
analyzerOptions = analyzerOptionsForWhereClause(rule))),
Seq(DeequMetricMapping("Column", targetColumn, "PatternMatch", "PatternMatch", None, rule = rule))))

case StringBasedConditionOperator.IN | StringBasedConditionOperator.EQUALS =>
val sql = constructComplianceCondition(transformedCol, condition, isNegated = false)
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
columns = List(transformedCol), analyzerOptions = analyzerOptionsForWhereClause(rule))),
complianceMetric(targetColumn, check.description, rule)))

case StringBasedConditionOperator.NOT_IN | StringBasedConditionOperator.NOT_EQUALS =>
val sql = constructComplianceCondition(transformedCol, condition, isNegated = true)
Right((addWhereClause(rule, check.satisfies(sql, check.description, _ == 1.0,
columns = List(transformedCol))), complianceMetric(targetColumn, check.description, rule)))
columns = List(transformedCol), analyzerOptions = analyzerOptionsForWhereClause(rule))),
complianceMetric(targetColumn, check.description, rule)))

case _ =>
Left(s"Unsupported operator for ColumnValues string condition: ${condition.getOperator}")
Expand All @@ -247,9 +266,9 @@ case class ColumnValuesRule() extends DQDLRuleConverter {

if (isNegated) {
if (hasNull) conditions += s"$targetColumn IS NOT NULL"
if (hasEmpty) conditions += s"$targetColumn != ''"
if (hasEmpty) conditions += s"($targetColumn IS NULL OR $targetColumn != '')"
if (hasWhitespacesOnly) {
conditions += s"(LENGTH(TRIM($targetColumn)) > 0 OR LENGTH($targetColumn) = 0)"
conditions += s"($targetColumn IS NULL OR LENGTH(TRIM($targetColumn)) > 0 OR LENGTH($targetColumn) = 0)"
}
if (quotedStrings.nonEmpty) {
val valueList = quotedStrings.map(s => s"'${s.replace("'", "''")}'").mkString(", ")
Expand All @@ -258,9 +277,9 @@ case class ColumnValuesRule() extends DQDLRuleConverter {
if (conditions.isEmpty) "TRUE" else conditions.mkString(" AND ")
} else {
if (hasNull) conditions += s"$targetColumn IS NULL"
if (hasEmpty) conditions += s"$targetColumn = ''"
if (hasEmpty) conditions += s"($targetColumn IS NOT NULL AND $targetColumn = '')"
if (hasWhitespacesOnly) {
conditions += s"(LENGTH(TRIM($targetColumn)) = 0 AND LENGTH($targetColumn) > 0)"
conditions += s"($targetColumn IS NOT NULL AND LENGTH(TRIM($targetColumn)) = 0 AND LENGTH($targetColumn) > 0)"
}
if (quotedStrings.nonEmpty) {
val valueList = quotedStrings.map(s => s"'${s.replace("'", "''")}'").mkString(", ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ case class CompletenessRule() extends DQDLRuleConverter {
override def convert(rule: DQRule): Either[String, (Check, Seq[DeequMetricMapping])] = {
val col = rule.getParameters.asScala("TargetColumn")
val check = Check(CheckLevel.Error, java.util.UUID.randomUUID.toString)
.hasCompleteness(col, assertionAsScala(rule, rule.getCondition.asInstanceOf[NumberBasedCondition]), None, None)
.hasCompleteness(col, assertionAsScala(rule, rule.getCondition.asInstanceOf[NumberBasedCondition]),
None, analyzerOptionsForWhereClause(rule))
Right(
addWhereClause(rule, check),
Seq(DeequMetricMapping("Column", col, "Completeness", "Completeness", None, rule = rule)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.amazon.deequ.dqdl.translation.rules
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.dqdl.model.DeequMetricMapping
import com.amazon.deequ.dqdl.translation.DQDLRuleConverter
import com.amazon.deequ.dqdl.util.DQDLUtility.addWhereClause
import software.amazon.glue.dqdl.model.DQRule
import software.amazon.glue.dqdl.model.condition.number.NumberBasedCondition

Expand All @@ -31,7 +30,7 @@ case class EntropyRule() extends DQDLRuleConverter {
val check = Check(CheckLevel.Error, java.util.UUID.randomUUID.toString)
.hasEntropy(col, assertionAsScala(rule, rule.getCondition.asInstanceOf[NumberBasedCondition]))
Right(
addWhereClause(rule, check),
check,
Seq(DeequMetricMapping("Column", col, "Entropy", "Entropy", None, rule = rule)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import scala.collection.JavaConverters._
case class IsCompleteRule() extends DQDLRuleConverter {
override def convert(rule: DQRule): Either[String, (Check, Seq[DeequMetricMapping])] = {
val col = rule.getParameters.asScala("TargetColumn")
val check = Check(CheckLevel.Error, java.util.UUID.randomUUID.toString).isComplete(col)
val check = Check(CheckLevel.Error, java.util.UUID.randomUUID.toString)
.isComplete(col, None, analyzerOptionsForWhereClause(rule))
Right(
addWhereClause(rule, check),
Seq(DeequMetricMapping("Column", col, "Completeness", "Completeness", None, rule = rule)))
Expand Down
Loading
Loading