Skip to content

Commit 7d14c73

Browse files
committed
* Fixed a bug
* Added tests to `InfinitySupportIso` * introduced `TimeZoneNormalizer` to tests to increase tests compatibility * Added a badge to README.md to show java version the project is built on
1 parent 193744f commit 7d14c73

File tree

12 files changed

+293
-110
lines changed

12 files changed

+293
-110
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
[![License](http://img.shields.io/:license-apache-blue.svg)](http://www.apache.org/licenses/LICENSE-2.0.html)
44
[![Release](https://github.com/AbsaOSS/spark-data-standardization/actions/workflows/release.yml/badge.svg)](https://github.com/AbsaOSS/spark-data-standardization/actions/workflows/release.yml)
5+
![Java 8](https://img.shields.io/badge/Java_1.8-ED8B00?style=flat&logo=openjdk&logoColor=black)
56

67
- Dataframe in
78
- Standardized Dataframe out

src/main/scala/za/co/absa/standardization/implicits/StringImplicits.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.security.InvalidParameterException
2121
import scala.annotation.tailrec
2222

2323
object StringImplicits {
24-
implicit class StringEnhancements(string: String) {
24+
implicit class StringEnhancements(val string: String) extends AnyVal {
2525

2626
/**
2727
* Replaces all occurrences of the provided characters with their mapped values

src/main/scala/za/co/absa/standardization/stages/InfinitySupport.scala

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,35 +27,32 @@ class InfinitySupport(
2727
val infPlusValue: Option[String],
2828
val origType: DataType
2929
) {
30-
private val hasInfinityDefined: Boolean = (infMinusSymbol.isDefined && infMinusValue.isDefined) ||
31-
(infPlusSymbol.isDefined && infPlusValue.isDefined)
32-
33-
protected def replaceSymbol(column: Column,
34-
possibleSymbol: Option[String],
35-
possibleValue: Option[String],
36-
valueToColumn: String => Column
37-
): Column = {
38-
possibleSymbol.flatMap { symbol =>
39-
possibleValue.map { value =>
40-
when(column === lit(symbol).cast(origType), valueToColumn(value))
41-
}
42-
}.getOrElse(column)
43-
}
44-
45-
protected def defaultInfinityValueInjection(value: String): Column = lit(value).cast(origType)
46-
47-
protected def executeReplacement(column: Column, conversion: Column => Column): Column = {
48-
val columnWithNegativeInf = replaceSymbol(column, infMinusSymbol, infMinusValue, defaultInfinityValueInjection)
49-
val columnWithPositiveInf = replaceSymbol(columnWithNegativeInf, infPlusSymbol, infPlusValue, defaultInfinityValueInjection)
50-
conversion(columnWithPositiveInf.otherwise(column))
30+
protected def executeReplacement(
31+
column: Column,
32+
conversion: Column => Column,
33+
useMinusSymbol: Option[String],
34+
useMinusValue: Option[String],
35+
usePlusSymbol: Option[String],
36+
usePlusValue: Option[String],
37+
): Column = {
38+
val replacement = (useMinusSymbol, useMinusValue, usePlusSymbol, usePlusValue) match {
39+
case (Some(minusSymbol), Some(minusValue), Some(plusSymbol), Some(plusValue)) =>
40+
when(column === lit(minusSymbol).cast(origType), lit(minusValue).cast(origType))
41+
.when(column === lit(plusSymbol).cast(origType), lit(plusValue).cast(origType))
42+
.otherwise(column)
43+
case (Some(minusSymbol), Some(minusValue), _, _) =>
44+
when(column === lit(minusSymbol).cast(origType), lit(minusValue).cast(origType))
45+
.otherwise(column)
46+
case (_, _, Some(plusSymbol), Some(plusValue)) =>
47+
when(column === lit(plusSymbol).cast(origType), lit(plusValue).cast(origType))
48+
.otherwise(column)
49+
case _ => column
50+
}
51+
conversion(replacement)
5152
}
5253

5354
def replaceInfinitySymbols(column: Column, conversion: Column => Column = c => c): Column = {
54-
if (hasInfinityDefined) {
55-
executeReplacement(column, conversion)
56-
} else {
57-
conversion(column)
58-
}
55+
executeReplacement(column, conversion, infMinusSymbol, infMinusValue, infPlusSymbol, infPlusValue)
5956
}
6057
}
6158

src/main/scala/za/co/absa/standardization/stages/InfinitySupportIso.scala

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2025 ABSA Group Limited
2+
* Copyright 2021 ABSA Group Limited
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,9 +20,8 @@ import org.apache.spark.sql.Column
2020
import org.apache.spark.sql.functions._
2121
import org.apache.spark.sql.types.{DataType, DateType, TimestampType}
2222

23-
import java.text.SimpleDateFormat
2423
import java.time.OffsetDateTime
25-
import java.time.format.DateTimeFormatter.{ISO_DATE, ISO_DATE_TIME}
24+
import java.time.format.DateTimeFormatter.ISO_DATE
2625
import scala.util.Try
2726

2827
abstract class InfinitySupportIso(
@@ -38,21 +37,41 @@ abstract class InfinitySupportIso(
3837
def useIsoForInfMinus: Boolean
3938
def useIsoForInfPlus: Boolean
4039

41-
def chooseInjectionFunction(isIso: Boolean, conversion: Column => Column, value: String): Column = {
42-
if (isIso) {
43-
isoCast(value)
44-
} else {
45-
conversion(defaultInfinityValueInjection(value))
40+
private def executeIsoReplacement(
41+
column: Column,
42+
otherwiseColumn: Column,
43+
useMinusSymbol: Option[String],
44+
useMinusValue: Option[String],
45+
usePlusSymbol: Option[String],
46+
usePlusValue: Option[String],
47+
): Column = {
48+
(useMinusSymbol, useMinusValue, usePlusSymbol, usePlusValue) match {
49+
case (Some(minusSymbol), Some(minusValue), Some(plusSymbol), Some(plusValue)) =>
50+
when(column === lit(minusSymbol).cast(origType), isoCast(minusValue))
51+
.when(column === lit(plusSymbol).cast(origType), isoCast(plusValue))
52+
.otherwise(otherwiseColumn)
53+
case (Some(minusSymbol), Some(minusValue), _, _) =>
54+
when(column === lit(minusSymbol).cast(origType), isoCast(minusValue))
55+
.otherwise(otherwiseColumn)
56+
case (_, _, Some(plusSymbol), Some(plusValue)) =>
57+
when(column === lit(plusSymbol).cast(origType), isoCast(plusValue))
58+
.otherwise(otherwiseColumn)
59+
case _ => otherwiseColumn
4660
}
4761
}
4862

49-
override def executeReplacement(column: Column, conversion: Column => Column): Column = {
50-
if (useIsoForInfMinus || useIsoForInfPlus) {
51-
val columnWithNegativeInf = replaceSymbol(column, infMinusSymbol, infMinusValue, chooseInjectionFunction(useIsoForInfMinus, conversion, _))
52-
val columnWithPositiveInf = replaceSymbol(columnWithNegativeInf, infPlusSymbol, infPlusValue, chooseInjectionFunction(useIsoForInfPlus, conversion, _))
53-
columnWithPositiveInf.otherwise(conversion(column))
54-
} else {
55-
super.executeReplacement(column, conversion)
63+
override def replaceInfinitySymbols(column: Column, conversion: Column => Column = c => c): Column = {
64+
(useIsoForInfMinus, useIsoForInfPlus) match {
65+
case (true, true) =>
66+
val otherwise = super.executeReplacement(column, conversion, None, None, None, None) // no replacements, just conversion
67+
executeIsoReplacement(column, otherwise, infMinusSymbol, infMinusValue, infPlusSymbol, infPlusValue)
68+
case (true, false) =>
69+
val otherwise = super.executeReplacement(column, conversion, None, None, infPlusSymbol, infPlusValue) // no minus replacement, just conversion and plus replacement
70+
executeIsoReplacement(column, otherwise, infMinusSymbol, infMinusValue, None, None)
71+
case (false, true) =>
72+
val otherwise = super.executeReplacement(column, conversion, infMinusSymbol, infMinusValue, None, None) // no plus replacement, just conversion and minus replacement
73+
executeIsoReplacement(column, otherwise, None, None, infPlusSymbol, infPlusValue)
74+
case (false, false) => super.replaceInfinitySymbols(column, conversion)
5675
}
5776
}
5877
}

src/test/scala/za/co/absa/standardization/TypeParserSuite.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,26 +47,25 @@ class TypeParserSuite extends AnyFunSuite with SparkTestBase {
4747
StructField("id",IntegerType, nullable = false),
4848
StructField("date",DateType, nullable = true, Metadata.fromJson("""{"pattern":"yyyy-MM-dd","minus_infinity_symbol":"-INF","minus_infinity_value":"1000-01-01","plus_infinity_symbol":"INF","plus_infinity_value":"9999-12-31"}""")),
4949
StructField("timestamp",TimestampType, nullable = true, Metadata.fromJson("""{"pattern":"yyyy-MM-dd HH:mm:ss","minus_infinity_symbol":"-INF","minus_infinity_value":"1000-01-01 00:00:00","plus_infinity_symbol":"INF","plus_infinity_value":"9999-12-31 23:59:59"}""")),
50-
StructField("custom_date",DateType, nullable = true, Metadata.fromJson("""{"pattern":"yyMMdd","minus_infinity_symbol":"-INF","minus_infinity_value":"1000-01-01","plus_infinity_symbol":"INF","plus_infinity_value":"9999-12-31"}""")),
50+
StructField("custom_date",DateType, nullable = true, Metadata.fromJson("""{"pattern":"yyMMdd","minus_infinity_symbol":"-INF","minus_infinity_value":"1000-01-01","plus_infinity_symbol":"INF","plus_infinity_value":"9999-12-31"}"""))
5151
))
5252

5353
val stdDF = Standardization.standardize(testData,schema,stdConfig).cache()
5454

5555
val results = stdDF.select("id","date", "timestamp", "custom_date","errCol").collect()
5656

57-
5857
assert(results(0).getAs[Date](1) == Date.valueOf("2025-08-05"))
59-
assert(results(0).getAs[Timestamp](2) == Date.valueOf("2025-08-05 12:34:56"))
58+
assert(results(0).getAs[Timestamp](2) == Timestamp.valueOf("2025-08-05 12:34:56"))
6059
assert(results(0).getAs[Date](3) == Date.valueOf("2025-08-05"))
6160
assert(results(0).getAs[Seq[ErrorMessage]]("errCol").isEmpty)
6261

6362
assert(results(1).getAs[Date](1) == Date.valueOf("1000-01-01"))
64-
assert(results(1).getAs[Timestamp](2) == Date.valueOf("1000-01-01 00:00:00"))
63+
assert(results(1).getAs[Timestamp](2) == Timestamp.valueOf("1000-01-01 00:00:00"))
6564
assert(results(1).getAs[Date](3) == Date.valueOf("1000-01-01"))
6665
assert(results(1).getAs[Seq[ErrorMessage]]("errCol").isEmpty)
6766

6867
assert(results(2).getAs[Date](1) == Date.valueOf("9999-12-31"))
69-
assert(results(2).getAs[Timestamp](2) == Date.valueOf("9999-12-31 23:59:59"))
68+
assert(results(2).getAs[Timestamp](2) == Timestamp.valueOf("9999-12-31 23:59:59"))
7069
assert(results(2).getAs[Date](3) == Date.valueOf("9999-12-31"))
7170
assert(results(2).getAs[Seq[ErrorMessage]]("errCol").isEmpty)
7271

src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_DateSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ import za.co.absa.standardization.udf.UDFLibrary
2727
import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage}
2828
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
2929
import za.co.absa.standardization.schema.MetadataKeys
30+
import za.co.absa.standardization.testing.TimeZoneNormalizer
3031

3132
class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase {
3233
import spark.implicits._
34+
TimeZoneNormalizer.normalizeAll
3335

3436
private val stdConfig = BasicStandardizationConfig
3537
.fromDefault()

src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_TimestampSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import za.co.absa.spark.commons.test.SparkTestBase
2323
import za.co.absa.standardization.RecordIdGeneration.IdType.NoId
2424
import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig, ErrorCodesConfig}
2525
import za.co.absa.standardization.schema.MetadataKeys
26+
import za.co.absa.standardization.testing.TimeZoneNormalizer
2627
import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults}
2728
import za.co.absa.standardization.udf.UDFLibrary
2829
import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage}
@@ -31,6 +32,7 @@ import java.sql.Timestamp
3132

3233
class StandardizationInterpreter_TimestampSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase {
3334
import spark.implicits._
35+
TimeZoneNormalizer.normalizeAll
3436

3537
private val stdConfig = BasicStandardizationConfig
3638
.fromDefault()

src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParserSuiteTemplate.scala

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,6 @@ trait TypeParserSuiteTemplate extends AnyFunSuite with SparkTestBase {
8686
testTemplate(floatField, schema, path)
8787
}
8888

89-
protected def doTestIntoFloatWithInf(input: Input): Unit = {
90-
import input._
91-
val floatField = StructField("floatField", FloatType, nullable = false,
92-
new MetadataBuilder()
93-
.putString("sourcecolumn", sourceFieldName)
94-
.putString(MetadataKeys.PlusInfinityValue, Float.PositiveInfinity.toString)
95-
.putString(MetadataKeys.PlusInfinitySymbol, "inf")
96-
.putString(MetadataKeys.MinusInfinityValue, Float.NegativeInfinity.toString)
97-
.putString(MetadataKeys.MinusInfinitySymbol, "-inf")
98-
.build)
99-
val schema = buildSchema(Array(sourceField(baseType), floatField), path)
100-
testTemplate(floatField, schema, path)
101-
}
102-
10389
protected def doTestIntoIntegerField(input: Input): Unit = {
10490
import input._
10591
val integerField = StructField("integerField", IntegerType, nullable = true,
@@ -198,35 +184,6 @@ trait TypeParserSuiteTemplate extends AnyFunSuite with SparkTestBase {
198184
testTemplate(timestampField, schema, path, timestampPattern, Option(fixedTimezone))
199185
}
200186

201-
protected def doTestIntoTimestampWithPlusInfinity(input: Input): Unit = {
202-
import input._
203-
val timestampField = StructField("timestampField", TimestampType, nullable = false,
204-
new MetadataBuilder()
205-
.putString("sourcecolumn", sourceFieldName)
206-
.putString("pattern", timestampPattern)
207-
.putString(MetadataKeys.PlusInfinityValue, "99991231")
208-
.putString(MetadataKeys.PlusInfinitySymbol, "inf")
209-
.putString(MetadataKeys.MinusInfinityValue, "00010101")
210-
.putString(MetadataKeys.MinusInfinitySymbol, "-inf")
211-
.build)
212-
val schema = buildSchema(Array(sourceField(baseType), timestampField), path)
213-
testTemplate(timestampField, schema, path, timestampPattern)
214-
}
215-
216-
protected def doTestIntoDateFieldWithInf(input: Input): Unit = {
217-
import input._
218-
val timestampField = StructField("dateField", DateType, nullable = false,
219-
new MetadataBuilder()
220-
.putString("sourcecolumn", sourceFieldName)
221-
.putString(MetadataKeys.PlusInfinityValue, "99991231")
222-
.putString(MetadataKeys.PlusInfinitySymbol, "inf")
223-
.putString(MetadataKeys.MinusInfinityValue, "00010101")
224-
.putString(MetadataKeys.MinusInfinitySymbol, "-inf")
225-
.build)
226-
val schema = buildSchema(Array(sourceField(baseType), timestampField), path)
227-
testTemplate(timestampField, schema, path, "yyyy-MM-dd")
228-
}
229-
230187
protected def doTestIntoDateFieldWithEpochPattern(input: Input): Unit = {
231188
import input._
232189
val dateField = StructField("dateField", DateType, nullable = false,
@@ -300,7 +257,7 @@ trait TypeParserSuiteTemplate extends AnyFunSuite with SparkTestBase {
300257
val dateFormatter = new SimpleDateFormat("yyyy-MM-dd")
301258
dateFormatter.format(date)
302259
}
303-
s"DATE '${dateString}'"
260+
s"DATE '$dateString'"
304261
}
305262

306263
def timeStampComponentShow(date: Timestamp): String = {

src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromStringTypeSuite.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,4 @@ class TypeParser_FromStringTypeSuite extends TypeParserSuiteTemplate {
141141
test("Into timestamp field with epoch pattern") {
142142
doTestIntoTimestampFieldWithEpochPattern(input)
143143
}
144-
145-
test("Into float field with inf"){
146-
doTestIntoFloatWithInf(input)
147-
}
148-
149-
test("Into timestamp field with inf") {
150-
doTestIntoTimestampWithPlusInfinity(input)
151-
}
152-
153-
test("Into date field with inf"){
154-
doTestIntoDateFieldWithInf(input)
155-
}
156144
}

src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromTimestampTypeSuite.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,4 @@ class TypeParser_FromTimestampTypeSuite extends TypeParserSuiteTemplate {
133133
test("Into timestamp field with epoch pattern") {
134134
doTestIntoTimestampFieldWithEpochPattern(input)
135135
}
136-
137-
test("Into timestamp field with inf") {
138-
doTestIntoTimestampWithPlusInfinity(input)
139-
}
140136
}

0 commit comments

Comments
 (0)