Skip to content

Commit 4049f2e

Browse files
Marquis WongMarquis Wong
authored andcommitted
SPARKC-686 Port to Scala 2.13
Major changes: * Migrate from scala.collection.JavaConversions to scala.jdk.CollectionConverters * Migrate some uses of Seq to immutable.Seq
1 parent 9eb9e81 commit 4049f2e

File tree

51 files changed

+218
-196
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+218
-196
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
runs-on: ubuntu-latest
1515
strategy:
1616
matrix:
17-
scala: [2.12.10]
17+
scala: [2.12.11, 2.13.8]
1818
db-version: [6.8.13, 5.1.24, 3.11.10, 4.0-rc2]
1919

2020
steps:

build.sbt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ import sbt.{Compile, moduleFilter, _}
44
import sbtassembly.AssemblyPlugin.autoImport.assembly
55

66
lazy val scala212 = "2.12.11"
7-
lazy val supportedScalaVersions = List(scala212)
7+
lazy val scala213 = "2.13.8"
8+
lazy val supportedScalaVersions = List(scala212, scala213)
89

910
// factor out common settings
10-
ThisBuild / scalaVersion := scala212
11+
ThisBuild / scalaVersion := scala213
1112
ThisBuild / scalacOptions ++= Seq("-target:jvm-1.8")
1213

1314
// Publishing Info
@@ -70,6 +71,7 @@ val annotationProcessor = Seq(
7071
def scalacVersionDependantOptions(scalaBinary: String): Seq[String] = scalaBinary match {
7172
case "2.11" => Seq()
7273
case "2.12" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
74+
case "2.13" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
7375
}
7476

7577
lazy val root = (project in file("."))

connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.datastax.spark.connector
33
import com.datastax.spark.connector.ccm.CcmBridge
44
import com.datastax.spark.connector.cluster.DefaultCluster
55

6-
import scala.collection.JavaConversions._
6+
import scala.jdk.CollectionConverters._
77
import scala.concurrent.Future
88
import org.apache.spark.rdd.RDD
99
import com.datastax.spark.connector.cql.CassandraConnector
@@ -117,9 +117,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
117117

118118
val rows = results.all()
119119
assert(rows.size() == 3)
120-
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
121-
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
122-
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
120+
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
121+
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
122+
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
123123
}
124124

125125

@@ -140,9 +140,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
140140

141141
val rows = results.all()
142142
assert(rows.size() == 3)
143-
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
144-
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
145-
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
143+
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
144+
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
145+
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
146146
}
147147

148148
it should "allow to save beans with transient fields to Cassandra" in {
@@ -162,9 +162,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
162162

163163
val rows = results.all()
164164
assert(rows.size() == 3)
165-
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
166-
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
167-
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
165+
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
166+
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
167+
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
168168
}
169169

170170
it should "allow to save beans with inherited fields to Cassandra" in {
@@ -184,7 +184,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
184184
val rows = results.all()
185185

186186
rows should have size 3
187-
rows.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
187+
rows.asScala.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
188188
("one", 1, "a"),
189189
("two", 2, "b"),
190190
("three", 3, "c")
@@ -210,9 +210,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
210210

211211
val rows = results.all()
212212
assert(rows.size() == 3)
213-
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
214-
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
215-
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
213+
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
214+
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
215+
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
216216
}
217217

