Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ private void sendQueries() {
} else {
CompletionStage<AdminResult> localQuery =
query("SELECT schema_version FROM system.local WHERE key='local'");
CompletionStage<AdminResult> peersQuery = query("SELECT * FROM system.peers");

// `tokens` column is excluded, it is served from
// context.getMetadataManager().getMetadata().getTokenMap()
CompletionStage<AdminResult> peersQuery =
query("SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers");

localQuery
.thenCombine(peersQuery, this::extractSchemaVersions)
Expand Down Expand Up @@ -142,9 +146,15 @@ private Set<UUID> extractSchemaVersions(AdminResult controlNodeResult, AdminResu
channel.getEndPoint());
}

boolean allowZeroTokenNodes =
context
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS);

Map<UUID, Node> nodes = context.getMetadataManager().getMetadata().getNodes();
for (AdminRow peerRow : peersResult) {
if (isPeerValid(peerRow, nodes)) {
if (isPeerValid(peerRow, nodes, allowZeroTokenNodes)) {
UUID schemaVersion = Objects.requireNonNull(peerRow.getUuid("schema_version"));
schemaVersions.add(schemaVersion);
}
Expand Down Expand Up @@ -189,13 +199,13 @@ protected CompletionStage<AdminResult> query(String queryString) {
.start();
}

protected boolean isPeerValid(AdminRow peerRow, Map<UUID, Node> nodes) {
protected boolean isPeerValid(
AdminRow peerRow, Map<UUID, Node> nodes, boolean allowZeroTokenNodes) {
if (PeerRowValidator.isValid(
peerRow,
context
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS))) {
// allowZeroTokenPeers is true since `tokens` column is not pulled, but it will make it
// ignore `tokens` column.
true)) {
UUID hostId = peerRow.getUuid("host_id");
Node node = nodes.get(hostId);
if (node == null) {
Expand All @@ -205,7 +215,16 @@ protected boolean isPeerValid(AdminRow peerRow, Map<UUID, Node> nodes) {
LOG.debug("[{}] Peer {} is down, excluding from schema agreement check", logPrefix, hostId);
return false;
}
return true;

if (allowZeroTokenNodes) {
return true;
}

if (!(node instanceof DefaultNode)) {
return true;
}

return !((DefaultNode) node).getRawTokens().isEmpty();
} else {
LOG.warn(
"[{}] Found invalid system.peers row for peer: {}, excluding from schema agreement check.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ public void should_succeed_if_only_one_node() {
new StubbedQuery(
"SELECT schema_version FROM system.local WHERE key='local'",
mockResult(mockLocalRow(VERSION1))),
new StubbedQuery("SELECT * FROM system.peers", mockResult(/*empty*/ )));
new StubbedQuery(
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
mockResult(/*empty*/ )));

// When
CompletionStage<Boolean> future = checker.run();
Expand All @@ -156,7 +158,9 @@ public void should_succeed_if_versions_match_on_first_try() {
new StubbedQuery(
"SELECT schema_version FROM system.local WHERE key='local'",
mockResult(mockLocalRow(VERSION1))),
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION1))));
new StubbedQuery(
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
mockResult(mockValidPeerRow(VERSION1))));

// When
CompletionStage<Boolean> future = checker.run();
Expand All @@ -174,7 +178,9 @@ public void should_ignore_down_peers() {
new StubbedQuery(
"SELECT schema_version FROM system.local WHERE key='local'",
mockResult(mockLocalRow(VERSION1))),
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION2))));
new StubbedQuery(
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
mockResult(mockValidPeerRow(VERSION2))));

// When
CompletionStage<Boolean> future = checker.run();
Expand Down Expand Up @@ -210,7 +216,9 @@ public void should_ignore_malformed_rows(AdminRow malformedPeer) {
new StubbedQuery(
"SELECT schema_version FROM system.local WHERE key='local'",
mockResult(mockLocalRow(VERSION1))),
new StubbedQuery("SELECT * FROM system.peers", mockResult(malformedPeer)));
new StubbedQuery(
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
mockResult(malformedPeer)));

// When
CompletionStage<Boolean> future = checker.run();
Expand All @@ -228,13 +236,17 @@ public void should_reschedule_if_versions_do_not_match_on_first_try() {
new StubbedQuery(
"SELECT schema_version FROM system.local WHERE key='local'",
mockResult(mockLocalRow(VERSION1))),
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION2))),
new StubbedQuery(
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
mockResult(mockValidPeerRow(VERSION2))),

// Second round
new StubbedQuery(
"SELECT schema_version FROM system.local WHERE key='local'",
mockResult(mockLocalRow(VERSION1))),
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION1))));
new StubbedQuery(
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
mockResult(mockValidPeerRow(VERSION1))));

// When
CompletionStage<Boolean> future = checker.run();
Expand All @@ -253,7 +265,9 @@ public void should_fail_if_versions_do_not_match_after_timeout() {
new StubbedQuery(
"SELECT schema_version FROM system.local WHERE key='local'",
mockResult(mockLocalRow(VERSION1))),
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION1))));
new StubbedQuery(
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
mockResult(mockValidPeerRow(VERSION1))));

// When
CompletionStage<Boolean> future = checker.run();
Expand Down
Loading