Skip to content

Commit 5ff1ec3

Browse files
authored
Merge pull request #1302 from yohannrub/SPARKC-642-b2.5
SPARKC-642 repartitionByCassandraReplica relocates data to the local node only
2 parents 7ca460e + 1d7f0a1 commit 5ff1ec3

File tree

4 files changed

+104
-42
lines changed

4 files changed

+104
-42
lines changed

connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,15 @@ trait TwoClustersWithOneNode extends Fixture {
187187
override def cluster(c: Int): Cluster = ClusterHolder.get(this)(c)
188188
}
189189

190+
/** A fixture that bootstraps on single cluster with two nodes. */
191+
trait TwoNodeCluster extends SingleClusterFixture {
192+
193+
private[cluster] final override val configs: Seq[CcmConfig] = Seq(defaultConfig.copy(nodes = Seq(1, 2)))
194+
195+
private[cluster] override def connectionParameters(address: InetSocketAddress): Map[String, String] =
196+
DefaultCluster.defaultConnectionParameters(address)
197+
}
198+
190199
trait CETCluster extends DefaultCluster
191200

192201
trait CSTCluster extends DefaultCluster
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.datastax.spark.connector.rdd
2+
3+
import java.lang.{Long => JLong}
4+
5+
import com.datastax.spark.connector._
6+
import com.datastax.spark.connector.cluster.TwoNodeCluster
7+
import com.datastax.spark.connector.cql.CassandraConnector
8+
import com.datastax.spark.connector.rdd.partitioner.EndpointPartition
9+
10+
import scala.concurrent.Future
11+
12+
class ReplicaRepartitionedCassandraRDDSpec extends SparkCassandraITFlatSpecBase with TwoNodeCluster {
13+
14+
override lazy val conn = CassandraConnector(defaultConf)
15+
val tableName = "key_value"
16+
val keys = 0 to 200
17+
val total = 0 to 10000
18+
19+
override def beforeClass {
20+
conn.withSessionDo { session =>
21+
createKeyspace(session)
22+
val startTime = System.currentTimeMillis()
23+
24+
val executor = getExecutor(session)
25+
26+
awaitAll(
27+
Future {
28+
session.execute(
29+
s"""
30+
|CREATE TABLE $ks.$tableName (
31+
| key INT,
32+
| group BIGINT,
33+
| value TEXT,
34+
| PRIMARY KEY (key, group)
35+
|)""".stripMargin)
36+
val ps = session
37+
.prepare(s"""INSERT INTO $ks.$tableName (key, group, value) VALUES (?, ?, ?)""")
38+
awaitAll((for (value <- total) yield
39+
executor.executeAsync(ps.bind(value: Integer, (value * 100).toLong: JLong, value.toString))): _*)
40+
}
41+
)
42+
executor.waitForCurrentlyExecutingTasks()
43+
println(s"Took ${(System.currentTimeMillis() - startTime) / 1000.0} Seconds to setup Suite Data")
44+
}
45+
}
46+
47+
def checkArrayCassandraRow[T](result: Array[(T, CassandraRow)]) = {
48+
markup("Checking RightSide Join Results")
49+
result.length should be(keys.length)
50+
for (key <- keys) {
51+
val sorted_result = result.map(_._2).sortBy(_.getInt(0))
52+
sorted_result(key).getInt("key") should be(key)
53+
sorted_result(key).getLong("group") should be(key * 100)
54+
sorted_result(key).getString("value") should be(key.toString)
55+
}
56+
}
57+
58+
"A Tuple RDD specifying partition keys" should "be repartitionable" in {
59+
val source = sc.parallelize(keys).map(Tuple1(_))
60+
val repart = source.repartitionByCassandraReplica(ks, tableName, 10)
61+
repart.partitions.length should be(conn.hosts.size * 10)
62+
conn.hosts.size should be(2)
63+
conn.hosts should be(cluster.addresses.toSet)
64+
val someCass = repart.joinWithCassandraTable(ks, tableName)
65+
someCass.partitions.foreach {
66+
case e: EndpointPartition =>
67+
conn.hostAddresses should contain(e.endpoints.head)
68+
case _ =>
69+
fail("Unable to get endpoints on repartitioned RDD, This means preferred locations will be broken")
70+
}
71+
val result = someCass.collect
72+
checkArrayCassandraRow(result)
73+
}
74+
75+
it should "be deterministically repartitionable" in {
76+
val source = sc.parallelize(keys).map(Tuple1(_))
77+
val repartRDDs = (1 to 10).map(_ =>
78+
source
79+
.repartitionByCassandraReplica(ks, tableName, 10)
80+
.mapPartitionsWithIndex((index, it) => it.map((_, index))))
81+
val first = repartRDDs(1).collect
82+
repartRDDs.foreach(rdd => rdd.collect should be(first))
83+
}
84+
85+
}

