Skip to content

Commit 8e36e5f

Browse files
authored
#61 Add infinity support to Dates and Numerics
1 parent 0ee16c8 commit 8e36e5f

15 files changed

+375
-82
lines changed

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.6.2
1+
sbt.version=1.10.2

src/main/scala/za/co/absa/standardization/schema/MetadataKeys.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ object MetadataKeys {
2222
val DefaultValue = "default"
2323
// date & timestamp
2424
val DefaultTimeZone = "timezone"
25+
val MinusInfinitySymbol = "minus_infinity_symbol"
26+
val MinusInfinityValue = "minus_infinity_value"
27+
val PlusInfinitySymbol = "plus_infinity_symbol"
28+
val PlusInfinityValue = "plus_infinity_value"
2529
// date & timestamp & all numeric
2630
val Pattern = "pattern"
2731
// all numeric
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2021 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.standardization.stages
18+
19+
import org.apache.spark.sql.Column
20+
import org.apache.spark.sql.functions.{lit, when}
21+
import org.apache.spark.sql.types.DataType
22+
23+
trait InfinitySupport {
24+
protected def infMinusSymbol: Option[String]
25+
26+
protected def infMinusValue: Option[String]
27+
28+
protected def infPlusSymbol: Option[String]
29+
30+
protected def infPlusValue: Option[String]
31+
32+
protected val origType: DataType
33+
34+
def replaceInfinitySymbols(column: Column): Column = {
35+
val columnWithNegativeInf: Column = infMinusSymbol.flatMap { minusSymbol =>
36+
infMinusValue.map { minusValue =>
37+
when(column === lit(minusSymbol).cast(origType), lit(minusValue).cast(origType)).otherwise(column)
38+
}
39+
}.getOrElse(column)
40+
41+
infPlusSymbol.flatMap { plusSymbol =>
42+
infPlusValue.map { plusValue =>
43+
when(columnWithNegativeInf === lit(plusSymbol).cast(origType), lit(plusValue).cast(origType))
44+
.otherwise(columnWithNegativeInf)
45+
}
46+
}.getOrElse(columnWithNegativeInf)
47+
}
48+
}

src/main/scala/za/co/absa/standardization/stages/TypeParser.scala

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ import org.apache.spark.sql.functions._
2222
import org.apache.spark.sql.types._
2323
import org.slf4j.{Logger, LoggerFactory}
2424
import za.co.absa.spark.commons.implicits.ColumnImplicits.ColumnEnhancements
25+
import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements
2526
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
2627
import za.co.absa.spark.commons.utils.SchemaUtils
2728
import za.co.absa.spark.hofs.transform
2829
import za.co.absa.standardization.{ErrorMessage, StandardizationErrorMessage}
2930
import za.co.absa.standardization.config.StandardizationConfig
3031
import za.co.absa.standardization.implicits.StdColumnImplicits.StdColumnEnhancements
3132
import za.co.absa.standardization.schema.StdSchemaUtils.FieldWithSource
32-
import za.co.absa.standardization.schema.{MetadataValues, StdSchemaUtils}
33+
import za.co.absa.standardization.schema.{MetadataKeys, MetadataValues, StdSchemaUtils}
3334
import za.co.absa.standardization.time.DateTimePattern
3435
import za.co.absa.standardization.typeClasses.{DoubleLike, LongLike}
3536
import za.co.absa.standardization.types.TypedStructField._
@@ -127,6 +128,7 @@ object TypeParser {
127128
private val MicrosecondsPerSecond = 1000000
128129
private val NanosecondsPerSecond = 1000000000
129130
private val InfinityStr = "Infinity"
131+
private val MinusInfinityStr = "-Infinity"
130132
private val nullColumn = lit(null) //scalastyle:ignore null
131133

132134

@@ -315,7 +317,13 @@ object TypeParser {
315317
}
316318

317319
private abstract class NumericParser[N: TypeTag](override val field: NumericTypeStructField[N])
318-
(implicit defaults: TypeDefaults) extends ScalarParser[N] {
320+
(implicit defaults: TypeDefaults) extends ScalarParser[N] with InfinitySupport {
321+
override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol)
322+
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
323+
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
324+
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
325+
private val columnWithInfinityReplaced = replaceInfinitySymbols(column)
326+
319327
override protected def standardizeAfterCheck(stdConfig: StandardizationConfig)(implicit logger: Logger): ParseOutput = {
320328
if (field.needsUdfParsing) {
321329
standardizeUsingUdf(stdConfig)
@@ -335,9 +343,9 @@ object TypeParser {
335343
val columnWithProperDecimalSymbols: Column = if (replacements.nonEmpty) {
336344
val from = replacements.keys.mkString
337345
val to = replacements.values.mkString
338-
translate(column, from, to)
346+
translate(columnWithInfinityReplaced, from, to)
339347
} else {
340-
column
348+
columnWithInfinityReplaced
341349
}
342350

343351
val columnToCast = if (field.allowInfinity && (decimalSymbols.infinityValue != InfinityStr)) {
@@ -494,9 +502,14 @@ object TypeParser {
494502
* Date | O | ->to_utc_timestamp->to_date
495503
* Other | ->String->to_date | ->String->to_timestamp->to_utc_timestamp->to_date
496504
*/
497-
private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] {
505+
private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] with InfinitySupport {
498506
override val field: DateTimeTypeStructField[T]
499507
protected val pattern: DateTimePattern = field.pattern.get.get
508+
override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol)
509+
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
510+
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
511+
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
512+
private val columnWithInfinityReplaced: Column = replaceInfinitySymbols(column)
500513

501514
override protected def assemblePrimitiveCastLogic: Column = {
502515
if (pattern.isEpoch) {
@@ -516,23 +529,23 @@ object TypeParser {
516529

517530
private def castWithPattern(): Column = {
518531
// sadly with parquet support, incoming might not be all `plain`
519-
// underlyingType match {
532+
// underlyingType match {
520533
origType match {
521534
case _: NullType => nullColumn
522-
case _: DateType => castDateColumn(column)
523-
case _: TimestampType => castTimestampColumn(column)
524-
case _: StringType => castStringColumn(column)
535+
case _: DateType => castDateColumn(columnWithInfinityReplaced)
536+
case _: TimestampType => castTimestampColumn(columnWithInfinityReplaced)
537+
case _: StringType => castStringColumn(columnWithInfinityReplaced)
525538
case ot: DoubleType =>
526539
// this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss
527540
patternNeeded(ot)
528-
castFractionalColumn(column, ot)
541+
castFractionalColumn(columnWithInfinityReplaced, ot)
529542
case ot: FloatType =>
530543
// this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss
531544
patternNeeded(ot)
532-
castFractionalColumn(column, ot)
545+
castFractionalColumn(columnWithInfinityReplaced, ot)
533546
case ot =>
534547
patternNeeded(ot)
535-
castNonStringColumn(column, ot)
548+
castNonStringColumn(columnWithInfinityReplaced, ot)
536549
}
537550
}
538551

@@ -554,7 +567,7 @@ object TypeParser {
554567
}
555568

556569
protected def castEpoch(): Column = {
557-
(column.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
570+
(columnWithInfinityReplaced.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
558571
}
559572

560573
protected def castStringColumn(stringColumn: Column): Column

src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_DateSuite.scala

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults}
2626
import za.co.absa.standardization.udf.UDFLibrary
2727
import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage}
2828
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
29+
import za.co.absa.standardization.schema.MetadataKeys
2930

3031
class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase {
3132
import spark.implicits._
@@ -53,7 +54,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
5354
)
5455
val desiredSchema = StructType(Seq(
5556
StructField(fieldName, DateType, nullable = false,
56-
new MetadataBuilder().putString("pattern", "epoch").build)
57+
new MetadataBuilder().putString(MetadataKeys.Pattern, "epoch").build)
5758
))
5859
val exp = Seq(
5960
DateRow(Date.valueOf("1970-01-01")),
@@ -75,18 +76,28 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
7576
val seq = Seq(
7677
0L,
7778
86400000,
79+
-1,
7880
978307199999L,
79-
1563288103123L
81+
1563288103123L,
82+
-2
8083
)
8184
val desiredSchema = StructType(Seq(
8285
StructField(fieldName, DateType, nullable = false,
83-
new MetadataBuilder().putString("pattern", "epochmilli").build)
86+
new MetadataBuilder()
87+
.putString(MetadataKeys.Pattern, "epochmilli")
88+
.putString(MetadataKeys.PlusInfinitySymbol, "-1")
89+
.putString(MetadataKeys.PlusInfinityValue, "1563278222094")
90+
.putString(MetadataKeys.MinusInfinitySymbol, "-2")
91+
.putString(MetadataKeys.MinusInfinityValue, "0")
92+
.build)
8493
))
8594
val exp = Seq(
8695
DateRow(Date.valueOf("1970-01-01")),
8796
DateRow(Date.valueOf("1970-01-02")),
97+
DateRow(Date.valueOf("2019-07-16")),
8898
DateRow(Date.valueOf("2000-12-31")),
89-
DateRow(Date.valueOf("2019-07-16"))
99+
DateRow(Date.valueOf("2019-07-16")),
100+
DateRow(Date.valueOf("1970-01-01"))
90101
)
91102

92103
val src = seq.toDF(fieldName)
@@ -106,7 +117,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
106117
)
107118
val desiredSchema = StructType(Seq(
108119
StructField(fieldName, DateType, nullable = false,
109-
new MetadataBuilder().putString("pattern", "epochmicro").build)
120+
new MetadataBuilder().putString(MetadataKeys.Pattern, "epochmicro").build)
110121
))
111122
val exp = Seq(
112123
DateRow(Date.valueOf("1970-01-01")),
@@ -132,7 +143,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
132143
)
133144
val desiredSchema = StructType(Seq(
134145
StructField(fieldName, DateType, nullable = false,
135-
new MetadataBuilder().putString("pattern", "epochnano").build)
146+
new MetadataBuilder().putString(MetadataKeys.Pattern, "epochnano").build)
136147
))
137148
val exp = Seq(
138149
DateRow(Date.valueOf("1970-01-01")),
@@ -150,25 +161,31 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
150161
}
151162

152163
test("simple date pattern") {
153-
val seq = Seq(
164+
val seq: Seq[String] = Seq(
154165
"1970/01/01",
155166
"1970/02/01",
156167
"2000/31/12",
157168
"2019/16/07",
158169
"1970-02-02",
159-
"crash"
170+
"crash",
171+
"Alfa"
160172
)
161173
val desiredSchema = StructType(Seq(
162174
StructField(fieldName, DateType, nullable = false,
163-
new MetadataBuilder().putString("pattern", "yyyy/dd/MM").build)
175+
new MetadataBuilder()
176+
.putString(MetadataKeys.Pattern, "yyyy/dd/MM")
177+
.putString(MetadataKeys.PlusInfinitySymbol, "Alfa")
178+
.putString(MetadataKeys.PlusInfinityValue, "1970/02/01")
179+
.build)
164180
))
165-
val exp = Seq(
181+
val exp: Seq[DateRow] = Seq(
166182
DateRow(Date.valueOf("1970-01-01")),
167183
DateRow(Date.valueOf("1970-01-02")),
168184
DateRow(Date.valueOf("2000-12-31")),
169185
DateRow(Date.valueOf("2019-07-16")),
170186
DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "1970-02-02", "string", "date", Some("yyyy/dd/MM")))),
171-
DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "crash", "string", "date", Some("yyyy/dd/MM"))))
187+
DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "crash", "string", "date", Some("yyyy/dd/MM")))),
188+
DateRow(Date.valueOf("1970-01-02"))
172189
)
173190

