Skip to content

Commit 087e33e

Browse files
authored
CNDB-13770: Separate timeout for aggregation queries (#1740)
CNDB-13770 Separate timeout for aggregation queries # What is the issue In CC, aggregate user queries use the same range read timeout. In DSE, we had a separate timeout that defaulted to 120s. We would like to retain that functionality. # What does this PR fix and why was it fixed This PR adds a separate 120s timeout for aggregate queries. The timeout is configurable with aggregation_request_timeout_in_ms Config parameter
1 parent 12a30b5 commit 087e33e

File tree

8 files changed

+307
-26
lines changed

8 files changed

+307
-26
lines changed

conf/cassandra.yaml

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ num_tokens: 16
4444
allocate_tokens_for_local_replication_factor: 3
4545

4646
# initial_token allows you to specify tokens manually. While you can use it with
47-
# vnodes (num_tokens > 1, above) -- in which case you should provide a
48-
# comma-separated list -- it's primarily used when adding nodes to legacy clusters
47+
# vnodes (num_tokens > 1, above) -- in which case you should provide a
48+
# comma-separated list -- it's primarily used when adding nodes to legacy clusters
4949
# that do not have vnodes enabled.
5050
# initial_token:
5151

@@ -243,7 +243,7 @@ credentials_validity_in_ms: 2000
243243
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
244244

245245
# Directories where Cassandra should store data on disk. If multiple
246-
# directories are specified, Cassandra will spread data evenly across
246+
# directories are specified, Cassandra will spread data evenly across
247247
# them by partitioning the token ranges.
248248
# If not set, the default directory is $CASSANDRA_HOME/data/data.
249249
# data_file_directories:
@@ -434,8 +434,8 @@ counter_cache_save_period: 7200
434434
# while still having the cache during runtime.
435435
# cache_load_timeout_seconds: 30
436436

437-
# commitlog_sync may be either "periodic", "group", or "batch."
438-
#
437+
# commitlog_sync may be either "periodic", "group", or "batch."
438+
#
439439
# When in batch mode, Cassandra won't ack writes until the commit log
440440
# has been flushed to disk. Each incoming write will trigger the flush task.
441441
# commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had
@@ -871,7 +871,7 @@ incremental_backups: false
871871
snapshot_before_compaction: false
872872

873873
# Whether or not a snapshot is taken of the data before keyspace truncation
874-
# or dropping of column families. The STRONGLY advised default of true
874+
# or dropping of column families. The STRONGLY advised default of true
875875
# should be used to provide data safety. If you set this flag to false, you will
876876
# lose data on truncation or drop.
877877
auto_snapshot: true
@@ -915,7 +915,7 @@ column_index_cache_size_in_kb: 2
915915
#
916916
# concurrent_compactors defaults to the smaller of (number of disks,
917917
# number of cores), with a minimum of 2 and a maximum of 8.
918-
#
918+
#
919919
# If your data directories are backed by SSD, you should increase this
920920
# to the number of cores.
921921
#concurrent_compactors: 1
@@ -943,7 +943,7 @@ compaction_throughput_mb_per_sec: 64
943943

944944
# When compacting, the replacement sstable(s) can be opened before they
945945
# are completely written, and used in place of the prior sstables for
946-
# any range that has been written. This helps to smoothly transfer reads
946+
# any range that has been written. This helps to smoothly transfer reads
947947
# between the sstables, reducing page cache churn and keeping hot rows hot
948948
sstable_preemptive_open_interval_in_mb: 50
949949

@@ -992,12 +992,18 @@ enable_uuid_sstable_identifiers: true
992992
# low is equally ill-advised since clients could get timeouts even for successful
993993
# operations just because the timeout setting is too tight.
994994

995-
# How long the coordinator should wait for read operations to complete.
995+
# How long the coordinator should wait for read operations to complete. This
996+
# timeout does not apply to aggregated queries such as SELECT COUNT(*), MIN(x), etc.
996997
# Lowest acceptable value is 10 ms.
997998
read_request_timeout_in_ms: 5000
998-
# How long the coordinator should wait for seq or index scans to complete.
999+
# How long the coordinator should wait for seq or index scans to complete. This
1000+
# timeout does not apply to aggregated queries such as SELECT COUNT(*), MIN(x), etc.
9991001
# Lowest acceptable value is 10 ms.
10001002
range_request_timeout_in_ms: 10000
1003+
# How long the coordinator should wait for aggregation read operations to complete,
1004+
# such as SELECT COUNT(*), MIN(x), etc.
1005+
# Lowest acceptable value is 10 ms.
1006+
aggregation_request_timeout_in_ms: 120000
10011007
# How long the coordinator should wait for writes to complete.
10021008
# Lowest acceptable value is 10 ms.
10031009
write_request_timeout_in_ms: 2000
@@ -1073,10 +1079,10 @@ slow_query_log_timeout_in_ms: 500
10731079
# Enable operation timeout information exchange between nodes to accurately
10741080
# measure request timeouts. If disabled, replicas will assume that requests
10751081
# were forwarded to them instantly by the coordinator, which means that
1076-
# under overload conditions we will waste that much extra time processing
1082+
# under overload conditions we will waste that much extra time processing
10771083
# already-timed-out requests.
10781084
#
1079-
# Warning: It is generally assumed that users have setup NTP on their clusters, and that clocks are modestly in sync,
1085+
# Warning: It is generally assumed that users have setup NTP on their clusters, and that clocks are modestly in sync,
10801086
# since this is a requirement for general correctness of last write wins.
10811087
#cross_node_timeout: true
10821088

src/java/org/apache/cassandra/config/Config.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.lang.reflect.Field;
2121
import java.lang.reflect.Modifier;
2222
import java.util.ArrayList;
23-
import java.util.Collections;
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.Set;
@@ -38,7 +37,6 @@
3837
import org.apache.cassandra.fql.FullQueryLoggerOptions;
3938
import org.apache.cassandra.db.ConsistencyLevel;
4039
import org.apache.cassandra.guardrails.GuardrailsConfig;
41-
import org.apache.cassandra.io.compress.AdaptiveCompressor;
4240
import org.apache.cassandra.utils.FBUtilities;
4341

4442
/**
@@ -103,6 +101,8 @@ public class Config
103101

104102
public volatile long range_request_timeout_in_ms = 10000L;
105103

104+
public volatile long aggregation_request_timeout_in_ms = 120000L;
105+
106106
public volatile long write_request_timeout_in_ms = 2000L;
107107

108108
public volatile long hints_request_timeout_in_ms = 2000L;

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public class DatabaseDescriptor
154154
private static boolean toolInitialized;
155155
private static boolean daemonInitialized;
156156
private static boolean enableMemtableAndCommitLog;
157-
157+
158158
private static final int searchConcurrencyFactor = Integer.parseInt(System.getProperty(Config.PROPERTY_PREFIX + "search_concurrency_factor", "1"));
159159

160160
private static volatile boolean disableSTCSInL0 = Boolean.getBoolean(Config.PROPERTY_PREFIX + "disable_stcs_in_l0");
@@ -187,7 +187,7 @@ static void updateInitialized(boolean clientInitialized, boolean toolInitialized
187187
* It cannot delete directories because on remote storage this would result
188188
* in errors if the test containers for remote storage are being shutdown
189189
* concurrently. The caller should delete any directories if required.
190-
* TODO If you run into problems with undeleted directories or with the
190+
* TODO If you run into problems with undeleted directories or with the
191191
* caller deleting them, please add additional details here.
192192
* <p/>
193193
* This method is called by integration tests that run in the same JVM.
@@ -345,7 +345,7 @@ public static void setEnableMemtableAndCommitLog()
345345
{
346346
enableMemtableAndCommitLog = true;
347347
}
348-
348+
349349
public static boolean enableMemtableAndCommitLog()
350350
{
351351
return daemonInitialized || enableMemtableAndCommitLog;
@@ -638,7 +638,7 @@ else if (conf.repair_session_space_in_mb > (int) (Runtime.getRuntime().maxMemory
638638
{
639639
conf.native_transport_max_concurrent_requests_in_bytes_per_ip = Runtime.getRuntime().maxMemory() / 40;
640640
}
641-
641+
642642
if (conf.native_transport_rate_limiting_enabled)
643643
logger.info("Native transport rate-limiting enabled at {} requests/second.", conf.native_transport_max_requests_per_second);
644644
else
@@ -1196,6 +1196,12 @@ static void checkForLowestAcceptedTimeouts(Config conf)
11961196
conf.range_request_timeout_in_ms = LOWEST_ACCEPTED_TIMEOUT;
11971197
}
11981198

1199+
if(conf.aggregation_request_timeout_in_ms < LOWEST_ACCEPTED_TIMEOUT)
1200+
{
1201+
logInfo("aggregation_request_timeout_in_ms", conf.aggregation_request_timeout_in_ms, LOWEST_ACCEPTED_TIMEOUT);
1202+
conf.aggregation_request_timeout_in_ms = LOWEST_ACCEPTED_TIMEOUT;
1203+
}
1204+
11991205
if(conf.request_timeout_in_ms < LOWEST_ACCEPTED_TIMEOUT)
12001206
{
12011207
logInfo("request_timeout_in_ms", conf.request_timeout_in_ms, LOWEST_ACCEPTED_TIMEOUT);
@@ -1716,6 +1722,16 @@ public static void setRangeRpcTimeout(long timeOutInMillis)
17161722
conf.range_request_timeout_in_ms = timeOutInMillis;
17171723
}
17181724

1725+
public static long getAggregationRpcTimeout(TimeUnit unit)
1726+
{
1727+
return unit.convert(conf.aggregation_request_timeout_in_ms, MILLISECONDS);
1728+
}
1729+
1730+
public static void setAggregationRpcTimeout(long timeOutInMillis)
1731+
{
1732+
conf.aggregation_request_timeout_in_ms = timeOutInMillis;
1733+
}
1734+
17191735
public static long getWriteRpcTimeout(TimeUnit unit)
17201736
{
17211737
return unit.convert(conf.write_request_timeout_in_ms, MILLISECONDS);
@@ -1787,13 +1803,14 @@ public static long getSlowQueryTimeout(TimeUnit units)
17871803
}
17881804

17891805
/**
1790-
* @return the minimum configured {read, write, range, truncate, misc} timeout
1806+
* @return the minimum configured {read, write, range, aggregated, truncate, misc} timeout
17911807
*/
17921808
public static long getMinRpcTimeout(TimeUnit unit)
17931809
{
17941810
return Longs.min(getRpcTimeout(unit),
17951811
getReadRpcTimeout(unit),
17961812
getRangeRpcTimeout(unit),
1813+
getAggregationRpcTimeout(unit),
17971814
getWriteRpcTimeout(unit),
17981815
getCounterWriteRpcTimeout(unit),
17991816
getTruncateRpcTimeout(unit));
@@ -3422,7 +3439,7 @@ private static void validateMaxConcurrentAutoUpgradeTasksConf(int value)
34223439
if (value > getConcurrentCompactors())
34233440
logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors());
34243441
}
3425-
3442+
34263443
public static AuditLogOptions getAuditLoggingOptions()
34273444
{
34283445
return conf.audit_logging_options;
@@ -3706,7 +3723,7 @@ public static void setSAIZeroCopyUsedThreshold(double threshold)
37063723
{
37073724
conf.sai_options.zerocopy_used_threshold = threshold;
37083725
}
3709-
3726+
37103727
public static GuardrailsConfig getGuardrailsConfig()
37113728
{
37123729
return conf.guardrails;

src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,10 @@ public ReadTimeoutException(ConsistencyLevel consistency, int received, int bloc
2828
super(ExceptionCode.READ_TIMEOUT, consistency, received, blockFor);
2929
this.dataPresent = dataPresent;
3030
}
31+
32+
public ReadTimeoutException(ConsistencyLevel consistency)
33+
{
34+
super(ExceptionCode.READ_TIMEOUT, consistency);
35+
this.dataPresent = false;
36+
}
3137
}

src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,12 @@ protected RequestTimeoutException(ExceptionCode code, ConsistencyLevel consisten
4040
this.received = received;
4141
this.blockFor = blockFor;
4242
}
43+
44+
public RequestTimeoutException(ExceptionCode exceptionCode, ConsistencyLevel consistency)
45+
{
46+
super(exceptionCode, "Operation timeout out");
47+
this.consistency = consistency;
48+
this.received = 0;
49+
this.blockFor = 0;
50+
}
4351
}

