Skip to content

Commit 6d3187b

Browse files
committed
SPARKC-621 adjust SchemaSpec to new push down rules
Pushdowns on frozen collections are now possible (not only SAI).
1 parent 4145f39 commit 6d3187b

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed

connector/src/it/scala/com/datastax/spark/connector/cql/SchemaSpec.scala

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import com.datastax.spark.connector.SparkCassandraITWordSpecBase
44
import com.datastax.spark.connector.cluster.DefaultCluster
55
import com.datastax.spark.connector.types._
66
import com.datastax.spark.connector.util.schemaFromCassandra
7+
import org.apache.spark.sql.cassandra._
8+
import org.apache.spark.sql.functions.col
79
import org.scalatest.Inspectors._
810

911
class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
@@ -31,7 +33,7 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
3133
| d7_int int,
3234
| d8_list list<int>,
3335
| d9_map map<int, varchar>,
34-
| d10_set set<int>,
36+
| d10_set frozen<set<int>>,
3537
| d11_timestamp timestamp,
3638
| d12_uuid uuid,
3739
| d13_timeuuid timeuuid,
@@ -43,8 +45,15 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
4345
""".stripMargin)
4446
session.execute(
4547
s"""CREATE INDEX test_d9_map_idx ON $ks.test (keys(d9_map))""")
48+
session.execute(
49+
s"""CREATE INDEX test_d9_m23423ap_idx ON $ks.test (full(d10_set))""")
4650
session.execute(
4751
s"""CREATE INDEX test_d7_int_idx ON $ks.test (d7_int)""")
52+
53+
for (i <- 0 to 9) {
54+
session.execute(s"insert into $ks.test (k1,k2,k3,c1,c2,c3,d10_set) " +
55+
s"values ($i, 'text$i', $i, $i, 'text$i', 123e4567-e89b-12d3-a456-42661417400$i, {$i, ${i*10}})")
56+
}
4857
}
4958

5059
val schema = schemaFromCassandra(conn)
@@ -120,7 +129,7 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
120129
table.columnByName("d7_int").columnType shouldBe IntType
121130
table.columnByName("d8_list").columnType shouldBe ListType(IntType)
122131
table.columnByName("d9_map").columnType shouldBe MapType(IntType, VarCharType)
123-
table.columnByName("d10_set").columnType shouldBe SetType(IntType)
132+
table.columnByName("d10_set").columnType shouldBe SetType(IntType, true)
124133
table.columnByName("d11_timestamp").columnType shouldBe TimestampType
125134
table.columnByName("d12_uuid").columnType shouldBe UUIDType
126135
table.columnByName("d13_timeuuid").columnType shouldBe TimeUUIDType
@@ -135,13 +144,19 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
135144
udt.columnTypes shouldBe Seq(VarCharType, VarCharType, IntType)
136145
}
137146

138-
"should not recognize column with collection index as indexed" in {
139-
table.indexedColumns.size shouldBe 1
140-
table.indexedColumns.head.columnName shouldBe "d7_int"
147+
"should recognize column with collection index as indexed" in {
148+
table.indexedColumns.size shouldBe 3
149+
table.indexedColumns.map(_.columnName).toSet shouldBe Set("d7_int", "d9_map", "d10_set")
150+
}
151+
152+
"allow for pushdown on frozen indexed collection" in {
153+
val value1 = spark.read.cassandraFormat(table.tableName, ks).load().where(col("d10_set").equalTo(Array(3, 30)))
154+
value1.explain()
155+
value1.collect().size shouldBe 1
141156
}
142157

143158
"should hold all indices retrieved from cassandra" in {
144-
table.indexes.size shouldBe 2
159+
table.indexes.size shouldBe 3
145160
}
146161
}
147162

connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase with DefaultCl
8181
s"""
8282
|CREATE TABLE $ks.timeuuidtable (k INT, v TIMEUUID, PRIMARY KEY (k))
8383
|""".stripMargin)
84-
},
84+
}
8585
)
8686
executor.waitForCurrentlyExecutingTasks()
8787
}

0 commit comments

Comments
 (0)