Skip to content

Commit 4807526

Browse files
authored
* InfinitySupport changed from mix-in train to class (#71)
* created `InfinitySupportIso` for support of Iso pattern fallback in date and timestamp parsing * Integrated the new class into `TypeParser` instead of using it as trait * Removed some compilation warnings
1 parent 31722ed commit 4807526

File tree

7 files changed

+232
-99
lines changed

7 files changed

+232
-99
lines changed

src/main/scala/za/co/absa/standardization/SchemaValidator.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package za.co.absa.standardization
1818

1919
import org.apache.spark.sql.SparkSession
2020
import org.apache.spark.sql.types._
21-
import za.co.absa.standardization.ErrorMessage
2221
import za.co.absa.standardization.types.{TypeDefaults, TypedStructField}
2322
import za.co.absa.standardization.validation.field.FieldValidationIssue
2423

@@ -116,7 +115,7 @@ object SchemaValidator {
116115
fields += prefixedField
117116
}
118117
}
119-
fields.toSeq
118+
fields
120119
}
121120

122121
def flattenArray(field: StructField, arr: ArrayType, structPath: String): Seq[FlatField] = {
@@ -128,7 +127,7 @@ object SchemaValidator {
128127
val prefixedField = FlatField(structPath, field)
129128
arrayFields += prefixedField
130129
}
131-
arrayFields.toSeq
130+
arrayFields
132131
}
133132

134133
flattenStruct(schema, "")

src/main/scala/za/co/absa/standardization/stages/InfinitySupport.scala

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,51 @@ import org.apache.spark.sql.Column
2020
import org.apache.spark.sql.functions.{lit, when}
2121
import org.apache.spark.sql.types.DataType
2222