src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,27 @@
2020
import java.nio.ByteBuffer;
2121
import java.util.NoSuchElementException;
2222
import java.util.StringJoiner;
23+
import java.util.concurrent.TimeUnit;
2324

2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

28+
import org.apache.cassandra.config.DatabaseDescriptor;
2729
import org.apache.cassandra.cql3.PageSize;
28-
import org.apache.cassandra.exceptions.InvalidRequestException;
29-
import org.apache.cassandra.exceptions.OperationExecutionException;
30-
import org.apache.cassandra.schema.TableMetadata;
31-
import org.apache.cassandra.db.*;
30+
import org.apache.cassandra.db.Clustering;
31+
import org.apache.cassandra.db.ConsistencyLevel;
32+
import org.apache.cassandra.db.DecoratedKey;
33+
import org.apache.cassandra.db.ReadExecutionController;
34+
import org.apache.cassandra.db.RegularAndStaticColumns;
3235
import org.apache.cassandra.db.aggregation.GroupingState;
3336
import org.apache.cassandra.db.filter.DataLimits;
3437
import org.apache.cassandra.db.partitions.PartitionIterator;
3538
import org.apache.cassandra.db.rows.Row;
3639
import org.apache.cassandra.db.rows.RowIterator;
40+
import org.apache.cassandra.exceptions.InvalidRequestException;
41+
import org.apache.cassandra.exceptions.OperationExecutionException;
42+
import org.apache.cassandra.exceptions.ReadTimeoutException;
43+
import org.apache.cassandra.schema.TableMetadata;
3744
import org.apache.cassandra.service.QueryState;
3845

