Skip to content

Commit 5a25f7f

Browse files
authored
SPARKC-686 scala 2.13 support (#1361)
while keeping compatibility with scala 2.12 - Applied automatated scala collection migration via [scalafix] (https://github.com/scala/scala-collection-compat) - Conditionally add dependencies on `org.scala-lang.modules.scala-collection-compat` and `org.scala-lang.modules.scala-parallel-collections` for scala 2.13 - Scala version specific implementations of `SparkILoop`, `ParIterable`, `GenericJavaRowReaderFactory` and `CanBuildFrom`.
1 parent 519f998 commit 5a25f7f

File tree

95 files changed

+550
-344
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+550
-344
lines changed

.github/workflows/main.yml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ jobs:
1414
runs-on: ubuntu-latest
1515
strategy:
1616
matrix:
17-
scala: [2.12.10]
18-
db-version: [6.8.13, 5.1.24, 3.11.10, 4.0-rc2]
17+
scala: [2.12.11, 2.13.11]
18+
db-version: [3.11.10, 4.0-rc2, 6.8.13]
1919

2020
steps:
2121
- uses: actions/checkout@v2
@@ -36,3 +36,10 @@ jobs:
3636
CCM_CASSANDRA_VERSION: ${{ matrix.db-version }}
3737
PUBLISH_VERSION: test
3838
run: sbt/sbt ++${{ matrix.scala }} test it:test
39+
40+
- name: Publish Test Report
41+
uses: mikepenz/action-junit-report@v3
42+
if: always()
43+
with:
44+
report_paths: '**/target/test-reports/*.xml'
45+
annotate_only: true

build.sbt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import sbt.{Compile, moduleFilter, _}
44
import sbtassembly.AssemblyPlugin.autoImport.assembly
55

66
lazy val scala212 = "2.12.11"
7-
lazy val supportedScalaVersions = List(scala212)
7+
lazy val scala213 = "2.13.11"
8+
lazy val supportedScalaVersions = List(scala212, scala213)
89

910
// factor out common settings
1011
ThisBuild / scalaVersion := scala212
@@ -60,16 +61,18 @@ lazy val commonSettings = Seq(
6061
fork := true,
6162
parallelExecution := true,
6263
testForkedParallel := false,
63-
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v")
64+
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),
6465
)
6566

67+
6668
val annotationProcessor = Seq(
6769
"-processor", "com.datastax.oss.driver.internal.mapper.processor.MapperProcessor"
6870
)
6971

7072
def scalacVersionDependantOptions(scalaBinary: String): Seq[String] = scalaBinary match {
7173
case "2.11" => Seq()
7274
case "2.12" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
75+
case "2.13" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
7376
}
7477

7578
lazy val root = (project in file("."))
@@ -81,6 +84,7 @@ lazy val root = (project in file("."))
8184
publish / skip := true
8285
)
8386

87+
8488
lazy val connector = (project in file("connector"))
8589
.configs(IntegrationTest)
8690
.settings(Defaults.itSettings: _*) //This and above enables the "it" suite
@@ -105,6 +109,7 @@ lazy val connector = (project in file("connector"))
105109
Global / concurrentRestrictions := Seq(Tags.limitAll(Testing.parallelTasks)),
106110

107111
libraryDependencies ++= Dependencies.Spark.dependencies
112+
++ Dependencies.Compatibility.dependencies(scalaVersion.value)
108113
++ Dependencies.TestConnector.dependencies
109114
++ Dependencies.Jetty.dependencies,
110115

@@ -121,7 +126,8 @@ lazy val testSupport = (project in file("test-support"))
121126
.settings(
122127
crossScalaVersions := supportedScalaVersions,
123128
name := "spark-cassandra-connector-test-support",
124-
libraryDependencies ++= Dependencies.TestSupport.dependencies
129+
libraryDependencies ++= Dependencies.Compatibility.dependencies(scalaVersion.value)
130+
++ Dependencies.TestSupport.dependencies
125131
)
126132

127133
lazy val driver = (project in file("driver"))
@@ -131,7 +137,8 @@ lazy val driver = (project in file("driver"))
131137
crossScalaVersions := supportedScalaVersions,
132138
name := "spark-cassandra-connector-driver",
133139
assembly /test := {},
134-
libraryDependencies ++= Dependencies.Driver.dependencies
140+
libraryDependencies ++= Dependencies.Compatibility.dependencies(scalaVersion.value)
141+
++ Dependencies.Driver.dependencies
135142
++ Dependencies.TestDriver.dependencies
136143
:+ ("org.scala-lang" % "scala-reflect" % scalaVersion.value)
137144
)

connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.datastax.spark.connector
33
import com.datastax.spark.connector.ccm.CcmBridge
44
import com.datastax.spark.connector.cluster.DefaultCluster
55

6-
import scala.collection.JavaConversions._
6+
import scala.jdk.CollectionConverters._
77
import scala.concurrent.Future
88
import org.apache.spark.rdd.RDD
99
import com.datastax.spark.connector.cql.CassandraConnector
@@ -117,9 +117,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
117117

118118
val rows = results.all()
119119
assert(rows.size() == 3)
120-
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
121-
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
122-
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
120+
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
121+
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
122+
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
123123
}
124124

125125

@@ -140,9 +140,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
140140

141141
val rows = results.all()
142142
assert(rows.size() == 3)
143-
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
144-
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
145-
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
143+
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
144+
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
145+
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
146146
}
147147

148148
it should "allow to save beans with transient fields to Cassandra" in {
@@ -162,9 +162,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
162162

163163
val rows = results.all()
164164
assert(rows.size() == 3)
165-
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
166-
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
167-
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
165+
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
166+
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
167+
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
168168
}
169169

170170
it should "allow to save beans with inherited fields to Cassandra" in {
@@ -184,7 +184,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
184184
val rows = results.all()
185185

186186
rows should have size 3
187-
rows.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
187+
rows.asScala.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
188188
("one", 1, "a"),
189189
("two", 2, "b"),
190190
("three", 3, "c")
@@ -210,9 +210,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
210210

211211
val rows = results.all()
212212
assert(rows.size() == 3)
213-
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
214-
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
215-
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
213+
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
214+
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
215+
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
216216
}
217217

