Skip to content

Commit afff248

Browse files
authored
Merge pull request #1283 from datastax/SPARKC-627-b2.5
SPARKC-627 add join throttling test
2 parents 8213f3f + fd2175d commit afff248

File tree

2 files changed

+84
-1
lines changed

2 files changed

+84
-1
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package org.apache.spark.sql.cassandra.execution
2+
3+
import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
4+
import com.datastax.spark.connector.cluster.DefaultCluster
5+
import com.datastax.spark.connector.cql.CassandraConnector
6+
import com.datastax.spark.connector.embedded.SparkTemplate
7+
import com.datastax.spark.connector.rdd.ReadConf
8+
import org.apache.spark.sql.cassandra.CassandraSourceRelation.DirectJoinSettingParam
9+
import org.apache.spark.sql.cassandra._
10+
import org.scalatest.concurrent.Eventually
11+
12+
import scala.concurrent.Future
13+
14+
class JoinThrottlingSpec extends SparkCassandraITFlatSpecBase with DefaultCluster with Eventually {
15+
16+
override lazy val conn = CassandraConnector(defaultConf)
17+
18+
private val rowsCount = 10000
19+
20+
override def beforeClass {
21+
spark.conf.set(DirectJoinSettingParam.name, "auto")
22+
conn.withSessionDo { session =>
23+
val executor = getExecutor(session)
24+
createKeyspace(session)
25+
awaitAll(
26+
Future {
27+
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.kvtarget (k int PRIMARY KEY, v int, id int)")
28+
session.execute(s"CREATE TABLE IF NOT EXISTS $ks.kv (k int PRIMARY KEY, v int)")
29+
val ps = session.prepare(s"INSERT INTO $ks.kv (k,v) VALUES (?,?)")
30+
awaitAll {
31+
for (id <- 1 to rowsCount) yield {
32+
executor.executeAsync(ps.bind(id: java.lang.Integer, id: java.lang.Integer))
33+
}
34+
}
35+
}
36+
)
37+
executor.waitForCurrentlyExecutingTasks()
38+
}
39+
}
40+
41+
private def timed(measureUnit: => Unit): Long = {
42+
val startMillis = System.currentTimeMillis()
43+
measureUnit
44+
System.currentTimeMillis() - startMillis
45+
}
46+
47+
private def joinWithRowsPerSecondThrottle(rowsPerSecondPerCore: Int): Int = {
48+
import spark.implicits._
49+
val right = spark.range(1, rowsCount).map(_.intValue)
50+
.withColumnRenamed("value", "id")
51+
val left = spark.read.cassandraFormat("kv", ks.toLowerCase)
52+
.option(ReadConf.ReadsPerSecParam.name, rowsPerSecondPerCore)
53+
.load()
54+
val join = left.join(right, left("k") === right("id"))
55+
56+
val durationMillis = timed {
57+
join.write.format("org.apache.spark.sql.cassandra")
58+
.options(Map("keyspace" -> ks, "table" -> "kvtarget"))
59+
.mode("append")
60+
.save()
61+
}
62+
63+
val durationSeconds = durationMillis.toInt / 1000
64+
val minimalDurationSeconds = rowsCount / rowsPerSecondPerCore / SparkTemplate.DefaultParallelism
65+
withClue(s"The expected duration of this join operation should not be shorter then $minimalDurationSeconds " +
66+
s"for rowsPerSecondPerCore=$rowsPerSecondPerCore.") {
67+
durationSeconds should be >= minimalDurationSeconds
68+
}
69+
durationSeconds
70+
}
71+
72+
/* SPARKC-627 */
73+
it should "throttle join by rows per second" in {
74+
val slowJoinDuration = joinWithRowsPerSecondThrottle(800)
75+
val fastJoinDuration = joinWithRowsPerSecondThrottle(1600)
76+
77+
withClue("Increasing rows per second throttle parameter should result in lowering the execution time") {
78+
fastJoinDuration should be < slowJoinDuration
79+
}
80+
}
81+
}

connector/src/test/scala/com/datastax/spark/connector/embedded/SparkTemplate.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import org.apache.spark.SparkConf
77

88
object SparkTemplate {
99

10+
val DefaultParallelism = 2
11+
1012
val HiveMetastoreConfig: Map[String, String] = Map (
1113
"spark.hadoop.hive.metastore.rawstore.impl" -> "com.datastax.bdp.hadoop.hive.metastore.CassandraHiveMetaStore",
1214
"spark.hadoop.cassandra.autoCreateHiveSchema" -> "true",
@@ -25,7 +27,7 @@ object SparkTemplate {
2527
.set("spark.ui.enabled", "false")
2628
.set("spark.cleaner.ttl", "3600")
2729
.set("spark.sql.extensions","com.datastax.spark.connector.CassandraSparkExtensions")
28-
.setMaster(sys.env.getOrElse("IT_TEST_SPARK_MASTER", "local[2]"))
30+
.setMaster(sys.env.getOrElse("IT_TEST_SPARK_MASTER", s"local[$DefaultParallelism]"))
2931
.setAppName("Test")
3032
.setAll(HiveMetastoreConfig)
3133

0 commit comments

Comments
 (0)