Skip to content

Commit 2b77a7b

Browse files
authored
Merge pull request #1284 from datastax/SPARKC-612-2.5
SPARKC-612 throw a meaningful exception when reading a table view
2 parents afff248 + c9b83a7 commit 2b77a7b

File tree

4 files changed

+36
-5
lines changed

4 files changed

+36
-5
lines changed

connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,17 @@ trait SparkCassandraITSpecBase
158158

159159
def skipIfProtocolVersionGTE(protocolVersion: ProtocolVersion)(f: => Unit): Unit = {
160160
if (!(pv.getCode >= protocolVersion.getCode)) f
161-
else report(s"Skipped Because ProtcolVersion $pv >= $protocolVersion")
161+
else report(s"Skipped Because ProtocolVersion $pv >= $protocolVersion")
162+
}
163+
164+
/** Skips the given test if the Cluster Version is lower or equal to the given `cassandra` Version or `dse` Version
165+
* (if this is a DSE cluster) */
166+
def skipIfLT(cassandra: Version, dse: Version)(f: => Unit): Unit = {
167+
if (isDse(conn)) {
168+
skipIfCassandraLT(dse)(f)
169+
} else {
170+
skipIfCassandraLT(cassandra)(f)
171+
}
162172
}
163173

164174
val Cass36: Version = Version.parse("3.6.0")
@@ -167,7 +177,7 @@ trait SparkCassandraITSpecBase
167177
val verOrd = implicitly[Ordering[Version]]
168178
import verOrd._
169179
if (cluster.getCassandraVersion >= cassandraVersion) f
170-
else report(s"Skipped because Cassandra Version ${cluster.getCassandraVersion} < $cassandraVersion")
180+
else report(s"Skipped because cluster Version ${cluster.getCassandraVersion} < $cassandraVersion")
171181
}
172182

173183
def skipIfProtocolVersionLT(protocolVersion: ProtocolVersion)(f: => Unit): Unit = {
@@ -176,11 +186,15 @@ trait SparkCassandraITSpecBase
176186
}
177187

178188
def skipIfNotDSE(connector: CassandraConnector)(f: => Unit): Unit = {
179-
val firstNodeExtras = connector.withSessionDo(_.getMetadata.getNodes.values().asScala.head.getExtras)
180-
if (firstNodeExtras.containsKey(DseNodeProperties.DSE_VERSION)) f
189+
if (isDse(connector)) f
181190
else report(s"Skipped because not DSE")
182191
}
183192

193+
private def isDse(connector: CassandraConnector): Boolean = {
194+
val firstNodeExtras = connector.withSessionDo(_.getMetadata.getNodes.values().asScala.head.getExtras)
195+
firstNodeExtras.containsKey(DseNodeProperties.DSE_VERSION)
196+
}
197+
184198
implicit val ec = SparkCassandraITSpecBase.ec
185199

186200
def await[T](unit: Future[T]): T = {

connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import java.util.Date
66

77
import com.datastax.oss.driver.api.core.DefaultProtocolVersion._
88
import com.datastax.oss.driver.api.core.config.DefaultDriverOption
9-
import com.datastax.oss.driver.api.core.cql.{BoundStatement, SimpleStatement}
9+
import com.datastax.oss.driver.api.core.cql.SimpleStatement
1010
import com.datastax.oss.driver.api.core.cql.SimpleStatement._
1111
import com.datastax.spark.connector._
12+
import com.datastax.spark.connector.ccm.CcmConfig.{V4_0_0, V6_7_0}
1213
import com.datastax.spark.connector.cluster.DefaultCluster
1314
import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf}
1415
import com.datastax.spark.connector.mapper.{DefaultColumnMapper, JavaBeanColumnMapper, JavaTestBean, JavaTestUDTBean}
@@ -1107,6 +1108,16 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase with DefaultCluster
11071108
)
11081109
}
11091110

1111+
it should "throw a meaningful exception when reading a table view" in skipIfLT(cassandra = V4_0_0, dse = V6_7_0) {
1112+
import org.apache.spark.sql.cassandra._
1113+
1114+
val ex = intercept[IllegalArgumentException] {
1115+
val data = spark.read.cassandraFormat("sstable_tasks", "system_views").load()
1116+
data.show()
1117+
}
1118+
ex.getMessage should contain("Table views are not supported")
1119+
}
1120+
11101121
it should "throw an exception when trying to write to a Materialized View" in skipIfProtocolVersionLT(V4){
11111122
intercept[IllegalArgumentException] {
11121123
sc.parallelize(Seq(("US", 1, "John", "DOE", "jdoe"))).saveToCassandra(ks, "user_by_country")

connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ private[connector] class CassandraPartitionGenerator[V, T <: Token[V]](
7979
def partitions: Seq[CassandraPartition[V, T]] = {
8080
val tokenRanges = describeRing
8181
val endpointCount = tokenRanges.map(_.replicas).reduce(_ ++ _).size
82+
if (endpointCount == 0)
83+
throw new IllegalArgumentException(s"Could not retrieve endpoints for the given table " +
84+
s"(${keyspaceName}.${tableDef.name}), are you trying to read a table view? Table views are not supported, " +
85+
s"see SPARKC-612.")
8286
val maxGroupSize = tokenRanges.size / endpointCount
8387

8488
val splitter = createTokenRangeSplitter

test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ object CcmConfig {
116116
val DEFAULT_SERVER_LOCALHOST_KEYSTORE_PATH: String = "/server_localhost.keystore"
117117

118118
// major DSE versions
119+
val V6_8_0: Version = Version.parse("6.8.0")
120+
val V6_7_0: Version = Version.parse("6.7.0")
119121
val V6_0_0: Version = Version.parse("6.0.0")
120122
val V5_1_0: Version = Version.parse("5.1.0")
121123
val V5_0_0: Version = Version.parse("5.0.0")

0 commit comments

Comments
 (0)