Skip to content

Commit 6807238

Browse files
Zejnilovicbenedeki
andauthored
#47 Add Scala 2.13 support
* #47 Add Scala 2.13 support * Add a scala 2.13 to build matrix * Adding `log4j2.properties` to test folder --------- Co-authored-by: David Benedeki <[email protected]>
1 parent 50b8270 commit 6807238

30 files changed

+158
-66
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
strategy:
3030
fail-fast: false
3131
matrix:
32-
scala: [2.11.12, 2.12.12]
32+
scala: [2.11.12, 2.12.18, 2.13.11]
3333
name: Scala ${{matrix.scala}}
3434
steps:
3535
- name: Checkout code

.github/workflows/jacoco_check.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
strategy:
2828
matrix:
2929
include:
30-
# The project supports Scala 2.11, 2.12
30+
# The project supports Scala 2.11, 2.12, 2.13
3131
# The CI runs all tests suites for all supported Scala versions at build.yml
3232
# The codebase for all Scala versions is the same, so the coverage is calculated only once
3333
# Scala 2.12 is chosen since it is supported by the most wide range of Spark versions and

build.sbt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ ThisBuild / name := "spark-data-standardization"
2121
ThisBuild / organization := "za.co.absa"
2222

2323
lazy val scala211 = "2.11.12"
24-
lazy val scala212 = "2.12.12"
24+
lazy val scala212 = "2.12.18"
25+
lazy val scala213 = "2.13.11"
2526

26-
ThisBuild / crossScalaVersions := Seq(scala211, scala212)
27+
ThisBuild / crossScalaVersions := Seq(scala211, scala212, scala213)
2728
ThisBuild / scalaVersion := scala211
2829

2930
ThisBuild / versionScheme := Some("early-semver")

project/Dependencies.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ object Dependencies {
3333
List(
3434
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
3535
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
36-
"za.co.absa" %% s"spark-commons-spark$sparkVersionUpToMinor" % "0.5.0" % Provided,
37-
"za.co.absa" %% "spark-commons-test" % "0.4.0" % Test,
38-
"com.typesafe" % "config" % "1.4.1",
36+
"za.co.absa" %% s"spark-commons-spark$sparkVersionUpToMinor" % "0.6.1" % Provided,
37+
"za.co.absa" %% "spark-commons-test" % "0.6.1" % Test,
38+
"com.typesafe" % "config" % "1.4.2",
3939
"com.github.mrpowers" %% "spark-fast-tests" % sparkFastTestsVersion(scalaVersion) % Test,
40-
"org.scalatest" %% "scalatest" % "3.2.2" % Test
40+
"org.scalatest" %% "scalatest" % "3.2.15" % Test
4141
)
4242
}
4343
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2021 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.standardization
18+
19+
import org.apache.spark.sql.SparkSession
20+
import org.apache.spark.sql.types.StructType
21+
import za.co.absa.standardization.ErrorMessage.Mapping
22+
23+
/**
24+
* Case class to represent an error message
25+
*
26+
* @param errType - Type or source of the error
27+
* @param errCode - Internal error code
28+
* @param errMsg - Textual description of the error
29+
* @param errCol - The name of the column where the error occurred
30+
* @param rawValues - Sequence of raw values (which are the potential culprits of the error)
31+
* @param mappings - Sequence of Mappings i.e Mapping Table Column -> Equivalent Mapped Dataset column
32+
*/
33+
case class ErrorMessage(
34+
errType: String,
35+
errCode: String,
36+
errMsg: String,
37+
errCol: String,
38+
rawValues: Seq[String],
39+
mappings: Seq[Mapping] = Seq()
40+
)
41+
42+
object ErrorMessage {
43+
case class Mapping(
44+
mappingTableColumn: String,
45+
mappedDatasetColumn: String
46+
)
47+
48+
val errorColumnName = "errCol"
49+
def errorColSchema(implicit spark: SparkSession): StructType = {
50+
import spark.implicits._
51+
spark.emptyDataset[ErrorMessage].schema
52+
}
53+
}

src/main/scala/za/co/absa/standardization/SchemaValidator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package za.co.absa.standardization
1818

1919
import org.apache.spark.sql.SparkSession
2020
import org.apache.spark.sql.types._
21-
import za.co.absa.spark.commons.errorhandling.ErrorMessage
21+
import za.co.absa.standardization.ErrorMessage
2222
import za.co.absa.standardization.types.{TypeDefaults, TypedStructField}
2323
import za.co.absa.standardization.validation.field.FieldValidationIssue
2424

@@ -116,7 +116,7 @@ object SchemaValidator {
116116
fields += prefixedField
117117
}
118118
}
119-
fields
119+
fields.toSeq
120120
}
121121

122122
def flattenArray(field: StructField, arr: ArrayType, structPath: String): Seq[FlatField] = {
@@ -128,7 +128,7 @@ object SchemaValidator {
128128
val prefixedField = FlatField(structPath, field)
129129
arrayFields += prefixedField
130130
}
131-
arrayFields
131+
arrayFields.toSeq
132132
}
133133

134134
flattenStruct(schema, "")

src/main/scala/za/co/absa/standardization/Standardization.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616

1717
package za.co.absa.standardization
1818

19-
import org.apache.hadoop.conf.Configuration
2019
import org.apache.spark.sql.functions._
2120
import org.apache.spark.sql.types._
2221
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
2322
import org.slf4j.{Logger, LoggerFactory}
24-
import za.co.absa.spark.commons.errorhandling.ErrorMessage
2523
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
2624
import za.co.absa.standardization.config.{DefaultStandardizationConfig, StandardizationConfig}
2725
import za.co.absa.standardization.stages.{SchemaChecker, TypeParser}
@@ -30,7 +28,6 @@ import za.co.absa.standardization.udf.{UDFLibrary, UDFNames}
3028

