Skip to content

Commit eff235b

Browse files
committed
Make schema agreement query pull only columns that are used
Currently schema agreement logic run `select * from system.peers`, while only `schema_version` is used, it creates excessive load on cluster and driver side.
1 parent 8cec131 commit eff235b

File tree

2 files changed

+58
-17
lines changed

2 files changed

+58
-17
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementChecker.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
2222
import com.datastax.oss.driver.api.core.metadata.Node;
2323
import com.datastax.oss.driver.api.core.metadata.NodeState;
24+
import com.datastax.oss.driver.api.core.metadata.TokenMap;
2425
import com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler;
2526
import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
2627
import com.datastax.oss.driver.internal.core.adminrequest.AdminRow;
@@ -108,7 +109,11 @@ private void sendQueries() {
108109
} else {
109110
CompletionStage<AdminResult> localQuery =
110111
query("SELECT schema_version FROM system.local WHERE key='local'");
111-
CompletionStage<AdminResult> peersQuery = query("SELECT * FROM system.peers");
112+
113+
// `tokens` column is excluded, it is served from
114+
// context.getMetadataManager().getMetadata().getTokenMap()
115+
CompletionStage<AdminResult> peersQuery =
116+
query("SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers");
112117

113118
localQuery
114119
.thenCombine(peersQuery, this::extractSchemaVersions)
@@ -142,9 +147,20 @@ private Set<UUID> extractSchemaVersions(AdminResult controlNodeResult, AdminResu
142147
channel.getEndPoint());
143148
}
144149

150+
boolean allowZeroTokenNodes =
151+
context
152+
.getConfig()
153+
.getDefaultProfile()
154+
.getBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS);
155+
156+
TokenMap tokenMap =
157+
context.getMetadataManager().getMetadata().getTokenMap().isPresent()
158+
? context.getMetadataManager().getMetadata().getTokenMap().get()
159+
: null;
160+
145161
Map<UUID, Node> nodes = context.getMetadataManager().getMetadata().getNodes();
146162
for (AdminRow peerRow : peersResult) {
147-
if (isPeerValid(peerRow, nodes)) {
163+
if (isPeerValid(peerRow, nodes, allowZeroTokenNodes, tokenMap)) {
148164
UUID schemaVersion = Objects.requireNonNull(peerRow.getUuid("schema_version"));
149165
schemaVersions.add(schemaVersion);
150166
}
@@ -189,13 +205,9 @@ protected CompletionStage<AdminResult> query(String queryString) {
189205
.start();
190206
}
191207

192-
protected boolean isPeerValid(AdminRow peerRow, Map<UUID, Node> nodes) {
193-
if (PeerRowValidator.isValid(
194-
peerRow,
195-
context
196-
.getConfig()
197-
.getDefaultProfile()
198-
.getBoolean(DefaultDriverOption.METADATA_ALLOW_ZERO_TOKEN_PEERS))) {
208+
protected boolean isPeerValid(
209+
AdminRow peerRow, Map<UUID, Node> nodes, boolean allowZeroTokenNodes, TokenMap tokenMap) {
210+
if (PeerRowValidator.isValid(peerRow, true)) {
199211
UUID hostId = peerRow.getUuid("host_id");
200212
Node node = nodes.get(hostId);
201213
if (node == null) {
@@ -205,7 +217,10 @@ protected boolean isPeerValid(AdminRow peerRow, Map<UUID, Node> nodes) {
205217
LOG.debug("[{}] Peer {} is down, excluding from schema agreement check", logPrefix, hostId);
206218
return false;
207219
}
208-
return true;
220+
return allowZeroTokenNodes
221+
|| tokenMap == null
222+
|| tokenMap.getTokenRanges().isEmpty()
223+
|| !tokenMap.getTokenRanges(node).isEmpty();
209224
} else {
210225
LOG.warn(
211226
"[{}] Found invalid system.peers row for peer: {}, excluding from schema agreement check.",

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/SchemaAgreementCheckerTest.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import com.datastax.oss.driver.internal.core.adminrequest.AdminRow;
3535
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
3636
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
37+
import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap;
38+
import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
39+
import com.datastax.oss.driver.internal.core.metadata.token.Murmur3TokenRange;
3740
import com.datastax.oss.driver.internal.core.metrics.MetricsFactory;
3841
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
3942
import com.datastax.oss.driver.shaded.guava.common.collect.Iterators;
@@ -46,6 +49,7 @@
4649
import java.util.Arrays;
4750
import java.util.Map;
4851
import java.util.Objects;
52+
import java.util.Optional;
4953
import java.util.Queue;
5054
import java.util.UUID;
5155
import java.util.concurrent.CompletableFuture;
@@ -56,6 +60,7 @@
5660
import org.junit.runner.RunWith;
5761
import org.mockito.Mock;
5862
import org.mockito.MockitoAnnotations;
63+
import org.mockito.internal.util.collections.Sets;
5964

6065
@RunWith(DataProviderRunner.class)
6166
public class SchemaAgreementCheckerTest {
@@ -72,6 +77,7 @@ public class SchemaAgreementCheckerTest {
7277
@Mock private EventLoop eventLoop;
7378
@Mock private MetadataManager metadataManager;
7479
@Mock private MetricsFactory metricsFactory;
80+
@Mock private DefaultTokenMap tokenMap;
7581
@Mock private Metadata metadata;
7682
@Mock private DefaultNode node1;
7783
@Mock private DefaultNode node2;
@@ -102,6 +108,12 @@ public void setup() {
102108
Objects.requireNonNull(node2.getHostId()),
103109
node2);
104110
when(metadata.getNodes()).thenReturn(nodes);
111+
when(tokenMap.getTokenRanges())
112+
.thenReturn(
113+
Sets.newSet(
114+
new Murmur3TokenRange(new Murmur3Token(1), new Murmur3Token(2)),
115+
new Murmur3TokenRange(new Murmur3Token(3), new Murmur3Token(4))));
116+
when(metadata.getTokenMap()).thenReturn(Optional.of(tokenMap));
105117
when(metadataManager.getMetadata()).thenReturn(metadata);
106118
when(context.getMetadataManager()).thenReturn(metadataManager);
107119

@@ -139,7 +151,9 @@ public void should_succeed_if_only_one_node() {
139151
new StubbedQuery(
140152
"SELECT schema_version FROM system.local WHERE key='local'",
141153
mockResult(mockLocalRow(VERSION1))),
142-
new StubbedQuery("SELECT * FROM system.peers", mockResult(/*empty*/ )));
154+
new StubbedQuery(
155+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
156+
mockResult(/*empty*/ )));
143157

144158
// When
145159
CompletionStage<Boolean> future = checker.run();
@@ -156,7 +170,9 @@ public void should_succeed_if_versions_match_on_first_try() {
156170
new StubbedQuery(
157171
"SELECT schema_version FROM system.local WHERE key='local'",
158172
mockResult(mockLocalRow(VERSION1))),
159-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION1))));
173+
new StubbedQuery(
174+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
175+
mockResult(mockValidPeerRow(VERSION1))));
160176

161177
// When
162178
CompletionStage<Boolean> future = checker.run();
@@ -174,7 +190,9 @@ public void should_ignore_down_peers() {
174190
new StubbedQuery(
175191
"SELECT schema_version FROM system.local WHERE key='local'",
176192
mockResult(mockLocalRow(VERSION1))),
177-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION2))));
193+
new StubbedQuery(
194+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
195+
mockResult(mockValidPeerRow(VERSION2))));
178196

179197
// When
180198
CompletionStage<Boolean> future = checker.run();
@@ -210,7 +228,9 @@ public void should_ignore_malformed_rows(AdminRow malformedPeer) {
210228
new StubbedQuery(
211229
"SELECT schema_version FROM system.local WHERE key='local'",
212230
mockResult(mockLocalRow(VERSION1))),
213-
new StubbedQuery("SELECT * FROM system.peers", mockResult(malformedPeer)));
231+
new StubbedQuery(
232+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
233+
mockResult(malformedPeer)));
214234