3946
/**
@@ -55,11 +62,20 @@ public final class AggregationQueryPager implements QueryPager
5562
// The sub-pager, used to retrieve the next sub-page.
5663
private QueryPager subPager;
5764

65+
// the timeout in nanoseconds, if more time has elapsed, a ReadTimeoutException will be raised
66+
private final long timeoutNanos;
67+
5868
public AggregationQueryPager(QueryPager subPager, PageSize subPageSize, DataLimits limits)
69+
{
70+
this(subPager, subPageSize, limits, DatabaseDescriptor.getAggregationRpcTimeout(TimeUnit.NANOSECONDS));
71+
}
72+
73+
public AggregationQueryPager(QueryPager subPager, PageSize subPageSize, DataLimits limits, long timeoutNanos)
5974
{
6075
this.subPager = subPager;
6176
this.limits = limits;
6277
this.subPageSize = subPageSize;
78+
this.timeoutNanos = timeoutNanos;
6379
}
6480

6581
/**
@@ -260,6 +276,18 @@ public final boolean hasNext()
260276
return next != null;
261277
}
262278

279+
private void checkTimeout()
280+
{
281+
// internal queries are not guarded by a timeout because cont. paging queries can be aborted
282+
// and system queries should not be aborted
283+
if (consistency == null)
284+
return;
285+
286+
long elapsed = System.nanoTime() - queryStartNanoTime;
287+
if (elapsed > timeoutNanos)
288+
throw new ReadTimeoutException(consistency);
289+
}
290+
263291
/**
264292
* Loads the next <code>RowIterator</code> to be returned. The iteration finishes when we reach either the
265293
* user groups limit or the groups page size. The user provided limit is initially set in subPager.maxRemaining().
@@ -326,7 +354,9 @@ protected QueryPager updatePagerLimit(QueryPager pager,
326354
*/
327355
private final PartitionIterator fetchSubPage(PageSize subPageSize)
328356
{
329-
return consistency != null ? subPager.fetchPage(subPageSize, consistency, queryState, queryStartNanoTime)
357+
checkTimeout();
358+
359+
return consistency != null ? subPager.fetchPage(subPageSize, consistency, queryState, System.nanoTime())
330360
: subPager.fetchPageInternal(subPageSize, executionController);
331361
}
332362

test/conf/cassandra.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ write_request_timeout_in_ms: 20000
6060
counter_write_request_timeout_in_ms: 20000
6161
cas_contention_timeout_in_ms: 20000
6262
request_timeout_in_ms: 20000
63+
aggregation_request_timeout_in_ms: 120000
6364
default_compaction:
6465
class_name: UnifiedCompactionStrategy
6566
parameters:

0 commit comments

Comments
 (0)