3129
object Standardization {
3230
private val logger: Logger = LoggerFactory.getLogger(this.getClass)
33-
final val DefaultColumnNameOfCorruptRecord = "_corrupt_record"
3431

3532
final val ColumnNameOfCorruptRecordConf = "spark.sql.columnNameOfCorruptRecord"
3633

@@ -39,7 +36,7 @@ object Standardization {
3936
standardizationConfig: StandardizationConfig = DefaultStandardizationConfig)
4037
(implicit sparkSession: SparkSession): DataFrame = {
4138
implicit val udfLib: UDFLibrary = new UDFLibrary(standardizationConfig)
42-
implicit val hadoopConf: Configuration = sparkSession.sparkContext.hadoopConfiguration
39+
udfLib.register(sparkSession)
4340
implicit val defaults: TypeDefaults = standardizationConfig.typeDefaults
4441

4542
logger.info(s"Step 1: Schema validation")
@@ -75,7 +72,7 @@ object Standardization {
7572
}
7673

7774
private def standardizeDataset(df: DataFrame, expSchema: StructType, stdConfig: StandardizationConfig)
78-
(implicit spark: SparkSession, udfLib: UDFLibrary, defaults: TypeDefaults): DataFrame = {
75+
(implicit spark: SparkSession, defaults: TypeDefaults): DataFrame = {
7976

8077
val rowErrors: List[Column] = gatherRowErrors(df.schema)
8178
val (stdCols, errorCols, oldErrorColumn) = expSchema.fields.foldLeft(List.empty[Column], rowErrors, None: Option[Column]) {
@@ -97,7 +94,7 @@ object Standardization {
9794
}
9895

9996
private def cleanTheFinalErrorColumn(dataFrame: DataFrame)
100-
(implicit spark: SparkSession, udfLib: UDFLibrary): DataFrame = {
97+
(implicit spark: SparkSession): DataFrame = {
10198
ArrayTransformations.flattenArrays(dataFrame, ErrorMessage.errorColumnName)
10299
.withColumn(ErrorMessage.errorColumnName, callUDF(UDFNames.cleanErrCol, col(ErrorMessage.errorColumnName)))
103100
}

src/main/scala/za/co/absa/standardization/StandardizationErrorMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package za.co.absa.standardization
1818

19-
import za.co.absa.spark.commons.errorhandling.ErrorMessage
19+
import za.co.absa.standardization.ErrorMessage
2020
import za.co.absa.standardization.config.{ErrorCodesConfig}
2121

2222
object StandardizationErrorMessage {

src/main/scala/za/co/absa/standardization/ValidationIssue.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package za.co.absa.standardization
1818

19-
sealed abstract class ValidationIssue
19+
sealed abstract class ValidationIssue {
20+
val msg: String
21+
}
2022

2123
case class ValidationWarning(msg: String) extends ValidationIssue
2224

src/main/scala/za/co/absa/standardization/stages/TypeParser.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.expressions.UserDefinedFunction
2525
import org.apache.spark.sql.functions._
2626
import org.apache.spark.sql.types._
2727
import org.slf4j.{Logger, LoggerFactory}
28-
import za.co.absa.spark.commons.errorhandling.ErrorMessage
28+
import za.co.absa.standardization.ErrorMessage
2929
import za.co.absa.spark.commons.implicits.ColumnImplicits.ColumnEnhancements
3030
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
3131
import za.co.absa.spark.commons.utils.SchemaUtils
@@ -39,7 +39,7 @@ import za.co.absa.standardization.time.DateTimePattern
3939
import za.co.absa.standardization.typeClasses.{DoubleLike, LongLike}
4040
import za.co.absa.standardization.types.TypedStructField._
4141
import za.co.absa.standardization.types.{ParseOutput, TypeDefaults, TypedStructField}
42-
import za.co.absa.standardization.udf.{UDFBuilder, UDFLibrary, UDFNames}
42+
import za.co.absa.standardization.udf.{UDFBuilder, UDFNames}
4343

4444
import scala.reflect.runtime.universe._
4545
import scala.util.{Random, Try}
@@ -136,7 +136,7 @@ object TypeParser {
136136
origSchema: StructType,
137137
stdConfig: StandardizationConfig,
138138
failOnInputNotPerSchema: Boolean = true)
139-
(implicit udfLib: UDFLibrary, defaults: TypeDefaults): ParseOutput = {
139+
(implicit defaults: TypeDefaults): ParseOutput = {
140140
// udfLib implicit is present for error column UDF implementation
141141
val sourceName = SchemaUtils.appendPath(path, field.sourceName)
142142
val origField = origSchema.getField(sourceName)
@@ -261,7 +261,7 @@ object TypeParser {
261261
}
262262
}
263263

264-
private abstract class PrimitiveParser[T](implicit defaults: TypeDefaults) extends TypeParser[T] {
264+
private abstract class PrimitiveParser[T] extends TypeParser[T] {
265265
override protected def standardizeAfterCheck(stdConfig: StandardizationConfig)(implicit logger: Logger): ParseOutput = {
266266
val castedCol: Column = assemblePrimitiveCastLogic
267267
val castHasError: Column = assemblePrimitiveCastErrorLogic(castedCol)

0 commit comments

Comments
 (0)