Skip to content

Commit dd677f1

Browse files
authored
Merge pull request #62 from phpisciuneri/varsub-rowcount
Adds variable substitution for minNumRows
2 parents 2839ed2 + 2d603d4 commit dd677f1

File tree

6 files changed

+50
-29
lines changed

6 files changed

+50
-29
lines changed

README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ A tool to validate data in HIVE tables.
77
Assemble fat jar: `sbt clean assembly`
88

99
```
10-
spark-submit --master local data-validator-assembly-0.10.0.jar --help
10+
spark-submit --master local data-validator-assembly-0.13.0.jar --help
1111
12-
data-validator v0.10.0
12+
data-validator v0.13.0
1313
Usage: data-validator [options]
1414
1515
--version
@@ -31,7 +31,7 @@ Usage: data-validator [options]
3131
spark-submit \
3232
--num-executors 10 \
3333
--executor-cores 2 \
34-
data-validator-assembly-0.10.0.jar \
34+
data-validator-assembly-0.13.0.jar \
3535
--config config.yaml \
3636
--jsonReport report.json
3737
```
@@ -302,7 +302,7 @@ The minimum number of rows a table must have to pass the validator.
302302

303303
| Arg | Type | Description |
304304
|-----|------|-------------|
305-
| `minNumRows` | Long | The minimum number of rows a table must have to pass. **Note:** Currently this cannot be a variable.
305+
| `minNumRows` | Long | The minimum number of rows a table must have to pass.
306306

307307
See Example Config file below to see how the checks are configured.
308308

