Skip to content

Commit f4de8e7

Browse files
authored
Make peer tokens check configurable (#400)
Adds an option that controls if `system.peers` rows without tokens should be considered valid. Enabling this option will allow discovering zero-token peers with extended_peer_check enabled (it is enabled by default). Adds ZeroTokenNodesIT that verifies the driver behavior. It also contains some test methods relating to zero-token DCs. The special driver behavior for arbiter DCs is not yet fully decided so those methods can change in the future.
1 parent 05c4824 commit f4de8e7

File tree

3 files changed

+308
-1
lines changed

3 files changed

+308
-1
lines changed

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1008,7 +1008,11 @@ private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
10081008
&& peerRow.getColumnDefinitions().contains("rack")
10091009
&& !peerRow.isNull("rack")
10101010
&& peerRow.getColumnDefinitions().contains("tokens")
1011-
&& !peerRow.isNull("tokens");
1011+
&& (!peerRow.isNull("tokens")
1012+
|| cluster
1013+
.configuration
1014+
.getQueryOptions()
1015+
.shouldConsiderZeroTokenNodesValidPeers());
10121016
}
10131017
if (!isValid && logIfInvalid)
10141018
logger.warn(

driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class QueryOptions {
7373
private volatile boolean schemaQueriesPaged = true;
7474

7575
private volatile boolean addOriginalContactsToReconnectionPlan = false;
76+
private volatile boolean considerZeroTokenNodesValidPeers = false;
7677

7778
/**
7879
* Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL},
@@ -521,6 +522,21 @@ public boolean shouldAddOriginalContactsToReconnectionPlan() {
521522
return this.addOriginalContactsToReconnectionPlan;
522523
}
523524

525+
/**
526+
* Recently introduced in Scylla zero-token nodes have null value in "tokens" column in their
527+
* system.peers rows. By default extended peer check considers such rows as invalid. Enabling this
528+
* option will exclude this field from the check, and allow such rows from system.peers queries to
529+
* be used when refreshing metadata.
530+
*/
531+
public QueryOptions setConsiderZeroTokenNodesValidPeers(boolean enabled) {
532+
this.considerZeroTokenNodesValidPeers = enabled;
533+
return this;
534+
}
535+
536+
public boolean shouldConsiderZeroTokenNodesValidPeers() {
537+
return this.considerZeroTokenNodesValidPeers;
538+
}
539+
524540
@Override
525541
public boolean equals(Object that) {
526542
if (that == null || !(that instanceof QueryOptions)) {
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
package com.datastax.driver.core;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
6+
import com.datastax.driver.core.policies.LoadBalancingPolicy;
7+
import com.datastax.driver.core.policies.RoundRobinPolicy;
8+
import com.datastax.driver.core.policies.TokenAwarePolicy;
9+
import com.datastax.driver.core.utils.ScyllaVersion;
10+
import java.net.InetSocketAddress;
11+
import java.util.HashSet;
12+
import java.util.Set;
13+
import java.util.stream.Collectors;
14+
import org.testng.annotations.DataProvider;
15+
import org.testng.annotations.Test;
16+
17+
public class ZeroTokenNodesIT {
18+
19+
@DataProvider(name = "loadBalancingPolicies")
20+
public static Object[][] loadBalancingPolicies() {
21+
return new Object[][] {
22+
{DCAwareRoundRobinPolicy.builder().build()},
23+
{new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())},
24+
{new TokenAwarePolicy(new RoundRobinPolicy())}
25+
};
26+
}
27+
28+
@Test(groups = "short")
29+
@ScyllaVersion(
30+
minOSS = "6.2.0",
31+
minEnterprise = "2024.2.2",
32+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
33+
public void should_not_ignore_zero_token_peer_when_option_is_enabled() {
34+
Cluster cluster = null;
35+
Session session = null;
36+
CCMBridge ccmBridge = null;
37+
try {
38+
ccmBridge =
39+
CCMBridge.builder().withNodes(3).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
40+
ccmBridge.start();
41+
ccmBridge.add(4);
42+
ccmBridge.updateNodeConfig(4, "join_ring", false);
43+
ccmBridge.start(4);
44+
ccmBridge.waitForUp(4);
45+
46+
cluster =
47+
Cluster.builder()
48+
.withQueryOptions(new QueryOptions().setConsiderZeroTokenNodesValidPeers(true))
49+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
50+
.withPort(9042)
51+
.withoutAdvancedShardAwareness()
52+
.build();
53+
session = cluster.connect();
54+
55+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
56+
.isEqualTo(ccmBridge.addressOfNode(1));
57+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
58+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
59+
assertThat(toStrings)
60+
.containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042", "/127.0.1.4:9042");
61+
} finally {
62+
if (ccmBridge != null) ccmBridge.close();
63+
if (session != null) session.close();
64+
if (cluster != null) cluster.close();
65+
}
66+
}
67+
68+
@Test(groups = "short")
69+
@ScyllaVersion(
70+
minOSS = "6.2.0",
71+
minEnterprise = "2024.2.2",
72+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
73+
public void should_not_discover_zero_token_DC_when_option_is_disabled() {
74+
Cluster cluster = null;
75+
Session session = null;
76+
CCMBridge ccmBridge = null;
77+
try {
78+
ccmBridge =
79+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
80+
ccmBridge.start();
81+
ccmBridge.add(2, 3);
82+
ccmBridge.updateNodeConfig(3, "join_ring", false);
83+
ccmBridge.start(3);
84+
ccmBridge.add(2, 4);
85+
ccmBridge.updateNodeConfig(4, "join_ring", false);
86+
ccmBridge.start(4);
87+
ccmBridge.waitForUp(3);
88+
ccmBridge.waitForUp(4);
89+
90+
cluster =
91+
Cluster.builder()
92+
.withQueryOptions(new QueryOptions().setConsiderZeroTokenNodesValidPeers(false))
93+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
94+
.withPort(9042)
95+
.withoutAdvancedShardAwareness()
96+
.build();
97+
session = cluster.connect();
98+
99+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
100+
.isEqualTo(ccmBridge.addressOfNode(1));
101+
102+
// Queries should not land on any of the zero-token DC nodes
103+
session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
104+
session.execute(
105+
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
106+
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
107+
for (int i = 0; i < 30; i++) {
108+
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
109+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
110+
.isNotIn(ccmBridge.addressOfNode(3), ccmBridge.addressOfNode(4));
111+
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
112+
.isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2));
113+
}
114+
115+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
116+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
117+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
118+
} finally {
119+
if (ccmBridge != null) ccmBridge.close();
120+
if (session != null) session.close();
121+
if (cluster != null) cluster.close();
122+
}
123+
}
124+
125+
@Test(groups = "short", dataProvider = "loadBalancingPolicies")
126+
@ScyllaVersion(
127+
minOSS = "6.2.0",
128+
minEnterprise = "2024.2.2",
129+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
130+
public void should_discover_zero_token_DC_when_option_is_enabled(
131+
LoadBalancingPolicy loadBalancingPolicy) {
132+
// Makes sure that with QueryOptions.setConsiderZeroTokenNodesValidPeers(true)
133+
// the zero-token peers will be discovered and added to metadata
134+
Cluster cluster = null;
135+
Session session = null;
136+
CCMBridge ccmBridge = null;
137+
try {
138+
ccmBridge =
139+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
140+
ccmBridge.start();
141+
ccmBridge.add(2, 3);
142+
ccmBridge.updateNodeConfig(3, "join_ring", false);
143+
ccmBridge.start(3);
144+
ccmBridge.add(2, 4);
145+
ccmBridge.updateNodeConfig(4, "join_ring", false);
146+
ccmBridge.start(4);
147+
ccmBridge.waitForUp(3);
148+
ccmBridge.waitForUp(4);
149+
150+
cluster =
151+
Cluster.builder()
152+
.withQueryOptions(new QueryOptions().setConsiderZeroTokenNodesValidPeers(true))
153+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
154+
.withPort(9042)
155+
.withLoadBalancingPolicy(loadBalancingPolicy)
156+
.withoutAdvancedShardAwareness()
157+
.build();
158+
session = cluster.connect();
159+
160+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
161+
.isEqualTo(ccmBridge.addressOfNode(1));
162+
163+
// Currently zero-token nodes are treated as normal nodes if allowed.
164+
// Later on we may want to adjust the LBP behavior to be more sophisticated.
165+
session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
166+
session.execute(
167+
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
168+
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
169+
Set<InetSocketAddress> queriedNodes = new HashSet<>();
170+
for (int i = 0; i < 30; i++) {
171+
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
172+
queriedNodes.add(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve());
173+
}
174+
assertThat(queriedNodes)
175+
.containsExactly(
176+
ccmBridge.addressOfNode(1),
177+
ccmBridge.addressOfNode(2),
178+
ccmBridge.addressOfNode(3),
179+
ccmBridge.addressOfNode(4));
180+
181+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
182+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
183+
assertThat(toStrings)
184+
.containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042", "/127.0.1.4:9042");
185+
} finally {
186+
if (ccmBridge != null) ccmBridge.close();
187+
if (session != null) session.close();
188+
if (cluster != null) cluster.close();
189+
}
190+
}
191+
192+
@Test(groups = "short")
193+
@ScyllaVersion(
194+
minOSS = "6.2.0",
195+
minEnterprise = "2024.2.2",
196+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
197+
public void should_connect_to_zero_token_contact_point() {
198+
Cluster cluster = null;
199+
Session session = null;
200+
CCMBridge ccmBridge = null;
201+
202+
try {
203+
ccmBridge =
204+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
205+
ccmBridge.start();
206+
ccmBridge.add(3);
207+
ccmBridge.updateNodeConfig(3, "join_ring", false);
208+
ccmBridge.start(3);
209+
ccmBridge.waitForUp(3);
210+
211+
cluster =
212+
Cluster.builder()
213+
.withQueryOptions(
214+
new QueryOptions()
215+
// Already implicitly false, but making sure
216+
.setConsiderZeroTokenNodesValidPeers(false))
217+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
218+
.withPort(9042)
219+
.withoutAdvancedShardAwareness()
220+
.build();
221+
session = cluster.connect();
222+
223+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
224+
.isEqualTo(ccmBridge.addressOfNode(3));
225+
226+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
227+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
228+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
229+
} finally {
230+
if (ccmBridge != null) ccmBridge.close();
231+
if (session != null) session.close();
232+
if (cluster != null) cluster.close();
233+
}
234+
}
235+
236+
@Test(groups = "short")
237+
@ScyllaVersion(
238+
minOSS = "6.2.0",
239+
minEnterprise = "2024.2.2",
240+
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
241+
public void should_connect_to_zero_token_DC() {
242+
// This test is similar but not exactly the same as should_connect_to_zero_token_contact_point.
243+
// In the future we may want to have different behavior for arbiter DCs and adjust this test
244+
// method.
245+
Cluster cluster = null;
246+
Session session = null;
247+
CCMBridge ccmBridge = null;
248+
249+
try {
250+
ccmBridge =
251+
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
252+
ccmBridge.start();
253+
ccmBridge.add(2, 3);
254+
ccmBridge.updateNodeConfig(3, "join_ring", false);
255+
ccmBridge.start(3);
256+
ccmBridge.add(2, 4);
257+
ccmBridge.updateNodeConfig(4, "join_ring", false);
258+
ccmBridge.start(4);
259+
ccmBridge.waitForUp(3);
260+
ccmBridge.waitForUp(4);
261+
262+
cluster =
263+
Cluster.builder()
264+
.withQueryOptions(new QueryOptions().setConsiderZeroTokenNodesValidPeers(false))
265+
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
266+
.withPort(9042)
267+
.withoutAdvancedShardAwareness()
268+
.build();
269+
session = cluster.connect();
270+
271+
assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
272+
.isEqualTo(ccmBridge.addressOfNode(3));
273+
274+
Set<Host> hosts = cluster.getMetadata().getAllHosts();
275+
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
276+
assertThat(toStrings)
277+
// node 3 will be discovered because it's a contact point. Node 4 will not because it's a
278+
// peer
279+
// and the setting is disabled
280+
.containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
281+
} finally {
282+
if (ccmBridge != null) ccmBridge.close();
283+
if (session != null) session.close();
284+
if (cluster != null) cluster.close();
285+
}
286+
}
287+
}

0 commit comments

Comments
 (0)