Skip to content

Commit a568e59

Browse files
committed
Fetch additional keyspace metadata from scylla_keyspaces
Extends schema related classes with methods for querying scylla_keyspaces in order to determine whether keyspace is tablets-enabled. Adjusts schema queries tests and several other integration tests that mock responses from system tables. Adds `isUsingTablets()` and `setUsingTablets()` to KeyspaceMetadata.
1 parent 47477e5 commit a568e59

File tree

13 files changed

+148
-3
lines changed

13 files changed

+148
-3
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/metadata/schema/KeyspaceMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ public interface KeyspaceMetadata extends Describable {
4040
/** Whether this keyspace is virtual */
4141
boolean isVirtual();
4242

43+
default boolean isUsingTablets() {
44+
return false;
45+
}
46+
47+
default void setUsingTablets(boolean predicate) {}
48+
4349
/** The replication options defined for this keyspace. */
4450
@NonNull
4551
Map<String, String> getReplication();

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/DefaultKeyspaceMetadata.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class DefaultKeyspaceMetadata implements KeyspaceMetadata, Serializable {
4545
@NonNull private final Map<CqlIdentifier, ViewMetadata> views;
4646
@NonNull private final Map<FunctionSignature, FunctionMetadata> functions;
4747
@NonNull private final Map<FunctionSignature, AggregateMetadata> aggregates;
48+
private boolean usingTablets = false;
4849

4950
public DefaultKeyspaceMetadata(
5051
@NonNull CqlIdentifier name,
@@ -119,6 +120,16 @@ public Map<FunctionSignature, AggregateMetadata> getAggregates() {
119120
return aggregates;
120121
}
121122

123+
@Override
124+
public boolean isUsingTablets() {
125+
return this.usingTablets;
126+
}
127+
128+
@Override
129+
public void setUsingTablets(boolean predicate) {
130+
this.usingTablets = predicate;
131+
}
132+
122133
@Override
123134
public boolean equals(Object other) {
124135
if (other == this) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/parsing/CassandraSchemaParser.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ public SchemaRefresh parse() {
7575
ImmutableMap.Builder<CqlIdentifier, KeyspaceMetadata> keyspacesBuilder = ImmutableMap.builder();
7676
for (AdminRow row : rows.keyspaces()) {
7777
KeyspaceMetadata keyspace = parseKeyspace(row);
78+
AdminRow scyllaRow = rows.scyllaKeyspaces().getOrDefault(keyspace.getName(), null);
79+
if (scyllaRow != null
80+
&& scyllaRow.contains("initial_tablets")
81+
&& !scyllaRow.isNull("initial_tablets")) {
82+
keyspace.setUsingTablets(true);
83+
}
7884
keyspacesBuilder.put(keyspace.getName(), keyspace);
7985
}
8086
for (AdminRow row : rows.virtualKeyspaces()) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/Cassandra21SchemaQueries.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,9 @@ protected Optional<String> selectEdgesQuery() {
9494
protected Optional<String> selectVerticiesQuery() {
9595
return Optional.empty();
9696
}
97+
98+
@Override
99+
protected Optional<String> selectScyllaKeyspacesQuery() {
100+
return Optional.empty();
101+
}
97102
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/Cassandra22SchemaQueries.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,9 @@ protected Optional<String> selectEdgesQuery() {
9494
protected Optional<String> selectVerticiesQuery() {
9595
return Optional.empty();
9696
}
97+
98+
@Override
99+
protected Optional<String> selectScyllaKeyspacesQuery() {
100+
return Optional.empty();
101+
}
97102
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/Cassandra3SchemaQueries.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,9 @@ protected Optional<String> selectEdgesQuery() {
9494
protected Optional<String> selectVerticiesQuery() {
9595
return Optional.empty();
9696
}
97+
98+
@Override
99+
protected Optional<String> selectScyllaKeyspacesQuery() {
100+
return Optional.of("SELECT * FROM system_schema.scylla_keyspaces");
101+
}
97102
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/CassandraSchemaQueries.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler;
2929
import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
3030
import com.datastax.oss.driver.internal.core.adminrequest.AdminRow;
31+
import com.datastax.oss.driver.internal.core.adminrequest.UnexpectedResponseException;
3132
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
3233
import com.datastax.oss.driver.internal.core.util.NanoTime;
3334
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
3435
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
36+
import com.datastax.oss.protocol.internal.ProtocolConstants;
3537
import io.netty.util.concurrent.EventExecutor;
3638
import java.time.Duration;
3739
import java.util.Collections;
@@ -111,6 +113,8 @@ protected CassandraSchemaQueries(
111113

112114
protected abstract Optional<String> selectVerticiesQuery();
113115

116+
protected abstract Optional<String> selectScyllaKeyspacesQuery();
117+
114118
@Override
115119
public CompletionStage<SchemaRows> execute() {
116120
RunOrSchedule.on(adminExecutor, this::executeOnAdminExecutor);
@@ -125,6 +129,12 @@ private void executeOnAdminExecutor() {
125129
String usingClause = shouldApplyUsingTimeout() ? usingTimeoutClause : "";
126130

127131
query(selectKeyspacesQuery() + whereClause + usingClause, schemaRowsBuilder::withKeyspaces);
132+
selectScyllaKeyspacesQuery()
133+
.ifPresent(
134+
select ->
135+
queryIfAvailable(
136+
select + whereClause + usingClause, schemaRowsBuilder::withScyllaKeyspaces));
137+
128138
query(selectTypesQuery() + whereClause + usingClause, schemaRowsBuilder::withTypes);
129139
query(selectTablesQuery() + whereClause + usingClause, schemaRowsBuilder::withTables);
130140
query(selectColumnsQuery() + whereClause + usingClause, schemaRowsBuilder::withColumns);
@@ -176,6 +186,17 @@ private void query(
176186
(result, error) -> handleResult(result, error, builderUpdater), adminExecutor);
177187
}
178188

189+
private void queryIfAvailable(
190+
String queryString,
191+
Function<Iterable<AdminRow>, CassandraSchemaRows.Builder> builderUpdater) {
192+
assert adminExecutor.inEventLoop();
193+
194+
pendingQueries += 1;
195+
query(queryString)
196+
.whenCompleteAsync(
197+
(result, error) -> handleResult(result, error, builderUpdater, true), adminExecutor);
198+
}
199+
179200
@VisibleForTesting
180201
protected CompletionStage<AdminResult> query(String query) {
181202
return AdminRequestHandler.query(channel, query, timeout, pageSize, logPrefix).start();
@@ -185,12 +206,34 @@ private void handleResult(
185206
AdminResult result,
186207
Throwable error,
187208
Function<Iterable<AdminRow>, CassandraSchemaRows.Builder> builderUpdater) {
209+
handleResult(result, error, builderUpdater, false);
210+
}
211+
212+
private void handleResult(
213+
AdminResult result,
214+
Throwable error,
215+
Function<Iterable<AdminRow>, CassandraSchemaRows.Builder> builderUpdater,
216+
boolean ignoreTargetDoesNotExistErrors) {
188217

189218
// If another query already failed, we've already propagated the failure so just ignore this one
190219
if (schemaRowsFuture.isCompletedExceptionally()) {
191220
return;
192221
}
193222

223+
if (ignoreTargetDoesNotExistErrors && error instanceof UnexpectedResponseException) {
224+
UnexpectedResponseException castedError = (UnexpectedResponseException) error;
225+
if (castedError.message.opcode == ProtocolConstants.ErrorCode.SERVER_ERROR
226+
&& castedError.getMessage().contains("does not exist")) {
227+
pendingQueries -= 1;
228+
if (pendingQueries == 0) {
229+
LOG.debug(
230+
"[{}] Schema queries took {}", logPrefix, NanoTime.formatTimeSince(startTimeNs));
231+
schemaRowsFuture.complete(schemaRowsBuilder.build());
232+
}
233+
return;
234+
}
235+
}
236+
194237
if (error != null) {
195238
schemaRowsFuture.completeExceptionally(error);
196239
} else {
@@ -200,7 +243,9 @@ private void handleResult(
200243
result
201244
.nextPage()
202245
.whenCompleteAsync(
203-
(nextResult, nextError) -> handleResult(nextResult, nextError, builderUpdater),
246+
(nextResult, nextError) ->
247+
handleResult(
248+
nextResult, nextError, builderUpdater, ignoreTargetDoesNotExistErrors),
204249
adminExecutor);
205250
} else {
206251
pendingQueries -= 1;

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/CassandraSchemaRows.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class CassandraSchemaRows implements SchemaRows {
5656
private final Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> indexes;
5757
private final Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> vertices;
5858
private final Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges;
59+
private final Map<CqlIdentifier, AdminRow> scyllaKeyspaces;
5960

6061
private CassandraSchemaRows(
6162
Node node,
@@ -72,7 +73,8 @@ private CassandraSchemaRows(
7273
Multimap<CqlIdentifier, AdminRow> functions,
7374
Multimap<CqlIdentifier, AdminRow> aggregates,
7475
Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> vertices,
75-
Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges) {
76+
Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges,
77+
Map<CqlIdentifier, AdminRow> scyllaKeyspaces) {
7678
this.node = node;
7779
this.dataTypeParser = dataTypeParser;
7880
this.keyspaces = keyspaces;
@@ -88,6 +90,7 @@ private CassandraSchemaRows(
8890
this.aggregates = aggregates;
8991
this.vertices = vertices;
9092
this.edges = edges;
93+
this.scyllaKeyspaces = scyllaKeyspaces;
9194
}
9295

9396
@NonNull
@@ -166,6 +169,11 @@ public Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges() {
166169
return edges;
167170
}
168171

172+
@Override
173+
public Map<CqlIdentifier, AdminRow> scyllaKeyspaces() {
174+
return scyllaKeyspaces;
175+
}
176+
169177
public static class Builder {
170178
private static final Logger LOG = LoggerFactory.getLogger(Builder.class);
171179

@@ -198,6 +206,8 @@ public static class Builder {
198206
verticesBuilders = new LinkedHashMap<>();
199207
private final Map<CqlIdentifier, ImmutableMultimap.Builder<CqlIdentifier, AdminRow>>
200208
edgesBuilders = new LinkedHashMap<>();
209+
private final ImmutableMap.Builder<CqlIdentifier, AdminRow> scyllaKeyspacesBuilder =
210+
ImmutableMap.builder();
201211

202212
public Builder(Node node, KeyspaceFilter keyspaceFilter, String logPrefix) {
203213
this.node = node;
@@ -323,6 +333,13 @@ public Builder withEdges(Iterable<AdminRow> rows) {
323333
return this;
324334
}
325335

336+
public Builder withScyllaKeyspaces(Iterable<AdminRow> rows) {
337+
for (AdminRow row : rows) {
338+
putByKeyspacePk(row, scyllaKeyspacesBuilder);
339+
}
340+
return this;
341+
}
342+
326343
private void put(ImmutableList.Builder<AdminRow> builder, AdminRow row) {
327344
String keyspace = row.getString("keyspace_name");
328345
if (keyspace == null) {
@@ -342,6 +359,16 @@ private void putByKeyspace(
342359
}
343360
}
344361

362+
private void putByKeyspacePk(
363+
AdminRow row, ImmutableMap.Builder<CqlIdentifier, AdminRow> builder) {
364+
String keyspace = row.getString("keyspace_name");
365+
if (keyspace == null) {
366+
LOG.warn("[{}] Skipping system row with missing keyspace name", logPrefix);
367+
} else if (keyspaceFilter.includes(keyspace)) {
368+
builder.put(CqlIdentifier.fromInternal(keyspace), row);
369+
}
370+
}
371+
345372
private void putByKeyspaceAndTable(
346373
AdminRow row,
347374
Map<CqlIdentifier, ImmutableMultimap.Builder<CqlIdentifier, AdminRow>> builders) {
@@ -375,7 +402,8 @@ public CassandraSchemaRows build() {
375402
functionsBuilder.build(),
376403
aggregatesBuilder.build(),
377404
build(verticesBuilders),
378-
build(edgesBuilders));
405+
build(edgesBuilders),
406+
scyllaKeyspacesBuilder.build());
379407
}
380408

381409
private static <K1, K2, V> Map<K1, Multimap<K2, V>> build(

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/SchemaRows.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.DataTypeParser;
2424
import com.datastax.oss.driver.shaded.guava.common.collect.Multimap;
2525
import edu.umd.cs.findbugs.annotations.NonNull;
26+
import java.util.HashMap;
2627
import java.util.LinkedHashMap;
2728
import java.util.List;
2829
import java.util.Map;
@@ -70,4 +71,8 @@ default Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> vertices() {
7071
default Map<CqlIdentifier, Multimap<CqlIdentifier, AdminRow>> edges() {
7172
return new LinkedHashMap<>();
7273
}
74+
75+
default Map<CqlIdentifier, AdminRow> scyllaKeyspaces() {
76+
return new HashMap<>();
77+
}
7378
}

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/Cassandra3SchemaQueriesTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ private void should_query_with_clauses(String whereClause, String usingClause) {
9393
call.result.complete(
9494
mockResult(mockRow("keyspace_name", "ks1"), mockRow("keyspace_name", "ks2")));
9595

96+
// Scylla keyspaces
97+
call = queries.calls.poll();
98+
assertThat(call.query)
99+
.isEqualTo("SELECT * FROM system_schema.scylla_keyspaces" + whereClause + usingClause);
100+
call.result.complete(mockResult());
101+
96102
// Types
97103
call = queries.calls.poll();
98104
assertThat(call.query)
@@ -217,6 +223,11 @@ public void should_query_with_paging() {
217223
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.keyspaces");
218224
call.result.complete(mockResult(mockRow("keyspace_name", "ks1")));
219225

226+
// Scylla keyspaces
227+
call = queries.calls.poll();
228+
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.scylla_keyspaces");
229+
call.result.complete(mockResult());
230+
220231
// No types
221232
call = queries.calls.poll();
222233
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.types");
@@ -281,6 +292,11 @@ public void should_ignore_malformed_rows() {
281292
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.keyspaces");
282293
call.result.complete(mockResult(mockRow("keyspace_name", "ks1")));
283294

295+
// Scylla keyspaces
296+
call = queries.calls.poll();
297+
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.scylla_keyspaces");
298+
call.result.complete(mockResult());
299+
284300
// No types
285301
call = queries.calls.poll();
286302
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.types");

0 commit comments

Comments
 (0)