Skip to content

Commit 5a6a195

Browse files
committed
Add USING TIMEOUT to CassandraSchemaQueries
The driver will now use existing configurable option `METADATA_SCHEMA_REQUEST_TIMEOUT` in a `USING TIMEOUT` clause with each schema query (if applicable). This will make it a server side timeout in addition to driver side. The driver will apply the `USING` clause if the sharding info of control connection channel is not null. This is used as a proxy check to differentiate between Scylla and Cassandra. The behaviour should remain unchanged for Cassandra clusters. USING TIMEOUT is a ScyllaDB CQL extension and it is available in versions 4.4/2022.1.0 or newer.
1 parent 521445e commit 5a6a195

File tree

4 files changed

+108
-23
lines changed

4 files changed

+108
-23
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/CassandraSchemaQueries.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18+
/*
19+
* Copyright (C) 2024 ScyllaDB
20+
*
21+
* Modified by ScyllaDB
22+
*/
1823
package com.datastax.oss.driver.internal.core.metadata.schema.queries;
1924

2025
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
@@ -54,6 +59,7 @@ public abstract class CassandraSchemaQueries implements SchemaQueries {
5459
// The future we return from execute, completes when all the queries are done.
5560
private final CompletableFuture<SchemaRows> schemaRowsFuture = new CompletableFuture<>();
5661
private final long startTimeNs = System.nanoTime();
62+
private final String usingTimeoutClause;
5763

5864
// All non-final fields are accessed exclusively on adminExecutor
5965
private CassandraSchemaRows.Builder schemaRowsBuilder;
@@ -73,6 +79,10 @@ protected CassandraSchemaQueries(
7379
DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES, Collections.emptyList());
7480
assert refreshedKeyspaces != null; // per the default value
7581
this.keyspaceFilter = KeyspaceFilter.newInstance(logPrefix, refreshedKeyspaces);
82+
this.usingTimeoutClause =
83+
" USING TIMEOUT "
84+
+ config.getDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT).toMillis()
85+
+ "ms";
7686
}
7787

7888
protected abstract String selectKeyspacesQuery();
@@ -112,29 +122,47 @@ private void executeOnAdminExecutor() {
112122

113123
schemaRowsBuilder = new CassandraSchemaRows.Builder(node, keyspaceFilter, logPrefix);
114124
String whereClause = keyspaceFilter.getWhereClause();
125+
String usingClause = shouldApplyUsingTimeout() ? usingTimeoutClause : "";
115126

116-
query(selectKeyspacesQuery() + whereClause, schemaRowsBuilder::withKeyspaces);
117-
query(selectTypesQuery() + whereClause, schemaRowsBuilder::withTypes);
118-
query(selectTablesQuery() + whereClause, schemaRowsBuilder::withTables);
119-
query(selectColumnsQuery() + whereClause, schemaRowsBuilder::withColumns);
127+
query(selectKeyspacesQuery() + whereClause + usingClause, schemaRowsBuilder::withKeyspaces);
128+
query(selectTypesQuery() + whereClause + usingClause, schemaRowsBuilder::withTypes);
129+
query(selectTablesQuery() + whereClause + usingClause, schemaRowsBuilder::withTables);
130+
query(selectColumnsQuery() + whereClause + usingClause, schemaRowsBuilder::withColumns);
120131
selectIndexesQuery()
121-
.ifPresent(select -> query(select + whereClause, schemaRowsBuilder::withIndexes));
132+
.ifPresent(
133+
select -> query(select + whereClause + usingClause, schemaRowsBuilder::withIndexes));
122134
selectViewsQuery()
123-
.ifPresent(select -> query(select + whereClause, schemaRowsBuilder::withViews));
135+
.ifPresent(
136+
select -> query(select + whereClause + usingClause, schemaRowsBuilder::withViews));
124137
selectFunctionsQuery()
125-
.ifPresent(select -> query(select + whereClause, schemaRowsBuilder::withFunctions));
138+
.ifPresent(
139+
select -> query(select + whereClause + usingClause, schemaRowsBuilder::withFunctions));
126140
selectAggregatesQuery()
127-
.ifPresent(select -> query(select + whereClause, schemaRowsBuilder::withAggregates));
141+
.ifPresent(
142+
select -> query(select + whereClause + usingClause, schemaRowsBuilder::withAggregates));
128143
selectVirtualKeyspacesQuery()
129-
.ifPresent(select -> query(select + whereClause, schemaRowsBuilder::withVirtualKeyspaces));
144+
.ifPresent(
145+
select ->
146+
query(select + whereClause + usingClause, schemaRowsBuilder::withVirtualKeyspaces));
130147
selectVirtualTablesQuery()
131-
.ifPresent(select -> query(select + whereClause, schemaRowsBuilder::withVirtualTables));
148+
.ifPresent(
149+
select ->
150+
query(select + whereClause + usingClause, schemaRowsBuilder::withVirtualTables));
132151
selectVirtualColumnsQuery()
133-
.ifPresent(select -> query(select + whereClause, schemaRowsBuilder::withVirtualColumns));
152+
.ifPresent(
153+
select ->
154+
query(select + whereClause + usingClause, schemaRowsBuilder::withVirtualColumns));
134155
selectEdgesQuery()
135-
.ifPresent(select -> query(select + whereClause, schemaRowsBuilder::withEdges));
156+
.ifPresent(
157+
select -> query(select + whereClause + usingClause, schemaRowsBuilder::withEdges));
136158
selectVerticiesQuery()
137-
.ifPresent(select -> query(select + whereClause, schemaRowsBuilder::withVertices));
159+
.ifPresent(
160+
select -> query(select + whereClause + usingClause, schemaRowsBuilder::withVertices));
161+
}
162+
163+
protected boolean shouldApplyUsingTimeout() {
164+
// We use non-null sharding info as a proxy check for cluster being a ScyllaDB cluster
165+
return (channel.getShardingInfo() != null);
138166
}
139167

