Skip to content

CNDB-14261: Separate timeout for aggregation queries (#1741) #1775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: cndb-main-release-202501
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ num_tokens: 16
allocate_tokens_for_local_replication_factor: 3

# initial_token allows you to specify tokens manually. While you can use it with
# vnodes (num_tokens > 1, above) -- in which case you should provide a
# comma-separated list -- it's primarily used when adding nodes to legacy clusters
# vnodes (num_tokens > 1, above) -- in which case you should provide a
# comma-separated list -- it's primarily used when adding nodes to legacy clusters
# that do not have vnodes enabled.
# initial_token:

Expand Down Expand Up @@ -227,7 +227,7 @@ credentials_validity_in_ms: 2000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner

# Directories where Cassandra should store data on disk. If multiple
# directories are specified, Cassandra will spread data evenly across
# directories are specified, Cassandra will spread data evenly across
# them by partitioning the token ranges.
# If not set, the default directory is $CASSANDRA_HOME/data/data.
# data_file_directories:
Expand Down Expand Up @@ -418,8 +418,8 @@ counter_cache_save_period: 7200
# while still having the cache during runtime.
# cache_load_timeout_seconds: 30

# commitlog_sync may be either "periodic", "group", or "batch."
#
# commitlog_sync may be either "periodic", "group", or "batch."
#
# When in batch mode, Cassandra won't ack writes until the commit log
# has been flushed to disk. Each incoming write will trigger the flush task.
# commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had
Expand Down Expand Up @@ -853,7 +853,7 @@ incremental_backups: false
snapshot_before_compaction: false

# Whether or not a snapshot is taken of the data before keyspace truncation
# or dropping of column families. The STRONGLY advised default of true
# or dropping of column families. The STRONGLY advised default of true
# should be used to provide data safety. If you set this flag to false, you will
# lose data on truncation or drop.
auto_snapshot: true
Expand Down Expand Up @@ -897,7 +897,7 @@ column_index_cache_size_in_kb: 2
#
# concurrent_compactors defaults to the smaller of (number of disks,
# number of cores), with a minimum of 2 and a maximum of 8.
#
#
# If your data directories are backed by SSD, you should increase this
# to the number of cores.
#concurrent_compactors: 1
Expand Down Expand Up @@ -925,7 +925,7 @@ compaction_throughput_mb_per_sec: 64

# When compacting, the replacement sstable(s) can be opened before they
# are completely written, and used in place of the prior sstables for
# any range that has been written. This helps to smoothly transfer reads
# any range that has been written. This helps to smoothly transfer reads
# between the sstables, reducing page cache churn and keeping hot rows hot
sstable_preemptive_open_interval_in_mb: 50

Expand Down Expand Up @@ -974,12 +974,18 @@ enable_uuid_sstable_identifiers: true
# low is equally ill-advised since clients could get timeouts even for successful
# operations just because the timeout setting is too tight.

# How long the coordinator should wait for read operations to complete.
# How long the coordinator should wait for read operations to complete. This
# timeout does not apply to aggregated queries such as SELECT COUNT(*), MIN(x), etc.
# Lowest acceptable value is 10 ms.
read_request_timeout_in_ms: 5000
# How long the coordinator should wait for seq or index scans to complete.
# How long the coordinator should wait for seq or index scans to complete. This
# timeout does not apply to aggregated queries such as SELECT COUNT(*), MIN(x), etc.
# Lowest acceptable value is 10 ms.
range_request_timeout_in_ms: 10000
# How long the coordinator should wait for aggregation read operations to complete,
# such as SELECT COUNT(*), MIN(x), etc.
# Lowest acceptable value is 10 ms.
aggregation_request_timeout_in_ms: 120000
# How long the coordinator should wait for writes to complete.
# Lowest acceptable value is 10 ms.
write_request_timeout_in_ms: 2000
Expand Down Expand Up @@ -1055,10 +1061,10 @@ slow_query_log_timeout_in_ms: 500
# Enable operation timeout information exchange between nodes to accurately
# measure request timeouts. If disabled, replicas will assume that requests
# were forwarded to them instantly by the coordinator, which means that
# under overload conditions we will waste that much extra time processing
# under overload conditions we will waste that much extra time processing
# already-timed-out requests.
#
# Warning: It is generally assumed that users have setup NTP on their clusters, and that clocks are modestly in sync,
# Warning: It is generally assumed that users have setup NTP on their clusters, and that clocks are modestly in sync,
# since this is a requirement for general correctness of last write wins.
#cross_node_timeout: true

Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class Config

public volatile long range_request_timeout_in_ms = 10000L;

public volatile long aggregation_request_timeout_in_ms = 120000L;

public volatile long write_request_timeout_in_ms = 2000L;

public volatile long hints_request_timeout_in_ms = 2000L;
Expand Down
31 changes: 24 additions & 7 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public class DatabaseDescriptor
private static boolean toolInitialized;
private static boolean daemonInitialized;
private static boolean enableMemtableAndCommitLog;

private static final int searchConcurrencyFactor = Integer.parseInt(System.getProperty(Config.PROPERTY_PREFIX + "search_concurrency_factor", "1"));

private static volatile boolean disableSTCSInL0 = Boolean.getBoolean(Config.PROPERTY_PREFIX + "disable_stcs_in_l0");
Expand Down Expand Up @@ -188,7 +188,7 @@ static void updateInitialized(boolean clientInitialized, boolean toolInitialized
* It cannot delete directories because on remote storage this would result
* in errors if the test containers for remote storage are being shutdown
* concurrently. The caller should delete any directories if required.
* TODO If you run into problems with undeleted directories or with the
* TODO If you run into problems with undeleted directories or with the
* caller deleting them, please add additional details here.
* <p/>
* This method is called by integration tests that run in the same JVM.
Expand Down Expand Up @@ -346,7 +346,7 @@ public static void setEnableMemtableAndCommitLog()
{
enableMemtableAndCommitLog = true;
}

public static boolean enableMemtableAndCommitLog()
{
return daemonInitialized || enableMemtableAndCommitLog;
Expand Down Expand Up @@ -639,7 +639,7 @@ else if (conf.repair_session_space_in_mb > (int) (Runtime.getRuntime().maxMemory
{
conf.native_transport_max_concurrent_requests_in_bytes_per_ip = Runtime.getRuntime().maxMemory() / 40;
}

if (conf.native_transport_rate_limiting_enabled)
logger.info("Native transport rate-limiting enabled at {} requests/second.", conf.native_transport_max_requests_per_second);
else
Expand Down Expand Up @@ -1197,6 +1197,12 @@ static void checkForLowestAcceptedTimeouts(Config conf)
conf.range_request_timeout_in_ms = LOWEST_ACCEPTED_TIMEOUT;
}

if(conf.aggregation_request_timeout_in_ms < LOWEST_ACCEPTED_TIMEOUT)
{
logInfo("aggregation_request_timeout_in_ms", conf.aggregation_request_timeout_in_ms, LOWEST_ACCEPTED_TIMEOUT);
conf.aggregation_request_timeout_in_ms = LOWEST_ACCEPTED_TIMEOUT;
}

if(conf.request_timeout_in_ms < LOWEST_ACCEPTED_TIMEOUT)
{
logInfo("request_timeout_in_ms", conf.request_timeout_in_ms, LOWEST_ACCEPTED_TIMEOUT);
Expand Down Expand Up @@ -1717,6 +1723,16 @@ public static void setRangeRpcTimeout(long timeOutInMillis)
conf.range_request_timeout_in_ms = timeOutInMillis;
}

public static long getAggregationRpcTimeout(TimeUnit unit)
{
return unit.convert(conf.aggregation_request_timeout_in_ms, MILLISECONDS);
}

public static void setAggregationRpcTimeout(long timeOutInMillis)
{
conf.aggregation_request_timeout_in_ms = timeOutInMillis;
}

public static long getWriteRpcTimeout(TimeUnit unit)
{
return unit.convert(conf.write_request_timeout_in_ms, MILLISECONDS);
Expand Down Expand Up @@ -1788,13 +1804,14 @@ public static long getSlowQueryTimeout(TimeUnit units)
}

/**
* @return the minimum configured {read, write, range, truncate, misc} timeout
* @return the minimum configured {read, write, range, aggregated, truncate, misc} timeout
*/
public static long getMinRpcTimeout(TimeUnit unit)
{
return Longs.min(getRpcTimeout(unit),
getReadRpcTimeout(unit),
getRangeRpcTimeout(unit),
getAggregationRpcTimeout(unit),
getWriteRpcTimeout(unit),
getCounterWriteRpcTimeout(unit),
getTruncateRpcTimeout(unit));
Expand Down Expand Up @@ -3416,7 +3433,7 @@ private static void validateMaxConcurrentAutoUpgradeTasksConf(int value)
if (value > getConcurrentCompactors())
logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors());
}

public static AuditLogOptions getAuditLoggingOptions()
{
return conf.audit_logging_options;
Expand Down Expand Up @@ -3700,7 +3717,7 @@ public static void setSAIZeroCopyUsedThreshold(double threshold)
{
conf.sai_options.zerocopy_used_threshold = threshold;
}

public static GuardrailsConfig getGuardrailsConfig()
{
return conf.guardrails;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ public ReadTimeoutException(ConsistencyLevel consistency, int received, int bloc
super(ExceptionCode.READ_TIMEOUT, consistency, received, blockFor);
this.dataPresent = dataPresent;
}

public ReadTimeoutException(ConsistencyLevel consistency)
{
super(ExceptionCode.READ_TIMEOUT, consistency);
this.dataPresent = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,12 @@ protected RequestTimeoutException(ExceptionCode code, ConsistencyLevel consisten
this.received = received;
this.blockFor = blockFor;
}

public RequestTimeoutException(ExceptionCode exceptionCode, ConsistencyLevel consistency)
{
super(exceptionCode, "Operation timeout out");
this.consistency = consistency;
this.received = 0;
this.blockFor = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.PageSize;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.OperationExecutionException;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.aggregation.GroupingState;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.OperationExecutionException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.QueryState;

/**
Expand All @@ -55,11 +62,20 @@ public final class AggregationQueryPager implements QueryPager
// The sub-pager, used to retrieve the next sub-page.
private QueryPager subPager;

// the timeout in nanoseconds, if more time has elapsed, a ReadTimeoutException will be raised
private final long timeoutNanos;

public AggregationQueryPager(QueryPager subPager, PageSize subPageSize, DataLimits limits)
{
this(subPager, subPageSize, limits, DatabaseDescriptor.getAggregationRpcTimeout(TimeUnit.NANOSECONDS));
}

public AggregationQueryPager(QueryPager subPager, PageSize subPageSize, DataLimits limits, long timeoutNanos)
{
this.subPager = subPager;
this.limits = limits;
this.subPageSize = subPageSize;
this.timeoutNanos = timeoutNanos;
}

/**
Expand Down Expand Up @@ -260,6 +276,18 @@ public final boolean hasNext()
return next != null;
}

private void checkTimeout()
{
// internal queries are not guarded by a timeout because cont. paging queries can be aborted
// and system queries should not be aborted
if (consistency == null)
return;

long elapsed = System.nanoTime() - queryStartNanoTime;
if (elapsed > timeoutNanos)
throw new ReadTimeoutException(consistency);
}

/**
* Loads the next <code>RowIterator</code> to be returned. The iteration finishes when we reach either the
* user groups limit or the groups page size. The user provided limit is initially set in subPager.maxRemaining().
Expand Down Expand Up @@ -326,7 +354,9 @@ protected QueryPager updatePagerLimit(QueryPager pager,
*/
private final PartitionIterator fetchSubPage(PageSize subPageSize)
{
return consistency != null ? subPager.fetchPage(subPageSize, consistency, queryState, queryStartNanoTime)
checkTimeout();

return consistency != null ? subPager.fetchPage(subPageSize, consistency, queryState, System.nanoTime())
: subPager.fetchPageInternal(subPageSize, executionController);
}

Expand Down
1 change: 1 addition & 0 deletions test/conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ write_request_timeout_in_ms: 20000
counter_write_request_timeout_in_ms: 20000
cas_contention_timeout_in_ms: 20000
request_timeout_in_ms: 20000
aggregation_request_timeout_in_ms: 120000
default_compaction:
class_name: UnifiedCompactionStrategy
parameters:
Expand Down
Loading