Skip to content

Commit 8be426b

Browse files
committed
Fix it:test when no SparkContext is set
1 parent fe78246 commit 8be426b

File tree

2 files changed

+9
-7
lines changed

2 files changed

+9
-7
lines changed

connector/src/it/scala/com/datastax/spark/connector/datasource/CassandraCatalogTableReadSpec.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,17 @@ class CassandraCatalogTableReadSpec extends CassandraCatalogSpecBase {
6060
it should "handle count pushdowns" in {
6161
setupBasicTable()
6262
val request = spark.sql(s"""SELECT COUNT(*) from $defaultKs.$testTable""")
63-
val reader = request
63+
var factory = request
6464
.queryExecution
6565
.executedPlan
6666
.collectFirst {
67-
case batchScanExec: BatchScanExec=> batchScanExec.readerFactory.createReader(EmptyInputPartition)
68-
case adaptiveSparkPlanExec: AdaptiveSparkPlanExec => adaptiveSparkPlanExec.executedPlan.collectLeaves().collectFirst{
69-
case batchScanExec: BatchScanExec=> batchScanExec.readerFactory.createReader(EmptyInputPartition)
70-
}.get
67+
case batchScanExec: BatchScanExec=> batchScanExec.readerFactory
68+
case adaptiveSparkPlanExec: AdaptiveSparkPlanExec => adaptiveSparkPlanExec.executedPlan.collectLeaves().collectFirst{
69+
case batchScanExec: BatchScanExec=> batchScanExec.readerFactory
70+
}.get
7171
}
7272

73-
reader.get.isInstanceOf[CassandraCountPartitionReader] should be (true)
73+
factory.get.asInstanceOf[CassandraScanPartitionReaderFactory].isCountQuery should be (true)
7474
request.collect()(0).get(0) should be (101)
7575
}
7676

connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraScanPartitionReaderFactory.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ case class CassandraScanPartitionReaderFactory(
2222
readConf: ReadConf,
2323
queryParts: CqlQueryParts) extends PartitionReaderFactory {
2424

25+
def isCountQuery: Boolean = queryParts.selectedColumnRefs.contains(RowCountRef)
26+
2527
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
2628

2729
val cassandraPartition = partition.asInstanceOf[CassandraPartition[Any, _ <: Token[Any]]]
28-
if (queryParts.selectedColumnRefs.contains(RowCountRef)) {
30+
if (isCountQuery) {
2931
//Count Pushdown
3032
CassandraCountPartitionReader(
3133
connector,

0 commit comments

Comments
 (0)