Skip to content

Commit 092de5d

Browse files
authored
Merge pull request #35 from target/gh34-sumofnumericcolumncheck
Adds a ColumnSumCheck
2 parents 3bb80d8 + 29908eb commit 092de5d

File tree

6 files changed

+456
-12
lines changed

6 files changed

+456
-12
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,17 @@ This is a costly check and requires an additional pass through the table.
313313
|-----|------|-------------|
314314
| `columns` | Array[String] | Each set of values in these columns must be unique.
315315

316+
#### `columnSumCheck`
317+
318+
This check sums a column in all rows. If the sum applied to the `column` doesn't fall within the range specified by (`minValue`, `maxValue`) the check will fail.
319+
320+
| Arg | Type | Description |
321+
|-------------|-------------|------------------------------------------------------------------------|
322+
| `column` | String | The column to be checked. |
323+
| `minValue` | NumericType | The lower bound of the sum. Type depends on the type of the `column`. |
324+
| `maxValue` | NumericType | The upper bound of the sum. Type depends on the type of the `column`. |
325+
| `inclusive` | Boolean | Include `minValue` and `maxValue` as part of the range. |
326+
316327
## Example Config
317328

318329
```yaml
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package com.target.data_validator.validator
2+
3+
import com.target.data_validator.{ValidatorCheckEvent, ValidatorError, VarSubstitution}
4+
import io.circe._
5+
import io.circe.generic.semiauto._
6+
import org.apache.spark.sql.{DataFrame, Row}
7+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
8+
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
9+
import org.apache.spark.sql.types._
10+
11+
case class ColumnSumCheck(
12+
column: String,
13+
minValue: Option[Json] = None,
14+
maxValue: Option[Json] = None,
15+
inclusive: Option[Json] = None
16+
) extends ColumnBased(column, Sum(UnresolvedAttribute(column)).toAggregateExpression()) {
17+
18+
private val minOrMax: Either[String, Unit] = if (minValue.isEmpty && maxValue.isEmpty) {
19+
Left("'minValue' or 'maxValue' or both must be defined")
20+
} else {
21+
Right()
22+
}
23+
24+
private val lowerBound: Either[String, Double] = minValue match {
25+
case Some(json) =>
26+
if (json.isNumber) { Right(json.asNumber.get.toDouble) }
27+
else { Left(s"'minValue' defined but type is not a Number, is: ${json.name}") }
28+
case None => Right(Double.MinValue)
29+
}
30+
31+
private val upperBound: Either[String, Double] = maxValue match {
32+
case Some(json) =>
33+
if (json.isNumber) { Right(json.asNumber.get.toDouble) }
34+
else { Left(s"'maxValue' defined but type is not a Number, is: ${json.name}") }
35+
case None => Right(Double.MaxValue)
36+
}
37+
38+
private val minLessThanMax: Either[String, Unit] = (lowerBound, upperBound) match {
39+
case (Right(lower), Right(upper)) if lower >= upper =>
40+
Left(s"'minValue': $lower must be less than 'maxValue': $upper")
41+
case _ => Right()
42+
}
43+
44+
private val inclusiveBounds: Either[String, Boolean] = inclusive match {
45+
case Some(json) =>
46+
if (json.isBoolean) { Right(json.asBoolean.get) }
47+
else { Left(s"'inclusive' defined but type is not Boolean, is: ${json.name}") }
48+
case None => Right(false)
49+
}
50+
51+
override def name: String = "columnSumCheck"
52+
53+
override def quickCheck(r: Row, count: Long, idx: Int): Boolean = {
54+
55+
def evaluate(sum: Double): Boolean = {
56+
if (inclusiveBounds.right.get) { sum > upperBound.right.get || sum < lowerBound.right.get}
57+
else { sum >= upperBound.right.get || sum <= lowerBound.right.get}
58+
}
59+
60+
failed = r.schema(idx).dataType match {
61+
case ShortType => evaluate(r.getShort(idx))
62+
case IntegerType => evaluate(r.getInt(idx))
63+
case LongType => evaluate(r.getLong(idx))
64+
case FloatType => evaluate(r.getFloat(idx))
65+
case DoubleType => evaluate(r.getDouble(idx))
66+
case ut => throw new Exception(s"Unsupported type for $name found in schema: $ut")
67+
}
68+
69+
val bounds = minValue.getOrElse("") :: maxValue.getOrElse("") :: Nil
70+
val prettyBounds = if (inclusiveBounds.right.get) {
71+
r.get(idx) + " in " + bounds.mkString("[", " , ", "]")
72+
} else {
73+
r.get(idx) + " in " + bounds.mkString("(", " , ", ")")
74+
}
75+
val errorValue = if (failed) 1 else 0
76+
addEvent(ValidatorCheckEvent(failed, s"$name on '$column': $prettyBounds", count, errorValue))
77+
failed
78+
}
79+
80+
override def substituteVariables(dict: VarSubstitution): ValidatorBase = {
81+
val ret = copy(
82+
column = getVarSub(column, "column", dict),
83+
minValue = minValue.map(getVarSubJson(_, "minValue", dict)),
84+
maxValue = maxValue.map(getVarSubJson(_, "maxValue", dict)),
85+
inclusive = inclusive.map(getVarSubJson(_, "inclusive", dict))
86+
)
87+
this.getEvents.foreach(ret.addEvent)
88+
ret
89+
}
90+
91+
override def configCheck(df: DataFrame): Boolean = {
92+
logger.debug(s"Full check config: ${this.toString}")
93+
Seq(
94+
minOrMax,
95+
lowerBound,
96+
upperBound,
97+
minLessThanMax,
98+
inclusiveBounds
99+
).foreach {
100+
case Left(msg) =>
101+
logger.error(msg)
102+
addEvent(ValidatorError(msg))
103+
case _ =>
104+
}
105+
106+
findColumnInDataFrame(df, column) match {
107+
case Some(ft) if ft.dataType.isInstanceOf[NumericType] =>
108+
case Some(ft) =>
109+
val msg = s"Column: $column found, but not of numericType type: ${ft.dataType}"
110+
logger.error(msg)
111+
addEvent(ValidatorError(msg))
112+
case None =>
113+
val msg = s"Column: $column not found in schema"
114+
logger.error(msg)
115+
addEvent(ValidatorError(msg))
116+
}
117+
failed
118+
}
119+
120+
override def toJson: Json = {
121+
val additionalFieldsForReport = Json.fromFields(Set(
122+
"type" -> Json.fromString("columnSumCheck"),
123+
"failed" -> Json.fromBoolean(failed)
124+
))
125+
126+
val base = ColumnSumCheck.encoder(this)
127+
base.deepMerge(additionalFieldsForReport)
128+
}
129+
}
130+
131+
object ColumnSumCheck {
132+
val encoder: Encoder[ColumnSumCheck] = deriveEncoder[ColumnSumCheck]
133+
val decoder: Decoder[ColumnSumCheck] = deriveDecoder[ColumnSumCheck]
134+
def fromJson(c: HCursor): Either[DecodingFailure, ValidatorBase] = decoder.apply(c)
135+
}

src/main/scala/com/target/data_validator/validator/JsonDecoders.scala

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,39 @@ package com.target.data_validator.validator
22

33
import cats.syntax.either._
44
import com.typesafe.scalalogging.LazyLogging
5-
import io.circe.{Decoder, HCursor}
5+
import io.circe.{Decoder, DecodingFailure, HCursor}
66
import io.circe.generic.auto._
77

88
object JsonDecoders extends LazyLogging {
99

1010
implicit val decodeChecks: Decoder[ValidatorBase] = new Decoder[ValidatorBase] {
11-
final def apply(c: HCursor): Decoder.Result[ValidatorBase] = c.downField("type").as[String].flatMap {
12-
case "rowCount" => c.as[MinNumRows]
13-
case "nullCheck" => NullCheck.fromJson(c)
14-
case "negativeCheck" => NegativeCheck.fromJson(c)
15-
case "columnMaxCheck" => c.as[ColumnMaxCheck]
16-
case "rangeCheck" => RangeCheck.fromJson(c)
17-
case "uniqueCheck" => UniqueCheck.fromJson(c)
18-
case "stringLengthCheck" => StringLengthCheck.fromJson(c)
19-
case "stringRegexCheck" => StringRegexCheck.fromJson(c)
20-
case x => logger.error(s"Unknown Check `$x` in config!")
21-
throw new RuntimeException(s"Unknown Check in config `$x`")
11+
// FIXME: specifying this Function here instead of Decoder[ValidatorBase] is a smell that these checks
12+
// ought to have proper decoder objects instead of a method.
13+
// I.e., we're not using the Circe Decoder API as intended.
14+
private lazy val decoders = Map[String, HCursor => Either[DecodingFailure, ValidatorBase]](
15+
"rowCount" -> { _.as[MinNumRows] },
16+
"nullCheck" -> NullCheck.fromJson,
17+
"negativeCheck" -> NegativeCheck.fromJson,
18+
"columnMaxCheck" -> { _.as[ColumnMaxCheck] },
19+
"rangeCheck" -> RangeCheck.fromJson,
20+
"uniqueCheck" -> UniqueCheck.fromJson,
21+
"stringLengthCheck" -> StringLengthCheck.fromJson,
22+
"stringRegexCheck" -> StringRegexCheck.fromJson,
23+
"columnSumCheck" -> ColumnSumCheck.fromJson
24+
)
25+
26+
final def apply(c: HCursor): Decoder.Result[ValidatorBase] = c.downField("type").as[String].flatMap(getDecoder(c))
27+
28+
private def getDecoder(cursor: HCursor)(checkType: String) = {
29+
decoders
30+
.get(checkType)
31+
.map(_(cursor))
32+
match {
33+
case Some(x) => x
34+
case None =>
35+
logger.error(s"Unknown Check `$checkType` in config! Choose one of: ${decoders.keys.mkString(", ")}.")
36+
throw new RuntimeException(s"Unknown Check in config `$checkType`")
37+
}
2238
}
2339
}
2440
}

src/main/scala/com/target/data_validator/validator/ValidatorBase.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,14 @@ object ValidatorBase extends LazyLogging {
249249
trait CheapCheck extends ValidatorBase {
250250
def select(schema: StructType, dict: VarSubstitution): Expression
251251

252+
/**
253+
* Run a check on a particular column on a row
254+
*
255+
* @param r the row under inspection
256+
* @param count the number of rows total
257+
* @param idx the index of the column under inspection
258+
* @return true if the check fails, false if is passes
259+
*/
252260
def quickCheck(r: Row, count: Long, idx: Int): Boolean
253261
}
254262

src/test/scala/com/target/data_validator/TestHelpers.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ object TestHelpers {
1919
}
2020

2121
def guessType(v: Any): DataType = v.getClass.getCanonicalName match {
22+
case "java.lang.Short" => ShortType
2223
case "java.lang.String" => StringType
2324
case "java.lang.Integer" => IntegerType
2425
case "java.lang.Double" => DoubleType
2526
case "java.lang.Boolean" => BooleanType
27+
case "java.lang.Long" => LongType
2628
case _ => throw new IllegalArgumentException(s"Unknown type '${v.getClass.getCanonicalName}'")
2729
}
2830

0 commit comments

Comments
 (0)