Skip to content

Commit 3459732

Browse files
authored
#97 Add scala 2.13 support (#115)
* #97 Add scala 2.13 support * #97 Bump scala 2.12 patch verison to 18 * #97 Implement using shims
1 parent 813303b commit 3459732

File tree

13 files changed

+113
-56
lines changed

13 files changed

+113
-56
lines changed

build.sbt

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@
1616
ThisBuild / organization := "za.co.absa"
1717

1818
lazy val scala211 = "2.11.12"
19-
lazy val scala212 = "2.12.12"
20-
lazy val spark2 = "2.4.7"
21-
lazy val spark32 = "3.2.1"
22-
lazy val spark33 = "3.3.1"
19+
lazy val scala212 = "2.12.18"
20+
lazy val scala213 = "2.13.11"
21+
lazy val spark2 = "2.4.8"
22+
lazy val spark32 = "3.2.4"
23+
lazy val spark33 = "3.3.2"
2324

2425
import Dependencies._
2526
import SparkVersionAxis._
2627

2728
ThisBuild / scalaVersion := scala211
28-
ThisBuild / crossScalaVersions := Seq(scala211, scala212)
29+
ThisBuild / crossScalaVersions := Seq(scala211, scala212, scala213)
2930

3031
ThisBuild / versionScheme := Some("early-semver")
3132

@@ -55,15 +56,23 @@ lazy val parent = (project in file("."))
5556
lazy val `sparkCommons` = (projectMatrix in file("spark-commons"))
5657
.settings(commonSettings: _*)
5758
.sparkRow(SparkVersionAxis(spark2), scalaVersions = Seq(scala211, scala212))
58-
.sparkRow(SparkVersionAxis(spark32), scalaVersions = Seq(scala212))
59-
.sparkRow(SparkVersionAxis(spark33), scalaVersions = Seq(scala212))
59+
.sparkRow(SparkVersionAxis(spark32), scalaVersions = Seq(scala212, scala213))
60+
.sparkRow(SparkVersionAxis(spark33), scalaVersions = Seq(scala212, scala213))
6061
.dependsOn(sparkCommonsTest % "test")
6162

6263
lazy val sparkCommonsTest = (projectMatrix in file("spark-commons-test"))
6364
.settings(
6465
commonSettings ++ Seq(
65-
name := "spark-commons-test",
66-
libraryDependencies ++= sparkDependencies(spark2)
66+
name := "spark-commons-test",
67+
libraryDependencies ++= sparkDependencies(if (scalaVersion.value == scala211) spark2 else spark32),
68+
Compile / unmanagedSourceDirectories += {
69+
val sourceDir = (Compile / sourceDirectory).value
70+
if (scalaVersion.value.startsWith("2.13")) {
71+
sourceDir / "scala_2.13+"
72+
} else {
73+
sourceDir / "scala_2.13-"
74+
}
75+
}
6776
): _*
6877
)
69-
.jvmPlatform(scalaVersions = Seq(scala211, scala212))
78+
.jvmPlatform(scalaVersions = Seq(scala211, scala212, scala213))

