Skip to content

Commit 17148a7

Browse files
authored
#156 Add sum of truncated values as measure
1 parent 9609081 commit 17148a7

File tree

6 files changed

+58
-12
lines changed

6 files changed

+58
-12
lines changed

.github/workflows/jacoco_check.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ jobs:
4848
run: sbt ++${{matrix.scala}} jacoco
4949
- name: Add coverage to PR
5050
id: jacoco
51-
uses: madrapps/jacoco-report@v1.3
51+
uses: madrapps/jacoco-report@v1.7.1
5252
with:
5353
paths: >
5454
${{ github.workspace }}/atum/target/scala-${{ matrix.scala_short }}/jacoco/report/jacoco.xml,

README.md

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -419,13 +419,15 @@ The summary of common control framework routines you can use as Spark and Datafr
419419
The control measurement of a column is a hash sum. It can be calculated differently depending on the column's data type and
420420
on business requirements. This table represents all currently supported measurement types:
421421

422-
| Type | Description |
423-
| ------------------------------ |:----------------------------------------------------- |
424-
| controlType.Count | Calculates the number of rows in the dataset |
425-
| controlType.distinctCount | Calculates DISTINCT(COUNT(()) of the specified column |
426-
| controlType.aggregatedTotal | Calculates SUM() of the specified column |
427-
| controlType.absAggregatedTotal | Calculates SUM(ABS()) of the specified column |
428-
| controlType.HashCrc32 | Calculates SUM(CRC32()) of the specified column |
422+
| Type | Description |
423+
| ----------------------------------- |:----------------------------------------------------- |
424+
| controlType.Count | Calculates the number of rows in the dataset |
425+
| controlType.distinctCount | Calculates DISTINCT(COUNT(()) of the specified column |
426+
| controlType.aggregatedTotal | Calculates SUM() of the specified column |
427+
| controlType.absAggregatedTotal | Calculates SUM(ABS()) of the specified column |
428+
| controlType.HashCrc32 | Calculates SUM(CRC32()) of the specified column |
429+
| controlType.aggregatedTruncTotal | Calculates SUM(TRUNC()) of the specified column |
430+
| controlType.absAggregatedTruncTotal | Calculates SUM(TRUNC(ABS())) of the specified column |
429431

430432
## How to generate Code coverage report
431433
```sbt

atum/src/main/scala/za/co/absa/atum/core/ControlType.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ object ControlType {
2222
case object DistinctCount extends ControlType("distinctCount", false)
2323
case object AggregatedTotal extends ControlType("aggregatedTotal", true)
2424
case object AbsAggregatedTotal extends ControlType("absAggregatedTotal", true)
25+
case object AggregatedTruncTotal extends ControlType("aggregatedTruncTotal", true)
26+
case object AbsAggregatedTruncTotal extends ControlType("absAggregatedTruncTotal", true)
2527
case object HashCrc32 extends ControlType("hashCrc32", false)
2628

27-
val values: Seq[ControlType] = Seq(Count, DistinctCount, AggregatedTotal, AbsAggregatedTotal, HashCrc32)
29+
val values: Seq[ControlType] = Seq(Count, DistinctCount, AggregatedTotal, AbsAggregatedTotal,
30+
AggregatedTruncTotal, AbsAggregatedTruncTotal, HashCrc32)
2831
val valueNames: Seq[String] = values.map(_.value)
2932

3033
def getNormalizedValueName(input: String): String = {

atum/src/main/scala/za/co/absa/atum/core/MeasurementProcessor.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@ object MeasurementProcessor {
6161
.agg(sum(col(aggColName))).collect()(0)(0)
6262
if (v == null) "" else v.toString
6363
}
64+
case AggregatedTruncTotal =>
65+
(ds: Dataset[Row]) => {
66+
val aggCol = sum(col(valueColumnName).cast(LongType))
67+
aggregateColumn(ds, controlCol, aggCol)
68+
}
69+
case AbsAggregatedTruncTotal =>
70+
(ds: Dataset[Row]) => {
71+
val aggCol = sum(abs(col(valueColumnName).cast(LongType)))
72+
aggregateColumn(ds, controlCol, aggCol)
73+
}
6474
}
6575
}
6676

atum/src/main/scala/za/co/absa/atum/utils/SparkLocalMaster.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ trait SparkLocalMaster {
2222
// in order to runSampleMeasuremts as tests, otherwise
2323
// java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200... is thrown
2424
System.getProperties.setProperty("spark.testing.memory", (1024*1024*1024).toString) // 1g
25+
System.getProperties.setProperty("spark.app.name", "unit-test")
2526
}

atum/src/test/scala/za/co/absa/atum/ControlMeasurementsSpec.scala

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ControlMeasurementsSpec extends AnyFlatSpec with Matchers with SparkTestBa
3737
)
3838
))
3939

40-
val measurementsIntOverflow = List(
40+
val measurementsIntOverflow: Seq[Measurement] = List(
4141
Measurement(
4242
controlName = "RecordCount",
4343
controlType = ControlType.Count.value,
@@ -112,7 +112,7 @@ class ControlMeasurementsSpec extends AnyFlatSpec with Matchers with SparkTestBa
112112
assert(newMeasurements == measurementsIntOverflow)
113113
}
114114

115-
val measurementsAggregation = List(
115+
val measurementsAggregation: Seq[Measurement] = List(
116116
Measurement(
117117
controlName = "RecordCount",
118118
controlType = ControlType.Count.value,
@@ -304,7 +304,7 @@ class ControlMeasurementsSpec extends AnyFlatSpec with Matchers with SparkTestBa
304304
assert(newMeasurements == measurements3)
305305
}
306306

307-
val measurementsWithHash = List(
307+
val measurementsWithHash: Seq[Measurement] = List(
308308
Measurement(
309309
controlName = "RecordCount",
310310
controlType = ControlType.Count.value,
@@ -394,4 +394,34 @@ class ControlMeasurementsSpec extends AnyFlatSpec with Matchers with SparkTestBa
394394
assert(newMeasurements == measurementsAggregationShort)
395395
}
396396

397+
val measurementsAggregatedTruncTotal: Seq[Measurement] = List(
398+
Measurement(
399+
controlName = "aggregatedTruncTotal",
400+
controlType = "aggregatedTruncTotal",
401+
controlCol = "price",
402+
controlValue = "999"
403+
),
404+
Measurement(
405+
controlName = "absAggregatedTruncTotal",
406+
controlType = "absAggregatedTruncTotal",
407+
controlCol = "price",
408+
controlValue = "2999"
409+
)
410+
)
411+
412+
"aggregatedTruncTotal types" should "return truncated sum of values" in {
413+
val inputDataJson = spark.sparkContext.parallelize(
414+
s"""{"id": ${Long.MaxValue}, "price": -1000.000001, "order": { "orderid": 1, "items": 1 } } """ ::
415+
s"""{"id": ${Long.MinValue}, "price": 1000.9, "order": { "orderid": -1, "items": -1 } } """ ::
416+
s"""{"id": ${Long.MinValue}, "price": 999.999999, "order": { "orderid": -1, "items": -1 } } """ ::Nil)
417+
val df = spark.read
418+
.schema(schema)
419+
.json(inputDataJson.toDS)
420+
421+
val processor = new MeasurementProcessor(measurementsAggregatedTruncTotal)
422+
val newMeasurements = processor.measureDataset(df)
423+
424+
assert(newMeasurements == measurementsAggregatedTruncTotal)
425+
}
426+
397427
}

0 commit comments

Comments
 (0)