Skip to content

Commit c96f87b

Browse files
committed
Adjust for zero token nodes
Adds ZeroTokenNodesIT that checks the behaviour of the driver when zero-token nodes are involved. Changes behavior of the driver in regards to zero-token nodes. Those nodes will be ignored if encountered in `system.peers` table. If provided as a contact point, the driver will attempt to connect to it, but will not populate metadata with it. This results in zero-token nodes being not included in query planning. Note that the `isValidPeer` method had the check for nullity of `tokens` column moved from extended peer check to the regular check.
1 parent 204be3a commit c96f87b

File tree

4 files changed

+323
-9
lines changed

4 files changed

+323
-9
lines changed

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,12 @@ CloseFuture closeAsync() {
131131

132132
Host connectedHost() {
133133
Connection current = connectionRef.get();
134-
return (current == null) ? null : cluster.metadata.getHost(current.endPoint);
134+
if (current == null) return null;
135+
Host host = cluster.metadata.getHost(current.endPoint);
136+
// If the host is not in metadata, then it may be zero-token contact point
137+
if (host == null && cluster.configuration.getQueryOptions().shouldSkipZeroTokenNodes())
138+
host = cluster.metadata.getContactPoint(current.endPoint);
139+
return host;
135140
}
136141

137142
void triggerReconnect() {
@@ -392,6 +397,11 @@ static void refreshSchema(
392397
throws ConnectionException, BusyConnectionException, ExecutionException,
393398
InterruptedException {
394399
Host host = cluster.metadata.getHost(connection.endPoint);
400+
// Host may have been deliberately not added to metadata, because it's a zero-token node
401+
// Try checking contact points if its null:
402+
if (host == null && cluster.configuration.getQueryOptions().shouldSkipZeroTokenNodes()) {
403+
host = cluster.metadata.getContactPoint(connection.endPoint);
404+
}
395405
// Neither host, nor it's version should be null. But instead of dying if there is a race or
396406
// something, we can kind of try to infer
397407
// a Cassandra version from the protocol version (this is not full proof, we can have the
@@ -826,7 +836,14 @@ private void refreshNodeListAndTokenMap(
826836
}
827837
}
828838
if (isInitialConnection) {
829-
cluster.metadata.addIfAbsent(controlHost);
839+
if (localRow.isNull("tokens")
840+
&& cluster.configuration.getQueryOptions().shouldSkipZeroTokenNodes()) {
841+
logger.warn(
842+
"Control host ({}) is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.",
843+
connection.endPoint);
844+
} else {
845+
cluster.metadata.addIfAbsent(controlHost);
846+
}
830847
}
831848
}
832849

@@ -984,7 +1001,10 @@ private static Set<Token> toTokens(Token.Factory factory, Set<String> tokensStr)
9841001

9851002
private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
9861003
boolean isValid =
987-
peerRow.getColumnDefinitions().contains("host_id") && !peerRow.isNull("host_id");
1004+
peerRow.getColumnDefinitions().contains("host_id")
1005+
&& !peerRow.isNull("host_id")
1006+
&& peerRow.getColumnDefinitions().contains("tokens")
1007+
&& !peerRow.isNull("tokens");
9881008

9891009
if (isPeersV2) {
9901010
isValid &=
@@ -1006,14 +1026,12 @@ private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
10061026
peerRow.getColumnDefinitions().contains("data_center")
10071027
&& !peerRow.isNull("data_center")
10081028
&& peerRow.getColumnDefinitions().contains("rack")
1009-
&& !peerRow.isNull("rack")
1010-
&& peerRow.getColumnDefinitions().contains("tokens")
1011-
&& !peerRow.isNull("tokens");
1029+
&& !peerRow.isNull("rack");
10121030
}
10131031
if (!isValid && logIfInvalid)
10141032
logger.warn(
10151033
"Found invalid row in system.peers: {}. "
1016-
+ "This is likely a gossip or snitch issue, this host will be ignored.",
1034+
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
10171035
formatInvalidPeer(peerRow));
10181036
return isValid;
10191037
}

driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class QueryOptions {
7474

7575
private volatile boolean addOriginalContactsToReconnectionPlan = false;
7676

77+
private volatile boolean skipZeroTokenNodes = false;
78+
7779
/**
7880
* Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL},
7981
* {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}.
@@ -521,6 +523,20 @@ public boolean shouldAddOriginalContactsToReconnectionPlan() {
521523
return this.addOriginalContactsToReconnectionPlan;
522524
}
523525

526+
/**
527+
* Whether the driver should skip adding zero-token nodes to the metadata's hosts set. This mostly
528+
* makes a difference for zero-token contact points, because driver won't reach other zero-token
529+
* nodes since their records in system.peers are incomplete, thus considered invalid and omitted.
530+
*/
531+
public QueryOptions setSkipZeroTokenNodes(boolean enabled) {
532+
this.skipZeroTokenNodes = enabled;
533+
return this;
534+
}
535+
536+
public boolean shouldSkipZeroTokenNodes() {
537+
return this.skipZeroTokenNodes;
538+
}
539+
524540
@Override
525541
public boolean equals(Object that) {
526542
if (that == null || !(that instanceof QueryOptions)) {

driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,13 @@ static void run_with_null_peer_info(String columns, boolean expectPeer2, boolean
343343
expectedError =
344344
String.format(
345345
"Found invalid row in system.peers: [peer=%s, %s]. "
346-
+ "This is likely a gossip or snitch issue, this host will be ignored.",
346+
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
347347
node2Address, columnData);
348348
} else {
349349
expectedError =
350350
String.format(
351351
"Found invalid row in system.peers: [peer=%s, %s%s%s%s]. "
352-
+ "This is likely a gossip or snitch issue, this host will be ignored.",
352+
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
353353
node2Address,
354354
!splitColumnsSet.contains("native_transport_address")
355355
? "missing native_transport_address, "
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
package com.datastax.driver.core;
2+
3+
import static org.apache.log4j.Level.WARN;
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
6+
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
7+
import com.datastax.driver.core.policies.LoadBalancingPolicy;
8+
import com.datastax.driver.core.policies.RoundRobinPolicy;
9+
import com.datastax.driver.core.policies.TokenAwarePolicy;
10+
import com.datastax.driver.core.utils.ScyllaVersion;
11+
import java.net.InetSocketAddress;
12+
import java.util.Set;
13+
import java.util.stream.Collectors;
14+
import org.apache.log4j.Level;
15+
import org.apache.log4j.Logger;
16+
import org.testng.annotations.AfterMethod;
17+
import org.testng.annotations.BeforeMethod;
18+
import org.testng.annotations.DataProvider;
19+
import org.testng.annotations.Test;
20+
21+
public class ZeroTokenNodesIT {
22+
private Logger logger = Logger.getLogger(ControlConnection.class);
23+
private MemoryAppender appender;
24+
private Level originalLevel;
25+
26+
@DataProvider(name = "loadBalancingPolicies")
27+
public static Object[][] loadBalancingPolicies() {
28+
return new Object[][] {
29+
{DCAwareRoundRobinPolicy.builder().build()},
30+
{new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())},
31+
{new TokenAwarePolicy(new RoundRobinPolicy())}
32+
};
33+
}
34+
35+
@BeforeMethod(groups = "short")
36+
public void startCapturingLogs() {
37+
originalLevel = logger.getLevel();
38+
logger.setLevel(WARN);
39+
logger.addAppender(appender = new MemoryAppender());
40+
}
41+
42+
@AfterMethod(groups = "short")
43+
public void stopCapturingLogs() {
44+
logger.setLevel(originalLevel);
45+
logger.removeAppender(appender);
46+
}
47+
48+
@Test(groups = "short")
49+
@ScyllaVersion(
50+
minOSS = "6.2.0",
51+
minEnterprise = "2024.2.2",
52+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
53+
public void should_ignore_zero_token_peer() {
54+
// Given 4 node cluster with 1 zero-token node and normal contact point,
55+
// make sure that it's not included in the metadata.
56+
// By extension, it won't be included in the query planning.
57+
Cluster cluster = null;
58+
Session session = null;
59+
CCMBridge ccmBridge = null;
60+
try {
61+
ccmBridge =
62+
CCMBridge.builder().withNodes(3).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
63+
ccmBridge.start();
64+
ccmBridge.add(4);
65+
ccmBridge.updateNodeConfig(4, "join_ring", false);
66+
ccmBridge.start(4);
67+
ccmBridge.waitForUp(4);
68+
69+
cluster =
70+
Cluster.builder()
71+
.withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true))
72+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
73+
.withPort(9042)
74+
.withoutAdvancedShardAwareness()
75+
.build();
76+
session = cluster.connect();
77+
78+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
79+
.isEqualTo(ccmBridge.addressOfNode(1));
80+
String line = null;
81+
try {
82+
line = appender.waitAndGet(5000);
83+
} catch (InterruptedException e) {
84+
throw new RuntimeException(e);
85+
}
86+
assertThat(line).contains("Found invalid row in system.peers");
87+
assertThat(line).contains("tokens=null");
88+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
89+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
90+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
91+
} finally {
92+
if (ccmBridge != null) ccmBridge.close();
93+
if (session != null) session.close();
94+
if (cluster != null) cluster.close();
95+
}
96+
}
97+
98+
@Test(groups = "short")
99+
@ScyllaVersion(
100+
minOSS = "6.2.0",
101+
minEnterprise = "2024.2.2",
102+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
103+
public void should_ignore_zero_token_DC() {
104+
Cluster cluster = null;
105+
Session session = null;
106+
CCMBridge ccmBridge = null;
107+
try {
108+
ccmBridge =
109+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
110+
ccmBridge.start();
111+
ccmBridge.add(2, 3);
112+
ccmBridge.updateNodeConfig(3, "join_ring", false);
113+
ccmBridge.start(3);
114+
ccmBridge.add(2, 4);
115+
ccmBridge.updateNodeConfig(4, "join_ring", false);
116+
ccmBridge.start(4);
117+
ccmBridge.waitForUp(3);
118+
ccmBridge.waitForUp(4);
119+
120+
cluster =
121+
Cluster.builder()
122+
.withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true))
123+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
124+
.withPort(9042)
125+
.withoutAdvancedShardAwareness()
126+
.build();
127+
session = cluster.connect();
128+
129+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
130+
.isEqualTo(ccmBridge.addressOfNode(1));
131+
for (int i = 0; i < 2; i++) {
132+
String line = null;
133+
try {
134+
line = appender.waitAndGet(5000);
135+
} catch (InterruptedException e) {
136+
throw new RuntimeException(e);
137+
}
138+
assertThat(line).contains("Found invalid row in system.peers");
139+
assertThat(line).contains("tokens=null");
140+
}
141+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
142+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
143+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
144+
} finally {
145+
if (ccmBridge != null) ccmBridge.close();
146+
if (session != null) session.close();
147+
if (cluster != null) cluster.close();
148+
}
149+
}
150+
151+
@Test(groups = "short", dataProvider = "loadBalancingPolicies")
152+
@ScyllaVersion(
153+
minOSS = "6.2.0",
154+
minEnterprise = "2024.2.2",
155+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
156+
public void should_connect_to_zero_token_contact_point(LoadBalancingPolicy loadBalancingPolicy) {
157+
Cluster cluster = null;
158+
Session session = null;
159+
CCMBridge ccmBridge = null;
160+
161+
try {
162+
ccmBridge =
163+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
164+
ccmBridge.start();
165+
ccmBridge.add(3);
166+
ccmBridge.updateNodeConfig(3, "join_ring", false);
167+
ccmBridge.start(3);
168+
ccmBridge.waitForUp(3);
169+
170+
cluster =
171+
Cluster.builder()
172+
.withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true))
173+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
174+
.withPort(9042)
175+
.withLoadBalancingPolicy(loadBalancingPolicy)
176+
.withoutAdvancedShardAwareness()
177+
.build();
178+
session = cluster.connect();
179+
180+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
181+
.isEqualTo(ccmBridge.addressOfNode(3));
182+
String line = null;
183+
try {
184+
line = appender.waitAndGet(5000);
185+
} catch (InterruptedException e) {
186+
throw new RuntimeException(e);
187+
}
188+
assertThat(line)
189+
.contains(
190+
"is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.");
191+
assertThat(line).contains(ccmBridge.ipOfNode(3));
192+
193+
session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
194+
session.execute(
195+
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
196+
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
197+
for (int i = 0; i < 30; i++) {
198+
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
199+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
200+
.isNotEqualTo(ccmBridge.addressOfNode(3));
201+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
202+
.isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2));
203+
}
204+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
205+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
206+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
207+
} finally {
208+
if (ccmBridge != null) ccmBridge.close();
209+
if (session != null) session.close();
210+
if (cluster != null) cluster.close();
211+
}
212+
}
213+
214+
@Test(groups = "short", dataProvider = "loadBalancingPolicies")
215+
@ScyllaVersion(
216+
minOSS = "6.2.0",
217+
minEnterprise = "2024.2.2",
218+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
219+
public void should_connect_to_zero_token_DC(LoadBalancingPolicy loadBalancingPolicy) {
220+
Cluster cluster = null;
221+
Session session = null;
222+
CCMBridge ccmBridge = null;
223+
224+
try {
225+
ccmBridge =
226+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
227+
ccmBridge.start();
228+
ccmBridge.add(2, 3);
229+
ccmBridge.updateNodeConfig(3, "join_ring", false);
230+
ccmBridge.start(3);
231+
ccmBridge.add(2, 4);
232+
ccmBridge.updateNodeConfig(4, "join_ring", false);
233+
ccmBridge.start(4);
234+
ccmBridge.waitForUp(3);
235+
ccmBridge.waitForUp(4);
236+
237+
cluster =
238+
Cluster.builder()
239+
.withQueryOptions(new QueryOptions().setSkipZeroTokenNodes(true))
240+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
241+
.withPort(9042)
242+
.withLoadBalancingPolicy(loadBalancingPolicy)
243+
.withoutAdvancedShardAwareness()
244+
.build();
245+
session = cluster.connect();
246+
247+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
248+
.isEqualTo(ccmBridge.addressOfNode(3));
249+
String line = null;
250+
try {
251+
line = appender.waitAndGet(5000);
252+
} catch (InterruptedException e) {
253+
throw new RuntimeException(e);
254+
}
255+
assertThat(line)
256+
.contains(
257+
"is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.");
258+
assertThat(line).contains(ccmBridge.ipOfNode(3));
259+
260+
session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
261+
session.execute(
262+
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
263+
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
264+
for (int i = 0; i < 30; i++) {
265+
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
266+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
267+
.isNotEqualTo(ccmBridge.addressOfNode(3));
268+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
269+
.isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2));
270+
}
271+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
272+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
273+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
274+
} finally {
275+
if (ccmBridge != null) ccmBridge.close();
276+
if (session != null) session.close();
277+
if (cluster != null) cluster.close();
278+
}
279+
}
280+
}

0 commit comments

Comments
 (0)