@@ -463,7 +463,7 @@ Example oozie wf snippet:
463463
<argument>${principal}</argument>
464464
<argument>--files</argument>
465465
<argument>config.yaml</argument>
466-
<argument>data-validator-assembly-0.10.0.jar</argument>
466+
<argument>data-validator-assembly-0.13.0.jar</argument>
467467
<argument>--config</argument>
468468
<argument>config.yaml</argument>
469469
<argument>--exitErrorOnFail</argument>
@@ -511,7 +511,7 @@ Example oozie wf snippet:
511511
<argument>${keytab}</argument>
512512
<argument>--principal</argument>
513513
<argument>${principal}</argument>
514-
<argument>data-validator-assembly-0.10.0.jar</argument>
514+
<argument>data-validator-assembly-0.13.0.jar</argument>
515515
<argument>--config</argument>
516516
<argument>config.yaml</argument>
517517
<argument>--exitErrorOnFail</argument>
@@ -554,7 +554,7 @@ A tool is provided to generate a sample `.orc` file for use in local development
554554
```sh
555555
spark-submit \
556556
--master "local[*]" \
557-
data-validator-assembly-0.10.0.jar \
557+
data-validator-assembly-0.13.0.jar \
558558
--config local_validators.yaml \
559559
--jsonReport report.json \
560560
--htmlReport report.html

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name := "data-validator"
22
organization := "com.target"
33

4-
scalaVersion := "2.11.10"
4+
scalaVersion := "2.11.12"
55

66
val sparkVersion = "2.3.1"
77

src/main/scala/com/target/data_validator/validator/ColumnBased.scala

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,26 +35,40 @@ abstract class ColumnBased(column: String, condTest: Expression) extends CheapCh
3535
}
3636
}
3737

38-
case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0) {
38+
case class MinNumRows(minNumRows: Json) extends ColumnBased("", ValidatorBase.L0) {
3939
override def name: String = "MinNumRows"
4040

41-
override def substituteVariables(dict: VarSubstitution): ValidatorBase = this
41+
override def substituteVariables(dict: VarSubstitution): ValidatorBase = {
42+
val ret = MinNumRows(getVarSubJson(minNumRows, "minNumRows", dict))
43+
getEvents.foreach(ret.addEvent)
44+
ret
45+
}
4246

4347
override def configCheck(df: DataFrame): Boolean = {
44-
if (minNumRows <= 0) {
45-
val msg = s"MinNumRows: $minNumRows <= 0"
48+
49+
def notNaturalNumber(): Unit = {
50+
val msg = "minNumRows must be a natural number"
4651
logger.error(msg)
4752
addEvent(ValidatorError(msg))
48-
failed = true
49-
true
50-
} else {
51-
false
5253
}
54+
55+
minNumRows.asNumber match {
56+
case Some(jsonNumber) => jsonNumber.toLong match {
57+
case Some(x) if x > 0 =>
58+
case _ => notNaturalNumber()
59+
}
60+
case _ => notNaturalNumber()
61+
}
62+
failed
5363
}
5464

5565
override def quickCheck(row: Row, count: Long, idx: Int): Boolean = {
56-
failed = count < minNumRows
57-
val pctError = if (failed) calculatePctError(minNumRows, count) else "0.00%"
66+
// Convert to `JsonNumber` then to `Long`
67+
// safe because already handled in `configCheck`
68+
val minNumRowsLong = minNumRows.asNumber.get.toLong.get
69+
70+
failed = count < minNumRowsLong
71+
val pctError = if (failed) calculatePctError(minNumRowsLong, count) else "0.00%"
5872
addEvent(ValidatorCounter("rowCount", count))
5973
val msg = s"MinNumRowsCheck Expected: $minNumRows Actual: $count Relative Error: $pctError"
6074
val data = ListMap("expected" -> minNumRows.toString, "actual" -> count.toString, "relative_error" -> pctError)
@@ -64,7 +78,7 @@ case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0
6478

6579
override def toJson: Json = Json.obj(
6680
("type", Json.fromString("rowCount")),
67-
("minNumRows", Json.fromLong(minNumRows)),
81+
("minNumRows", minNumRows),
6882
("failed", Json.fromBoolean(failed)),
6983
("events", this.getEvents.asJson)
7084
)

src/test/scala/com/target/data_validator/ConfigParserSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ConfigParserSpec extends FunSpec with BeforeAndAfterAll {
2828
"bar",
2929
Some(List("one", "two")),
3030
None,
31-
List(MinNumRows(10294), NullCheck("mdse_item_i", None)) // scalastyle:ignore magic.number
31+
List(MinNumRows(Json.fromInt(10294)), NullCheck("mdse_item_i", None)) // scalastyle:ignore magic.number
3232
),
3333
ValidatorOrcFile("LocalFile.orc", None, Some("foo < 10"), List(NullCheck("start_d", None))),
3434
ValidatorParquetFile("LocFile.parquet", None, Some("bar < 10"), List(NullCheck("end_d", None)))

src/test/scala/com/target/data_validator/ConfigVarSubSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ class ConfigVarSubSpec extends FunSpec with Matchers with TestingSparkSession {
6262

6363
describe("MinNumRows") {
6464

65-
it("MinNumRows doesn't support substitutions so it should be equal, no changes.") {
66-
val sut = MinNumRows(100) // scalastyle:ignore
67-
assert(sut.substituteVariables(dict) == sut)
65+
it("should substitute variables properly") {
66+
val sut = MinNumRows(Json.fromString("$one"))
67+
assert(sut.substituteVariables(dict) == MinNumRows(Json.fromInt(1)))
6868
}
6969

7070
}

src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,19 +171,27 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession {
171171

172172
describe("ValidatorMinNumRows") {
173173

174+
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
175+
176+
it("configCheck() should fail for minNumRows as non-numeric") {
177+
val dict = new VarSubstitution
178+
val config = mkConfig(df, List(MinNumRows(Json.fromString("badinput"))))
179+
assert(config.configCheck(spark, dict))
180+
assert(config.failed)
181+
assert(config.tables.head.failed)
182+
}
183+
174184
it("configCheck() should fail for negative minNumRows") {
175185
val dict = new VarSubstitution
176-
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
177-
val config = mkConfig(df, List(MinNumRows(-10))) //scalastyle:ignore
186+
val config = mkConfig(df, List(MinNumRows(Json.fromLong(-10)))) // scalastyle:ignore magic.number
178187
assert(config.configCheck(spark, dict))
179188
assert(config.failed)
180189
assert(config.tables.head.failed)
181190
}
182191

183192
it("quickCheck() should fail when rowCount < minNumRows") {
184193
val dict = new VarSubstitution
185-
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
186-
val minNumRowsCheck = MinNumRows(10) // scalastyle:ignore magic.number
194+
val minNumRowsCheck = MinNumRows(Json.fromLong(10)) // scalastyle:ignore magic.number
187195
val config = mkConfig(df, List(minNumRowsCheck))
188196
assert(config.quickChecks(spark, dict))
189197
assert(config.failed)
@@ -197,8 +205,7 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession {
197205

198206
it("quickCheck() should succeed when rowCount > minNumRows") {
199207
val dict = new VarSubstitution
200-
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
201-
val minNumRowsCheck = MinNumRows(1)
208+
val minNumRowsCheck = MinNumRows(Json.fromInt(1))
202209
val config = mkConfig(df, List(minNumRowsCheck))
203210
assert(!config.configCheck(spark, dict))
204211
assert(!config.quickChecks(spark, dict))

0 commit comments

Comments
 (0)