diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index c10771a61092..183ac2514646 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -44,8 +44,8 @@ num_tokens: 16 allocate_tokens_for_local_replication_factor: 3 # initial_token allows you to specify tokens manually. While you can use it with -# vnodes (num_tokens > 1, above) -- in which case you should provide a -# comma-separated list -- it's primarily used when adding nodes to legacy clusters +# vnodes (num_tokens > 1, above) -- in which case you should provide a +# comma-separated list -- it's primarily used when adding nodes to legacy clusters # that do not have vnodes enabled. # initial_token: @@ -227,7 +227,7 @@ credentials_validity_in_ms: 2000 partitioner: org.apache.cassandra.dht.Murmur3Partitioner # Directories where Cassandra should store data on disk. If multiple -# directories are specified, Cassandra will spread data evenly across +# directories are specified, Cassandra will spread data evenly across # them by partitioning the token ranges. # If not set, the default directory is $CASSANDRA_HOME/data/data. # data_file_directories: @@ -418,8 +418,8 @@ counter_cache_save_period: 7200 # while still having the cache during runtime. # cache_load_timeout_seconds: 30 -# commitlog_sync may be either "periodic", "group", or "batch." -# +# commitlog_sync may be either "periodic", "group", or "batch." +# # When in batch mode, Cassandra won't ack writes until the commit log # has been flushed to disk. Each incoming write will trigger the flush task. # commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had @@ -853,7 +853,7 @@ incremental_backups: false snapshot_before_compaction: false # Whether or not a snapshot is taken of the data before keyspace truncation -# or dropping of column families. The STRONGLY advised default of true +# or dropping of column families. The STRONGLY advised default of true # should be used to provide data safety. If you set this flag to false, you will # lose data on truncation or drop. auto_snapshot: true @@ -897,7 +897,7 @@ column_index_cache_size_in_kb: 2 # # concurrent_compactors defaults to the smaller of (number of disks, # number of cores), with a minimum of 2 and a maximum of 8. -# +# # If your data directories are backed by SSD, you should increase this # to the number of cores. #concurrent_compactors: 1 @@ -925,7 +925,7 @@ compaction_throughput_mb_per_sec: 64 # When compacting, the replacement sstable(s) can be opened before they # are completely written, and used in place of the prior sstables for -# any range that has been written. This helps to smoothly transfer reads +# any range that has been written. This helps to smoothly transfer reads # between the sstables, reducing page cache churn and keeping hot rows hot sstable_preemptive_open_interval_in_mb: 50 @@ -974,12 +974,18 @@ enable_uuid_sstable_identifiers: true # low is equally ill-advised since clients could get timeouts even for successful # operations just because the timeout setting is too tight. -# How long the coordinator should wait for read operations to complete. +# How long the coordinator should wait for read operations to complete. This +# timeout does not apply to aggregated queries such as SELECT COUNT(*), MIN(x), etc. # Lowest acceptable value is 10 ms. read_request_timeout_in_ms: 5000 -# How long the coordinator should wait for seq or index scans to complete. +# How long the coordinator should wait for seq or index scans to complete. This +# timeout does not apply to aggregated queries such as SELECT COUNT(*), MIN(x), etc. # Lowest acceptable value is 10 ms. range_request_timeout_in_ms: 10000 +# How long the coordinator should wait for aggregation read operations to complete, +# such as SELECT COUNT(*), MIN(x), etc. +# Lowest acceptable value is 10 ms. +aggregation_request_timeout_in_ms: 120000 # How long the coordinator should wait for writes to complete. # Lowest acceptable value is 10 ms. write_request_timeout_in_ms: 2000 @@ -1055,10 +1061,10 @@ slow_query_log_timeout_in_ms: 500 # Enable operation timeout information exchange between nodes to accurately # measure request timeouts. If disabled, replicas will assume that requests # were forwarded to them instantly by the coordinator, which means that -# under overload conditions we will waste that much extra time processing +# under overload conditions we will waste that much extra time processing # already-timed-out requests. # -# Warning: It is generally assumed that users have setup NTP on their clusters, and that clocks are modestly in sync, +# Warning: It is generally assumed that users have setup NTP on their clusters, and that clocks are modestly in sync, # since this is a requirement for general correctness of last write wins. #cross_node_timeout: true diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 4a526eeb38da..aaefbfc71f48 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -101,6 +101,8 @@ public class Config public volatile long range_request_timeout_in_ms = 10000L; + public volatile long aggregation_request_timeout_in_ms = 120000L; + public volatile long write_request_timeout_in_ms = 2000L; public volatile long hints_request_timeout_in_ms = 2000L; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index c5f80fdaac2a..44ed4fc1c264 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -154,7 +154,7 @@ public class DatabaseDescriptor private static boolean toolInitialized; private static boolean daemonInitialized; private static boolean enableMemtableAndCommitLog; - + private static final int searchConcurrencyFactor = Integer.parseInt(System.getProperty(Config.PROPERTY_PREFIX + "search_concurrency_factor", "1")); private static volatile boolean disableSTCSInL0 = Boolean.getBoolean(Config.PROPERTY_PREFIX + "disable_stcs_in_l0"); @@ -188,7 +188,7 @@ static void updateInitialized(boolean clientInitialized, boolean toolInitialized * It cannot delete directories because on remote storage this would result * in errors if the test containers for remote storage are being shutdown * concurrently. The caller should delete any directories if required. - * TODO If you run into problems with undeleted directories or with the + * TODO If you run into problems with undeleted directories or with the * caller deleting them, please add additional details here. *
* This method is called by integration tests that run in the same JVM. @@ -346,7 +346,7 @@ public static void setEnableMemtableAndCommitLog() { enableMemtableAndCommitLog = true; } - + public static boolean enableMemtableAndCommitLog() { return daemonInitialized || enableMemtableAndCommitLog; @@ -639,7 +639,7 @@ else if (conf.repair_session_space_in_mb > (int) (Runtime.getRuntime().maxMemory { conf.native_transport_max_concurrent_requests_in_bytes_per_ip = Runtime.getRuntime().maxMemory() / 40; } - + if (conf.native_transport_rate_limiting_enabled) logger.info("Native transport rate-limiting enabled at {} requests/second.", conf.native_transport_max_requests_per_second); else @@ -1197,6 +1197,12 @@ static void checkForLowestAcceptedTimeouts(Config conf) conf.range_request_timeout_in_ms = LOWEST_ACCEPTED_TIMEOUT; } + if(conf.aggregation_request_timeout_in_ms < LOWEST_ACCEPTED_TIMEOUT) + { + logInfo("aggregation_request_timeout_in_ms", conf.aggregation_request_timeout_in_ms, LOWEST_ACCEPTED_TIMEOUT); + conf.aggregation_request_timeout_in_ms = LOWEST_ACCEPTED_TIMEOUT; + } + if(conf.request_timeout_in_ms < LOWEST_ACCEPTED_TIMEOUT) { logInfo("request_timeout_in_ms", conf.request_timeout_in_ms, LOWEST_ACCEPTED_TIMEOUT); @@ -1717,6 +1723,16 @@ public static void setRangeRpcTimeout(long timeOutInMillis) conf.range_request_timeout_in_ms = timeOutInMillis; } + public static long getAggregationRpcTimeout(TimeUnit unit) + { + return unit.convert(conf.aggregation_request_timeout_in_ms, MILLISECONDS); + } + + public static void setAggregationRpcTimeout(long timeOutInMillis) + { + conf.aggregation_request_timeout_in_ms = timeOutInMillis; + } + public static long getWriteRpcTimeout(TimeUnit unit) { return unit.convert(conf.write_request_timeout_in_ms, MILLISECONDS); @@ -1788,13 +1804,14 @@ public static long getSlowQueryTimeout(TimeUnit units) } /** - * @return the minimum configured {read, write, range, truncate, misc} timeout + * @return the minimum configured {read, write, range, aggregated, truncate, misc} timeout */ public static long getMinRpcTimeout(TimeUnit unit) { return Longs.min(getRpcTimeout(unit), getReadRpcTimeout(unit), getRangeRpcTimeout(unit), + getAggregationRpcTimeout(unit), getWriteRpcTimeout(unit), getCounterWriteRpcTimeout(unit), getTruncateRpcTimeout(unit)); @@ -3416,7 +3433,7 @@ private static void validateMaxConcurrentAutoUpgradeTasksConf(int value) if (value > getConcurrentCompactors()) logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors()); } - + public static AuditLogOptions getAuditLoggingOptions() { return conf.audit_logging_options; @@ -3700,7 +3717,7 @@ public static void setSAIZeroCopyUsedThreshold(double threshold) { conf.sai_options.zerocopy_used_threshold = threshold; } - + public static GuardrailsConfig getGuardrailsConfig() { return conf.guardrails; diff --git a/src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java b/src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java index 05f3510e7b39..b0ccc91fd8b8 100644 --- a/src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java +++ b/src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java @@ -28,4 +28,10 @@ public ReadTimeoutException(ConsistencyLevel consistency, int received, int bloc super(ExceptionCode.READ_TIMEOUT, consistency, received, blockFor); this.dataPresent = dataPresent; } + + public ReadTimeoutException(ConsistencyLevel consistency) + { + super(ExceptionCode.READ_TIMEOUT, consistency); + this.dataPresent = false; + } } diff --git a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java b/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java index 853ba2fd1c46..d8d2a9c6f73b 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java +++ b/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java @@ -40,4 +40,12 @@ protected RequestTimeoutException(ExceptionCode code, ConsistencyLevel consisten this.received = received; this.blockFor = blockFor; } + + public RequestTimeoutException(ExceptionCode exceptionCode, ConsistencyLevel consistency) + { + super(exceptionCode, "Operation timeout out"); + this.consistency = consistency; + this.received = 0; + this.blockFor = 0; + } } diff --git a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java index 306dec55dc8a..b9849d86b1ce 100644 --- a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java @@ -20,20 +20,27 @@ import java.nio.ByteBuffer; import java.util.NoSuchElementException; import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.PageSize; -import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.exceptions.OperationExecutionException; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.aggregation.GroupingState; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.OperationExecutionException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.QueryState; /** @@ -55,11 +62,20 @@ public final class AggregationQueryPager implements QueryPager // The sub-pager, used to retrieve the next sub-page. private QueryPager subPager; + // the timeout in nanoseconds, if more time has elapsed, a ReadTimeoutException will be raised + private final long timeoutNanos; + public AggregationQueryPager(QueryPager subPager, PageSize subPageSize, DataLimits limits) + { + this(subPager, subPageSize, limits, DatabaseDescriptor.getAggregationRpcTimeout(TimeUnit.NANOSECONDS)); + } + + public AggregationQueryPager(QueryPager subPager, PageSize subPageSize, DataLimits limits, long timeoutNanos) { this.subPager = subPager; this.limits = limits; this.subPageSize = subPageSize; + this.timeoutNanos = timeoutNanos; } /** @@ -260,6 +276,18 @@ public final boolean hasNext() return next != null; } + private void checkTimeout() + { + // internal queries are not guarded by a timeout because cont. paging queries can be aborted + // and system queries should not be aborted + if (consistency == null) + return; + + long elapsed = System.nanoTime() - queryStartNanoTime; + if (elapsed > timeoutNanos) + throw new ReadTimeoutException(consistency); + } + /** * Loads the nextRowIterator
to be returned. The iteration finishes when we reach either the
* user groups limit or the groups page size. The user provided limit is initially set in subPager.maxRemaining().
@@ -326,7 +354,9 @@ protected QueryPager updatePagerLimit(QueryPager pager,
*/
private final PartitionIterator fetchSubPage(PageSize subPageSize)
{
- return consistency != null ? subPager.fetchPage(subPageSize, consistency, queryState, queryStartNanoTime)
+ checkTimeout();
+
+ return consistency != null ? subPager.fetchPage(subPageSize, consistency, queryState, System.nanoTime())
: subPager.fetchPageInternal(subPageSize, executionController);
}
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 2b7d5232c1c6..cac827739e61 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -61,6 +61,7 @@ write_request_timeout_in_ms: 20000
counter_write_request_timeout_in_ms: 20000
cas_contention_timeout_in_ms: 20000
request_timeout_in_ms: 20000
+aggregation_request_timeout_in_ms: 120000
default_compaction:
class_name: UnifiedCompactionStrategy
parameters:
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationQueriesTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationQueriesTest.java
new file mode 100644
index 000000000000..d9ebb8155f91
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationQueriesTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.operations;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(BMUnitRunner.class)
+@BMRule(
+name = "delay page read",
+targetClass = "org.apache.cassandra.service.StorageProxy",
+targetMethod = "getRangeSlice",
+targetLocation = "AT ENTRY",
+action = "org.apache.cassandra.cql3.validation.operations.AggregationQueriesTest.delayPageRead();")
+public class AggregationQueriesTest extends CQLTester
+{
+ private static final AtomicLong pageReadDelayMillis = new AtomicLong();
+
+ @Before
+ public void setup()
+ {
+ pageReadDelayMillis.set(0);
+ }
+
+ @Test
+ public void testAggregationQueryShouldTimeoutWhenSinglePageReadExceedesReadTimeout() throws Throwable
+ {
+ logger.info("Creating table");
+ createTable("CREATE TABLE %s (a int, b int, c double, primary key (a, b))");
+
+ logger.info("Inserting data");
+ for (int i = 0; i < 4; i++)
+ for (int j = 0; j < 200; j++)
+ execute("INSERT INTO %s (a, b, c) VALUES (?, ?, 1)", i, j);
+
+ // connect net session
+ sessionNet();
+
+ logger.info("Setting timeouts");
+ var oldTimeouts = getDBTimeouts();
+ try
+ {
+ // 3rd and subsequent page reads should be delayed enough to time out the query
+ int rangeTimeoutMs = 50;
+ DatabaseDescriptor.setRangeRpcTimeout(rangeTimeoutMs);
+ pageReadDelayMillis.set(100);
+
+ logger.info("Running aggregate, multi-page query");
+
+ long queryStartTime = System.nanoTime();
+ ReadTimeoutException exception = assertThrows("expected read timeout",
+ ReadTimeoutException.class,
+ () -> executeNet("SELECT a, count(c) FROM %s group by a").all());
+ long queryDuration = System.nanoTime() - queryStartTime;
+ assertTrue("Query duration " + queryDuration + " should be greater than range read timeout " + rangeTimeoutMs + "ms",
+ queryDuration > MILLISECONDS.toNanos(rangeTimeoutMs));
+ logger.info("Query failed after {} ms as expected with ", NANOSECONDS.toMillis(queryDuration), exception);
+ }
+ finally
+ {
+ setDBTimeouts(oldTimeouts);
+ }
+ }
+
+ @Test
+ public void testAggregationQueryShouldNotTimeoutWhenItExceedesReadTimeout() throws Throwable
+ {
+ logger.info("Creating table");
+ createTable("CREATE TABLE %s (a int, b int, c double, primary key (a, b))");
+
+ logger.info("Inserting data");
+ for (int i = 0; i < 4; i++)
+ for (int j = 0; j < 40000; j++)
+ execute("INSERT INTO %s (a, b, c) VALUES (?, ?, 1)", i, j);
+
+ // connect net session
+ sessionNet();
+
+ logger.info("Setting timeouts");
+ var oldTimeouts = getDBTimeouts();
+ try
+ {
+ // single page read should fit in the range timeout, but multiple pages should not;
+ // the query should complete nevertheless because aggregate timeout is large
+ int rangeTimeoutMs = 2000;
+ pageReadDelayMillis.set(400);
+ DatabaseDescriptor.setRangeRpcTimeout(rangeTimeoutMs);
+ DatabaseDescriptor.setAggregationRpcTimeout(120000);
+
+ logger.info("Running aggregate, multi-page query");
+
+ long queryStartTime = System.nanoTime();
+ List