project/Dependencies.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ object Dependencies {
3434
def sparkCommonsDependencies(sparkVersion: String): Seq[ModuleID] = {
3535
Seq(
3636
"za.co.absa.commons" %% "commons" % "1.0.0",
37-
"za.co.absa" %% "spark-hofs" % "0.4.0",
38-
"za.co.absa" %% "spark-hats" % "0.2.2"
37+
"za.co.absa" %% "spark-hofs" % "0.5.0",
38+
"za.co.absa" %% "spark-hats" % "0.3.0"
3939
) ++
4040
sparkDependencies(sparkVersion)
4141
}

spark-commons-test/src/main/scala/za/co/absa/spark/commons/test/YarnSparkConfiguration.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,34 @@ import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.{FileSystem, Path}
2222
import org.apache.spark.SparkConf
2323
import org.apache.spark.sql.SparkSession
24-
import za.co.absa.spark.commons.test.YarnSparkConfiguration._
2524

26-
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
2725

2826
class YarnSparkConfiguration(confDir: String, distJarsDir: String) extends SparkTestConfig {
29-
3027
override def master: String = "yarn"
3128

3229
override def appName: String = super.appName + " - Yarn"
3330

3431
override protected def builder: SparkSession.Builder = {
3532
super.builder
36-
.config(new SparkConf().setAll(getHadoopConfigurationForSpark(confDir)))
33+
.config(new SparkConf().setAll(YarnSparkConfiguration.getHadoopConfigurationForSpark(confDir)))
3734
.config("spark.yarn.jars", dependencies)
3835
.config("spark.deploy.mode", "client")
3936
}
4037

4138
protected def dependencies: String = {
4239
//get a list of all dist jars
4340
val distJars = FileSystem
44-
.get(getHadoopConfiguration(confDir))
41+
.get(YarnSparkConfiguration.getHadoopConfiguration(confDir))
4542
.listStatus(new Path(distJarsDir))
4643
.map(_.getPath)
47-
val localJars = getDepsFromClassPath("absa")
48-
val currentJars = getCurrentProjectJars
44+
val localJars = YarnSparkConfiguration.getDepsFromClassPath("absa")
45+
val currentJars = YarnSparkConfiguration.getCurrentProjectJars
4946
(distJars ++ localJars ++currentJars).mkString(",")
5047
}
5148

5249
}
5350

54-
object YarnSparkConfiguration {
51+
object YarnSparkConfiguration extends JavaConvertersWrapper {
5552

5653
/**
5754
* Gets a Hadoop configuration object from the specified hadoopConfDir parameter
@@ -73,7 +70,7 @@ object YarnSparkConfiguration {
7370
* @param hadoopConf Hadoop Configuration object to be converted into Spark configs
7471
*/
7572
def hadoopConfToSparkMap(hadoopConf: Configuration): Map[String, String] = {
76-
hadoopConf.asScala.map(entry => (s"spark.hadoop.${entry.getKey}", entry.getValue)).toMap
73+
asScala(hadoopConf).map(entry => (s"spark.hadoop.${entry.getKey}", entry.getValue)).toMap
7774
}
7875

7976
/**
@@ -88,7 +85,7 @@ object YarnSparkConfiguration {
8885
*/
8986
def getDepsFromClassPath(inclPattern: String): Seq[String] = {
9087
val cl = this.getClass.getClassLoader
91-
cl.asInstanceOf[java.net.URLClassLoader].getURLs.filter(c => c.toString.contains(inclPattern)).map(_.toString())
88+
cl.asInstanceOf[java.net.URLClassLoader].getURLs.toSeq.filter(c => c.toString.contains(inclPattern)).map(_.toString())
9289
}
9390

9491
/**
@@ -98,6 +95,7 @@ object YarnSparkConfiguration {
9895
val targetDir = new File(s"${System.getProperty("user.dir")}/target")
9996
targetDir
10097
.listFiles()
98+
.toSeq
10199
.filter(f => f.getName.split("\\.").last.toLowerCase() == "jar" && f.getName.contains("original"))
102100
.map(_.getAbsolutePath)
103101
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2021 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.spark.commons.test
18+
19+
import scala.jdk.CollectionConverters.IterableHasAsScala
20+
21+
trait JavaConvertersWrapper {
22+
def asScala[T](item: java.lang.Iterable[T]): Iterable[T] = {
23+
item.asScala
24+
}
25+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2021 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.spark.commons.test
18+
19+
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
20+
21+
trait JavaConvertersWrapper {
22+
def asScala[T](item: java.lang.Iterable[T]): Iterable[T] = {
23+
item.asScala
24+
}
25+
}

spark-commons/src/main/scala/za/co/absa/spark/commons/utils/ExplodeTools.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ object ExplodeTools {
229229
def processStruct(schema: StructType, path: Seq[String], parentCol: Option[Column]): Seq[Column] = {
230230
val currentField = path.head
231231
val isLeaf = path.lengthCompare(1) <= 0
232-
val newFields = schema.fields.flatMap(field => {
232+
val newFields = schema.fields.toSeq.flatMap(field => {
233233
if (field.name != currentField) {
234234
Seq(getFullFieldPath(parentCol, field.name).as(field.name))
235235
} else {
@@ -286,7 +286,7 @@ object ExplodeTools {
286286
}
287287
)
288288
val newFields2 = if (isColumnToFound) newFields else newFields :+ col(columnFrom).as(columnTo)
289-
inputDf.select(newFields2: _*)
289+
inputDf.select(newFields2.toSeq: _*)
290290
} else {
291291
putFieldIntoNestedStruct(inputDf, columnFrom, SchemaUtils.splitPath(columnTo), positionColumn)
292292
}
@@ -301,7 +301,7 @@ object ExplodeTools {
301301
val isLeaf = path.lengthCompare(1) <= 0
302302
var isFound = false
303303

304-
val newFields = schema.fields.flatMap(field => {
304+
val newFields = schema.fields.toSeq.flatMap(field => {
305305
if (field.name == columnFrom) {
306306
// This removes the original column name (if any) and the transient column
307307
Nil

spark-commons/src/main/scala/za/co/absa/spark/commons/utils/JsonUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ object JsonUtils {
5858
*/
5959
def getDataFrameFromJson(json: Seq[String])(implicit spark: SparkSession): DataFrame = {
6060
import spark.implicits._
61-
spark.read.json(json.toDS)
61+
spark.read.json(json.toDS())
6262
}
6363

6464
/**
@@ -71,7 +71,7 @@ object JsonUtils {
7171
*/
7272
def getDataFrameFromJson(json: Seq[String], schema: StructType)(implicit spark: SparkSession): DataFrame = {
7373
import spark.implicits._
74-
spark.read.schema(schema).json(json.toDS)
74+
spark.read.schema(schema).json(json.toDS())
7575
}
7676

7777
/**

spark-commons/src/main/scala/za/co/absa/spark/commons/utils/SchemaUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ object SchemaUtils {
5252
def getAllArraySubPaths(path: String, name: String, dt: DataType): Seq[String] = {
5353
val currPath = appendPath(path, name)
5454
dt match {
55-
case s: StructType => s.fields.flatMap(f => getAllArraySubPaths(currPath, f.name, f.dataType))
55+
case s: StructType => s.fields.toSeq.flatMap(f => getAllArraySubPaths(currPath, f.name, f.dataType))
5656
case _@ArrayType(elType, _) => getAllArraySubPaths(path, name, elType) :+ currPath
5757
case _ => Seq()
5858
}

spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandler/DataFrameErrorHandlerImplicitTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class DataFrameErrorHandlerImplicitTest extends AnyFunSuite with SparkTestBase {
3333

3434
private val col1Name = "id"
3535
private val col2Name = "name"
36-
implicit private val df = Seq((1, "John"), (2, "Jane"), (3, "Alice")).toDF(col1Name, col2Name)
36+
implicit private val df: DataFrame = Seq((1, "John"), (2, "Jane"), (3, "Alice")).toDF(col1Name, col2Name)
3737

3838
private type ResultDfRecordType = (Option[Integer], String)
3939

spark-commons/src/test/scala/za/co/absa/spark/commons/errorhandler/types/ColumnOrValueTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ColumnOrValueTest extends AnyFunSuite {
3131
}
3232

3333
test("Creation of column based on its definition") {
34-
val myColumn = current_date
34+
val myColumn = current_date()
3535
val expected = ColumnOrValueForm(myColumn, Set(), None)
3636
val result = ColumnOrValue(myColumn)
3737
expected assertTo result

0 commit comments

Comments
 (0)