connector/src/main/scala/com/datastax/spark/connector/RDDFunctions.scala

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -237,31 +237,6 @@ class RDDFunctions[T](rdd: RDD[T]) extends WritableToCassandra[T] with Serializa
237237
currentType: ClassTag[T],
238238
rwf: RowWriterFactory[T]): CassandraPartitionedRDD[T] = {
239239

240-
val replicaLocator = ReplicaLocator[T](connector, keyspaceName, tableName, partitionKeyMapper)
241-
rdd.repartitionByCassandraReplica(
242-
replicaLocator,
243-
keyspaceName,
244-
tableName,
245-
partitionsPerHost,
246-
partitionKeyMapper)
247-
}
248-
249-
250-
/**
251-
* A Serializable version of repartitionByCassandraReplica which removes
252-
* the implicit RowWriterFactory Dependency
253-
*/
254-
private[connector] def repartitionByCassandraReplica(
255-
replicaLocator: ReplicaLocator[T],
256-
keyspaceName: String,
257-
tableName: String,
258-
partitionsPerHost: Int,
259-
partitionKeyMapper: ColumnSelector)(
260-
implicit
261-
connector: CassandraConnector,
262-
currentType: ClassTag[T],
263-
rwf: RowWriterFactory[T]): CassandraPartitionedRDD[T] = {
264-
265240
val partitioner = new ReplicaPartitioner[T](
266241
tableName,
267242
keyspaceName,

connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@ import java.net.{InetAddress, InetSocketAddress}
55

66
import com.datastax.oss.driver.api.core.CqlSession
77
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance
8-
import com.datastax.spark.connector.cql.CassandraConnectorConf.CassandraSSLConf
98
import com.datastax.spark.connector.types.TypeAdapters.{ValueByNameAdapter, ValuesSeqAdapter}
109
import com.datastax.spark.connector.types.{NullableTypeConverter, TypeConverter}
1110
import com.datastax.spark.connector.util.ConfigCheck.ConnectorConfigurationException
1211
import com.datastax.spark.connector.util.DriverUtil.toAddress
13-
import com.datastax.spark.connector.util.{DriverUtil, Logging, SerialShutdownHooks}
12+
import com.datastax.spark.connector.util.{Logging, SerialShutdownHooks}
1413
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
15-
import org.apache.spark.{SparkConf, SparkContext, SparkFiles}
14+
import org.apache.spark.{SparkConf, SparkContext}
1615

1716
import scala.collection.JavaConverters._
1817
import scala.language.reflectiveCalls
@@ -65,14 +64,8 @@ class CassandraConnector(val conf: CassandraConnectorConf)
6564
def hosts: Set[InetSocketAddress] =
6665
// wrapped in a session, so we get full lists of hosts,
6766
// not only those explicitly passed in the conf
68-
withSessionDo {
69-
_.getMetadata
70-
.getNodes
71-
.values
72-
.asScala
73-
.filter(_.getDistance == NodeDistance.LOCAL)
74-
.flatMap(DriverUtil.toAddress)
75-
.toSet
67+
withSessionDo { session =>
68+
dataCenterNodes(session)
7669
}
7770

7871
private[connector] def hostAddresses: Set[InetAddress] = hosts.map(_.getAddress)
@@ -187,14 +180,14 @@ object CassandraConnector extends Logging {
187180
logInfo(s"Disconnected from Cassandra cluster.")
188181
}
189182

190-
private def dataCenterNodes(conf: CassandraConnectorConf, ipConf: IpBasedContactInfo, session: CqlSession): Set[InetSocketAddress] = {
183+
// LocalNodeFirstLoadBalancingPolicy assigns LOCAL or REMOTE (i.e. non-IGNORED) distance to local DC nodes
184+
private def dataCenterNodes(session: CqlSession): Set[InetSocketAddress] = {
191185
val allNodes = session.getMetadata.getNodes.asScala.values.toSet
192-
val dcToUse = conf.localDC.getOrElse(LocalNodeFirstLoadBalancingPolicy.determineDataCenter(ipConf.hosts, allNodes))
193186
val nodes = allNodes
194-
.collect { case n if n.getDatacenter == dcToUse => toAddress(n) }
195-
.flatten
187+
.filter(_.getDistance != NodeDistance.IGNORED)
188+
.flatMap(toAddress)
196189
if (nodes.isEmpty) {
197-
throw new ConnectorConfigurationException(s"Could not determine suitable nodes for DC: $dcToUse and known nodes: " +
190+
throw new ConnectorConfigurationException(s"Could not determine suitable nodes in local DC for known nodes: " +
198191
s"${allNodes.map(n => (n.getHostId, toAddress(n))).mkString(", ")}")
199192
}
200193
nodes
@@ -204,7 +197,7 @@ object CassandraConnector extends Logging {
204197
private def alternativeConnectionConfigs(conf: CassandraConnectorConf, session: CqlSession): Set[CassandraConnectorConf] = {
205198
conf.contactInfo match {
206199
case ipConf: IpBasedContactInfo =>
207-
val nodes = dataCenterNodes(conf, ipConf, session)
200+
val nodes = dataCenterNodes(session)
208201
nodes.map(n => conf.copy(contactInfo = ipConf.copy(hosts = Set(n)))) + conf.copy(contactInfo = ipConf.copy(hosts = nodes))
209202
case _ => Set.empty
210203
}

0 commit comments

Comments
 (0)