Skip to content

Commit 31722ed

Browse files
Algorithm logic followed , canParseInfValue used to decide (A) or (B) with methods isocast & origcast , and NumericParser compability retained
1 parent 42e1af1 commit 31722ed

File tree

10 files changed

+189
-445
lines changed

10 files changed

+189
-445
lines changed

src/main/scala/za/co/absa/standardization/adr/001-infinity-support-iso-pattern-defaults/InfinitySupport-ISO-fallback.drawio

Lines changed: 89 additions & 0 deletions
Large diffs are not rendered by default.
419 KB
Loading

src/main/scala/za/co/absa/standardization/schema/MetadataKeys.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ object MetadataKeys {
4242
val StrictParsing = "strict_parsing"
4343
// For nonstandard data inputs like the Mainframe's century pattern
4444
val IsNonStandard = "is_non_standard"
45-
// For allowing separate infinity patterns
46-
val PlusInfinityPattern = "plus_infinity_pattern"
47-
val MinusInfinityPattern = "minus_infinity_pattern"
48-
4945
}
5046

5147
object MetadataValues {

src/main/scala/za/co/absa/standardization/stages/InfinitySupport.scala

Lines changed: 13 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -16,101 +16,30 @@
1616

1717
package za.co.absa.standardization.stages
1818

19-
import org.apache.spark.sql.functions.{lit, when}
2019
import org.apache.spark.sql.Column
21-
import org.apache.spark.sql.types.{DataType, DateType, TimestampType}
22-
import za.co.absa.standardization.types.parsers.DateTimeParser
23-
import za.co.absa.standardization.time.{DateTimePattern, InfinityConfig}
24-
25-
import java.sql.{Date, Timestamp}
26-
import java.text.SimpleDateFormat
27-
import java.util.Locale
28-
import scala.util.Try
29-
30-
20+
import org.apache.spark.sql.functions.{lit, when}
21+
import org.apache.spark.sql.types.DataType
3122

3223
trait InfinitySupport {
3324
protected def infMinusSymbol: Option[String]
34-
3525
protected def infMinusValue: Option[String]
36-
3726
protected def infPlusSymbol: Option[String]
38-
3927
protected def infPlusValue: Option[String]
40-
protected def infMinusPattern: Option[String]
41-
protected def infPlusPattern: Option[String]
28+
protected def canParseInfValue(value: String): Boolean
4229
protected val origType: DataType
43-
protected val targetType: DataType
4430

4531
def replaceInfinitySymbols(column: Column): Column = {
46-
targetType match {
47-
case DateType =>
48-
val defaultDatePattern = "yyyy-MM-dd"
49-
val minusDate = infMinusValue.flatMap { value =>
50-
infMinusSymbol.map { symbol =>
51-
when(
52-
column === lit(symbol).cast(origType),
53-
lit(parseInfinityValue(value, infMinusPattern.getOrElse(defaultDatePattern)).getTime)
54-
.cast(TimestampType)
55-
.cast(DateType)
56-
)
57-
}
58-
}.getOrElse(column)
59-
60-
infPlusValue.flatMap { value =>
61-
infPlusSymbol.map { symbol =>
62-
when(
63-
minusDate === lit(symbol).cast(origType),
64-
lit(parseInfinityValue(value, infPlusPattern.getOrElse(defaultDatePattern)).getTime)
65-
.cast(TimestampType)
66-
.cast(DateType)
67-
).otherwise(minusDate)
68-
}
69-
}.getOrElse(minusDate)
70-
71-
case TimestampType =>
72-
val defaultTimestampPattern = "yyyy-MM-dd HH:mm:ss"
73-
val minusTimestamp = infMinusValue.flatMap { value =>
74-
infMinusSymbol.map { symbol =>
75-
when(
76-
column === lit(symbol).cast(origType),
77-
lit(parseInfinityValue(value, infMinusPattern.getOrElse(defaultTimestampPattern)).getTime)
78-
.cast(TimestampType)
79-
)
80-
}
81-
}.getOrElse(column)
82-
83-
infPlusValue.flatMap { value =>
84-
infPlusSymbol.map { symbol =>
85-
when(
86-
minusTimestamp === lit(symbol).cast(origType),
87-
lit(parseInfinityValue(value, infPlusPattern.getOrElse(defaultTimestampPattern)).getTime)
88-
.cast(TimestampType)
89-
).otherwise(minusTimestamp)
90-
}
91-
}.getOrElse(minusTimestamp)
92-
93-
case _ =>
94-
val columnWithNegativeInf: Column = infMinusSymbol.flatMap { minusSymbol =>
95-
infMinusValue.map { minusValue =>
96-
when(column === lit(minusSymbol).cast(origType), lit(minusValue).cast(origType)).otherwise(column)
97-
}
98-
}.getOrElse(column)
99-
100-
infPlusSymbol.flatMap { plusSymbol =>
101-
infPlusValue.map { plusValue =>
102-
when(columnWithNegativeInf === lit(plusSymbol).cast(origType), lit(plusValue).cast(origType))
103-
.otherwise(columnWithNegativeInf)
104-
}
105-
}.getOrElse(columnWithNegativeInf)
32+
val columnWithNegativeInf: Column = infMinusSymbol.flatMap { minusSymbol =>
33+
infMinusValue.map { minusValue =>
34+
when(column === lit(minusSymbol), lit(minusValue)).otherwise(column)
10635
}
107-
}
36+
}.getOrElse(column)
10837

109-
private def parseInfinityValue(value: String, pattern: String): Date = {
110-
val dateFormat = new SimpleDateFormat(pattern, Locale.US)
111-
dateFormat.setLenient(false)
112-
new Date(dateFormat.parse(value).getTime)
38+
infPlusSymbol.flatMap { plusSymbol =>
39+
infPlusValue.map { plusValue =>
40+
when(columnWithNegativeInf === lit(plusSymbol), lit(plusValue))
41+
.otherwise(columnWithNegativeInf)
42+
}
43+
}.getOrElse(columnWithNegativeInf)
11344
}
11445
}
115-
116-

src/main/scala/za/co/absa/standardization/stages/TypeParser.scala

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import za.co.absa.standardization.schema.{MetadataKeys, MetadataValues, StdSchem
3434
import za.co.absa.standardization.time.DateTimePattern
3535
import za.co.absa.standardization.typeClasses.{DoubleLike, LongLike}
3636
import za.co.absa.standardization.types.TypedStructField._
37-
import za.co.absa.standardization.types.parsers.DateTimeParser
37+
import za.co.absa.standardization.types.parsers.{DateTimeParser => DateTimeParserImpl}
3838
import za.co.absa.standardization.types.{ParseOutput, TypeDefaults, TypedStructField}
3939
import za.co.absa.standardization.udf.{UDFBuilder, UDFNames}
4040

@@ -323,10 +323,8 @@ object TypeParser {
323323
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
324324
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
325325
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
326-
override protected val infMinusPattern : Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityPattern)
327-
override protected val infPlusPattern : Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityPattern)
328-
override protected val targetType: DataType = field.dataType
329-
private val columnWithInfinityReplaced = replaceInfinitySymbols(column)
326+
override protected def canParseInfValue(value: String): Boolean = false
327+
private val columnWithInfinityReplaced = replaceInfinitySymbols(column).cast(origType)
330328

331329
override protected def standardizeAfterCheck(stdConfig: StandardizationConfig)(implicit logger: Logger): ParseOutput = {
332330
if (field.needsUdfParsing) {
@@ -513,10 +511,21 @@ object TypeParser {
513511
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
514512
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
515513
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
516-
override protected val infMinusPattern : Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityPattern)
517-
override protected val infPlusPattern : Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityPattern)
518-
override protected val targetType: DataType = field.dataType
519-
private val columnWithInfinityReplaced: Column = replaceInfinitySymbols(column)
514+
515+
516+
private val IsoDatePattern = "yyyy-MM-dd"
517+
private val IsoTimestampPattern = "yyyy-MM-dd HH:mm:ss"
518+
519+
private lazy val dateTimeParser: DateTimeParserImpl = field.parser.get
520+
521+
override protected def canParseInfValue(value: String): Boolean = {
522+
Try{
523+
field.dataType match{
524+
case DateType => dateTimeParser.parseDate(value)
525+
case TimestampType => dateTimeParser.parseTimestamp(value)
526+
}
527+
}.isSuccess
528+
}
520529

521530
protected val replaceCenturyUDF: UserDefinedFunction = udf((inputDate: String, centuryPattern: String) => {
522531
val centuryIndex = centuryPattern.indexOf(DateTimePattern.patternCenturyChar)
@@ -529,6 +538,45 @@ object TypeParser {
529538
pendedInput.substring(0, centuryIndex) + modifiedChar + pendedInput.substring(centuryIndex + 1)
530539
})
531540

541+
private val columnWithInfinityReplaced: Column = {
542+
val replaced = replaceInfinitySymbols(column)
543+
544+
val originalCastFunc: Column => Column = if (pattern.isEpoch) {
545+
col => (col.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
546+
} else {
547+
col => castStringColumn(col)
548+
}
549+
550+
val isoPattern = field.dataType match {
551+
case DateType => IsoDatePattern
552+
case TimestampType => IsoTimestampPattern
553+
}
554+
val isoCastFunc: Column => Column = col => field.dataType match{
555+
case DateType => to_date(col,isoPattern)
556+
case TimestampType => to_timestamp(col, isoPattern)
557+
}
558+
559+
infMinusSymbol.flatMap { minusSymbol =>
560+
infMinusValue.map { minusValue =>
561+
if (canParseInfValue(minusValue)){
562+
originalCastFunc(replaced)
563+
} else {
564+
when(replaced === lit(minusValue), isoCastFunc(lit(minusValue))).otherwise(originalCastFunc(replaced))
565+
}
566+
}
567+
}.getOrElse{
568+
infPlusSymbol.flatMap{ plusSymbol =>
569+
infPlusValue.map { plusValue=>
570+
if (canParseInfValue(plusValue)){
571+
originalCastFunc(replaced)
572+
} else{
573+
when(replaced === lit(plusValue), isoCastFunc(lit(plusValue))).otherwise(originalCastFunc(replaced))
574+
}
575+
}
576+
}.getOrElse(originalCastFunc(replaced))
577+
}
578+
}
579+
532580
override protected def assemblePrimitiveCastLogic: Column = {
533581
if (pattern.isEpoch) {
534582
castEpoch()

src/main/scala/za/co/absa/standardization/time/DateTimePattern.scala

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ import za.co.absa.standardization.types.{Section, TypePattern}
2626
* @param pattern actual pattern to format the type conversion
2727
* @param isDefault marks if the pattern is actually an assigned value or taken for global defaults
2828
*/
29-
30-
abstract sealed class DateTimePattern(pattern: String, isDefault: Boolean = false,
31-
val infinityConfig:Option[ InfinityConfig] = None)
29+
abstract sealed class DateTimePattern(pattern: String, isDefault: Boolean = false)
3230
extends TypePattern(pattern, isDefault){
3331

3432
val isEpoch: Boolean
@@ -83,9 +81,8 @@ object DateTimePattern {
8381
// scalastyle:on magic.number
8482

8583
private final case class EpochDTPattern(override val pattern: String,
86-
override val isDefault: Boolean = false,
87-
override val infinityConfig: Option[InfinityConfig] = None)
88-
extends DateTimePattern(pattern, isDefault, infinityConfig) {
84+
override val isDefault: Boolean = false)
85+
extends DateTimePattern(pattern, isDefault) {
8986

9087
override val isEpoch: Boolean = true
9188
override val isCentury: Boolean = false
@@ -118,15 +115,12 @@ object DateTimePattern {
118115
case _ => Seq.empty
119116
}
120117
override val patternWithoutSecondFractions: String = EpochKeyword
121-
122-
123118
}
124119

125120
private abstract class StandardDTPatternBase(override val pattern: String,
126121
assignedDefaultTimeZone: Option[String],
127-
override val isDefault: Boolean = false,
128-
override val infinityConfig: Option[InfinityConfig] = None)
129-
extends DateTimePattern(pattern, isDefault,infinityConfig) {
122+
override val isDefault: Boolean = false)
123+
extends DateTimePattern(pattern, isDefault) {
130124

131125
override val isEpoch: Boolean = false
132126
override val epochFactor: Long = 0
@@ -157,9 +151,8 @@ object DateTimePattern {
157151

158152
private final case class StandardDTPattern(override val pattern: String,
159153
assignedDefaultTimeZone: Option[String] = None,
160-
override val isDefault: Boolean = false,
161-
override val infinityConfig: Option[InfinityConfig] = None)
162-
extends StandardDTPatternBase(pattern, assignedDefaultTimeZone, isDefault, infinityConfig) {
154+
override val isDefault: Boolean = false)
155+
extends StandardDTPatternBase(pattern, assignedDefaultTimeZone, isDefault) {
163156

164157
override val isCentury: Boolean = false
165158
override val originalPattern: Option[String] = None
@@ -168,39 +161,35 @@ object DateTimePattern {
168161
private final case class CenturyDTPattern(override val pattern: String,
169162
override val originalPattern: Option[String],
170163
assignedDefaultTimeZone: Option[String] = None,
171-
override val isDefault: Boolean = false,
172-
override val infinityConfig: Option[InfinityConfig] = None)
173-
extends StandardDTPatternBase(pattern, assignedDefaultTimeZone, isDefault, infinityConfig) {
164+
override val isDefault: Boolean = false)
165+
extends StandardDTPatternBase(pattern, assignedDefaultTimeZone, isDefault) {
174166

175167
override val isCentury: Boolean = true
176168
}
177169

178170
private def create(pattern: String,
179171
assignedDefaultTimeZone: Option[String],
180172
isCenturyPattern: Boolean,
181-
isDefault: Boolean,
182-
infinityConfig: Option[InfinityConfig]): DateTimePattern = {
173+
isDefault: Boolean): DateTimePattern = {
183174
if (isEpoch(pattern)) {
184-
EpochDTPattern(pattern, isDefault,infinityConfig)
175+
EpochDTPattern(pattern, isDefault)
185176
} else if (isCenturyPattern && isCentury(pattern)) {
186177
val patternWithoutCentury = pattern.replaceAll(patternCenturyChar, "yy")
187-
CenturyDTPattern(patternWithoutCentury, Some(pattern), assignedDefaultTimeZone, isDefault, infinityConfig)
178+
CenturyDTPattern(patternWithoutCentury, Some(pattern), assignedDefaultTimeZone, isDefault)
188179
} else {
189-
StandardDTPattern(pattern, assignedDefaultTimeZone, isDefault, infinityConfig)
180+
StandardDTPattern(pattern, assignedDefaultTimeZone, isDefault)
190181
}
191182
}
192183

193184
def apply(pattern: String,
194185
assignedDefaultTimeZone: Option[String] = None,
195-
isCenturyPattern: Boolean = false,
196-
infinityConfig: Option[InfinityConfig] = None): DateTimePattern = {
197-
create(pattern, assignedDefaultTimeZone, isCenturyPattern, isDefault = false , infinityConfig)
186+
isCenturyPattern: Boolean = false): DateTimePattern = {
187+
create(pattern, assignedDefaultTimeZone, isCenturyPattern, isDefault = false)
198188
}
199189

200190
def asDefault(pattern: String,
201-
assignedDefaultTimeZone: Option[String] = None,
202-
infinityConfig: Option[InfinityConfig] = None): DateTimePattern = {
203-
create(pattern, assignedDefaultTimeZone, isCenturyPattern = false, isDefault = true, infinityConfig)
191+
assignedDefaultTimeZone: Option[String] = None): DateTimePattern = {
192+
create(pattern, assignedDefaultTimeZone, isCenturyPattern = false, isDefault = true)
204193
}
205194

206195
def isEpoch(pattern: String): Boolean = {

src/main/scala/za/co/absa/standardization/time/InfinityConfig.scala

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)