Skip to content

Commit 3822238

Browse files
authored
Merge pull request #43 from target/BugFix-IncorrectErrorPct
BugFix - Incorrect calculation of Error % for ColumnBasedChecks
2 parents 6fed17c + 3f424d0 commit 3822238

File tree

8 files changed

+279
-53
lines changed

8 files changed

+279
-53
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,8 @@ This check sums a column in all rows. If the sum applied to the `column` doesn't
324324
| `maxValue` | NumericType | The upper bound of the sum. Type depends on the type of the `column`. |
325325
| `inclusive` | Boolean | Include `minValue` and `maxValue` as part of the range. |
326326

327+
**Note:** If bounds are non-inclusive, and the actual sum is equal to one of the bounds, the relative error percentage will be undefined.
328+
327329
## Example Config
328330

329331
```yaml

src/main/scala/com/target/data_validator/JsonEncoders.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ object JsonEncoders extends LazyLogging {
4747
("count", Json.fromLong(vce.count)),
4848
("errorCount", Json.fromLong(vce.errorCount))
4949
)
50+
case cbvce: ColumnBasedValidatorCheckEvent => Json.obj(
51+
("type", Json.fromString("columnBasedCheckEvent")),
52+
("failed", Json.fromBoolean(cbvce.failed)),
53+
("message", Json.fromString(cbvce.msg)),
54+
("data", Json.fromFields(cbvce.data.map(x => (x._1, Json.fromString(x._2)))))
55+
)
5056
case qce: ValidatorQuickCheckError => Json.obj(
5157
("type", Json.fromString("quickCheckError")),
5258
("failed", Json.fromBoolean(qce.failed)),

src/main/scala/com/target/data_validator/ValidatorEvent.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ case class ValidatorCheckEvent(failure: Boolean, label: String, count: Long, err
3636
}
3737
}
3838

39+
case class ColumnBasedValidatorCheckEvent(
40+
failure: Boolean,
41+
data: Map[String, String],
42+
msg: String
43+
) extends ValidatorEvent {
44+
override def failed: Boolean = failure
45+
46+
override def toHTML: Text.all.Tag = {
47+
div(cls:="checkEvent")(failedHTML, s" - $msg")
48+
}
49+
}
50+
3951
class ValidatorTimer(val label: String) extends ValidatorEvent {
4052
var duration = 0L
4153

src/main/scala/com/target/data_validator/validator/ColumnBased.scala

Lines changed: 72 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.target.data_validator.validator
22

3-
import com.target.data_validator.{ValidatorCheckEvent, ValidatorCounter, ValidatorError, VarSubstitution}
3+
import com.target.data_validator.{ColumnBasedValidatorCheckEvent, ValidatorCounter, ValidatorError, VarSubstitution}
44
import com.target.data_validator.JsonEncoders.eventEncoder
55
import io.circe.Json
66
import io.circe.syntax._
@@ -10,11 +10,29 @@ import org.apache.spark.sql.catalyst.expressions.Expression
1010
import org.apache.spark.sql.catalyst.expressions.aggregate.Max
1111
import org.apache.spark.sql.types._
1212

13+
import scala.collection.immutable.ListMap
14+
import scala.math.abs
15+
1316
abstract class ColumnBased(column: String, condTest: Expression) extends CheapCheck {
1417
override def select(schema: StructType, dict: VarSubstitution): Expression = condTest
1518

1619
// ColumnBased checks don't have per row error details.
1720
def hasQuickErrorDetails: Boolean = false
21+
22+
// calculates and returns the pct error as a string
23+
def calculatePctError(expected: Double, actual: Double, formatStr: String = "%4.2f%%"): String = {
24+
25+
if (expected == actual) {
26+
formatStr.format(0.00) // if expected == actual, error % should be 0, even if expected is 0
27+
}
28+
else if (expected == 0.0) {
29+
"undefined"
30+
}
31+
else {
32+
val pct = abs(((expected - actual) * 100.0) / expected)
33+
formatStr.format(pct)
34+
}
35+
}
1836
}
1937

2038
case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0) {
@@ -36,8 +54,11 @@ case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0
3654

3755
override def quickCheck(row: Row, count: Long, idx: Int): Boolean = {
3856
failed = count < minNumRows
57+
val pctError = if (failed) calculatePctError(minNumRows, count) else "0.00%"
3958
addEvent(ValidatorCounter("rowCount", count))
40-
addEvent(ValidatorCheckEvent(failed, s"MinNumRowCheck $minNumRows ", count, 1))
59+
val msg = s"MinNumRowsCheck Expected: $minNumRows Actual: $count Relative Error: $pctError"
60+
val data = ListMap("expected" -> minNumRows.toString, "actual" -> count.toString, "relative_error" -> pctError)
61+
addEvent(ColumnBasedValidatorCheckEvent(failed, data, msg))
4162
failed
4263
}
4364

@@ -66,34 +87,58 @@ case class ColumnMaxCheck(column: String, value: Json)
6687
val dataType = row.schema(idx).dataType
6788
val rMax = row(idx)
6889
logger.info(s"rMax: $rMax colType: $dataType value: $value valueClass: ${value.getClass.getCanonicalName}")
69-
val num = value.asNumber
70-
failed = dataType match {
71-
case StringType => value.asString.exists(_ != row.getString(idx))
72-
case ByteType => num.map(_.toByte).exists(_.get != row.getByte(idx))
73-
case ShortType => num.map(_.toShort).exists(_.get != row.getShort(idx))
74-
case IntegerType =>
75-
val intNum = value.asNumber.map(_.toInt.get).getOrElse(-1)
76-
val rowInt = row.getInt(idx)
77-
logger.debug(s"intNum[${intNum.getClass.getCanonicalName}]: $intNum " +
78-
s"rowInt[${rowInt.getClass.getCanonicalName}]: $rowInt")
79-
num.map(_.toInt).exists(_.get != row.getInt(idx))
80-
case LongType => num.map(_.toLong).exists(_.get != row.getLong(idx))
81-
case FloatType => num.forall(_.toDouble != row.getFloat(idx))
82-
case DoubleType => num.forall(_.toDouble != row.getDouble(idx))
83-
case ut =>
84-
logger.error(s"quickCheck for type: $ut, Row: $row not Implemented! Please file this as a bug.")
85-
true // Fail check!
90+
91+
def resultForString: (ListMap[String, String], String) = {
92+
val (expected, actual) = (value.asString.getOrElse(""), row.getString(idx))
93+
94+
failed = expected != actual
95+
val data = ListMap("expected" -> expected, "actual" -> actual)
96+
val errorMsg = s"ColumnMaxCheck $column[StringType]: Expected: $expected Actual: $actual"
97+
98+
(data, errorMsg)
99+
}
100+
101+
def resultForNumeric: (ListMap[String, String], String) = {
102+
val num = value.asNumber.get
103+
var cmp_params = (0.0, 0.0) // (expected, actual)
104+
105+
dataType match {
106+
case ByteType => cmp_params = (num.toByte.getOrElse[Byte](-1), row.getByte(idx))
107+
case ShortType => cmp_params = (num.toShort.getOrElse[Short](-1), row.getShort(idx))
108+
case IntegerType => cmp_params = (num.toInt.getOrElse[Int](-1), row.getInt(idx))
109+
case LongType => cmp_params = (num.toLong.getOrElse[Long](-1), row.getLong(idx))
110+
case FloatType => cmp_params = (num.toDouble, row.getFloat(idx))
111+
case DoubleType => cmp_params = (num.toDouble, row.getDouble(idx))
112+
}
113+
114+
failed = cmp_params._1 != cmp_params._2
115+
val pctError = if (failed) calculatePctError(cmp_params._1, cmp_params._2) else "0.00%"
116+
val data = ListMap("expected" -> num.toString, "actual" -> rMax.toString, "relative_error" -> pctError)
117+
val errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num Actual: $rMax Relative Error: $pctError"
118+
119+
(data, errorMsg)
120+
}
121+
122+
def resultForOther: (ListMap[String, String], String) = {
123+
logger.error(
124+
s"""ColumnMaxCheck for type: $dataType, Row: $row not implemented!
125+
|Please open a bug report on the data-validator issue tracker.""".stripMargin
126+
)
127+
failed = true
128+
val errorMsg = s"ColumnMaxCheck is not supported for data type $dataType"
129+
130+
(ListMap.empty[String, String], errorMsg)
131+
}
132+
133+
val (data, errorMsg) = dataType match {
134+
case StringType => resultForString
135+
case _: NumericType => resultForNumeric
136+
case _ => resultForOther
86137
}
138+
87139
logger.debug(s"MaxValue compared Row: $row with value: $value failed: $failed")
88140
if (failed) {
89-
addEvent(
90-
ValidatorCheckEvent(
91-
failed,
92-
s"columnMaxCheck column[$dataType]: $column value: $value doesn't equal $rMax",
93-
count,
94-
1
95-
)
96-
)
141+
addEvent(ColumnBasedValidatorCheckEvent(failed, data, errorMsg))
97142
}
98143
failed
99144
}

src/main/scala/com/target/data_validator/validator/ColumnSumCheck.scala

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package com.target.data_validator.validator
22

3-
import com.target.data_validator.{ValidatorCheckEvent, ValidatorError, VarSubstitution}
3+
import com.target.data_validator.{ColumnBasedValidatorCheckEvent, JsonEncoders, ValidatorError, VarSubstitution}
44
import io.circe._
55
import io.circe.generic.semiauto._
6+
import io.circe.syntax._
67
import org.apache.spark.sql.{DataFrame, Row}
78
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
89
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
910
import org.apache.spark.sql.types._
1011

12+
import scala.collection.immutable.ListMap
13+
1114
case class ColumnSumCheck(
1215
column: String,
1316
minValue: Option[Json] = None,
@@ -52,29 +55,64 @@ case class ColumnSumCheck(
5255

5356
override def quickCheck(r: Row, count: Long, idx: Int): Boolean = {
5457

58+
val dataType = r.schema(idx).dataType
59+
val isInclusive = inclusiveBounds.right.get
60+
val lowerBoundValue = lowerBound.right.get
61+
val upperBoundValue = upperBound.right.get
62+
5563
def evaluate(sum: Double): Boolean = {
56-
if (inclusiveBounds.right.get) { sum > upperBound.right.get || sum < lowerBound.right.get}
57-
else { sum >= upperBound.right.get || sum <= lowerBound.right.get}
64+
if (isInclusive) { sum > upperBoundValue || sum < lowerBoundValue}
65+
else { sum >= upperBoundValue || sum <= lowerBoundValue}
66+
}
67+
68+
def getPctError(sum: Double): String = {
69+
if (sum < lowerBoundValue) {
70+
calculatePctError(lowerBoundValue, sum)
71+
}
72+
else if (sum > upperBoundValue) {
73+
calculatePctError(upperBoundValue, sum)
74+
}
75+
else if (!isInclusive && (sum == upperBoundValue || sum == lowerBoundValue)) {
76+
"undefined"
77+
}
78+
else {
79+
"0.00%"
80+
}
5881
}
5982

60-
failed = r.schema(idx).dataType match {
61-
case ShortType => evaluate(r.getShort(idx))
62-
case IntegerType => evaluate(r.getInt(idx))
63-
case LongType => evaluate(r.getLong(idx))
64-
case FloatType => evaluate(r.getFloat(idx))
65-
case DoubleType => evaluate(r.getDouble(idx))
66-
case ByteType => evaluate(r.getByte(idx))
83+
def getData(pctError: String): ListMap[String, String] = {
84+
((minValue, maxValue) match {
85+
case (Some(x), Some(y)) =>
86+
ListMap("lower_bound" -> x.asNumber.get.toString, "upper_bound" -> y.asNumber.get.toString)
87+
case (None, Some(y)) => ListMap("upper_bound" -> y.asNumber.get.toString)
88+
case (Some(x), None) => ListMap("lower_bound" -> x.asNumber.get.toString)
89+
case (None, None) => throw new RuntimeException("Must define at least one of minValue or maxValue.")
90+
}) + ("inclusive" -> isInclusive.toString, "actual" -> r(idx).toString, "relative_error" -> pctError)
91+
}
92+
93+
val actualSum: Double = dataType match {
94+
case ByteType => r.getByte(idx)
95+
case ShortType => r.getShort(idx)
96+
case IntegerType => r.getInt(idx)
97+
case LongType => r.getLong(idx)
98+
case FloatType => r.getFloat(idx)
99+
case DoubleType => r.getDouble(idx)
67100
case ut => throw new Exception(s"Unsupported type for $name found in schema: $ut")
68101
}
69102

70-
val bounds = minValue.getOrElse("") :: maxValue.getOrElse("") :: Nil
71-
val prettyBounds = if (inclusiveBounds.right.get) {
72-
r.get(idx) + " in " + bounds.mkString("[", " , ", "]")
103+
failed = evaluate(actualSum)
104+
val pctError = getPctError(actualSum)
105+
val data = getData(pctError)
106+
107+
val bounds = minValue.getOrElse(" ") :: maxValue.getOrElse("") :: Nil
108+
val prettyBounds = if (isInclusive) {
109+
bounds.mkString("[", ", ", "]")
73110
} else {
74-
r.get(idx) + " in " + bounds.mkString("(", " , ", ")")
111+
bounds.mkString("(", ", ", ")")
75112
}
76-
val errorValue = if (failed) 1 else 0
77-
addEvent(ValidatorCheckEvent(failed, s"$name on '$column': $prettyBounds", count, errorValue))
113+
114+
val msg = s"$name on $column[$dataType]: Expected Range: $prettyBounds Actual: ${r(idx)} Relative Error: $pctError"
115+
addEvent(ColumnBasedValidatorCheckEvent(failed, data, msg))
78116
failed
79117
}
80118

@@ -119,9 +157,11 @@ case class ColumnSumCheck(
119157
}
120158

121159
override def toJson: Json = {
160+
import JsonEncoders.eventEncoder
122161
val additionalFieldsForReport = Json.fromFields(Set(
123162
"type" -> Json.fromString("columnSumCheck"),
124-
"failed" -> Json.fromBoolean(failed)
163+
"failed" -> Json.fromBoolean(failed),
164+
"events" -> getEvents.asJson
125165
))
126166

127167
val base = ColumnSumCheck.encoder(this)

src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
1010
import org.apache.spark.sql.types._
1111
import org.scalatest._
1212

13+
import scala.collection.immutable.ListMap
1314
import scala.util.Random
1415

1516
class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession {
@@ -182,20 +183,32 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession {
182183
it("quickCheck() should fail when rowCount < minNumRows") {
183184
val dict = new VarSubstitution
184185
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
185-
val config = mkConfig(df, List(MinNumRows(10))) //scalastyle:ignore
186+
val minNumRowsCheck = MinNumRows(10) // scalastyle:ignore magic.number
187+
val config = mkConfig(df, List(minNumRowsCheck))
186188
assert(config.quickChecks(spark, dict))
187189
assert(config.failed)
188190
assert(config.tables.head.failed)
191+
assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(
192+
failure = true,
193+
ListMap("expected" -> "10", "actual" -> "2", "relative_error" -> "80.00%"),
194+
"MinNumRowsCheck Expected: 10 Actual: 2 Relative Error: 80.00%"
195+
))
189196
}
190197

191198
it("quickCheck() should succeed when rowCount > minNumRows") {
192199
val dict = new VarSubstitution
193200
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
194-
val config = mkConfig(df, List(MinNumRows(1))) //scalastyle:ignore
201+
val minNumRowsCheck = MinNumRows(1)
202+
val config = mkConfig(df, List(minNumRowsCheck))
195203
assert(!config.configCheck(spark, dict))
196204
assert(!config.quickChecks(spark, dict))
197205
assert(!config.failed)
198206
assert(!config.tables.exists(_.failed))
207+
assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(
208+
failure = false,
209+
ListMap("expected" -> "1", "actual" -> "2", "relative_error" -> "0.00%"),
210+
"MinNumRowsCheck Expected: 1 Actual: 2 Relative Error: 0.00%"
211+
))
199212
}
200213

201214
}

0 commit comments

Comments
 (0)