23-
trait InfinitySupport {
24-
protected def infMinusSymbol: Option[String]
25-
protected def infMinusValue: Option[String]
26-
protected def infPlusSymbol: Option[String]
27-
protected def infPlusValue: Option[String]
28-
protected def canParseInfValue(value: String): Boolean
29-
protected val origType: DataType
30-
31-
def replaceInfinitySymbols(column: Column): Column = {
32-
val columnWithNegativeInf: Column = infMinusSymbol.flatMap { minusSymbol =>
33-
infMinusValue.map { minusValue =>
34-
when(column === lit(minusSymbol), lit(minusValue)).otherwise(column)
23+
class InfinitySupport(
24+
val infMinusSymbol: Option[String],
25+
val infMinusValue: Option[String],
26+
val infPlusSymbol: Option[String],
27+
val infPlusValue: Option[String],
28+
val origType: DataType
29+
) {
30+
private val hasInfinityDefined: Boolean = (infMinusSymbol.isDefined && infMinusValue.isDefined) ||
31+
(infPlusSymbol.isDefined && infPlusValue.isDefined)
32+
33+
protected def replaceSymbol(column: Column,
34+
possibleSymbol: Option[String],
35+
possibleValue: Option[String],
36+
valueToColumn: String => Column
37+
): Column = {
38+
possibleSymbol.flatMap { symbol =>
39+
possibleValue.map { value =>
40+
when(column === lit(symbol).cast(origType), valueToColumn(value))
3541
}
3642
}.getOrElse(column)
43+
}
3744

38-
infPlusSymbol.flatMap { plusSymbol =>
39-
infPlusValue.map { plusValue =>
40-
when(columnWithNegativeInf === lit(plusSymbol), lit(plusValue))
41-
.otherwise(columnWithNegativeInf)
42-
}
43-
}.getOrElse(columnWithNegativeInf)
45+
protected def defaultInfinityValueInjection(value: String): Column = lit(value).cast(origType)
46+
47+
protected def executeReplacement(column: Column, conversion: Column => Column): Column = {
48+
val columnWithNegativeInf = replaceSymbol(column, infMinusSymbol, infMinusValue, defaultInfinityValueInjection)
49+
val columnWithPositiveInf = replaceSymbol(columnWithNegativeInf, infPlusSymbol, infPlusValue, defaultInfinityValueInjection)
50+
conversion(columnWithPositiveInf.otherwise(column))
51+
}
52+
53+
def replaceInfinitySymbols(column: Column, conversion: Column => Column = c => c): Column = {
54+
if (hasInfinityDefined) {
55+
executeReplacement(column, conversion)
56+
} else {
57+
conversion(column)
58+
}
59+
}
60+
}
61+
62+
object InfinitySupport {
63+
def apply(infMinusSymbol: Option[String],
64+
infMinusValue: Option[String],
65+
infPlusSymbol: Option[String],
66+
infPlusValue: Option[String],
67+
origType: DataType): InfinitySupport = {
68+
new InfinitySupport(infMinusSymbol, infMinusValue, infPlusSymbol, infPlusValue, origType)
4469
}
4570
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2025 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.standardization.stages
18+
19+
import org.apache.spark.sql.Column
20+
import org.apache.spark.sql.functions._
21+
import org.apache.spark.sql.types.{DataType, DateType, TimestampType}
22+
23+
import java.text.SimpleDateFormat
24+
import java.time.OffsetDateTime
25+
import java.time.format.DateTimeFormatter.{ISO_DATE, ISO_DATE_TIME}
26+
import scala.util.Try
27+
28+
abstract class InfinitySupportIso(
29+
infMinusSymbol: Option[String],
30+
infMinusValue: Option[String],
31+
infPlusSymbol: Option[String],
32+
infPlusValue: Option[String],
33+
origType: DataType
34+
) extends InfinitySupport(infMinusSymbol, infMinusValue, infPlusSymbol, infPlusValue, origType) {
35+
36+
def isoCast(value: String): Column
37+
38+
def useIsoForInfMinus: Boolean
39+
def useIsoForInfPlus: Boolean
40+
41+
def chooseInjectionFunction(isIso: Boolean, conversion: Column => Column, value: String): Column = {
42+
if (isIso) {
43+
isoCast(value)
44+
} else {
45+
conversion(defaultInfinityValueInjection(value))
46+
}
47+
}
48+
49+
override def executeReplacement(column: Column, conversion: Column => Column): Column = {
50+
if (useIsoForInfMinus || useIsoForInfPlus) {
51+
val columnWithNegativeInf = replaceSymbol(column, infMinusSymbol, infMinusValue, chooseInjectionFunction(useIsoForInfMinus, conversion, _))
52+
val columnWithPositiveInf = replaceSymbol(columnWithNegativeInf, infPlusSymbol, infPlusValue, chooseInjectionFunction(useIsoForInfPlus, conversion, _))
53+
columnWithPositiveInf.otherwise(conversion(column))
54+
} else {
55+
super.executeReplacement(column, conversion)
56+
}
57+
}
58+
}
59+
60+
object InfinitySupportIso {
61+
def apply(
62+
infMinusSymbol: Option[String],
63+
infMinusValue: Option[String],
64+
infPlusSymbol: Option[String],
65+
infPlusValue: Option[String],
66+
origType: DataType,
67+
targetType: DataType
68+
): InfinitySupportIso = {
69+
targetType match {
70+
case DateType => new InfinitySupportIsoDate(infMinusSymbol, infMinusValue, infPlusSymbol, infPlusValue, origType)
71+
case TimestampType => new InfinitySupportIsoTimestamp(infMinusSymbol, infMinusValue, infPlusSymbol, infPlusValue, origType)
72+
case _ => throw new IllegalArgumentException(s"InfinitySupportIso does not support target type $targetType")
73+
}
74+
}
75+
76+
def isOfISODateFormat(dateValue: Option[String]): Boolean = {
77+
dateValue.exists(value =>
78+
Try {
79+
ISO_DATE.parse(value)
80+
}.isSuccess)
81+
}
82+
83+
def isOfISOTimestampFormat(timestampValue: Option[String]): Boolean = {
84+
timestampValue.exists(value =>
85+
Try {
86+
OffsetDateTime.parse(value)
87+
}.isSuccess)
88+
}
89+
90+
class InfinitySupportIsoDate(
91+
infMinusSymbol: Option[String],
92+
infMinusValue: Option[String],
93+
infPlusSymbol: Option[String],
94+
infPlusValue: Option[String],
95+
origType: DataType
96+
) extends InfinitySupportIso(infMinusSymbol, infMinusValue, infPlusSymbol, infPlusValue, origType) {
97+
override def isoCast(value: String): Column = to_date(lit(value))
98+
val useIsoForInfMinus: Boolean = isOfISODateFormat(infMinusValue)
99+
val useIsoForInfPlus: Boolean = isOfISODateFormat(infPlusValue)
100+
}
101+
102+
class InfinitySupportIsoTimestamp(
103+
infMinusSymbol: Option[String],
104+
infMinusValue: Option[String],
105+
infPlusSymbol: Option[String],
106+
infPlusValue: Option[String],
107+
origType: DataType
108+
) extends InfinitySupportIso(infMinusSymbol, infMinusValue, infPlusSymbol, infPlusValue, origType) {
109+
override def isoCast(value: String): Column = to_timestamp(lit(value))
110+
val useIsoForInfMinus: Boolean = isOfISOTimestampFormat(infMinusValue)
111+
val useIsoForInfPlus: Boolean = isOfISOTimestampFormat(infPlusValue)
112+
}
113+
114+
}
115+
116+

src/main/scala/za/co/absa/standardization/stages/TypeParser.scala

Lines changed: 26 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import za.co.absa.standardization.schema.{MetadataKeys, MetadataValues, StdSchem
3434
import za.co.absa.standardization.time.DateTimePattern
3535
import za.co.absa.standardization.typeClasses.{DoubleLike, LongLike}
3636
import za.co.absa.standardization.types.TypedStructField._
37-
import za.co.absa.standardization.types.parsers.{DateTimeParser => DateTimeParserImpl}
37+
import za.co.absa.standardization.types.parsers.DateTimeParser
3838
import za.co.absa.standardization.types.{ParseOutput, TypeDefaults, TypedStructField}
3939
import za.co.absa.standardization.udf.{UDFBuilder, UDFNames}
4040

@@ -318,13 +318,13 @@ object TypeParser {
318318
}
319319

320320
private abstract class NumericParser[N: TypeTag](override val field: NumericTypeStructField[N])
321-
(implicit defaults: TypeDefaults) extends ScalarParser[N] with InfinitySupport {
322-
override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol)
323-
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
324-
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
325-
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
326-
override protected def canParseInfValue(value: String): Boolean = false
327-
private val columnWithInfinityReplaced = replaceInfinitySymbols(column).cast(origType)
321+
(implicit defaults: TypeDefaults) extends ScalarParser[N] {
322+
private val infinitySupport = InfinitySupport(
323+
metadata.getOptString(MetadataKeys.MinusInfinitySymbol),
324+
metadata.getOptString(MetadataKeys.MinusInfinityValue),
325+
metadata.getOptString(MetadataKeys.PlusInfinitySymbol),
326+
metadata.getOptString(MetadataKeys.PlusInfinityValue),
327+
origType)
328328

329329
override protected def standardizeAfterCheck(stdConfig: StandardizationConfig)(implicit logger: Logger): ParseOutput = {
330330
if (field.needsUdfParsing) {
@@ -345,9 +345,9 @@ object TypeParser {
345345
val columnWithProperDecimalSymbols: Column = if (replacements.nonEmpty) {
346346
val from = replacements.keys.mkString
347347
val to = replacements.values.mkString
348-
translate(columnWithInfinityReplaced, from, to)
348+
infinitySupport.replaceInfinitySymbols(column, translate(_, from, to))
349349
} else {
350-
columnWithInfinityReplaced
350+
infinitySupport.replaceInfinitySymbols(column)
351351
}
352352

353353
val columnToCast = if (field.allowInfinity && (decimalSymbols.infinityValue != InfinityStr)) {
@@ -504,28 +504,17 @@ object TypeParser {
504504
* Date | O | ->to_utc_timestamp->to_date
505505
* Other | ->String->to_date | ->String->to_timestamp->to_utc_timestamp->to_date
506506
*/
507-
private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] with InfinitySupport {
507+
private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] {
508508
override val field: DateTimeTypeStructField[T]
509509
protected val pattern: DateTimePattern = field.pattern.get.get
510-
override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol)
511-
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
512-
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
513-
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
514-
515-
516-
private val IsoDatePattern = "yyyy-MM-dd"
517-
private val IsoTimestampPattern = "yyyy-MM-dd HH:mm:ss"
518-
519-
private lazy val dateTimeParser: DateTimeParserImpl = field.parser.get
520-
521-
override protected def canParseInfValue(value: String): Boolean = {
522-
Try{
523-
field.dataType match{
524-
case DateType => dateTimeParser.parseDate(value)
525-
case TimestampType => dateTimeParser.parseTimestamp(value)
526-
}
527-
}.isSuccess
528-
}
510+
private val infinitySupport = InfinitySupportIso(
511+
metadata.getOptString(MetadataKeys.MinusInfinitySymbol),
512+
metadata.getOptString(MetadataKeys.MinusInfinityValue),
513+
metadata.getOptString(MetadataKeys.PlusInfinitySymbol),
514+
metadata.getOptString(MetadataKeys.PlusInfinityValue),
515+
origType,
516+
field.dataType
517+
)
529518

530519
protected val replaceCenturyUDF: UserDefinedFunction = udf((inputDate: String, centuryPattern: String) => {
531520
val centuryIndex = centuryPattern.indexOf(DateTimePattern.patternCenturyChar)
@@ -538,45 +527,6 @@ object TypeParser {
538527
pendedInput.substring(0, centuryIndex) + modifiedChar + pendedInput.substring(centuryIndex + 1)
539528
})
540529

541-
private val columnWithInfinityReplaced: Column = {
542-
val replaced = replaceInfinitySymbols(column)
543-
544-
val originalCastFunc: Column => Column = if (pattern.isEpoch) {
545-
col => (col.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
546-
} else {
547-
col => castStringColumn(col)
548-
}
549-
550-
val isoPattern = field.dataType match {
551-
case DateType => IsoDatePattern
552-
case TimestampType => IsoTimestampPattern
553-
}
554-
val isoCastFunc: Column => Column = col => field.dataType match{
555-
case DateType => to_date(col,isoPattern)
556-
case TimestampType => to_timestamp(col, isoPattern)
557-
}
558-
559-
infMinusSymbol.flatMap { minusSymbol =>
560-
infMinusValue.map { minusValue =>
561-
if (canParseInfValue(minusValue)){
562-
originalCastFunc(replaced)
563-
} else {
564-
when(replaced === lit(minusValue), isoCastFunc(lit(minusValue))).otherwise(originalCastFunc(replaced))
565-
}
566-
}
567-
}.getOrElse{
568-
infPlusSymbol.flatMap{ plusSymbol =>
569-
infPlusValue.map { plusValue=>
570-
if (canParseInfValue(plusValue)){
571-
originalCastFunc(replaced)
572-
} else{
573-
when(replaced === lit(plusValue), isoCastFunc(lit(plusValue))).otherwise(originalCastFunc(replaced))
574-
}
575-
}
576-
}.getOrElse(originalCastFunc(replaced))
577-
}
578-
}
579-
580530
override protected def assemblePrimitiveCastLogic: Column = {
581531
if (pattern.isEpoch) {
582532
castEpoch()
@@ -598,20 +548,20 @@ object TypeParser {
598548
// underlyingType match {
599549
origType match {
600550
case _: NullType => nullColumn
601-
case _: DateType => castDateColumn(columnWithInfinityReplaced)
602-
case _: TimestampType => castTimestampColumn(columnWithInfinityReplaced)
603-
case _: StringType => castStringColumn(columnWithInfinityReplaced)
551+
case _: DateType => infinitySupport.replaceInfinitySymbols(column, castDateColumn)
552+
case _: TimestampType => infinitySupport.replaceInfinitySymbols(column, castTimestampColumn)
553+
case _: StringType => infinitySupport.replaceInfinitySymbols(column, castStringColumn)
604554
case ot: DoubleType =>
605555
// this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss
606556
patternNeeded(ot)
607-
castFractionalColumn(columnWithInfinityReplaced, ot)
557+
infinitySupport.replaceInfinitySymbols(column, castFractionalColumn(_, ot))
608558
case ot: FloatType =>
609559
// this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss
610560
patternNeeded(ot)
611-
castFractionalColumn(columnWithInfinityReplaced, ot)
561+
infinitySupport.replaceInfinitySymbols(column, castFractionalColumn(_, ot))
612562
case ot =>
613563
patternNeeded(ot)
614-
castNonStringColumn(columnWithInfinityReplaced, ot)
564+
infinitySupport.replaceInfinitySymbols(column, castNonStringColumn(_, ot))
615565
}
616566
}
617567

@@ -633,7 +583,7 @@ object TypeParser {
633583
}
634584

635585
protected def castEpoch(): Column = {
636-
(columnWithInfinityReplaced.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
586+
infinitySupport.replaceInfinitySymbols(column, c => (c.cast(decimalType) / pattern.epochFactor).cast(TimestampType))
637587
}
638588

639589
protected def castStringColumn(stringColumn: Column): Column

src/test/scala/za/co/absa/standardization/StandardizationCsvSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package za.co.absa.standardization
1919
import org.apache.spark.sql.functions._
2020
import org.apache.spark.sql.types._
2121
import org.scalatest.funsuite.AnyFunSuite
22-
import za.co.absa.standardization.ErrorMessage
2322
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
2423
import za.co.absa.spark.commons.test.SparkTestBase
2524
import za.co.absa.standardization.RecordIdGeneration.IdType.NoId

src/test/scala/za/co/absa/standardization/TestSamples.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package za.co.absa.standardization
1818

19-
import za.co.absa.standardization.ErrorMessage
2019
import za.co.absa.standardization.RecordIdGeneration.IdType.NoId
2120
import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig, ErrorCodesConfig}
2221

0 commit comments

Comments
 (0)