Skip to content

Commit 9ab28e1

Browse files
committed
SPARKC-642 Fix retrieving the full list of local DC nodes by selecting non-IGNORED nodes
Local DC nodes are assigned LOCAL or REMOTE distance (i.e. non-IGNORED) by LocalNodeFirstLoadBalancingPolicy. The code doing the determination of local DC nodes is put back in method dataCenterNodes(), which throws a ConnectorConfigurationException if no node is found.
1 parent 08bf6eb commit 9ab28e1

File tree

1 file changed

+10
-17
lines changed

1 file changed

+10
-17
lines changed

connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@ import java.net.{InetAddress, InetSocketAddress}
55

66
import com.datastax.oss.driver.api.core.CqlSession
77
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance
8-
import com.datastax.spark.connector.cql.CassandraConnectorConf.CassandraSSLConf
98
import com.datastax.spark.connector.types.TypeAdapters.{ValueByNameAdapter, ValuesSeqAdapter}
109
import com.datastax.spark.connector.types.{NullableTypeConverter, TypeConverter}
1110
import com.datastax.spark.connector.util.ConfigCheck.ConnectorConfigurationException
1211
import com.datastax.spark.connector.util.DriverUtil.toAddress
13-
import com.datastax.spark.connector.util.{DriverUtil, Logging, SerialShutdownHooks}
12+
import com.datastax.spark.connector.util.{Logging, SerialShutdownHooks}
1413
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
15-
import org.apache.spark.{SparkConf, SparkContext, SparkFiles}
14+
import org.apache.spark.{SparkConf, SparkContext}
1615

1716
import scala.collection.JavaConverters._
1817
import scala.language.reflectiveCalls
@@ -65,14 +64,8 @@ class CassandraConnector(val conf: CassandraConnectorConf)
6564
def hosts: Set[InetSocketAddress] =
6665
// wrapped in a session, so we get full lists of hosts,
6766
// not only those explicitly passed in the conf
68-
withSessionDo {
69-
_.getMetadata
70-
.getNodes
71-
.values
72-
.asScala
73-
.filter(_.getDistance == NodeDistance.LOCAL)
74-
.flatMap(DriverUtil.toAddress)
75-
.toSet
67+
withSessionDo { session =>
68+
dataCenterNodes(_config, session)
7669
}
7770

7871
private[connector] def hostAddresses: Set[InetAddress] = hosts.map(_.getAddress)
@@ -187,14 +180,14 @@ object CassandraConnector extends Logging {
187180
logInfo(s"Disconnected from Cassandra cluster.")
188181
}
189182

190-
private def dataCenterNodes(conf: CassandraConnectorConf, ipConf: IpBasedContactInfo, session: CqlSession): Set[InetSocketAddress] = {
183+
// LocalNodeFirstLoadBalancingPolicy assigns LOCAL or REMOTE (i.e. non-IGNORED) distance to local DC nodes
184+
private def dataCenterNodes(conf: CassandraConnectorConf, session: CqlSession): Set[InetSocketAddress] = {
191185
val allNodes = session.getMetadata.getNodes.asScala.values.toSet
192-
val dcToUse = conf.localDC.getOrElse(LocalNodeFirstLoadBalancingPolicy.determineDataCenter(ipConf.hosts, allNodes))
193186
val nodes = allNodes
194-
.collect { case n if n.getDatacenter == dcToUse => toAddress(n) }
195-
.flatten
187+
.filter(_.getDistance != NodeDistance.IGNORED)
188+
.flatMap(toAddress)
196189
if (nodes.isEmpty) {
197-
throw new ConnectorConfigurationException(s"Could not determine suitable nodes for DC: $dcToUse and known nodes: " +
190+
throw new ConnectorConfigurationException(s"Could not determine suitable nodes in local DC for known nodes: " +
198191
s"${allNodes.map(n => (n.getHostId, toAddress(n))).mkString(", ")}")
199192
}
200193
nodes
@@ -204,7 +197,7 @@ object CassandraConnector extends Logging {
204197
private def alternativeConnectionConfigs(conf: CassandraConnectorConf, session: CqlSession): Set[CassandraConnectorConf] = {
205198
conf.contactInfo match {
206199
case ipConf: IpBasedContactInfo =>
207-
val nodes = dataCenterNodes(conf, ipConf, session)
200+
val nodes = dataCenterNodes(conf, session)
208201
nodes.map(n => conf.copy(contactInfo = ipConf.copy(hosts = Set(n)))) + conf.copy(contactInfo = ipConf.copy(hosts = nodes))
209202
case _ => Set.empty
210203
}

0 commit comments

Comments
 (0)