218218
it should "allow to read rows as Tuple1" in {
@@ -222,7 +222,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
222222
)).select(
223223
"c1"
224224
)
225-
.collect().head
225+
.collect().asScala.head
226226
tuple shouldBe Tuple1(
227227
1: Integer
228228
)
@@ -237,7 +237,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
237237
)).select(
238238
"c1", "c2"
239239
)
240-
.collect().head
240+
.collect().asScala.head
241241
tuple shouldBe Tuple2(
242242
1: Integer,
243243
"2"
@@ -254,7 +254,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
254254
)).select(
255255
"c1", "c2", "c3"
256256
)
257-
.collect().head
257+
.collect().asScala.head
258258
tuple shouldBe Tuple3(
259259
1: Integer,
260260
"2",
@@ -273,7 +273,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
273273
)).select(
274274
"c1", "c2", "c3", "c4"
275275
)
276-
.collect().head
276+
.collect().asScala.head
277277
tuple shouldBe Tuple4(
278278
1: Integer,
279279
"2",
@@ -294,7 +294,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
294294
)).select(
295295
"c1", "c2", "c3", "c4", "c5"
296296
)
297-
.collect().head
297+
.collect().asScala.head
298298
tuple shouldBe Tuple5(
299299
1: Integer,
300300
"2",
@@ -317,7 +317,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
317317
)).select(
318318
"c1", "c2", "c3", "c4", "c5", "c6"
319319
)
320-
.collect().head
320+
.collect().asScala.head
321321
tuple shouldBe Tuple6(
322322
1: Integer,
323323
"2",
@@ -342,7 +342,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
342342
)).select(
343343
"c1", "c2", "c3", "c4", "c5", "c6", "c7"
344344
)
345-
.collect().head
345+
.collect().asScala.head
346346
tuple shouldBe Tuple7(
347347
1: Integer,
348348
"2",
@@ -369,7 +369,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
369369
)).select(
370370
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"
371371
)
372-
.collect().head
372+
.collect().asScala.head
373373
tuple shouldBe Tuple8(
374374
1: Integer,
375375
"2",
@@ -398,7 +398,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
398398
)).select(
399399
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"
400400
)
401-
.collect().head
401+
.collect().asScala.head
402402
tuple shouldBe Tuple9(
403403
1: Integer,
404404
"2",
@@ -429,7 +429,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
429429
)).select(
430430
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"
431431
)
432-
.collect().head
432+
.collect().asScala.head
433433
tuple shouldBe Tuple10(
434434
1: Integer,
435435
"2",
@@ -462,7 +462,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
462462
)).select(
463463
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11"
464464
)
465-
.collect().head
465+
.collect().asScala.head
466466
tuple shouldBe Tuple11(
467467
1: Integer,
468468
"2",
@@ -497,7 +497,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
497497
)).select(
498498
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12"
499499
)
500-
.collect().head
500+
.collect().asScala.head
501501
tuple shouldBe Tuple12(
502502
1: Integer,
503503
"2",
@@ -534,7 +534,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
534534
)).select(
535535
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13"
536536
)
537-
.collect().head
537+
.collect().asScala.head
538538
tuple shouldBe Tuple13(
539539
1: Integer,
540540
"2",
@@ -573,7 +573,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
573573
)).select(
574574
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14"
575575
)
576-
.collect().head
576+
.collect().asScala.head
577577
tuple shouldBe Tuple14(
578578
1: Integer,
579579
"2",
@@ -614,7 +614,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
614614
)).select(
615615
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15"
616616
)
617-
.collect().head
617+
.collect().asScala.head
618618
tuple shouldBe Tuple15(
619619
1: Integer,
620620
"2",
@@ -657,7 +657,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
657657
)).select(
658658
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16"
659659
)
660-
.collect().head
660+
.collect().asScala.head
661661
tuple shouldBe Tuple16(
662662
1: Integer,
663663
"2",
@@ -702,7 +702,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
702702
)).select(
703703
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17"
704704
)
705-
.collect().head
705+
.collect().asScala.head
706706
tuple shouldBe Tuple17(
707707
1: Integer,
708708
"2",
@@ -749,7 +749,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
749749
)).select(
750750
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18"
751751
)
752-
.collect().head
752+
.collect().asScala.head
753753
tuple shouldBe Tuple18(
754754
1: Integer,
755755
"2",
@@ -798,7 +798,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
798798
)).select(
799799
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19"
800800
)
801-
.collect().head
801+
.collect().asScala.head
802802
tuple shouldBe Tuple19(
803803
1: Integer,
804804
"2",
@@ -849,7 +849,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
849849
)).select(
850850
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20"
851851
)
852-
.collect().head
852+
.collect().asScala.head
853853
tuple shouldBe Tuple20(
854854
1: Integer,
855855
"2",
@@ -902,7 +902,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
902902
)).select(
903903
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21"
904904
)
905-
.collect().head
905+
.collect().asScala.head
906906
tuple shouldBe Tuple21(
907907
1: Integer,
908908
"2",
@@ -957,7 +957,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
957957
)).select(
958958
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21", "c22"
959959
)
960-
.collect().head
960+
.collect().asScala.head
961961
tuple shouldBe Tuple22(
962962
1: Integer,
963963
"2",

connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,8 @@ trait SparkCassandraITSpecBase
200200
Await.result(Future.sequence(units), Duration.Inf)
201201
}
202202

203-
def awaitAll[T](units: TraversableOnce[Future[T]]): TraversableOnce[T] = {
204-
Await.result(Future.sequence(units), Duration.Inf)
203+
def awaitAll[T](units: IterableOnce[Future[T]]): IterableOnce[T] = {
204+
Await.result(Future.sequence(units.iterator.toList), Duration.Inf)
205205
}
206206

207207
def keyspaceCql(name: String = ks) =

connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class CassandraAuthenticatedConnectorSpec extends SparkCassandraITFlatSpecBase w
6666
"spark.cassandra.auth.password" -> "cassandra",
6767
"keyspace" -> ks, "table" -> "authtest")
6868

69-
personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append")save()
69+
personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append").save()
7070
val personDF2 = spark.read.format("org.apache.spark.sql.cassandra").options(options).load()
7171

7272
personDF2.count should be(4)

connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaPairRDDSpec.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import com.datastax.spark.connector.cql.CassandraConnector
88
import com.datastax.spark.connector.japi.CassandraJavaUtil._
99
import org.apache.spark.api.java.function.{Function2, Function => JFunction}
1010

11-
import scala.collection.JavaConversions._
1211
import scala.concurrent.Future
12+
import scala.jdk.CollectionConverters._
1313

1414
case class SimpleClass(value: Integer)
1515

@@ -103,15 +103,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default
103103
"key")
104104
.spanBy(f, classOf[Integer])
105105
.collect()
106+
.asScala
106107
.toMap
107108

108109
results should have size 2
109110
results should contain key 10
110111
results should contain key 20
111-
results(10).size should be(3)
112-
results(10).map(_._2).toSeq should be(Seq(10, 11, 12))
113-
results(20).size should be(3)
114-
results(20).map(_._2).toSeq should be(Seq(20, 21, 22))
112+
results(10).asScala.size should be(3)
113+
results(10).asScala.map(_._2).toSeq should be(Seq(10, 11, 12))
114+
results(20).asScala.size should be(3)
115+
results(20).asScala.map(_._2).toSeq should be(Seq(20, 21, 22))
115116
}
116117

117118
it should "allow to use spanByKey method" in {
@@ -129,15 +130,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default
129130
"key")
130131
.spanByKey()
131132
.collect()
133+
.asScala
132134
.toMap
133135

134136
results should have size 2
135137
results should contain key 10
136138
results should contain key 20
137139
results(10).size should be(3)
138-
results(10).toSeq should be(Seq(10, 11, 12))
140+
results(10).asScala.toSeq should be(Seq(10, 11, 12))
139141
results(20).size should be(3)
140-
results(20).toSeq should be(Seq(20, 21, 22))
142+
results(20).asScala.toSeq should be(Seq(20, 21, 22))
141143
}
142144

143145
it should "allow to use of keyByAndApplyPartitioner" in {

0 commit comments

Comments
 (0)