140168
private void query(

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/Cassandra3SchemaQueriesTest.java

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18+
/*
19+
* Copyright (C) 2024 ScyllaDB
20+
*
21+
* Modified by ScyllaDB
22+
*/
1823
package com.datastax.oss.driver.internal.core.metadata.schema.queries;
1924

2025
import static com.datastax.oss.driver.Assertions.assertThat;
@@ -28,6 +33,7 @@
2833
import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
2934
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
3035
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
36+
import java.time.Duration;
3137
import java.util.Collections;
3238
import java.util.Queue;
3339
import java.util.concurrent.CompletionStage;
@@ -63,52 +69,72 @@ public void should_query_with_keyspace_filter() {
6369
should_query_with_where_clause(" WHERE keyspace_name IN ('ks1','ks2')");
6470
}
6571

72+
@Test
73+
public void should_query_with_using_clause() {
74+
when(config.getDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT))
75+
.thenReturn(Duration.ofMillis(100));
76+
should_query_with_clauses("", " USING TIMEOUT 100ms");
77+
}
78+
6679
private void should_query_with_where_clause(String whereClause) {
80+
should_query_with_clauses(whereClause, "");
81+
}
82+
83+
private void should_query_with_clauses(String whereClause, String usingClause) {
6784
SchemaQueriesWithMockedChannel queries =
68-
new SchemaQueriesWithMockedChannel(driverChannel, node, config, "test");
85+
new SchemaQueriesWithMockedChannel(
86+
driverChannel, node, config, "test", !usingClause.equals(""));
6987
CompletionStage<SchemaRows> result = queries.execute();
7088

7189
// Keyspace
7290
Call call = queries.calls.poll();
73-
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.keyspaces" + whereClause);
91+
assertThat(call.query)
92+
.isEqualTo("SELECT * FROM system_schema.keyspaces" + whereClause + usingClause);
7493
call.result.complete(
7594
mockResult(mockRow("keyspace_name", "ks1"), mockRow("keyspace_name", "ks2")));
7695

7796
// Types
7897
call = queries.calls.poll();
79-
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.types" + whereClause);
98+
assertThat(call.query)
99+
.isEqualTo("SELECT * FROM system_schema.types" + whereClause + usingClause);
80100
call.result.complete(mockResult(mockRow("keyspace_name", "ks1", "type_name", "type")));
81101

82102
// Tables
83103
call = queries.calls.poll();
84-
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.tables" + whereClause);
104+
assertThat(call.query)
105+
.isEqualTo("SELECT * FROM system_schema.tables" + whereClause + usingClause);
85106
call.result.complete(mockResult(mockRow("keyspace_name", "ks1", "table_name", "foo")));
86107

87108
// Columns
88109
call = queries.calls.poll();
89-
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.columns" + whereClause);
110+
assertThat(call.query)
111+
.isEqualTo("SELECT * FROM system_schema.columns" + whereClause + usingClause);
90112
call.result.complete(
91113
mockResult(mockRow("keyspace_name", "ks1", "table_name", "foo", "column_name", "k")));
92114

93115
// Indexes
94116
call = queries.calls.poll();
95-
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.indexes" + whereClause);
117+
assertThat(call.query)
118+
.isEqualTo("SELECT * FROM system_schema.indexes" + whereClause + usingClause);
96119
call.result.complete(
97120
mockResult(mockRow("keyspace_name", "ks1", "table_name", "foo", "index_name", "index")));
98121

99122
// Views
100123
call = queries.calls.poll();
101-
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.views" + whereClause);
124+
assertThat(call.query)
125+
.isEqualTo("SELECT * FROM system_schema.views" + whereClause + usingClause);
102126
call.result.complete(mockResult(mockRow("keyspace_name", "ks2", "view_name", "foo")));
103127

104128
// Functions
105129
call = queries.calls.poll();
106-
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.functions" + whereClause);
130+
assertThat(call.query)
131+
.isEqualTo("SELECT * FROM system_schema.functions" + whereClause + usingClause);
107132
call.result.complete(mockResult(mockRow("keyspace_name", "ks2", "function_name", "add")));
108133

109134
// Aggregates
110135
call = queries.calls.poll();
111-
assertThat(call.query).isEqualTo("SELECT * FROM system_schema.aggregates" + whereClause);
136+
assertThat(call.query)
137+
.isEqualTo("SELECT * FROM system_schema.aggregates" + whereClause + usingClause);
112138
call.result.complete(mockResult(mockRow("keyspace_name", "ks2", "aggregate_name", "add")));
113139

114140
channel.runPendingTasks();
@@ -349,10 +375,26 @@ public void should_abort_if_query_fails() {
349375
static class SchemaQueriesWithMockedChannel extends Cassandra3SchemaQueries {
350376

351377
final Queue<Call> calls = new LinkedBlockingDeque<>();
378+
final boolean shouldApplyUsingTimeout;
352379

353380
SchemaQueriesWithMockedChannel(
354381
DriverChannel channel, Node node, DriverExecutionProfile config, String logPrefix) {
382+
this(channel, node, config, logPrefix, false);
383+
}
384+
385+
SchemaQueriesWithMockedChannel(
386+
DriverChannel channel,
387+
Node node,
388+
DriverExecutionProfile config,
389+
String logPrefix,
390+
boolean shouldApplyUsingTimeout) {
355391
super(channel, node, config, logPrefix);
392+
this.shouldApplyUsingTimeout = shouldApplyUsingTimeout;
393+
}
394+
395+
@Override
396+
protected boolean shouldApplyUsingTimeout() {
397+
return shouldApplyUsingTimeout;
356398
}
357399

358400
@Override

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/DefaultSchemaQueriesFactoryTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18+
/*
19+
* Copyright (C) 2024 ScyllaDB
20+
*
21+
* Modified by ScyllaDB
22+
*/
1823
package com.datastax.oss.driver.internal.core.metadata.schema.queries;
1924

2025
import static org.assertj.core.api.Assertions.assertThat;
@@ -23,6 +28,7 @@
2328

2429
import com.datastax.dse.driver.api.core.metadata.DseNodeProperties;
2530
import com.datastax.oss.driver.api.core.Version;
31+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2632
import com.datastax.oss.driver.api.core.config.DriverConfig;
2733
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
2834
import com.datastax.oss.driver.api.core.metadata.Node;
@@ -33,6 +39,8 @@
3339
import com.tngtech.java.junit.dataprovider.DataProvider;
3440
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
3541
import com.tngtech.java.junit.dataprovider.UseDataProvider;
42+
import java.time.Duration;
43+
import java.time.temporal.ChronoUnit;
3644
import java.util.Optional;
3745
import org.junit.Test;
3846
import org.junit.runner.RunWith;
@@ -130,6 +138,8 @@ private DefaultSchemaQueriesFactory buildFactory() {
130138
when(mockConfig.getDefaultProfile()).thenReturn(mockProfile);
131139
final InternalDriverContext mockInternalCtx = mock(InternalDriverContext.class);
132140
when(mockInternalCtx.getConfig()).thenReturn(mockConfig);
141+
when(mockProfile.getDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT))
142+
.thenReturn(Duration.of(5, ChronoUnit.SECONDS));
133143

134144
return new DefaultSchemaQueriesFactory(mockInternalCtx);
135145
}

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/schema/queries/SchemaQueriesTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18+
/*
19+
* Copyright (C) 2024 ScyllaDB
20+
*
21+
* Modified by ScyllaDB
22+
*/
1823
package com.datastax.oss.driver.internal.core.metadata.schema.queries;
1924

2025
import static org.assertj.core.api.Assertions.assertThat;
@@ -54,7 +59,7 @@ public abstract class SchemaQueriesTest {
5459
public void setup() {
5560
// Whatever, not actually used because the requests are mocked
5661
when(config.getDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT))
57-
.thenReturn(Duration.ZERO);
62+
.thenReturn(Duration.ofSeconds(2));
5863
when(config.getInt(DefaultDriverOption.METADATA_SCHEMA_REQUEST_PAGE_SIZE)).thenReturn(5000);
5964

6065
channel = new EmbeddedChannel();

0 commit comments

Comments
 (0)