Skip to content

Commit 4ca7300

Browse files
authored
Merge pull request #1286 from datastax/SPARKC-621-2.5
SPARKC-621 SAI support
2 parents 4ea8c18 + 433da8e commit 4ca7300

30 files changed

+1084
-163
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ env:
3232
- CCM_CASSANDRA_VERSION=3.0.20
3333
- CCM_CASSANDRA_VERSION=2.1.21
3434
- CCM_CASSANDRA_VERSION=6.0.11 CCM_IS_DSE=true DSE_VERSION=6.0.11 CCM_SERVER_TYPE=dse
35-
- CCM_CASSANDRA_VERSION=6.7.7 CCM_IS_DSE=true DSE_VERSION=6.7.7 CCM_SERVER_TYPE=dse
35+
- CCM_CASSANDRA_VERSION=6.8.5 CCM_IS_DSE=true DSE_VERSION=6.8.5 CCM_SERVER_TYPE=dse
3636

3737
install:
3838
- pip install --user ccm

connector/src/it/scala/com/datastax/bdp/spark/search/SearchAnalyticsIntegrationSpec.scala

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
4242
val typeIndex: String = s"$ks.$typesTable"
4343
val weirdIndex: String = s"$ks.$weirdTable"
4444

45-
skipIfNotDSE(conn) {
45+
dseOnly {
4646
conn.withSessionDo { session =>
4747

4848
val executor = getExecutor(session)
@@ -159,7 +159,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
159159
}
160160

161161

162-
"SearchAnalytics" should "be able to use solr_query" in skipIfNotDSE(conn) {
162+
"SearchAnalytics" should "be able to use solr_query" in dseOnly {
163163
val df = spark.sql(s"""SELECT key,a,b,c FROM $table WHERE solr_query = '{"q": "*:*", "fq":["key:[500 TO *]", "a:1"]}' """)
164164

165165
//Check that the correct data got through
@@ -174,7 +174,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
174174
results.size should be(expected.size)
175175
}
176176

177-
it should "be able to use solr optimizations " in skipIfNotDSE(conn) {
177+
it should "be able to use solr optimizations " in dseOnly {
178178
val df = spark.sql(s"SELECT key,a,b,c FROM $table WHERE key >= 500 AND a == 1")
179179

180180
//Check that pushdown happened
@@ -196,7 +196,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
196196
results.size should be(expected.size)
197197
}
198198

199-
it should "not use solr optimizations if there is a manual optimization" in skipIfNotDSE(conn) {
199+
it should "not use solr optimizations if there is a manual optimization" in dseOnly {
200200
val df = spark.sql(s"SELECT key,a,b,c FROM $table WHERE solr_query='key:[500 TO *]' AND a == 1")
201201
val whereClause = df.getUnderlyingCqlWhereClause()
202202
whereClause.predicates.head should include("solr_query")
@@ -217,7 +217,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
217217

218218
}
219219

220-
it should "should choose solr clauses over clustering key clauses " in skipIfNotDSE(conn) {
220+
it should "should choose solr clauses over clustering key clauses " in dseOnly {
221221
val df = spark.sql(s"SELECT key,a,b,c FROM $table WHERE a > 2")
222222

223223
//Check that pushdown happened
@@ -236,7 +236,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
236236
results.size should be(expected.size)
237237
}
238238

239-
it should "should be able to turn off solr optimization " in skipIfNotDSE(conn) {
239+
it should "should be able to turn off solr optimization " in dseOnly {
240240
val df = spark.sql(s"SELECT key,a,b,c FROM $tableNoSolr WHERE a > 2")
241241

242242
//Check that solr pushdown didn't happened
@@ -247,7 +247,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
247247
results.size should be(3000)
248248
}
249249

250-
it should "should correctly do negation filters " in skipIfNotDSE(conn) {
250+
it should "should correctly do negation filters " in dseOnly {
251251
val df = spark.sql(
252252
s"""SELECT key,a,b,c FROM $table WHERE
253253
|key > 998 AND
@@ -270,7 +270,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
270270
results should contain theSameElementsAs (expected)
271271
}
272272

273-
it should "handle OR conjunctions when possible" in skipIfNotDSE(conn) {
273+
it should "handle OR conjunctions when possible" in dseOnly {
274274
val df = spark.sql(
275275
s"""SELECT key,a,b,c FROM $table
276276
|WHERE key < 10
@@ -295,7 +295,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
295295
results should contain theSameElementsAs (expected)
296296
}
297297

298-
it should "handle conjunctions with LIKE clauses" in skipIfNotDSE(conn) {
298+
it should "handle conjunctions with LIKE clauses" in dseOnly {
299299
val df = spark.sql(
300300
s"""SELECT key,a,b,c FROM $table
301301
|WHERE c LIKE '%100%'
@@ -322,7 +322,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
322322
results should contain theSameElementsAs (expected)
323323
}
324324

325-
it should "handle IN clauses" in skipIfNotDSE(conn) {
325+
it should "handle IN clauses" in dseOnly {
326326
val df = spark.sql(
327327
s"""SELECT key,a,b,c FROM $table
328328
|WHERE key IN (4, 8, 15, 16, 23, 42)""".stripMargin)
@@ -342,7 +342,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
342342
results should contain theSameElementsAs (expected)
343343
}
344344

345-
it should "handle IsNotNull on a partition key" in skipIfNotDSE(conn) {
345+
it should "handle IsNotNull on a partition key" in dseOnly {
346346
val df = spark.sql(
347347
s"""SELECT key,a,b,c FROM $table WHERE key IS NOT NULL""")
348348

@@ -358,7 +358,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
358358
results should contain theSameElementsAs (expected)
359359
}
360360

361-
it should "handle mixed partition key and solr restrictions" in skipIfNotDSE(conn) {
361+
it should "handle mixed partition key and solr restrictions" in dseOnly {
362362
val df = spark.sql(
363363
s"""SELECT key,a,b,c FROM $table WHERE key = 4 AND b = 5""")
364364

@@ -378,7 +378,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
378378
results should contain theSameElementsAs (expected)
379379
}
380380

381-
it should "handle isNotNull all by itself" in skipIfNotDSE(conn) {
381+
it should "handle isNotNull all by itself" in dseOnly {
382382
val df = spark.sql(
383383
s"""SELECT key,a,b,c FROM $table WHERE c IS NOT NULL""")
384384

@@ -390,7 +390,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
390390
df.collect()
391391
}
392392

393-
it should "handle only partition key restrictions" in skipIfNotDSE(conn) {
393+
it should "handle only partition key restrictions" in dseOnly {
394394
val df = spark.sql(
395395
s"""SELECT key,a,b,c FROM $table WHERE key = 4""")
396396

@@ -407,7 +407,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
407407
results should contain theSameElementsAs (expected)
408408
}
409409

410-
it should "handle primary key restrictions" in skipIfNotDSE(conn) {
410+
it should "handle primary key restrictions" in dseOnly {
411411
val df = spark.sql(
412412
s"""SELECT key,a,b,c FROM $table WHERE key = 4 AND a > 2""")
413413

@@ -425,7 +425,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
425425
results should contain theSameElementsAs (expected)
426426
}
427427

428-
it should "handle Primary key restrictions and solr queries at the same time" in skipIfNotDSE(conn) {
428+
it should "handle Primary key restrictions and solr queries at the same time" in dseOnly {
429429
val df = spark.sql(
430430
s"""SELECT key,a,b,c FROM $table WHERE key = 4 AND a > 2 AND b < 25""")
431431
//Check that pushdown happened
@@ -443,7 +443,7 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
443443
results should contain theSameElementsAs (expected)
444444
}
445445

446-
it should "optimize a count(*) without any predicates in " in skipIfNotDSE(conn) {
446+
it should "optimize a count(*) without any predicates in " in dseOnly {
447447
val df = spark.sql(s"SELECT COUNT(*) from $table")
448448
val whereClause = df.getUnderlyingCqlWhereClause()
449449
val predicates = whereClause.predicates.head
@@ -461,63 +461,63 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
461461
df.collect() should have size 1
462462
}
463463

464-
it should "work with ascii" in skipIfNotDSE(conn) {
464+
it should "work with ascii" in dseOnly {
465465
testType("""a = 'one:\".'""", "a:one")
466466
}
467467

468-
it should "work with solr reserved words in ascii" in skipIfNotDSE(conn) {
468+
it should "work with solr reserved words in ascii" in dseOnly {
469469
testType("a = 'OR'", "a:\\\\OR")
470470
}
471471

472-
it should "work with bigint" in skipIfNotDSE(conn) {
472+
it should "work with bigint" in dseOnly {
473473
testType("b = 1", "b:1")
474474
}
475475

476-
it should "work with date" in skipIfNotDSE(conn) {
476+
it should "work with date" in dseOnly {
477477
testType("d = cast('2017-2-7' as date)", """d:2017\\-02\\-07""")
478478
}
479479

480-
it should "work with decimal" in skipIfNotDSE(conn) {
480+
it should "work with decimal" in dseOnly {
481481
testType("e > 1.0", "1.000000000000000000 TO *")
482482
}
483483

484-
it should "work with double" in skipIfNotDSE(conn) {
484+
it should "work with double" in dseOnly {
485485
testType("f = 2.0", "f:2.0")
486486
}
487487

488-
it should "work with inet" in skipIfNotDSE(conn) {
488+
it should "work with inet" in dseOnly {
489489
testType("g = '8.8.8.8'", "g:8.8.8.8")
490490
}
491491

492-
it should "work with int" in skipIfNotDSE(conn) {
492+
it should "work with int" in dseOnly {
493493
testType("h = 1", "h:1")
494494
}
495495

496-
it should "work with text" in skipIfNotDSE(conn) {
496+
it should "work with text" in dseOnly {
497497
testType("i = 'one'", "i:one")
498498
}
499499

500-
it should "work with solr reserved words in text" in skipIfNotDSE(conn) {
500+
it should "work with solr reserved words in text" in dseOnly {
501501
testType("i = 'OR'", "i:\\\\OR")
502502
}
503503

504-
it should "work with timestamp" in skipIfNotDSE(conn) {
504+
it should "work with timestamp" in dseOnly {
505505
testType("j < cast( \"2000-01-01T00:08:20.000Z\" as timestamp)", """{"q":"*:*", "fq":["j:[* TO 2000\\-01\\-01T00\\:08\\:20Z}"]}""")
506506
}
507507

508-
it should "work with varchar" in skipIfNotDSE(conn) {
508+
it should "work with varchar" in dseOnly {
509509
testType("k = 'one'", "k:one")
510510
}
511511

512-
it should "work with varint" in skipIfNotDSE(conn) {
512+
it should "work with varint" in dseOnly {
513513
testType("l = 1", "l:1")
514514
}
515515

516-
it should "work with uuid" in skipIfNotDSE(conn) {
516+
it should "work with uuid" in dseOnly {
517517
testType("m = '10000000-0000-0000-0000-000000000000'", """"m:10000000\\-0000\\-0000\\-0000\\-000000000000"""")
518518
}
519519

520-
it should "work with weird column names" in skipIfNotDSE(conn) {
520+
it should "work with weird column names" in dseOnly {
521521
val df: DataFrame = spark
522522
.read
523523
.cassandraFormat(weirdTable, ks)
@@ -538,14 +538,14 @@ class SearchAnalyticsIntegrationSpec extends SparkCassandraITFlatSpecBase with D
538538
rows.head.getAs[String]("MixEdCol") should be("world")
539539
}
540540

541-
"Automatic Solr Optimization" should "occur when the selected amount of data is less than the threshold" in skipIfNotDSE(conn) {
541+
"Automatic Solr Optimization" should "occur when the selected amount of data is less than the threshold" in dseOnly {
542542
val df = spark.sql(s"SELECT COUNT(*) from $tableAutoSolr where key < 5")
543543
val whereClause = df.getUnderlyingCqlWhereClause()
544544
val predicates = whereClause.predicates.head
545545
predicates should include("""solr_query""")
546546
}
547547

548-
it should "not occur if the selected amount of data is greater than the threshold" in skipIfNotDSE(conn) {
548+
it should "not occur if the selected amount of data is greater than the threshold" in dseOnly {
549549
val df = spark.sql(s"SELECT COUNT(*) from $tableAutoSolr where key < 500")
550550
val whereClause = df.getUnderlyingCqlWhereClause()
551551
val predicates = whereClause.predicates

connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import scala.concurrent.{Await, ExecutionContext, Future}
2828
import scala.util.Try
2929

3030
trait SparkCassandraITFlatSpecBase extends FlatSpec with SparkCassandraITSpecBase {
31-
override def report(message: String): Unit = info
3231
}
3332

3433
trait SparkCassandraITWordSpecBase extends WordSpec with SparkCassandraITSpecBase {
@@ -42,7 +41,8 @@ trait SparkCassandraITSpecBase
4241
with Matchers
4342
with BeforeAndAfterAll
4443
with ClusterProvider
45-
with Logging {
44+
with Logging
45+
with Alerting {
4646

4747
final def defaultConf: SparkConf = {
4848
SparkTemplate.defaultConf
@@ -150,9 +150,9 @@ trait SparkCassandraITSpecBase
150150
}
151151
}
152152

153-
def pv = conn.withSessionDo(_.getContext.getProtocolVersion)
153+
def pv: ProtocolVersion = conn.withSessionDo(_.getContext.getProtocolVersion)
154154

155-
def report(message: String): Unit = {}
155+
def report(message: String): Unit = alert(message)
156156

157157
val ks = getKsName
158158

@@ -161,33 +161,50 @@ trait SparkCassandraITSpecBase
161161
else report(s"Skipped Because ProtocolVersion $pv >= $protocolVersion")
162162
}
163163

164+
def skipIfProtocolVersionLT(protocolVersion: ProtocolVersion)(f: => Unit): Unit = {
165+
if (!(pv.getCode < protocolVersion.getCode)) f
166+
else report(s"Skipped Because ProtocolVersion $pv < $protocolVersion")
167+
}
168+
164169
/** Skips the given test if the Cluster Version is lower or equal to the given `cassandra` Version or `dse` Version
165170
* (if this is a DSE cluster) */
166-
def skipIfLT(cassandra: Version, dse: Version)(f: => Unit): Unit = {
171+
def from(cassandra: Version, dse: Version)(f: => Unit): Unit = {
167172
if (isDse(conn)) {
168-
skipIfCassandraLT(dse)(f)
173+
from(dse)(f)
169174
} else {
170-
skipIfCassandraLT(cassandra)(f)
175+
from(cassandra)(f)
171176
}
172177
}
173178

174-
val Cass36: Version = Version.parse("3.6.0")
179+
/** Skips the given test if the Cluster Version is lower or equal to the given version */
180+
def from(version: Version)(f: => Unit): Unit = {
181+
skip(cluster.getCassandraVersion, version) { f }
182+
}
175183

176-
def skipIfCassandraLT(cassandraVersion: Version)(f: => Unit): Unit = {
177-
val verOrd = implicitly[Ordering[Version]]
178-
import verOrd._
179-
if (cluster.getCassandraVersion >= cassandraVersion) f
180-
else report(s"Skipped because cluster Version ${cluster.getCassandraVersion} < $cassandraVersion")
184+
/** Skips the given test if the cluster is not DSE */
185+
def dseOnly(f: => Unit): Unit = {
186+
if (isDse(conn)) f
187+
else report(s"Skipped because not DSE")
181188
}
182189

183-
def skipIfProtocolVersionLT(protocolVersion: ProtocolVersion)(f: => Unit): Unit = {
184-
if (!(pv.getCode < protocolVersion.getCode)) f
185-
else report(s"Skipped Because ProtocolVersion $pv < $protocolVersion")
190+
/** Skips the given test if the cluster is not Cassandra */
191+
def cassandraOnly(f: => Unit): Unit = {
192+
if (isDse(conn)) report(s"Skipped because not Cassandra")
193+
else f
186194
}
187195

188-
def skipIfNotDSE(connector: CassandraConnector)(f: => Unit): Unit = {
189-
if (isDse(connector)) f
190-
else report(s"Skipped because not DSE")
196+
/** Skips the given test if the Cluster Version is lower or equal to the given version or the cluster is not DSE */
197+
def dseFrom(version: Version)(f: => Any): Unit = {
198+
dseOnly {
199+
skip(cluster.getDseVersion.get, version) { f }
200+
}
201+
}
202+
203+
private def skip(clusterVersion: Version, minVersion: Version)(f: => Unit): Unit = {
204+
val verOrd = implicitly[Ordering[Version]]
205+
import verOrd._
206+
if (clusterVersion >= minVersion) f
207+
else report(s"Skipped because cluster Version ${cluster.getCassandraVersion} < $minVersion")
191208
}
192209

193210
private def isDse(connector: CassandraConnector): Boolean = {

connector/src/it/scala/com/datastax/spark/connector/cql/ContinuousPagingScannerSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class ContinuousPagingScannerSpec extends SparkCassandraITFlatSpecBase with Defa
100100
withClue(sessions.map(_.toString).mkString("\n"))(sessions.size should be(1))
101101
}
102102

103-
it should "apply MB/s throughput limit" in skipIfNotDSE(conn) {
103+
it should "apply MB/s throughput limit" in dseOnly {
104104
val readConf = ReadConf(throughputMiBPS = Some(32.0))
105105
val executedStmt = executeContinuousPagingScan(readConf)
106106

@@ -109,7 +109,7 @@ class ContinuousPagingScannerSpec extends SparkCassandraITFlatSpecBase with Defa
109109
executedStmt.getExecutionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE) should be(33554) // 32MB/s
110110
}
111111

112-
it should "apply reads/s throughput limit" in skipIfNotDSE(conn) {
112+
it should "apply reads/s throughput limit" in dseOnly {
113113
val readConf = ReadConf(fetchSizeInRows = 999, readsPerSec = Some(5))
114114
val executedStmt = executeContinuousPagingScan(readConf)
115115

@@ -118,7 +118,7 @@ class ContinuousPagingScannerSpec extends SparkCassandraITFlatSpecBase with Defa
118118
executedStmt.getExecutionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE) should be(999)
119119
}
120120

121-
it should "throw a meaningful exception when pages per second does not fall int (0, Int.MaxValue)" in skipIfNotDSE(conn) {
121+
it should "throw a meaningful exception when pages per second does not fall int (0, Int.MaxValue)" in dseOnly {
122122
val readConfs = Seq(
123123
ReadConf(throughputMiBPS = Some(1.0 + Int.MaxValue), readsPerSec = Some(1)),
124124
ReadConf(throughputMiBPS = Some(-1)),

0 commit comments

Comments
 (0)