Skip to content

Commit 20e9c97

Browse files
Removed abstract members and modified method replaceInfinitySymbols to handle validation
1 parent c9d99c9 commit 20e9c97

File tree

1 file changed

+38
-25
lines changed

1 file changed

+38
-25
lines changed

src/main/scala/za/co/absa/standardization/stages/InfinitySupport.scala

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616

1717
package za.co.absa.standardization.stages
1818

19-
import org.apache.spark.sql.{Column, SparkSession, Row, functions => F}
20-
import org.apache.spark.sql.types.{DataType, DateType, StringType, TimestampType, StructType, StructField}
19+
import org.apache.spark.sql.functions.{to_timestamp,lit, when,coalesce,to_date}
20+
import org.apache.spark.sql.{Column, Row, SparkSession}
21+
import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType}
2122
import za.co.absa.standardization.types.{TypeDefaults, TypedStructField}
22-
import za.co.absa.standardization.types.parsers.DateTimeParser
23-
import za.co.absa.standardization.time.DateTimePattern
2423
import za.co.absa.standardization.types.TypedStructField.DateTimeTypeStructField
2524
import java.sql.Timestamp
2625
import scala.collection.JavaConverters._
@@ -35,9 +34,7 @@ trait InfinitySupport {
3534
protected def infPlusSymbol: Option[String]
3635
protected def infPlusValue: Option[String]
3736
protected val origType: DataType
38-
protected def defaults: TypeDefaults
3937
protected def field: TypedStructField
40-
protected def spark: SparkSession
4138

4239

4340
private def sanitizeInput(s: String): String = {
@@ -59,16 +56,16 @@ trait InfinitySupport {
5956
}
6057
}
6158

62-
private def validateAndConvertInfinityValue(value: String, dataType: DataType, patternOpt: Option[String]): String = {
59+
private def validateAndConvertInfinityValue(value: String, dataType: DataType, patternOpt: Option[String], spark:SparkSession): String = {
6360
val sanitizedValue = sanitizeInput(value)
6461
val schema = StructType(Seq(StructField("value", StringType, nullable = false)))
6562
val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row(sanitizedValue))), schema)
6663

6764
val parsedWithPattern = patternOpt.flatMap { pattern =>
6865
val parsedCol = dataType match {
69-
case TimestampType => F.to_timestamp(F.col("value"), pattern)
70-
case DateType => F.to_date(F.col("value"), pattern)
71-
case _ => F.col("value").cast(dataType)
66+
case TimestampType =>to_timestamp(df.col("value"), pattern)
67+
case DateType => to_date(df.col("value"), pattern)
68+
case _ => df.col("value").cast(dataType)
7269
}
7370
val result = df.select(parsedCol.alias("parsed")).first().get(0)
7471
if (result != null) Some(sanitizedValue) else None
@@ -83,8 +80,8 @@ trait InfinitySupport {
8380
case _ => ""
8481
}
8582
val parsedWithISO = dataType match {
86-
case TimestampType => df.select(F.to_timestamp(F.col("value"), isoPattern)).alias("parsed").first().getAs[Timestamp](0)
87-
case DateType => df.select(F.to_date(F.col("value"), isoPattern)).alias("parsed").first().getAs[Date](0)
83+
case TimestampType => df.select(to_timestamp(df.col("value"), isoPattern)).alias("parsed").first().getAs[Timestamp](0)
84+
case DateType => df.select(to_date(df.col("value"), isoPattern)).alias("parsed").first().getAs[Date](0)
8885
case _ => null
8986
}
9087
if (parsedWithISO != null) {
@@ -101,30 +98,46 @@ trait InfinitySupport {
10198

10299
protected val validatedInfMinusValue: Option[String] = if (origType == DateType || origType == TimestampType) {
103100
infMinusValue.map { v =>
104-
validateAndConvertInfinityValue(v, origType,getPattern(origType))
101+
//validateAndConvertInfinityValue(v, origType,getPattern(origType))
102+
v
105103
}
106104
} else {
107105
infMinusValue.map(sanitizeInput)
108106
}
109107

110108
protected val validatedInfPlusValue: Option[String] = if (origType == DateType || origType == TimestampType) {
111109
infPlusValue.map { v =>
112-
validateAndConvertInfinityValue(v, origType,getPattern(origType))
110+
//validateAndConvertInfinityValue(v, origType,getPattern(origType))
111+
v
113112
}
114113
} else {
115114
infPlusValue.map(sanitizeInput)
116115
}
117116

118-
def replaceInfinitySymbols(column: Column): Column = {
117+
def replaceInfinitySymbols(column: Column, spark:SparkSession, defaults: TypeDefaults): Column = {
119118
var resultCol = column.cast(StringType)
120-
validatedInfMinusValue.foreach { v =>
119+
120+
val validatedMinus = if (origType == DateType || origType == TimestampType) {
121+
infMinusValue.map( v => validateAndConvertInfinityValue(v, origType, getPattern(origType),spark))
122+
} else {
123+
infMinusValue.map(sanitizeInput)
124+
}
125+
126+
val validatedPlus = if (origType == DateType || origType == TimestampType){
127+
infPlusValue.map(v => validateAndConvertInfinityValue(v, origType, getPattern(origType),spark))
128+
} else{
129+
infPlusValue.map(sanitizeInput)
130+
}
131+
132+
validatedMinus.foreach { v =>
121133
infMinusSymbol.foreach { s =>
122-
resultCol = F.when(resultCol === F.lit(s), F.lit(v)).otherwise(resultCol)
134+
resultCol = when(resultCol === lit(s), lit(v)).otherwise(resultCol)
123135
}
124136
}
125-
validatedInfPlusValue.foreach { v =>
137+
138+
validatedPlus.foreach { v =>
126139
infPlusSymbol.foreach { s =>
127-
resultCol = F.when(resultCol === F.lit(s), F.lit(v)).otherwise(resultCol)
140+
resultCol = when(resultCol === lit(s), lit(v)).otherwise(resultCol)
128141
}
129142
}
130143

@@ -133,17 +146,17 @@ trait InfinitySupport {
133146
val pattern = getPattern(origType).getOrElse(
134147
defaults.defaultTimestampTimeZone.map(_ => "yyyy-MM-dd'T'HH:mm:ss.SSSSSS").getOrElse("yyyy-MM-dd HH:mm:ss")
135148
)
136-
F.coalesce(
137-
F.to_timestamp(resultCol,pattern),
138-
F.to_timestamp(resultCol,"yyyy-MM-dd'T'HH:mm:ss.SSSSSS")
149+
coalesce(
150+
to_timestamp(resultCol,pattern),
151+
to_timestamp(resultCol,"yyyy-MM-dd'T'HH:mm:ss.SSSSSS")
139152
).cast(origType)
140153
case DateType =>
141154
val pattern = getPattern(origType).getOrElse(
142155
defaults.defaultDateTimeZone.map(_ => "yyyy-MM-dd").getOrElse("yyyy-MM-dd")
143156
)
144-
F.coalesce(
145-
F.to_date(resultCol,pattern),
146-
F.to_date(resultCol, "yyyy-MM-dd")
157+
coalesce(
158+
to_date(resultCol,pattern),
159+
to_date(resultCol, "yyyy-MM-dd")
147160
).cast(origType)
148161
case _ =>
149162
resultCol.cast(origType)

0 commit comments

Comments
 (0)