Skip to content

Commit c2ccc68

Browse files
authored
Merge pull request #1270 from datastax/SPARKC-606-2.5
SPARKC-606 honor ContinuousPaging properties
2 parents 4319de2 + 9fd1446 commit c2ccc68

File tree

4 files changed

+101
-44
lines changed

4 files changed

+101
-44
lines changed

connector/src/it/scala/com/datastax/spark/connector/cql/ContinuousPagingScannerSpec.scala

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@
77
package com.datastax.spark.connector.cql
88

99
import com.datastax.bdp.spark.ContinuousPagingScanner
10+
import com.datastax.dse.driver.api.core.config.DseDriverOption
11+
import com.datastax.oss.driver.api.core.cql.Statement
1012
import com.datastax.spark.connector._
1113
import com.datastax.spark.connector.cluster.DefaultCluster
1214
import com.datastax.spark.connector.rdd.ReadConf
15+
import org.mockito.ArgumentCaptor
16+
import org.mockito.Mockito._
1317
import org.scalatest.concurrent.Eventually
1418

1519
import scala.concurrent.Future
@@ -48,6 +52,23 @@ class ContinuousPagingScannerSpec extends SparkCassandraITFlatSpecBase with Defa
4852
}
4953
}
5054

55+
private def executeContinuousPagingScan(readConf: ReadConf): Statement[_] = {
56+
// we don't want to use the session from CC as mockito is unable to spy on a Proxy
57+
val cqlSession = conn.conf.connectionFactory.createSession(conn.conf)
58+
try {
59+
val sessionSpy = spy(cqlSession)
60+
val scanner = ContinuousPagingScanner(readConf, conn.conf, IndexedSeq.empty, sessionSpy)
61+
val stmt = sessionSpy.prepare(s"SELECT * FROM $ks.test1").bind()
62+
val statementCaptor = ArgumentCaptor.forClass(classOf[Statement[_]])
63+
64+
scanner.scan(stmt)
65+
verify(sessionSpy).executeContinuously(statementCaptor.capture())
66+
statementCaptor.getValue
67+
} finally {
68+
cqlSession.close()
69+
}
70+
}
71+
5172
"ContinuousPagingScanner" should "re-use a session in the same thread" in {
5273
val sessions = for (x <- 1 to 10) yield {
5374
val cps = ContinuousPagingScanner(ReadConf(), conn.conf, IndexedSeq.empty)
@@ -65,7 +86,9 @@ class ContinuousPagingScannerSpec extends SparkCassandraITFlatSpecBase with Defa
6586

6687
it should "use a single CP session for all threads" in {
6788
CassandraConnector.evictCache()
68-
eventually {CassandraConnector.sessionCache.cache.isEmpty}
89+
eventually {
90+
CassandraConnector.sessionCache.cache.isEmpty
91+
}
6992
val rdd = sc.cassandraTable(ks, table).withReadConf(ReadConf(splitCount = Some(400)))
7093
rdd.partitions.length should be > 100 //Sanity check that we will have to reuse sessions
7194
rdd.count
@@ -76,6 +99,39 @@ class ContinuousPagingScannerSpec extends SparkCassandraITFlatSpecBase with Defa
7699

77100
withClue(sessions.map(_.toString).mkString("\n"))(sessions.size should be(1))
78101
}
79-
}
80102

103+
it should "apply MB/s throughput limit" in skipIfNotDSE(conn) {
104+
val readConf = ReadConf(throughputMiBPS = Some(32.0))
105+
val executedStmt = executeContinuousPagingScan(readConf)
106+
107+
executedStmt.getExecutionProfile.getBoolean(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE_BYTES) should be(true)
108+
executedStmt.getExecutionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND) should be(1000)
109+
executedStmt.getExecutionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE) should be(33554) // 32MB/s
110+
}
111+
112+
it should "apply reads/s throughput limit" in skipIfNotDSE(conn) {
113+
val readConf = ReadConf(fetchSizeInRows = 999, readsPerSec = Some(5))
114+
val executedStmt = executeContinuousPagingScan(readConf)
81115

116+
executedStmt.getExecutionProfile.getBoolean(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE_BYTES) should be(false)
117+
executedStmt.getExecutionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND) should be(5)
118+
executedStmt.getExecutionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE) should be(999)
119+
}
120+
121+
it should "throw a meaningful exception when pages per second does not fall int (0, Int.MaxValue)" in skipIfNotDSE(conn) {
122+
val readConfs = Seq(
123+
ReadConf(throughputMiBPS = Some(1.0 + Int.MaxValue), readsPerSec = Some(1)),
124+
ReadConf(throughputMiBPS = Some(-1)),
125+
ReadConf(throughputMiBPS = Some(0)))
126+
127+
for (readConf <- readConfs) {
128+
withClue(s"Expected IllegalArgumentException for invalid throughput argument: ${readConf.throughputMiBPS}.") {
129+
val exc = intercept[IllegalArgumentException] {
130+
executeContinuousPagingScan(readConf)
131+
}
132+
exc.getMessage should include(s"This number must be positive, non-zero and smaller than ${Int.MaxValue}")
133+
}
134+
}
135+
}
136+
137+
}

