Skip to content

Commit 1d7f0a1

Browse files
committed
SPARKC-642 Add integration tests for repartitionByCassandraReplica, using a cluster with two nodes
1 parent 61c029c commit 1d7f0a1

File tree

2 files changed

+94
-0
lines changed

2 files changed

+94
-0
lines changed

connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,15 @@ trait TwoClustersWithOneNode extends Fixture {
187187
override def cluster(c: Int): Cluster = ClusterHolder.get(this)(c)
188188
}
189189

190+
/** A fixture that bootstraps on single cluster with two nodes. */
191+
trait TwoNodeCluster extends SingleClusterFixture {
192+
193+
private[cluster] final override val configs: Seq[CcmConfig] = Seq(defaultConfig.copy(nodes = Seq(1, 2)))
194+
195+
private[cluster] override def connectionParameters(address: InetSocketAddress): Map[String, String] =
196+
DefaultCluster.defaultConnectionParameters(address)
197+
}
198+
190199
trait CETCluster extends DefaultCluster
191200

192201
trait CSTCluster extends DefaultCluster
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.datastax.spark.connector.rdd
2+
3+
import java.lang.{Long => JLong}
4+
5+
import com.datastax.spark.connector._
6+
import com.datastax.spark.connector.cluster.TwoNodeCluster
7+
import com.datastax.spark.connector.cql.CassandraConnector
8+
import com.datastax.spark.connector.rdd.partitioner.EndpointPartition
9+
10+
import scala.concurrent.Future
11+
12+
class ReplicaRepartitionedCassandraRDDSpec extends SparkCassandraITFlatSpecBase with TwoNodeCluster {
13+
14+
override lazy val conn = CassandraConnector(defaultConf)
15+
val tableName = "key_value"
16+
val keys = 0 to 200
17+
val total = 0 to 10000
18+
19+
override def beforeClass {
20+
conn.withSessionDo { session =>
21+
createKeyspace(session)
22+
val startTime = System.currentTimeMillis()
23+
24+
val executor = getExecutor(session)
25+
26+
awaitAll(
27+
Future {
28+
session.execute(
29+
s"""
30+
|CREATE TABLE $ks.$tableName (
31+
| key INT,
32+
| group BIGINT,
33+
| value TEXT,
34+
| PRIMARY KEY (key, group)
35+
|)""".stripMargin)
36+
val ps = session
37+
.prepare(s"""INSERT INTO $ks.$tableName (key, group, value) VALUES (?, ?, ?)""")
38+
awaitAll((for (value <- total) yield
39+
executor.executeAsync(ps.bind(value: Integer, (value * 100).toLong: JLong, value.toString))): _*)
40+
}
41+
)
42+
executor.waitForCurrentlyExecutingTasks()
43+
println(s"Took ${(System.currentTimeMillis() - startTime) / 1000.0} Seconds to setup Suite Data")
44+
}
45+
}
46+
47+
def checkArrayCassandraRow[T](result: Array[(T, CassandraRow)]) = {
48+
markup("Checking RightSide Join Results")
49+
result.length should be(keys.length)
50+
for (key <- keys) {
51+
val sorted_result = result.map(_._2).sortBy(_.getInt(0))
52+
sorted_result(key).getInt("key") should be(key)
53+
sorted_result(key).getLong("group") should be(key * 100)
54+
sorted_result(key).getString("value") should be(key.toString)
55+
}
56+
}
57+
58+
"A Tuple RDD specifying partition keys" should "be repartitionable" in {
59+
val source = sc.parallelize(keys).map(Tuple1(_))
60+
val repart = source.repartitionByCassandraReplica(ks, tableName, 10)
61+
repart.partitions.length should be(conn.hosts.size * 10)
62+
conn.hosts.size should be(2)
63+
conn.hosts should be(cluster.addresses.toSet)
64+
val someCass = repart.joinWithCassandraTable(ks, tableName)
65+
someCass.partitions.foreach {
66+
case e: EndpointPartition =>
67+
conn.hostAddresses should contain(e.endpoints.head)
68+
case _ =>
69+
fail("Unable to get endpoints on repartitioned RDD, This means preferred locations will be broken")
70+
}
71+
val result = someCass.collect
72+
checkArrayCassandraRow(result)
73+
}
74+
75+
it should "be deterministically repartitionable" in {
76+
val source = sc.parallelize(keys).map(Tuple1(_))
77+
val repartRDDs = (1 to 10).map(_ =>
78+
source
79+
.repartitionByCassandraReplica(ks, tableName, 10)
80+
.mapPartitionsWithIndex((index, it) => it.map((_, index))))
81+
val first = repartRDDs(1).collect
82+
repartRDDs.foreach(rdd => rdd.collect should be(first))
83+
}
84+
85+
}

0 commit comments

Comments
 (0)