174191
val src = seq.toDF(fieldName)
@@ -190,7 +207,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
190207
)
191208
val desiredSchema = StructType(Seq(
192209
StructField(fieldName, DateType, nullable = false,
193-
new MetadataBuilder().putString("pattern", "HH-mm-ss dd.MM.yyyy ZZZ").build)
210+
new MetadataBuilder().putString(MetadataKeys.Pattern, "HH-mm-ss dd.MM.yyyy ZZZ").build)
194211
))
195212
val exp = Seq(
196213
DateRow(Date.valueOf("1970-01-01")),
@@ -221,7 +238,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
221238
)
222239
val desiredSchema = StructType(Seq(
223240
StructField(fieldName, DateType, nullable = false,
224-
new MetadataBuilder().putString("pattern", "HH:mm:ss(SSSnnnnnn) dd+MM+yyyy XXX").build)
241+
new MetadataBuilder().putString(MetadataKeys.Pattern, "HH:mm:ss(SSSnnnnnn) dd+MM+yyyy XXX").build)
225242
))
226243
val exp = Seq(
227244
DateRow(Date.valueOf("1970-01-01")),
@@ -252,7 +269,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
252269
val desiredSchema = StructType(Seq(
253270
StructField(fieldName, DateType, nullable = false,
254271
new MetadataBuilder()
255-
.putString("pattern", "yyyy/dd/MM")
272+
.putString(MetadataKeys.Pattern, "yyyy/dd/MM")
256273
.putString("timezone", "EST")
257274
.build)
258275
))
@@ -286,7 +303,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
286303
val desiredSchema = StructType(Seq(
287304
StructField(fieldName, DateType, nullable = false,
288305
new MetadataBuilder()
289-
.putString("pattern", "yyyy/dd/MM")
306+
.putString(MetadataKeys.Pattern, "yyyy/dd/MM")
290307
.putString("timezone", "Africa/Johannesburg")
291308
.build)
292309
))
@@ -319,7 +336,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
319336
)
320337
val desiredSchema = StructType(Seq(
321338
StructField(fieldName, DateType, nullable = false,
322-
new MetadataBuilder().putString("pattern", "MMMM d 'of' yyyy").build)
339+
new MetadataBuilder().putString(MetadataKeys.Pattern, "MMMM d 'of' yyyy").build)
323340
))
324341
val exp = Seq(
325342
DateRow(Date.valueOf("1970-01-01")),
@@ -351,7 +368,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
351368
)
352369
val desiredSchema = StructType(Seq(
353370
StructField(fieldName, DateType, nullable = false,
354-
new MetadataBuilder().putString("pattern", "yyyy/MM/dd 'insignificant' iiiiii").build)
371+
new MetadataBuilder().putString(MetadataKeys.Pattern, "yyyy/MM/dd 'insignificant' iiiiii").build)
355372
))
356373
val exp = Seq(
357374
DateRow(Date.valueOf("1970-01-01")),

src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@ package za.co.absa.standardization.interpreter
1818

1919
import org.apache.spark.sql.types._
2020
import org.scalatest.funsuite.AnyFunSuite
21-
import za.co.absa.standardization.ErrorMessage
21+
import za.co.absa.standardization.{ErrorMessage, LoggerTestBase, Standardization, StandardizationErrorMessage, interpreter}
2222
import za.co.absa.spark.commons.test.SparkTestBase
2323
import za.co.absa.standardization.RecordIdGeneration.IdType.NoId
2424
import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig, ErrorCodesConfig}
2525
import za.co.absa.standardization.schema.MetadataKeys
2626
import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults}
2727
import za.co.absa.standardization.udf.UDFLibrary
28-
import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage}
2928
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
3029