connector/src/main/scala/com/datastax/bdp/spark/ContinuousPagingScanner.scala

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ package com.datastax.bdp.spark
77

88
import java.io.IOException
99

10-
import scala.collection.JavaConverters._
11-
import com.datastax.dse.driver.api.core.cql.continuous.{ContinuousResultSet, ContinuousSession}
10+
import com.datastax.dse.driver.api.core.config.DseDriverOption
11+
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousResultSet
1212
import com.datastax.oss.driver.api.core.CqlSession
1313
import com.datastax.oss.driver.api.core.`type`.codec.TypeCodec
1414
import com.datastax.oss.driver.api.core.cql.{BoundStatement, Statement}
@@ -19,62 +19,52 @@ import com.datastax.spark.connector.rdd.ReadConf
1919
import com.datastax.spark.connector.util.DriverUtil.toName
2020
import com.datastax.spark.connector.util._
2121

22+
import scala.collection.JavaConverters._
23+
2224
case class ContinuousPagingScanner(
2325
readConf: ReadConf,
2426
connConf: CassandraConnectorConf,
25-
columnNames: IndexedSeq[String]) extends Scanner with Logging {
27+
columnNames: IndexedSeq[String],
28+
cqlSession: CqlSession) extends Scanner with Logging {
2629

2730
val TARGET_PAGE_SIZE_IN_BYTES: Int = 5000 * 50 // 5000 rows * 50 bytes per row
2831
val MIN_PAGES_PER_SECOND = 1000
2932

30-
//TODO This must be moved to session initilization? We can no longer pass options at execution time without deriving a new profile
31-
// I think the right thing to do, to support old configurations as well as new is to create a new profile based on options as
32-
// Set using derivied profiles, but this probably can't happen here
33-
/**
34-
private val cpOptions = readConf.throughputMiBPS match {
33+
private lazy val cpProfile = readConf.throughputMiBPS match {
3534
case Some(throughput) =>
36-
val bytesPerSecond = (throughput * 1024 * 1024).toInt
35+
val bytesPerSecond = (throughput * 1024 * 1024).toLong
3736
val fallBackPagesPerSecond = math.max(MIN_PAGES_PER_SECOND, bytesPerSecond / TARGET_PAGE_SIZE_IN_BYTES)
38-
val pagesPerSecond: Int = readConf.readsPerSec.getOrElse(fallBackPagesPerSecond)
37+
val pagesPerSecond = readConf.readsPerSec.map(_.toLong).getOrElse(fallBackPagesPerSecond)
3938
if (readConf.readsPerSec.isEmpty) {
4039
logInfo(s"Using a pages per second of $pagesPerSecond since " +
4140
s"${ReadConf.ReadsPerSecParam.name} is not set")
4241
}
43-
val bytesPerPage = (bytesPerSecond / pagesPerSecond ).toInt
42+
val bytesPerPage = bytesPerSecond / pagesPerSecond
4443

45-
if (bytesPerPage <= 0) {
44+
if (bytesPerPage <= 0 || bytesPerPage > Int.MaxValue) {
4645
throw new IllegalArgumentException(
4746
s"""Read Throttling set to $throughput MBPS, but with the current
4847
| ${ReadConf.ReadsPerSecParam.name} value of $pagesPerSecond that equates to
49-
| $bytesPerPage bytes per page. This number must be positive and non-zero.
48+
| $bytesPerPage bytes per page.
49+
| This number must be positive, non-zero and smaller than ${Int.MaxValue}.
5050
""".stripMargin)
5151
}
5252

5353
logDebug(s"Read Throttling set to $throughput mbps. Pages of $bytesPerPage with ${readConf.readsPerSec} max" +
5454
s"pages per second. ${ReadConf.FetchSizeInRowsParam.name} will be ignored.")
55-
ContinuousPagingOptions
56-
.builder()
57-
.withPageSize(bytesPerPage, ContinuousPagingOptions.PageUnit.BYTES)
58-
.withMaxPagesPerSecond(pagesPerSecond)
59-
.build()
55+
cqlSession.getContext.getConfig.getDefaultProfile
56+
.withBoolean(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE_BYTES, true)
57+
.withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, bytesPerPage.toInt)
58+
.withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, pagesPerSecond.toInt)
6059

6160
case None =>
62-
ContinuousPagingOptions
63-
.builder()
64-
.withPageSize(readConf.fetchSizeInRows, ContinuousPagingOptions.PageUnit.ROWS)
65-
.withMaxPagesPerSecond(readConf.readsPerSec.getOrElse(Integer.MAX_VALUE))
66-
.build()
61+
cqlSession.getContext.getConfig.getDefaultProfile
62+
.withBoolean(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE_BYTES, false)
63+
.withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, readConf.fetchSizeInRows)
64+
.withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, readConf.readsPerSec.getOrElse(0))
6765
}
68-
**/
69-
70-
/**
71-
* Attempts to get or create a session for this execution thread.
72-
*/
73-
private val cpSession = new CassandraConnector(connConf)
74-
.openSession()
75-
.asInstanceOf[CqlSession with ContinuousSession]
7666

77-
private val codecRegistry = cpSession.getContext.getCodecRegistry
67+
private val codecRegistry = cqlSession.getContext.getCodecRegistry
7868

7969
private def asBoundStatement(statement: Statement[_]): Option[BoundStatement] = {
8070
statement match {
@@ -91,22 +81,22 @@ case class ContinuousPagingScanner(
9181
* Calls SessionProxy Close which issues a deferred close request on the session if no
9282
* references are requested to it in the next keep_alive ms
9383
*/
94-
override def close(): Unit = cpSession.close
84+
override def close(): Unit = cqlSession.close()
9585

96-
override def getSession(): CqlSession = cpSession
86+
override def getSession(): CqlSession = cqlSession
9787

9888
override def scan[StatementT <: Statement[StatementT]](statement: StatementT): ScanResult = {
99-
val authStatement = maybeExecutingAs(statement, readConf.executeAs)
89+
val authStatement = maybeExecutingAs(statement, readConf.executeAs)
10090

10191
if (isSolr(authStatement)) {
10292
logDebug("Continuous Paging doesn't work with Search, Falling back to default paging")
103-
val regularResult = cpSession.execute(authStatement)
93+
val regularResult = cqlSession.execute(authStatement)
10494
val regularIterator = regularResult.iterator().asScala
10595
ScanResult(regularIterator, CassandraRowMetadata.fromResultSet(columnNames, regularResult, codecRegistry))
10696

10797
} else {
10898
try {
109-
val cpResult = cpSession.executeContinuously(authStatement)
99+
val cpResult = cqlSession.executeContinuously(authStatement.setExecutionProfile(cpProfile))
110100
val cpIterator = cpResult.iterator().asScala
111101
ScanResult(cpIterator, getMetaData(cpResult))
112102
} catch {
@@ -120,7 +110,7 @@ case class ContinuousPagingScanner(
120110
}
121111
}
122112

123-
private def getMetaData(result: ContinuousResultSet) = {
113+
private def getMetaData(result: ContinuousResultSet): CassandraRowMetadata = {
124114
import scala.collection.JavaConverters._
125115
val columnDefs = result.getColumnDefinitions.asScala
126116
val rsColumnNames = columnDefs.map(c => toName(c.getName))
@@ -131,3 +121,16 @@ case class ContinuousPagingScanner(
131121
CassandraRowMetadata(columnNames, Some(rsColumnNames.toIndexedSeq), codecs.toIndexedSeq)
132122
}
133123
}
124+
125+
object ContinuousPagingScanner {
126+
def apply(
127+
readConf: ReadConf,
128+
connConf: CassandraConnectorConf,
129+
columnNames: IndexedSeq[String]): ContinuousPagingScanner = {
130+
/**
131+
* Attempts to get or create a session for this execution thread.
132+
*/
133+
val cqlSession = new CassandraConnector(connConf).openSession()
134+
new ContinuousPagingScanner(readConf, connConf, columnNames, cqlSession)
135+
}
136+
}

connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ object DefaultConnectionFactory extends CassandraConnectionFactory {
187187

188188
if (isContinuousPagingEnabled) {
189189
logger.debug("Using ContinousPagingScanner")
190-
new ContinuousPagingScanner(readConf, connConf, columnNames)
190+
ContinuousPagingScanner(readConf, connConf, columnNames)
191191
} else {
192192
logger.debug("Not Connected to DSE 5.1 or Greater Falling back to Non-Continuous Paging")
193193
new DefaultScanner(readConf, connConf, columnNames)

connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,14 +331,12 @@ class CassandraTableScanRDD[R] private[connector](
331331
range: CqlTokenRange[_, _],
332332
inputMetricsUpdater: InputMetricsUpdater): Iterator[R] = {
333333

334-
val session = scanner.getSession()
335-
336334
val (cql, values) = tokenRangeToCqlQuery(range)
337335
logDebug(
338336
s"Fetching data for range ${range.cql(partitionKeyStr)} " +
339337
s"with $cql " +
340338
s"with params ${values.mkString("[", ",", "]")}")
341-
val stmt = createStatement(session, cql, values: _*)
339+
val stmt = createStatement(scanner.getSession(), cql, values: _*)
342340
.setRoutingToken(range.range.startNativeToken())
343341

344342
try {

0 commit comments

Comments
 (0)