218218
it should "allow to read rows as Tuple1" in {
@@ -222,7 +222,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
222222
)).select(
223223
"c1"
224224
)
225-
.collect().head
225+
.collect().asScala.head
226226
tuple shouldBe Tuple1(
227227
1: Integer
228228
)
@@ -237,7 +237,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
237237
)).select(
238238
"c1", "c2"
239239
)
240-
.collect().head
240+
.collect().asScala.head
241241
tuple shouldBe Tuple2(
242242
1: Integer,
243243
"2"
@@ -254,7 +254,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
254254
)).select(
255255
"c1", "c2", "c3"
256256
)
257-
.collect().head
257+
.collect().asScala.head
258258
tuple shouldBe Tuple3(
259259
1: Integer,
260260
"2",
@@ -273,7 +273,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
273273
)).select(
274274
"c1", "c2", "c3", "c4"
275275
)
276-
.collect().head
276+
.collect().asScala.head
277277
tuple shouldBe Tuple4(
278278
1: Integer,
279279
"2",
@@ -294,7 +294,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
294294
)).select(
295295
"c1", "c2", "c3", "c4", "c5"
296296
)
297-
.collect().head
297+
.collect().asScala.head
298298
tuple shouldBe Tuple5(
299299
1: Integer,
300300
"2",
@@ -317,7 +317,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
317317
)).select(
318318
"c1", "c2", "c3", "c4", "c5", "c6"
319319
)
320-
.collect().head
320+
.collect().asScala.head
321321
tuple shouldBe Tuple6(
322322
1: Integer,
323323
"2",
@@ -342,7 +342,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
342342
)).select(
343343
"c1", "c2", "c3", "c4", "c5", "c6", "c7"
344344
)
345-
.collect().head
345+
.collect().asScala.head
346346
tuple shouldBe Tuple7(
347347
1: Integer,
348348
"2",
@@ -369,7 +369,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
369369
)).select(
370370
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"
371371
)
372-
.collect().head
372+
.collect().asScala.head
373373
tuple shouldBe Tuple8(
374374
1: Integer,
375375
"2",
@@ -398,7 +398,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
398398
)).select(
399399
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"
400400
)
401-
.collect().head
401+
.collect().asScala.head
402402
tuple shouldBe Tuple9(
403403
1: Integer,
404404
"2",
@@ -429,7 +429,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
429429
)).select(
430430
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"
431431
)
432-
.collect().head
432+
.collect().asScala.head
433433
tuple shouldBe Tuple10(
434434
1: Integer,
435435
"2",
@@ -462,7 +462,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
462462
)).select(
463463
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11"
464464
)
465-
.collect().head
465+
.collect().asScala.head
466466
tuple shouldBe Tuple11(
467467
1: Integer,
468468
"2",
@@ -497,7 +497,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
497497
)).select(
498498
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12"
499499
)
500-
.collect().head
500+
.collect().asScala.head
501501
tuple shouldBe Tuple12(
502502
1: Integer,
503503
"2",
@@ -534,7 +534,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
534534
)).select(
535535
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13"
536536
)
537-
.collect().head
537+
.collect().asScala.head
538538
tuple shouldBe Tuple13(
539539
1: Integer,
540540
"2",
@@ -573,7 +573,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
573573
)).select(
574574
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14"
575575
)
576-
.collect().head
576+
.collect().asScala.head
577577
tuple shouldBe Tuple14(
578578
1: Integer,
579579
"2",
@@ -614,7 +614,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
614614
)).select(
615615
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15"
616616
)
617-
.collect().head
617+
.collect().asScala.head
618618
tuple shouldBe Tuple15(
619619
1: Integer,
620620
"2",
@@ -657,7 +657,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
657657
)).select(
658658
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16"
659659
)
660-
.collect().head
660+
.collect().asScala.head
661661
tuple shouldBe Tuple16(
662662
1: Integer,
663663
"2",
@@ -702,7 +702,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
702702
)).select(
703703
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17"
704704
)
705-
.collect().head
705+
.collect().asScala.head
706706
tuple shouldBe Tuple17(
707707
1: Integer,
708708
"2",
@@ -749,7 +749,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
749749
)).select(
750750
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18"
751751
)
752-
.collect().head
752+
.collect().asScala.head
753753
tuple shouldBe Tuple18(
754754
1: Integer,
755755
"2",
@@ -798,7 +798,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
798798
)).select(
799799
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19"
800800
)
801-
.collect().head
801+
.collect().asScala.head
802802
tuple shouldBe Tuple19(
803803
1: Integer,
804804
"2",
@@ -849,7 +849,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
849849
)).select(
850850
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20"
851851
)
852-
.collect().head
852+
.collect().asScala.head
853853
tuple shouldBe Tuple20(
854854
1: Integer,
855855
"2",
@@ -902,7 +902,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
902902
)).select(
903903
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21"
904904
)
905-
.collect().head
905+
.collect().asScala.head
906906
tuple shouldBe Tuple21(
907907
1: Integer,
908908
"2",
@@ -957,7 +957,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
957957
)).select(
958958
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21", "c22"
959959
)
960-
.collect().head
960+
.collect().asScala.head
961961
tuple shouldBe Tuple22(
962962
1: Integer,
963963
"2",

connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import org.scalatest._
2626
import org.scalatest.concurrent.Eventually._
2727
import org.scalatest.time.{Seconds, Span}
2828

29-
import scala.collection.JavaConverters._
29+
import scala.jdk.CollectionConverters._
30+
import scala.collection.compat._
3031
import scala.concurrent.duration.Duration
3132
import scala.concurrent.{Await, ExecutionContext, Future}
3233
import scala.util.Try
@@ -200,8 +201,8 @@ trait SparkCassandraITSpecBase
200201
Await.result(Future.sequence(units), Duration.Inf)
201202
}
202203

203-
def awaitAll[T](units: TraversableOnce[Future[T]]): TraversableOnce[T] = {
204-
Await.result(Future.sequence(units), Duration.Inf)
204+
def awaitAll[T](units: IterableOnce[Future[T]]): IterableOnce[T] = {
205+
Await.result(Future.sequence(units.iterator), Duration.Inf)
205206
}
206207

207208
def keyspaceCql(name: String = ks) =

connector/src/it/scala/com/datastax/spark/connector/cluster/ClusterHolder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.datastax.spark.connector.ccm.CcmBridge
44
import com.datastax.spark.connector.util.{Logging, SerialShutdownHooks}
55
import org.apache.commons.lang3.ClassUtils
66

7-
import scala.collection.JavaConverters._
7+
import scala.jdk.CollectionConverters._
88
import scala.collection.mutable
99

1010
/** Source of [[Cluster]]s for all tests executed within a single test group (single process/JVM).

0 commit comments

Comments
 (0)