Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
scala: [2.12.10]
db-version: [6.8.13, 5.1.24, 3.11.10, 4.0-rc2]
scala: [2.12.11, 2.13.11]
db-version: [3.11.10, 4.0-rc2, 6.8.13]

steps:
- uses: actions/checkout@v2
Expand All @@ -36,3 +36,10 @@ jobs:
CCM_CASSANDRA_VERSION: ${{ matrix.db-version }}
PUBLISH_VERSION: test
run: sbt/sbt ++${{ matrix.scala }} test it:test

- name: Publish Test Report
uses: mikepenz/action-junit-report@v3
if: always()
with:
report_paths: '**/target/test-reports/*.xml'
annotate_only: true
15 changes: 11 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import sbt.{Compile, moduleFilter, _}
import sbtassembly.AssemblyPlugin.autoImport.assembly

lazy val scala212 = "2.12.11"
lazy val supportedScalaVersions = List(scala212)
lazy val scala213 = "2.13.11"
lazy val supportedScalaVersions = List(scala212, scala213)

// factor out common settings
ThisBuild / scalaVersion := scala212
Expand Down Expand Up @@ -60,16 +61,18 @@ lazy val commonSettings = Seq(
fork := true,
parallelExecution := true,
testForkedParallel := false,
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v")
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),
)


val annotationProcessor = Seq(
"-processor", "com.datastax.oss.driver.internal.mapper.processor.MapperProcessor"
)

def scalacVersionDependantOptions(scalaBinary: String): Seq[String] = scalaBinary match {
case "2.11" => Seq()
case "2.12" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
case "2.13" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
}

lazy val root = (project in file("."))
Expand All @@ -81,6 +84,7 @@ lazy val root = (project in file("."))
publish / skip := true
)


lazy val connector = (project in file("connector"))
.configs(IntegrationTest)
.settings(Defaults.itSettings: _*) //This and above enables the "it" suite
Expand All @@ -105,6 +109,7 @@ lazy val connector = (project in file("connector"))
Global / concurrentRestrictions := Seq(Tags.limitAll(Testing.parallelTasks)),

libraryDependencies ++= Dependencies.Spark.dependencies
++ Dependencies.Compatibility.dependencies(scalaVersion.value)
++ Dependencies.TestConnector.dependencies
++ Dependencies.Jetty.dependencies,

Expand All @@ -121,7 +126,8 @@ lazy val testSupport = (project in file("test-support"))
.settings(
crossScalaVersions := supportedScalaVersions,
name := "spark-cassandra-connector-test-support",
libraryDependencies ++= Dependencies.TestSupport.dependencies
libraryDependencies ++= Dependencies.Compatibility.dependencies(scalaVersion.value)
++ Dependencies.TestSupport.dependencies
)

lazy val driver = (project in file("driver"))
Expand All @@ -131,7 +137,8 @@ lazy val driver = (project in file("driver"))
crossScalaVersions := supportedScalaVersions,
name := "spark-cassandra-connector-driver",
assembly /test := {},
libraryDependencies ++= Dependencies.Driver.dependencies
libraryDependencies ++= Dependencies.Compatibility.dependencies(scalaVersion.value)
++ Dependencies.Driver.dependencies
++ Dependencies.TestDriver.dependencies
:+ ("org.scala-lang" % "scala-reflect" % scalaVersion.value)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.datastax.spark.connector
import com.datastax.spark.connector.ccm.CcmBridge
import com.datastax.spark.connector.cluster.DefaultCluster

import scala.collection.JavaConversions._
import scala.jdk.CollectionConverters._
import scala.concurrent.Future
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.cql.CassandraConnector
Expand Down Expand Up @@ -117,9 +117,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
}


Expand All @@ -140,9 +140,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
}

it should "allow to save beans with transient fields to Cassandra" in {
Expand All @@ -162,9 +162,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
}

it should "allow to save beans with inherited fields to Cassandra" in {
Expand All @@ -184,7 +184,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
val rows = results.all()

rows should have size 3
rows.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
rows.asScala.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
("one", 1, "a"),
("two", 2, "b"),
("three", 3, "c")
Expand All @@ -210,9 +210,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
}

it should "allow to read rows as Tuple1" in {
Expand All @@ -222,7 +222,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple1(
1: Integer
)
Expand All @@ -237,7 +237,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple2(
1: Integer,
"2"
Expand All @@ -254,7 +254,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple3(
1: Integer,
"2",
Expand All @@ -273,7 +273,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple4(
1: Integer,
"2",
Expand All @@ -294,7 +294,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple5(
1: Integer,
"2",
Expand All @@ -317,7 +317,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple6(
1: Integer,
"2",
Expand All @@ -342,7 +342,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple7(
1: Integer,
"2",
Expand All @@ -369,7 +369,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple8(
1: Integer,
"2",
Expand Down Expand Up @@ -398,7 +398,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple9(
1: Integer,
"2",
Expand Down Expand Up @@ -429,7 +429,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple10(
1: Integer,
"2",
Expand Down Expand Up @@ -462,7 +462,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple11(
1: Integer,
"2",
Expand Down Expand Up @@ -497,7 +497,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple12(
1: Integer,
"2",
Expand Down Expand Up @@ -534,7 +534,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple13(
1: Integer,
"2",
Expand Down Expand Up @@ -573,7 +573,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple14(
1: Integer,
"2",
Expand Down Expand Up @@ -614,7 +614,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple15(
1: Integer,
"2",
Expand Down Expand Up @@ -657,7 +657,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple16(
1: Integer,
"2",
Expand Down Expand Up @@ -702,7 +702,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple17(
1: Integer,
"2",
Expand Down Expand Up @@ -749,7 +749,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple18(
1: Integer,
"2",
Expand Down Expand Up @@ -798,7 +798,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple19(
1: Integer,
"2",
Expand Down Expand Up @@ -849,7 +849,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple20(
1: Integer,
"2",
Expand Down Expand Up @@ -902,7 +902,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple21(
1: Integer,
"2",
Expand Down Expand Up @@ -957,7 +957,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21", "c22"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple22(
1: Integer,
"2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.{Seconds, Span}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.compat._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try
Expand Down Expand Up @@ -200,8 +201,8 @@ trait SparkCassandraITSpecBase
Await.result(Future.sequence(units), Duration.Inf)
}

def awaitAll[T](units: TraversableOnce[Future[T]]): TraversableOnce[T] = {
Await.result(Future.sequence(units), Duration.Inf)
def awaitAll[T](units: IterableOnce[Future[T]]): IterableOnce[T] = {
Await.result(Future.sequence(units.iterator), Duration.Inf)
}

def keyspaceCql(name: String = ks) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.datastax.spark.connector.ccm.CcmBridge
import com.datastax.spark.connector.util.{Logging, SerialShutdownHooks}
import org.apache.commons.lang3.ClassUtils

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable

/** Source of [[Cluster]]s for all tests executed within a single test group (single process/JVM).
Expand Down
Loading