Skip to content

Commit 492a4c9

Browse files
authored
4.x: Adjust BasicLoadBalancingPolicy#getReplicas (#405)
* Fetch additional keyspace metadata from scylla_keyspaces Extends schema related classes with methods for querying scylla_keyspaces in order to determine whether keyspace is tablets-enabled. Adjusts schema queries tests and several other integration tests that mock responses from system tables. Adds `isUsingTablets()` and `setUsingTablets()` to KeyspaceMetadata. * Use keyspace metadata in `BasicLoadBalancingPolicy#getReplicas` Modifies the getReplicas method to do the tablet lookup if and only if the keyspace metadata indicates that it's a tablets-based keyspace. Otherwise refer to the token map. Previous behavior was to try tablet map lookup first regardless of the keyspace configuration.
1 parent f9a87e1 commit 492a4c9

File tree

14 files changed

+162
-12
lines changed

14 files changed

+162
-12
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/metadata/schema/KeyspaceMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ public interface KeyspaceMetadata extends Describable {
4040
/** Whether this keyspace is virtual */
4141
boolean isVirtual();
4242

43+
default boolean isUsingTablets() {
44+
return false;
45+
}
46+
47+
default void setUsingTablets(boolean predicate) {}
48+
4349
/** The replication options defined for this keyspace. */
4450
@NonNull
4551
Map<String, String> getReplication();

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.datastax.oss.driver.api.core.metadata.Tablet;
3838
import com.datastax.oss.driver.api.core.metadata.TabletMap;
3939
import com.datastax.oss.driver.api.core.metadata.TokenMap;
40+
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
4041
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
4142
import com.datastax.oss.driver.api.core.metadata.token.Token;
4243
import com.datastax.oss.driver.api.core.session.Request;
@@ -328,22 +329,24 @@ protected Set<Node> getReplicas(@Nullable Request request, @Nullable Session ses
328329
return Collections.emptySet();
329330
}
330331

331-
if (table != null) {
332-
if (token == null) {
333-
if (partitioner != null) {
334-
token = partitioner.hash(key);
335-
}
332+
if (token == null && partitioner != null) {
333+
token = partitioner.hash(key);
334+
}
335+
336+
Optional<KeyspaceMetadata> ksMetadata =
337+
context.getMetadataManager().getMetadata().getKeyspace(keyspace);
338+
if (ksMetadata.isPresent() && ksMetadata.get().isUsingTablets()) {
339+
if (table == null) {
340+
return Collections.emptySet();
336341
}
337342
if (token instanceof TokenLong64) {
338343
Tablet targetTablet =
339344
tabletMap.getTablet(keyspace, table, ((TokenLong64) token).getValue());
340345
if (targetTablet != null) {
341-
Set<Node> replicas = targetTablet.getReplicaNodes();
342-
if (!replicas.isEmpty()) {
343-
return replicas;
344-
}
346+
return targetTablet.getReplicaNodes();
345347
}
346348
}
349+
return Collections.emptySet();
347350
}
348351

349352
if (!maybeTokenMap.isPresent()) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/DefaultKeyspaceMetadata.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class DefaultKeyspaceMetadata implements KeyspaceMetadata, Serializable {
4545
@NonNull private final Map<CqlIdentifier, ViewMetadata> views;
4646
@NonNull private final Map<FunctionSignature, FunctionMetadata> functions;
4747
@NonNull private final Map<FunctionSignature, AggregateMetadata> aggregates;
48+
private boolean usingTablets = false;
4849

4950
public DefaultKeyspaceMetadata(
5051
@NonNull CqlIdentifier name,
@@ -119,6 +120,16 @@ public Map<FunctionSignature, AggregateMetadata> getAggregates() {
119120
return aggregates;
120121
}
121122

123+
@Override
124+
public boolean isUsingTablets() {
125+
return this.usingTablets;
126+
}
127+
128+
@Override
129+
public void setUsingTablets(boolean predicate) {
130+
this.usingTablets = predicate;
131+
}
132+
122133
@Override
123134
public boolean equals(Object other) {
124135
if (other == this) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/parsing/CassandraSchemaParser.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ public SchemaRefresh parse() {
7575
ImmutableMap.Builder<CqlIdentifier, KeyspaceMetadata> keyspacesBuilder = ImmutableMap.builder();
7676
for (AdminRow row : rows.keyspaces()) {
7777
KeyspaceMetadata keyspace = parseKeyspace(row);
78+
AdminRow scyllaRow = rows.scyllaKeyspaces().getOrDefault(keyspace.getName(), null);
79+
if (scyllaRow != null
80+
&& scyllaRow.contains("initial_tablets")
81+
&& !scyllaRow.isNull("initial_tablets")) {
82+
keyspace.setUsingTablets(true);
83+
}
7884
keyspacesBuilder.put(keyspace.getName(), keyspace);
7985
}
8086
for (AdminRow row : rows.virtualKeyspaces()) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/Cassandra21SchemaQueries.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,9 @@ protected Optional<String> selectEdgesQuery() {
9494
protected Optional<String> selectVerticiesQuery() {
9595
return Optional.empty();
9696
}
97+
98+
@Override
99+
protected Optional<String> selectScyllaKeyspacesQuery() {
100+
return Optional.empty();
101+
}
97102
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/Cassandra22SchemaQueries.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,9 @@ protected Optional<String> selectEdgesQuery() {
9494
protected Optional<String> selectVerticiesQuery() {
9595
return Optional.empty();
9696
}
97+
98+
@Override
99+
protected Optional<String> selectScyllaKeyspacesQuery() {
100+
return Optional.empty();
101+
}
97102
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/Cassandra3SchemaQueries.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,9 @@ protected Optional<String> selectEdgesQuery() {
9494
protected Optional<String> selectVerticiesQuery() {
9595
return Optional.empty();
9696
}
97+
98+
@Override
99+
protected Optional<String> selectScyllaKeyspacesQuery() {
100+
return Optional.of("SELECT * FROM system_schema.scylla_keyspaces");
101+
}
97102
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/CassandraSchemaQueries.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler;
2929
import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
3030
import com.datastax.oss.driver.internal.core.adminrequest.AdminRow;
31+
import com.datastax.oss.driver.internal.core.adminrequest.UnexpectedResponseException;
3132
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
3233
import com.datastax.oss.driver.internal.core.util.NanoTime;
3334
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
3435
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
36+
import com.datastax.oss.protocol.internal.ProtocolConstants;
3537
import io.netty.util.concurrent.EventExecutor;
3638
import java.time.Duration;
3739
import java.util.Collections;
@@ -111,6 +113,8 @@ protected CassandraSchemaQueries(
111113

112114
protected abstract Optional<String> selectVerticiesQuery();
113115

116+
protected abstract Optional<String> selectScyllaKeyspacesQuery();
117+
114118
@Override
115119
public CompletionStage<SchemaRows> execute() {
116120
RunOrSchedule.on(adminExecutor, this::executeOnAdminExecutor);
@@ -125,6 +129,12 @@ private void executeOnAdminExecutor() {
125129
String usingClause = shouldApplyUsingTimeout() ? usingTimeoutClause : "";
126130

127131
query(selectKeyspacesQuery() + whereClause + usingClause, schemaRowsBuilder::withKeyspaces);
132+
selectScyllaKeyspacesQuery()
133+
.ifPresent(
134+
select ->
135+
queryIfAvailable(
136+
select + whereClause + usingClause, schemaRowsBuilder::withScyllaKeyspaces));
137+
128138
query(selectTypesQuery() + whereClause + usingClause, schemaRowsBuilder::withTypes);
129139
query(selectTablesQuery() + whereClause + usingClause, schemaRowsBuilder::withTables);
130140
query(selectColumnsQuery() + whereClause + usingClause, schemaRowsBuilder::withColumns);
@@ -176,6 +186,17 @@ private void query(
176186
(result, error) -> handleResult(result, error, builderUpdater), adminExecutor);
177187
}
178188

189+
private void queryIfAvailable(
190+
String queryString,
191+
Function<Iterable<AdminRow>, CassandraSchemaRows.Builder> builderUpdater) {
192+
assert adminExecutor.inEventLoop();
193+
194+
pendingQueries += 1;
195+
query(queryString)
196+
.whenCompleteAsync(
197+
(result, error) -> handleResult(result, error, builderUpdater, true), adminExecutor);
198+
}
199+
179200
@VisibleForTesting
180201
protected CompletionStage<AdminResult> query(String query) {
181202
return AdminRequestHandler.query(channel, query, timeout, pageSize, logPrefix).start();
@@ -185,12 +206,37 @@ private void handleResult(
185206
AdminResult result,
186207
Throwable error,
187208
Function<Iterable<AdminRow>, CassandraSchemaRows.Builder> builderUpdater) {
209+
handleResult(result, error, builderUpdater, false);
210+
}
211+
212+
private void handleResult(
213+
AdminResult result,
214+
Throwable error,
215+
Function<Iterable<AdminRow>, CassandraSchemaRows.Builder> builderUpdater,
216+
boolean ignoreServerErrors) {
188217

189218
// If another query already failed, we've already propagated the failure so just ignore this one
190219
if (schemaRowsFuture.isCompletedExceptionally()) {
191220
return;
192221
}
193222

223+
// Meant to allow through "(keyspace/table) does not exist" or "unconfigured" errors for
224+
// specific, optional queries
225+
if (ignoreServerErrors && error instanceof UnexpectedResponseException) {
226+
UnexpectedResponseException castedError = (UnexpectedResponseException) error;
227+
if (castedError.message.opcode == ProtocolConstants.ErrorCode.SERVER_ERROR) {
228+
LOG.debug("Silencing error: ", error);
229+
// Consider such query 'done', but ignore its result
230+
pendingQueries -= 1;
231+
if (pendingQueries == 0) {
232+
LOG.debug(
233+
"[{}] Schema queries took {}", logPrefix, NanoTime.formatTimeSince(startTimeNs));
234+
schemaRowsFuture.complete(schemaRowsBuilder.build());
235+
}
236+
return;
237+
}
238+
}
239+
194240
if (error != null) {
195241
schemaRowsFuture.completeExceptionally(error);
196242
} else {
@@ -200,7 +246,8 @@ private void handleResult(
200246
result
201247
.nextPage()
202248
.whenCompleteAsync(
203-
(nextResult, nextError) -> handleResult(nextResult, nextError, builderUpdater),
249+
(nextResult, nextError) ->
250+
handleResult(nextResult, nextError, builderUpdater, ignoreServerErrors),
204251
adminExecutor);
205252
} else {
206253
pendingQueries -= 1;

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/CassandraSchemaRows.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class CassandraSchemaRows implements SchemaRows {
5656
private final Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> indexes;
5757
private final Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> vertices;
5858
private final Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges;
59+
private final Map<CqlIdentifier, AdminRow> scyllaKeyspaces;
5960

6061
private CassandraSchemaRows(
6162
Node node,
@@ -72,7 +73,8 @@ private CassandraSchemaRows(
7273
Multimap<CqlIdentifier, AdminRow> functions,
7374
Multimap<CqlIdentifier, AdminRow> aggregates,
7475
Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> vertices,
75-
Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges) {
76+
Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges,
77+
Map<CqlIdentifier, AdminRow> scyllaKeyspaces) {
7678
this.node = node;
7779
this.dataTypeParser = dataTypeParser;
7880
this.keyspaces = keyspaces;
@@ -88,6 +90,7 @@ private CassandraSchemaRows(
8890
this.aggregates = aggregates;
8991
this.vertices = vertices;
9092
this.edges = edges;
93+
this.scyllaKeyspaces = scyllaKeyspaces;
9194
}
9295

9396
@NonNull
@@ -166,6 +169,11 @@ public Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges() {
166169
return edges;
167170
}
168171

172+
@Override
173+
public Map<CqlIdentifier, AdminRow> scyllaKeyspaces() {
174+
return scyllaKeyspaces;
175+
}
176+
169177
public static class Builder {
170178
private static final Logger LOG = LoggerFactory.getLogger(Builder.class);
171179

@@ -198,6 +206,8 @@ public static class Builder {
198206
verticesBuilders = new LinkedHashMap<>();
199207
private final Map<CqlIdentifier, ImmutableMultimap.Builder<CqlIdentifier, AdminRow>>
200208
edgesBuilders = new LinkedHashMap<>();
209+
private final ImmutableMap.Builder<CqlIdentifier, AdminRow> scyllaKeyspacesBuilder =
210+
ImmutableMap.builder();
201211

202212
public Builder(Node node, KeyspaceFilter keyspaceFilter, String logPrefix) {
203213
this.node = node;
@@ -323,6 +333,13 @@ public Builder withEdges(Iterable<AdminRow> rows) {
323333
return this;
324334
}
325335

336+
public Builder withScyllaKeyspaces(Iterable<AdminRow> rows) {
337+
for (AdminRow row : rows) {
338+
putByKeyspacePk(row, scyllaKeyspacesBuilder);
339+
}
340+
return this;
341+
}
342+
326343
private void put(ImmutableList.Builder<AdminRow> builder, AdminRow row) {
327344
String keyspace = row.getString("keyspace_name");
328345
if (keyspace == null) {
@@ -342,6 +359,16 @@ private void putByKeyspace(
342359
}
343360
}
344361

362+
private void putByKeyspacePk(
363+
AdminRow row, ImmutableMap.Builder<CqlIdentifier, AdminRow> builder) {
364+
String keyspace = row.getString("keyspace_name");
365+
if (keyspace == null) {
366+
LOG.warn("[{}] Skipping system row with missing keyspace name", logPrefix);
367+
} else if (keyspaceFilter.includes(keyspace)) {
368+
builder.put(CqlIdentifier.fromInternal(keyspace), row);
369+
}
370+
}
371+
345372
private void putByKeyspaceAndTable(
346373
AdminRow row,
347374
Map<CqlIdentifier, ImmutableMultimap.Builder<CqlIdentifier, AdminRow>> builders) {
@@ -375,7 +402,8 @@ public CassandraSchemaRows build() {
375402
functionsBuilder.build(),
376403
aggregatesBuilder.build(),
377404
build(verticesBuilders),
378-
build(edgesBuilders));
405+
build(edgesBuilders),
406+
scyllaKeyspacesBuilder.build());
379407
}
380408

381409
private static <K1, K2, V> Map<K1, Multimap<K2, V>> build(

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/SchemaRows.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.DataTypeParser;
2424
import com.datastax.oss.driver.shaded.guava.common.collect.Multimap;
2525
import edu.umd.cs.findbugs.annotations.NonNull;
26+
import java.util.HashMap;
2627
import java.util.LinkedHashMap;
2728
import java.util.List;
2829
import java.util.Map;
@@ -70,4 +71,8 @@ default Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> vertices() {
7071
default Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges() {
7172
return new LinkedHashMap<>();
7273
}
74+
75+
default Map<CqlIdentifier, AdminRow> scyllaKeyspaces() {
76+
return new HashMap<>();
77+
}
7378
}

0 commit comments

Comments
 (0)