Skip to content

Commit 4145f39

Browse files
committed
SPARKC-621 move timeuuid non-equal predicates check at the end
Before this commit the check was executed in BasicCassandraPredicatePushDown at the beginning of the push down evaluation. This interrupted push down evaluation without giving other predicates rules a chance to push down the predicate. Since thanks to SAI we may now push timeuuid predicates, this commit moves the check at the very end of the push down evaluation.
1 parent f824323 commit 4145f39

File tree

6 files changed

+88
-40
lines changed

6 files changed

+88
-40
lines changed

connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,12 @@ trait SparkCassandraITSpecBase
187187
else report(s"Skipped because not DSE")
188188
}
189189

190+
/** Skips the given test if the cluster is not Cassandra */
191+
def cassandraOnly(f: => Unit): Unit = {
192+
if (isDse(conn)) report(s"Skipped because not Cassandra")
193+
else f
194+
}
195+
190196
/** Skips the given test if the Cluster Version is lower or equal to the given version or the cluster is not DSE */
191197
def dseFrom(version: Version)(f: => Any): Unit = {
192198
dseOnly {

connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameSpec.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,14 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase with DefaultCl
7474
session.execute(s"create table $ks.date_test2 (key int primary key, dd date)")
7575
session.execute(s"insert into $ks.date_test (key, dd) values (1, '1930-05-31')")
7676
}
77-
}
77+
},
78+
79+
Future {
80+
session.execute(
81+
s"""
82+
|CREATE TABLE $ks.timeuuidtable (k INT, v TIMEUUID, PRIMARY KEY (k))
83+
|""".stripMargin)
84+
},
7885
)
7986
executor.waitForCurrentlyExecutingTasks()
8087
}
@@ -356,5 +363,23 @@ class CassandraDataFrameSpec extends SparkCassandraITFlatSpecBase with DefaultCl
356363

357364
}
358365

366+
it should "complain when non-equality predicate on timeuuid is detected" in cassandraOnly {
367+
val df = sparkSession
368+
.read
369+
.format("org.apache.spark.sql.cassandra")
370+
.options(
371+
Map(
372+
"table" -> "timeuuidtable",
373+
"keyspace" -> ks
374+
)
375+
)
376+
.load()
377+
.where(col("v") < "61129592-FBE4-11E3-A3AC-0800200C9A66")
378+
379+
380+
intercept[IllegalArgumentException] {
381+
df.count()
382+
}
383+
}
359384

360385
}

connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -69,20 +69,6 @@ class BasicCassandraPredicatePushDown[Predicate : PredicateOps](
6969
private def firstNonEmptySet[T](sets: Set[T]*): Set[T] =
7070
sets.find(_.nonEmpty).getOrElse(Set.empty[T])
7171

72-
73-
/** All non-equal predicates on a TimeUUID column are going to fail and fail
74-
* in silent way. The basic issue here is that when you use a comparison on
75-
* a time UUID column in C* it compares based on the Time portion of the UUID. When
76-
* Spark executes this filter (unhandled behavior) it will compare lexically, this
77-
* will lead to results being incorrectly filtered out of the set. As long as the
78-
* range predicate is handled completely by the connector the correct result
79-
* will be obtained.
80-
*/
81-
val timeUUIDNonEqual = {
82-
val timeUUIDCols = table.columns.filter(x => x.columnType == TimeUUIDType)
83-
timeUUIDCols.flatMap(col => rangePredicatesByName.get(col.columnName)).flatten
84-
}
85-
8672
/** Evaluates set of columns used in equality predicates and set of columns use in 'in' predicates.
8773
* Evaluation is based on protocol version. */
8874
private def partitionKeyPredicateColumns(pv: ProtocolVersion): (Seq[String], Seq[String]) = {
@@ -197,20 +183,4 @@ class BasicCassandraPredicatePushDown[Predicate : PredicateOps](
197183
val predicatesToPreserve: Set[Predicate] =
198184
predicates -- predicatesToPushDown
199185

200-
201-
val unhandledTimeUUIDNonEqual = {
202-
timeUUIDNonEqual.toSet -- predicatesToPushDown
203-
}
204-
205-
require(unhandledTimeUUIDNonEqual.isEmpty,
206-
s"""
207-
| You are attempting to do a non-equality comparison on a TimeUUID column in Spark.
208-
| Spark can only compare TimeUUIDs Lexically which means that the comparison will be
209-
| different than the comparison done in C* which is done based on the Time Portion of
210-
| TimeUUID. This will in almost all cases lead to incorrect results. If possible restrict
211-
| doing a TimeUUID comparison only to columns which can be pushed down to Cassandra.
212-
| https://datastax-oss.atlassian.net/browse/SPARKC-405.
213-
|
214-
| $unhandledTimeUUIDNonEqual
215-
""".stripMargin)
216186
}

connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,8 @@ case class CassandraSourceRelation(
279279
DsePredicateRules,
280280
InClausePredicateRules) ++
281281
solrPredicateRules ++
282-
additionalRules
282+
additionalRules :+
283+
TimeUUIDPredicateRules
283284

284285
/** Apply non-basic rules **/
285286
val finalPushdown = predicatePushDownRules.foldLeft(basicPushdown)(
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Please see the included license file for details.
5+
*/
6+
7+
package org.apache.spark.sql.cassandra
8+
9+
import com.datastax.spark.connector.cql.TableDef
10+
import com.datastax.spark.connector.types.TimeUUIDType
11+
import com.datastax.spark.connector.util.Logging
12+
import org.apache.spark.SparkConf
13+
import org.apache.spark.sql.cassandra.PredicateOps.FilterOps
14+
import org.apache.spark.sql.sources.Filter
15+
16+
/** All non-equal predicates on a TimeUUID column are going to fail and fail
17+
* in silent way. The basic issue here is that when you use a comparison on
18+
* a time UUID column in C* it compares based on the Time portion of the UUID. When
19+
* Spark executes this filter (unhandled behavior) it will compare lexically, this
20+
* will lead to results being incorrectly filtered out of the set. As long as the
21+
* range predicate is handled completely by the connector the correct result
22+
* will be obtained.
23+
*/
24+
object TimeUUIDPredicateRules extends CassandraPredicateRules with Logging {
25+
26+
private def isTimeUUIDNonEqualPredicate(tableDef: TableDef, predicate: Filter): Boolean = {
27+
if (FilterOps.isSingleColumnPredicate(predicate)) {
28+
// extract column name only from single column predicates, otherwise an exception is thrown
29+
val columnName = FilterOps.columnName(predicate)
30+
val isRange = FilterOps.isRangePredicate(predicate)
31+
val isTimeUUID = tableDef.columnByName.get(columnName).exists(_.columnType == TimeUUIDType)
32+
isRange && isTimeUUID
33+
} else {
34+
false
35+
}
36+
}
37+
38+
override def apply(predicates: AnalyzedPredicates, tableDef: TableDef, conf: SparkConf): AnalyzedPredicates = {
39+
val unhandledTimeUUIDNonEqual = predicates.handledBySpark.filter(isTimeUUIDNonEqualPredicate(tableDef, _))
40+
require(unhandledTimeUUIDNonEqual.isEmpty,
41+
s"""
42+
| You are attempting to do a non-equality comparison on a TimeUUID column in Spark.
43+
| Spark can only compare TimeUUIDs Lexically which means that the comparison will be
44+
| different than the comparison done in C* which is done based on the Time Portion of
45+
| TimeUUID. This will in almost all cases lead to incorrect results. If possible restrict
46+
| doing a TimeUUID comparison only to columns which can be pushed down to Cassandra.
47+
| https://datastax-oss.atlassian.net/browse/SPARKC-405.
48+
|
49+
| $unhandledTimeUUIDNonEqual
50+
""".stripMargin)
51+
52+
predicates
53+
}
54+
}

connector/src/test/scala/org/apache/spark/sql/cassandra/PredicatePushDownSpec.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,6 @@ class PredicatePushDownSpec extends FlatSpec with Matchers {
8282
ppd.predicatesToPreserve shouldBe empty
8383
}
8484

85-
it should " break if the user tries to use a TimeUUID on a fully unhandled predicate" in {
86-
val f1 = GtFilter("t1")
87-
88-
val ex = intercept[IllegalArgumentException] {
89-
val ppd = new BasicCassandraPredicatePushDown(Set[Filter](f1), timeUUIDTable)
90-
}
91-
}
92-
9385
it should " work if the user tries to use a TimeUUID on a fully handled predicate" in {
9486
val f1 = GtFilter("c1")
9587
val ppd = new BasicCassandraPredicatePushDown(Set[Filter](f1), timeUUIDTable)

0 commit comments

Comments
 (0)