Skip to content

Commit 47477e5

Browse files
authored
4.x: Add ZeroTokenNodesIT (#402)
* CcmBridge methods adjustments Adds `startWithArgs` methods that allow avoiding the default arguments like "--wait-other-notice". This wait causes timeout in multi-dc test utilizing `join_ring: false` so it needs to be avoided. Adds `addWithoutStart(n, dc)`. As the name suggests it's an add that does not immediately start the node. Adds `updateNodeConfig` in order to allow adding new nodes with different configurations. * Add ZeroTokenNodesIT Adds a test class verifying the behavior of the driver when dealing with zero-token Scylla nodes. Adds a configurable option which controls whether to allow zero-token nodes as valid peers. Until now such nodes were considered invalid. Default value of this option does not change the driver behavior.
1 parent 4f7e2ae commit 47477e5

File tree

9 files changed

+306
-15
lines changed

9 files changed

+306
-15
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,12 @@ public enum DefaultDriverOption implements DriverOption {
588588
*/
589589
HEARTBEAT_TIMEOUT("advanced.heartbeat.timeout"),
590590

591+
/**
592+
* Whether zero token peers should be considered valid.
593+
*
594+
* <p>Value-type: boolean
595+
*/
596+
METADATA_ALLOW_ZERO_TOKEN_PEERS("advanced.metadata.allow-zero-token-peers"),
591597
/**
592598
* How long the driver waits to propagate a Topology event.
593599
*

core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
345345
map.put(TypedDriverOption.SOCKET_TCP_NODELAY, true);
346346
map.put(TypedDriverOption.HEARTBEAT_INTERVAL, Duration.ofSeconds(30));
347347
map.put(TypedDriverOption.HEARTBEAT_TIMEOUT, initQueryTimeout);
348+
map.put(TypedDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS, false);
348349
map.put(TypedDriverOption.METADATA_TOPOLOGY_WINDOW, Duration.ofSeconds(1));
349350
map.put(TypedDriverOption.METADATA_TOPOLOGY_MAX_EVENTS, 20);
350351
map.put(TypedDriverOption.METADATA_SCHEMA_ENABLED, true);

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,10 @@ public String toString() {
519519
/** How long the driver waits for the response to a heartbeat. */
520520
public static final TypedDriverOption<Duration> HEARTBEAT_TIMEOUT =
521521
new TypedDriverOption<>(DefaultDriverOption.HEARTBEAT_TIMEOUT, GenericType.DURATION);
522+
/** Whether zero token peers are allowed */
523+
public static final TypedDriverOption<Boolean> METADATA_ALLOW_ZERO_TOKEN_PEERS =
524+
new TypedDriverOption<>(
525+
DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS, GenericType.BOOLEAN);
522526
/** How long the driver waits to propagate a Topology event. */
523527
public static final TypedDriverOption<Duration> METADATA_TOPOLOGY_WINDOW =
524528
new TypedDriverOption<>(DefaultDriverOption.METADATA_TOPOLOGY_WINDOW, GenericType.DURATION);

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,12 @@ protected InetSocketAddress getBroadcastRpcAddress(
541541
* node's broadcast RPC address and host ID; otherwise the driver may not work properly.
542542
*/
543543
protected boolean isPeerValid(AdminRow peerRow) {
544-
if (PeerRowValidator.isValid(peerRow)) {
544+
if (PeerRowValidator.isValid(
545+
peerRow,
546+
context
547+
.getConfig()
548+
.getDefaultProfile()
549+
.getBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS))) {
545550
return true;
546551
} else {
547552
LOG.warn(

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/PeerRowValidator.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
public class PeerRowValidator {
2727

2828
/** Returns {@code true} if the given peer row is valid, and {@code false} otherwise. */
29-
public static boolean isValid(@NonNull AdminRow peerRow) {
29+
public static boolean isValid(@NonNull AdminRow peerRow, boolean allowZeroTokenPeers) {
3030

3131
boolean hasPeersRpcAddress = !peerRow.isNull("rpc_address");
3232
boolean hasPeersV2RpcAddress =
@@ -37,7 +37,11 @@ public static boolean isValid(@NonNull AdminRow peerRow) {
3737
&& !peerRow.isNull("host_id")
3838
&& !peerRow.isNull("data_center")
3939
&& !peerRow.isNull("rack")
40-
&& !peerRow.isNull("tokens")
40+
&& (allowZeroTokenPeers || !peerRow.isNull("tokens"))
4141
&& !peerRow.isNull("schema_version");
4242
}
43+
44+
public static boolean isValid(@NonNull AdminRow peerRow) {
45+
return isValid(peerRow, false);
46+
}
4347
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,12 @@ protected CompletionStage<AdminResult> query(String queryString) {
190190
}
191191

192192
protected boolean isPeerValid(AdminRow peerRow, Map<UUID, Node> nodes) {
193-
if (PeerRowValidator.isValid(peerRow)) {
193+
if (PeerRowValidator.isValid(
194+
peerRow,
195+
context
196+
.getConfig()
197+
.getDefaultProfile()
198+
.getBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS))) {
194199
UUID hostId = peerRow.getUuid("host_id");
195200
Node node = nodes.get(hostId);
196201
if (node == null) {

core/src/main/resources/reference.conf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,16 @@ datastax-java-driver {
19391939
}
19401940

19411941
advanced.metadata {
1942+
# Driver learns about other nodes through system.peers table on the node ControlConnection
1943+
# has reached. By default, the nodes represented by rows that have their `tokens` column empty
1944+
# are considered invalid. This option allows you to modify that since it is possible to
1945+
# have valid zero-token nodes in the cluster.
1946+
#
1947+
# Required: yes
1948+
# Modifiable at runtime: yes
1949+
# Overridable in a profile: no
1950+
allow-zero-token-peers = false
1951+
19421952
# Topology events are external signals that inform the driver of the state of Cassandra nodes
19431953
# (by default, they correspond to gossip events received on the control connection).
19441954
# The debouncer helps smoothen out oscillations if conflicting events are sent out in short
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package com.datastax.oss.driver.core.metadata;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.junit.Assume.assumeTrue;
5+
6+
import com.datastax.oss.driver.api.core.CqlSession;
7+
import com.datastax.oss.driver.api.core.Version;
8+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
9+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
10+
import com.datastax.oss.driver.api.core.cql.ResultSet;
11+
import com.datastax.oss.driver.api.core.metadata.EndPoint;
12+
import com.datastax.oss.driver.api.core.metadata.Node;
13+
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
14+
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
15+
import java.net.InetAddress;
16+
import java.net.InetSocketAddress;
17+
import java.util.Collection;
18+
import java.util.HashSet;
19+
import java.util.Objects;
20+
import java.util.Set;
21+
import java.util.stream.Collectors;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
25+
public class ZeroTokenNodesIT {
26+
27+
@Before
28+
public void checkScyllaVersion() {
29+
// minOSS = "6.2.0",
30+
// minEnterprise = "2024.2.2",
31+
// Zero-token nodes introduced in scylladb/scylladb#19684
32+
assumeTrue(CcmBridge.SCYLLA_ENABLEMENT);
33+
if (CcmBridge.SCYLLA_ENTERPRISE) {
34+
assumeTrue(
35+
CcmBridge.VERSION.compareTo(Objects.requireNonNull(Version.parse("2024.2.2"))) >= 0);
36+
} else {
37+
assumeTrue(CcmBridge.VERSION.compareTo(Objects.requireNonNull(Version.parse("6.2.0"))) >= 0);
38+
}
39+
}
40+
41+
@Test
42+
public void should_not_ignore_zero_token_peer_when_option_is_enabled() {
43+
CqlSession session = null;
44+
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
45+
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(3).withIpPrefix("127.0.1.").build()) {
46+
ccmBridge.create();
47+
ccmBridge.startWithArgs("--wait-for-binary-proto");
48+
ccmBridge.addWithoutStart(4, "dc1");
49+
ccmBridge.updateNodeConfig(4, "join_ring", false);
50+
ccmBridge.startWithArgs(4, "--wait-for-binary-proto");
51+
DriverConfigLoader loader =
52+
SessionUtils.configLoaderBuilder()
53+
.withBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS, true)
54+
.build();
55+
session =
56+
CqlSession.builder()
57+
.withConfigLoader(loader)
58+
.addContactPoint(new InetSocketAddress(ccmBridge.getNodeIpAddress(1), 9042))
59+
.build();
60+
61+
Collection<Node> nodes = session.getMetadata().getNodes().values();
62+
Set<String> toStrings =
63+
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());
64+
assertThat(toStrings)
65+
.containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042", "/127.0.1.4:9042");
66+
} finally {
67+
if (session != null) session.close();
68+
}
69+
}
70+
71+
@Test
72+
public void should_not_discover_zero_token_DC_when_option_is_disabled() {
73+
CqlSession session = null;
74+
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
75+
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2, 2).withIpPrefix("127.0.1.").build()) {
76+
ccmBridge.create();
77+
ccmBridge.updateNodeConfig(3, "join_ring", false);
78+
ccmBridge.updateNodeConfig(4, "join_ring", false);
79+
ccmBridge.startWithArgs("--wait-for-binary-proto");
80+
81+
// Not adding .withBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS, false)
82+
// It is implicitly false
83+
DriverConfigLoader loader = SessionUtils.configLoaderBuilder().build();
84+
session =
85+
CqlSession.builder()
86+
.withLocalDatacenter("dc1")
87+
.withConfigLoader(loader)
88+
.addContactPoint(new InetSocketAddress(ccmBridge.getNodeIpAddress(1), 9042))
89+
.build();
90+
91+
Set<String> landedOn = new HashSet<>();
92+
for (int i = 0; i < 30; i++) {
93+
ResultSet rs = session.execute("SELECT * FROM system.local");
94+
InetAddress broadcastRpcInetAddress =
95+
Objects.requireNonNull(rs.one()).getInetAddress("rpc_address");
96+
landedOn.add(Objects.requireNonNull(broadcastRpcInetAddress).toString());
97+
}
98+
assertThat(landedOn).containsOnly("/127.0.1.1", "/127.0.1.2");
99+
Collection<Node> nodes = session.getMetadata().getNodes().values();
100+
Set<String> toStrings =
101+
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());
102+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
103+
} finally {
104+
if (session != null) session.close();
105+
}
106+
}
107+
108+
@Test
109+
public void should_discover_zero_token_DC_when_option_is_enabled() {
110+
CqlSession session = null;
111+
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
112+
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2, 2).withIpPrefix("127.0.1.").build()) {
113+
ccmBridge.create();
114+
ccmBridge.updateNodeConfig(3, "join_ring", false);
115+
ccmBridge.updateNodeConfig(4, "join_ring", false);
116+
ccmBridge.startWithArgs("--wait-for-binary-proto");
117+
118+
DriverConfigLoader loader =
119+
SessionUtils.configLoaderBuilder()
120+
.withBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS, true)
121+
.build();
122+
session =
123+
CqlSession.builder()
124+
.withLocalDatacenter("dc1")
125+
.withConfigLoader(loader)
126+
.addContactPoint(new InetSocketAddress(ccmBridge.getNodeIpAddress(1), 9042))
127+
.build();
128+
129+
Set<String> landedOn = new HashSet<>();
130+
for (int i = 0; i < 30; i++) {
131+
ResultSet rs = session.execute("SELECT * FROM system.local");
132+
InetAddress broadcastRpcInetAddress =
133+
Objects.requireNonNull(rs.one()).getInetAddress("rpc_address");
134+
landedOn.add(Objects.requireNonNull(broadcastRpcInetAddress).toString());
135+
}
136+
// LBP should still target local datacenter:
137+
assertThat(landedOn).containsOnly("/127.0.1.1", "/127.0.1.2");
138+
Collection<Node> nodes = session.getMetadata().getNodes().values();
139+
Set<String> toStrings =
140+
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());
141+
// Metadata should have all nodes:
142+
assertThat(toStrings)
143+
.containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042", "/127.0.1.4:9042");
144+
} finally {
145+
if (session != null) session.close();
146+
}
147+
}
148+
149+
@Test
150+
public void should_connect_to_zero_token_contact_point() {
151+
CqlSession session = null;
152+
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
153+
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2).withIpPrefix("127.0.1.").build()) {
154+
ccmBridge.create();
155+
ccmBridge.startWithArgs("--wait-for-binary-proto");
156+
ccmBridge.addWithoutStart(3, "dc1");
157+
ccmBridge.updateNodeConfig(3, "join_ring", false);
158+
ccmBridge.startWithArgs(3, "--wait-for-binary-proto");
159+
// DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS is implicitly false
160+
DriverConfigLoader loader = SessionUtils.configLoaderBuilder().build();
161+
session =
162+
CqlSession.builder()
163+
.withConfigLoader(loader)
164+
.addContactPoint(new InetSocketAddress(ccmBridge.getNodeIpAddress(3), 9042))
165+
.build();
166+
167+
Collection<Node> nodes = session.getMetadata().getNodes().values();
168+
Set<String> toStrings =
169+
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());
170+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
171+
} finally {
172+
if (session != null) session.close();
173+
}
174+
}
175+
176+
@Test
177+
public void should_connect_to_zero_token_DC() {
178+
// This test is similar but not exactly the same as should_connect_to_zero_token_contact_point.
179+
// In the future we may want to have different behavior for arbiter DCs and adjust this test
180+
// method.
181+
CqlSession session = null;
182+
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
183+
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2, 2).withIpPrefix("127.0.1.").build()) {
184+
ccmBridge.create();
185+
ccmBridge.updateNodeConfig(3, "join_ring", false);
186+
ccmBridge.updateNodeConfig(4, "join_ring", false);
187+
ccmBridge.startWithArgs("--wait-for-binary-proto");
188+
189+
DriverConfigLoader loader =
190+
SessionUtils.configLoaderBuilder()
191+
.withBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS, false)
192+
.build();
193+
session =
194+
CqlSession.builder()
195+
.withLocalDatacenter("dc1")
196+
.withConfigLoader(loader)
197+
.addContactPoint(new InetSocketAddress(ccmBridge.getNodeIpAddress(3), 9042))
198+
.build();
199+
200+
Set<String> landedOn = new HashSet<>();
201+
for (int i = 0; i < 30; i++) {
202+
ResultSet rs = session.execute("SELECT * FROM system.local");
203+
InetAddress broadcastRpcInetAddress =
204+
Objects.requireNonNull(rs.one()).getInetAddress("rpc_address");
205+
landedOn.add(Objects.requireNonNull(broadcastRpcInetAddress).toString());
206+
}
207+
// LBP should still target local datacenter:
208+
assertThat(landedOn).containsOnly("/127.0.1.1", "/127.0.1.2");
209+
Collection<Node> nodes = session.getMetadata().getNodes().values();
210+
Set<String> toStrings =
211+
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());
212+
// Metadata should have valid ordinary peers plus zero-token contact point:
213+
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
214+
} finally {
215+
if (session != null) session.close();
216+
}
217+
}
218+
}

0 commit comments

Comments
 (0)