3130
class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase {
@@ -57,9 +56,17 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT
5756
private val desiredSchemaWithInfinity = StructType(Seq(
5857
StructField("description", StringType, nullable = false),
5958
StructField("floatField", FloatType, nullable = false,
60-
new MetadataBuilder().putString("allow_infinity", value = "true").build),
59+
new MetadataBuilder()
60+
.putString("allow_infinity", value = "true")
61+
.putString(MetadataKeys.MinusInfinitySymbol, "FRRR")
62+
.putString(MetadataKeys.MinusInfinityValue, "0")
63+
.build),
6164
StructField("doubleField", DoubleType, nullable = true,
62-
new MetadataBuilder().putString("allow_infinity", value = "true").build)
65+
new MetadataBuilder()
66+
.putString("allow_infinity", value = "true")
67+
.putString(MetadataKeys.PlusInfinitySymbol, "MAXVALUE")
68+
.putString(MetadataKeys.PlusInfinityValue, "")
69+
.build)
6370
))
6471

6572
test("From String") {
@@ -168,14 +175,15 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT
168175
("01-Euler", "2.71", "2.71"),
169176
("02-Null", null, null),
170177
("03-Long", Long.MaxValue.toString, Long.MinValue.toString),
171-
("04-infinity", "-∞", ""),
178+
("04-infinity", "-∞", "MAXVALUE"),
172179
("05-Really big", "123456789123456791245678912324789123456789123456789.12",
173180
"-1234567891234567912456789123247891234567891234567891234567891234567891234567891234567891234567891234567891234"
174181
+ "567891234567891234567891234567891234567891234567891234567891234567891234567891234567891234567891234678912345"
175182
+ "678912345678912345678912345679124567891232478912345678912345678912345678912345678912345679124567891232478912"
176183
+ "3456789123456789123456789123456789123456789123456789123456789.1"),
177184
("06-Text", "foo", "bar"),
178-
("07-Exponential notation", "-1.23E4", "+9.8765E-3")
185+
("07-Exponential notation", "-1.23E4", "+9.8765E-3"),
186+
("08-Infinity", "FRRR", "MAXVALUE")
179187
)
180188
val src = seq.toDF("description","floatField", "doubleField")
181189
logDataFrameContent(src)
@@ -193,7 +201,8 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT
193201
FractionalRow("06-Text", Option(0), None, Seq(
194202
StandardizationErrorMessage.stdCastErr("floatField", "foo", "string", "float", None),
195203
StandardizationErrorMessage.stdCastErr("doubleField", "bar", "string", "double", None))),
196-
FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765))
204+
FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)),
205+
FractionalRow("08-Infinity", Option(0f), Option(Double.PositiveInfinity))
197206
)
198207

199208
assertResult(exp)(std.as[FractionalRow].collect().sortBy(_.description).toList)

0 commit comments

Comments
 (0)