215235
// When
216236
CompletionStage<Boolean> future = checker.run();
@@ -228,13 +248,17 @@ public void should_reschedule_if_versions_do_not_match_on_first_try() {
228248
new StubbedQuery(
229249
"SELECT schema_version FROM system.local WHERE key='local'",
230250
mockResult(mockLocalRow(VERSION1))),
231-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION2))),
251+
new StubbedQuery(
252+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
253+
mockResult(mockValidPeerRow(VERSION2))),
232254

233255
// Second round
234256
new StubbedQuery(
235257
"SELECT schema_version FROM system.local WHERE key='local'",
236258
mockResult(mockLocalRow(VERSION1))),
237-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION1))));
259+
new StubbedQuery(
260+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
261+
mockResult(mockValidPeerRow(VERSION1))));
238262

239263
// When
240264
CompletionStage<Boolean> future = checker.run();
@@ -253,7 +277,9 @@ public void should_fail_if_versions_do_not_match_after_timeout() {
253277
new StubbedQuery(
254278
"SELECT schema_version FROM system.local WHERE key='local'",
255279
mockResult(mockLocalRow(VERSION1))),
256-
new StubbedQuery("SELECT * FROM system.peers", mockResult(mockValidPeerRow(VERSION1))));
280+
new StubbedQuery(
281+
"SELECT host_id, schema_version, rpc_address, data_center, rack FROM system.peers",
282+
mockResult(mockValidPeerRow(VERSION1))));
257283

258284
// When
259285
CompletionStage<Boolean> future = checker.run();

0 commit comments

Comments
 (0)