Skip to content

Commit a557147

Browse files
committed
Adjust for zero token nodes
Adds ZeroTokenNodesIT that checks the behaviour of the driver when zero-token nodes are involved. Changes behavior of the driver in regards to zero-token nodes. Those nodes will be ignored if encountered in `system.peers` table. If provided as a contact point, the driver will attempt to connect to it, but will not populate metadata with it. This results in zero-token nodes being not included in query planning.
1 parent 204be3a commit a557147

File tree

3 files changed

+301
-9
lines changed

3 files changed

+301
-9
lines changed

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,11 @@ CloseFuture closeAsync() {
131131

132132
Host connectedHost() {
133133
Connection current = connectionRef.get();
134-
return (current == null) ? null : cluster.metadata.getHost(current.endPoint);
134+
if (current == null) return null;
135+
Host host = cluster.metadata.getHost(current.endPoint);
136+
// If the host is not in metadata, then it may be zero-token contact point
137+
if (host == null) host = cluster.metadata.getContactPoint(current.endPoint);
138+
return host;
135139
}
136140

137141
void triggerReconnect() {
@@ -392,6 +396,11 @@ static void refreshSchema(
392396
throws ConnectionException, BusyConnectionException, ExecutionException,
393397
InterruptedException {
394398
Host host = cluster.metadata.getHost(connection.endPoint);
399+
// Host may have been deliberately not added to metadata, because it's a zero-token node
400+
// Try checking contact points if its null:
401+
if (host == null) {
402+
host = cluster.metadata.getContactPoint(connection.endPoint);
403+
}
395404
// Neither host, nor it's version should be null. But instead of dying if there is a race or
396405
// something, we can kind of try to infer
397406
// a Cassandra version from the protocol version (this is not full proof, we can have the
@@ -826,7 +835,13 @@ private void refreshNodeListAndTokenMap(
826835
}
827836
}
828837
if (isInitialConnection) {
829-
cluster.metadata.addIfAbsent(controlHost);
838+
if (localRow.isNull("tokens")) {
839+
logger.warn(
840+
"Control host ({}) is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.",
841+
connection.endPoint);
842+
} else {
843+
cluster.metadata.addIfAbsent(controlHost);
844+
}
830845
}
831846
}
832847

@@ -984,7 +999,10 @@ private static Set<Token> toTokens(Token.Factory factory, Set<String> tokensStr)
984999

9851000
private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
9861001
boolean isValid =
987-
peerRow.getColumnDefinitions().contains("host_id") && !peerRow.isNull("host_id");
1002+
peerRow.getColumnDefinitions().contains("host_id")
1003+
&& !peerRow.isNull("host_id")
1004+
&& peerRow.getColumnDefinitions().contains("tokens")
1005+
&& !peerRow.isNull("tokens");
9881006

9891007
if (isPeersV2) {
9901008
isValid &=
@@ -1006,14 +1024,12 @@ private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
10061024
peerRow.getColumnDefinitions().contains("data_center")
10071025
&& !peerRow.isNull("data_center")
10081026
&& peerRow.getColumnDefinitions().contains("rack")
1009-
&& !peerRow.isNull("rack")
1010-
&& peerRow.getColumnDefinitions().contains("tokens")
1011-
&& !peerRow.isNull("tokens");
1027+
&& !peerRow.isNull("rack");
10121028
}
10131029
if (!isValid && logIfInvalid)
10141030
logger.warn(
10151031
"Found invalid row in system.peers: {}. "
1016-
+ "This is likely a gossip or snitch issue, this host will be ignored.",
1032+
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
10171033
formatInvalidPeer(peerRow));
10181034
return isValid;
10191035
}

driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,13 @@ static void run_with_null_peer_info(String columns, boolean expectPeer2, boolean
343343
expectedError =
344344
String.format(
345345
"Found invalid row in system.peers: [peer=%s, %s]. "
346-
+ "This is likely a gossip or snitch issue, this host will be ignored.",
346+
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
347347
node2Address, columnData);
348348
} else {
349349
expectedError =
350350
String.format(
351351
"Found invalid row in system.peers: [peer=%s, %s%s%s%s]. "
352-
+ "This is likely a gossip or snitch issue, this host will be ignored.",
352+
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
353353
node2Address,
354354
!splitColumnsSet.contains("native_transport_address")
355355
? "missing native_transport_address, "
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
package com.datastax.driver.core;
2+
3+
import static org.apache.log4j.Level.WARN;
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
6+
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
7+
import com.datastax.driver.core.policies.LoadBalancingPolicy;
8+
import com.datastax.driver.core.policies.RoundRobinPolicy;
9+
import com.datastax.driver.core.policies.TokenAwarePolicy;
10+
import com.datastax.driver.core.utils.ScyllaVersion;
11+
import java.net.InetSocketAddress;
12+
import java.util.Set;
13+
import java.util.stream.Collectors;
14+
import org.apache.log4j.Level;
15+
import org.apache.log4j.Logger;
16+
import org.testng.annotations.AfterMethod;
17+
import org.testng.annotations.BeforeMethod;
18+
import org.testng.annotations.DataProvider;
19+
import org.testng.annotations.Test;
20+
21+
public class ZeroTokenNodesIT {
22+
private Logger logger = Logger.getLogger(ControlConnection.class);
23+
private MemoryAppender appender;
24+
private Level originalLevel;
25+
26+
@DataProvider(name = "loadBalancingPolicies")
27+
public static Object[][] loadBalancingPolicies() {
28+
return new Object[][] {
29+
{DCAwareRoundRobinPolicy.builder().build()},
30+
{new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())},
31+
{new TokenAwarePolicy(new RoundRobinPolicy())}
32+
};
33+
}
34+
35+
@BeforeMethod(groups = "short")
36+
public void startCapturingLogs() {
37+
originalLevel = logger.getLevel();
38+
logger.setLevel(WARN);
39+
logger.addAppender(appender = new MemoryAppender());
40+
}
41+
42+
@AfterMethod(groups = "short")
43+
public void stopCapturingLogs() {
44+
logger.setLevel(originalLevel);
45+
logger.removeAppender(appender);
46+
}
47+
48+
@Test(groups = "short")
49+
@ScyllaVersion(
50+
minOSS = "6.2.0",
51+
minEnterprise = "2024.2.2",
52+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
53+
public void should_ignore_zero_token_peer() {
54+
// Given 4 node cluster with 1 zero-token node and normal contact point,
55+
// make sure that it's not included in the metadata.
56+
// By extension, it won't be included in the query planning.
57+
Cluster cluster = null;
58+
Session session = null;
59+
CCMBridge ccmBridge = null;
60+
try {
61+
ccmBridge =
62+
CCMBridge.builder().withNodes(3).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
63+
ccmBridge.start();
64+
ccmBridge.add(4);
65+
ccmBridge.updateNodeConfig(4, "join_ring", false);
66+
ccmBridge.start(4);
67+
ccmBridge.waitForUp(4);
68+
69+
cluster =
70+
Cluster.builder()
71+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
72+
.withPort(9042)
73+
.withoutAdvancedShardAwareness()
74+
.build();
75+
session = cluster.connect();
76+
77+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
78+
.isEqualTo(ccmBridge.addressOfNode(1));
79+
String line = null;
80+
try {
81+
line = appender.waitAndGet(5000);
82+
} catch (InterruptedException e) {
83+
throw new RuntimeException(e);
84+
}
85+
assertThat(line).contains("Found invalid row in system.peers");
86+
assertThat(line).contains("tokens=null");
87+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
88+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
89+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
90+
} finally {
91+
if (ccmBridge != null) ccmBridge.close();
92+
if (session != null) session.close();
93+
if (cluster != null) cluster.close();
94+
}
95+
}
96+
97+
@Test(groups = "short")
98+
@ScyllaVersion(
99+
minOSS = "6.2.0",
100+
minEnterprise = "2024.2.2",
101+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
102+
public void should_ignore_zero_token_DC() {
103+
Cluster cluster = null;
104+
Session session = null;
105+
CCMBridge ccmBridge = null;
106+
try {
107+
ccmBridge =
108+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
109+
ccmBridge.start();
110+
ccmBridge.add(2, 3);
111+
ccmBridge.updateNodeConfig(3, "join_ring", false);
112+
ccmBridge.start(3);
113+
ccmBridge.add(2, 4);
114+
ccmBridge.updateNodeConfig(4, "join_ring", false);
115+
ccmBridge.start(4);
116+
ccmBridge.waitForUp(3);
117+
ccmBridge.waitForUp(4);
118+
119+
cluster =
120+
Cluster.builder()
121+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
122+
.withPort(9042)
123+
.withoutAdvancedShardAwareness()
124+
.build();
125+
session = cluster.connect();
126+
127+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
128+
.isEqualTo(ccmBridge.addressOfNode(1));
129+
for (int i = 0; i < 2; i++) {
130+
String line = null;
131+
try {
132+
line = appender.waitAndGet(5000);
133+
} catch (InterruptedException e) {
134+
throw new RuntimeException(e);
135+
}
136+
assertThat(line).contains("Found invalid row in system.peers");
137+
assertThat(line).contains("tokens=null");
138+
}
139+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
140+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
141+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
142+
} finally {
143+
if (ccmBridge != null) ccmBridge.close();
144+
if (session != null) session.close();
145+
if (cluster != null) cluster.close();
146+
}
147+
}
148+
149+
@Test(groups = "short", dataProvider = "loadBalancingPolicies")
150+
@ScyllaVersion(
151+
minOSS = "6.2.0",
152+
minEnterprise = "2024.2.2",
153+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
154+
public void should_connect_to_zero_token_contact_point(LoadBalancingPolicy loadBalancingPolicy) {
155+
Cluster cluster = null;
156+
Session session = null;
157+
CCMBridge ccmBridge = null;
158+
159+
try {
160+
ccmBridge =
161+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
162+
ccmBridge.start();
163+
ccmBridge.add(3);
164+
ccmBridge.updateNodeConfig(3, "join_ring", false);
165+
ccmBridge.start(3);
166+
ccmBridge.waitForUp(3);
167+
168+
cluster =
169+
Cluster.builder()
170+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
171+
.withPort(9042)
172+
.withLoadBalancingPolicy(loadBalancingPolicy)
173+
.withoutAdvancedShardAwareness()
174+
.build();
175+
session = cluster.connect();
176+
177+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
178+
.isEqualTo(ccmBridge.addressOfNode(3));
179+
String line = null;
180+
try {
181+
line = appender.waitAndGet(5000);
182+
} catch (InterruptedException e) {
183+
throw new RuntimeException(e);
184+
}
185+
assertThat(line)
186+
.contains(
187+
"is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.");
188+
assertThat(line).contains(ccmBridge.ipOfNode(3));
189+
190+
session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
191+
session.execute(
192+
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
193+
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
194+
for (int i = 0; i < 30; i++) {
195+
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
196+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
197+
.isNotEqualTo(ccmBridge.addressOfNode(3));
198+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
199+
.isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2));
200+
}
201+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
202+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
203+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
204+
} finally {
205+
if (ccmBridge != null) ccmBridge.close();
206+
if (session != null) session.close();
207+
if (cluster != null) cluster.close();
208+
}
209+
}
210+
211+
@Test(groups = "short", dataProvider = "loadBalancingPolicies")
212+
@ScyllaVersion(
213+
minOSS = "6.2.0",
214+
minEnterprise = "2024.2.2",
215+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
216+
public void should_connect_to_zero_token_DC(LoadBalancingPolicy loadBalancingPolicy) {
217+
Cluster cluster = null;
218+
Session session = null;
219+
CCMBridge ccmBridge = null;
220+
221+
try {
222+
ccmBridge =
223+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
224+
ccmBridge.start();
225+
ccmBridge.add(2, 3);
226+
ccmBridge.updateNodeConfig(3, "join_ring", false);
227+
ccmBridge.start(3);
228+
ccmBridge.add(2, 4);
229+
ccmBridge.updateNodeConfig(4, "join_ring", false);
230+
ccmBridge.start(4);
231+
ccmBridge.waitForUp(3);
232+
ccmBridge.waitForUp(4);
233+
234+
cluster =
235+
Cluster.builder()
236+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
237+
.withPort(9042)
238+
.withLoadBalancingPolicy(loadBalancingPolicy)
239+
.withoutAdvancedShardAwareness()
240+
.build();
241+
session = cluster.connect();
242+
243+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
244+
.isEqualTo(ccmBridge.addressOfNode(3));
245+
String line = null;
246+
try {
247+
line = appender.waitAndGet(5000);
248+
} catch (InterruptedException e) {
249+
throw new RuntimeException(e);
250+
}
251+
assertThat(line)
252+
.contains(
253+
"is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.");
254+
assertThat(line).contains(ccmBridge.ipOfNode(3));
255+
256+
session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
257+
session.execute(
258+
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
259+
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
260+
for (int i = 0; i < 30; i++) {
261+
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
262+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
263+
.isNotEqualTo(ccmBridge.addressOfNode(3));
264+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
265+
.isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2));
266+
}
267+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
268+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
269+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
270+
} finally {
271+
if (ccmBridge != null) ccmBridge.close();
272+
if (session != null) session.close();
273+
if (cluster != null) cluster.close();
274+
}
275+
}
276+
}

0 commit comments

Comments
 (0)