From 87312041e2a8adaaacf8e70b28d9ac49eb48d033 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20=C5=BBytka?= Date: Fri, 23 May 2025 14:41:52 +0200 Subject: [PATCH] CNDB-14261: Port CNDB-13770 to January release CNDB-13770 is Separate timeout for aggregation queries (#1741) In CC, aggregate user queries use the same range read timeout. In DSE, we had a separate timeout that defaulted to 120s. We would like to retain that functionality. In addition, aggregate queries in metrics look just like range reads. We'd like to distinguish them. This PR adds a separate 120s timeout for aggregate queries. The timeout is configurable with `aggregation_request_timeout_in_ms` Config parameter --- conf/cassandra.yaml | 30 ++- .../org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 31 ++- .../exceptions/ReadTimeoutException.java | 6 + .../exceptions/RequestTimeoutException.java | 8 + .../service/pager/AggregationQueryPager.java | 40 +++- test/conf/cassandra.yaml | 1 + .../operations/AggregationQueriesTest.java | 213 ++++++++++++++++++ .../metadata/MetadataCollectorTest.java | 78 +++++++ 9 files changed, 385 insertions(+), 24 deletions(-) create mode 100644 test/unit/org/apache/cassandra/cql3/validation/operations/AggregationQueriesTest.java create mode 100644 test/unit/org/apache/cassandra/io/sstable/metadata/MetadataCollectorTest.java diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index c10771a61092..183ac2514646 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -44,8 +44,8 @@ num_tokens: 16 allocate_tokens_for_local_replication_factor: 3 # initial_token allows you to specify tokens manually. While you can use it with -# vnodes (num_tokens > 1, above) -- in which case you should provide a -# comma-separated list -- it's primarily used when adding nodes to legacy clusters +# vnodes (num_tokens > 1, above) -- in which case you should provide a +# comma-separated list -- it's primarily used when adding nodes to legacy clusters # that do not have vnodes enabled. # initial_token: @@ -227,7 +227,7 @@ credentials_validity_in_ms: 2000 partitioner: org.apache.cassandra.dht.Murmur3Partitioner # Directories where Cassandra should store data on disk. If multiple -# directories are specified, Cassandra will spread data evenly across +# directories are specified, Cassandra will spread data evenly across # them by partitioning the token ranges. # If not set, the default directory is $CASSANDRA_HOME/data/data. # data_file_directories: @@ -418,8 +418,8 @@ counter_cache_save_period: 7200 # while still having the cache during runtime. # cache_load_timeout_seconds: 30 -# commitlog_sync may be either "periodic", "group", or "batch." -# +# commitlog_sync may be either "periodic", "group", or "batch." +# # When in batch mode, Cassandra won't ack writes until the commit log # has been flushed to disk. Each incoming write will trigger the flush task. # commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had @@ -853,7 +853,7 @@ incremental_backups: false snapshot_before_compaction: false # Whether or not a snapshot is taken of the data before keyspace truncation -# or dropping of column families. The STRONGLY advised default of true +# or dropping of column families. The STRONGLY advised default of true # should be used to provide data safety. If you set this flag to false, you will # lose data on truncation or drop. auto_snapshot: true @@ -897,7 +897,7 @@ column_index_cache_size_in_kb: 2 # # concurrent_compactors defaults to the smaller of (number of disks, # number of cores), with a minimum of 2 and a maximum of 8. -# +# # If your data directories are backed by SSD, you should increase this # to the number of cores. #concurrent_compactors: 1 @@ -925,7 +925,7 @@ compaction_throughput_mb_per_sec: 64 # When compacting, the replacement sstable(s) can be opened before they # are completely written, and used in place of the prior sstables for -# any range that has been written. This helps to smoothly transfer reads +# any range that has been written. This helps to smoothly transfer reads # between the sstables, reducing page cache churn and keeping hot rows hot sstable_preemptive_open_interval_in_mb: 50 @@ -974,12 +974,18 @@ enable_uuid_sstable_identifiers: true # low is equally ill-advised since clients could get timeouts even for successful # operations just because the timeout setting is too tight. -# How long the coordinator should wait for read operations to complete. +# How long the coordinator should wait for read operations to complete. This +# timeout does not apply to aggregated queries such as SELECT COUNT(*), MIN(x), etc. # Lowest acceptable value is 10 ms. read_request_timeout_in_ms: 5000 -# How long the coordinator should wait for seq or index scans to complete. +# How long the coordinator should wait for seq or index scans to complete. This +# timeout does not apply to aggregated queries such as SELECT COUNT(*), MIN(x), etc. # Lowest acceptable value is 10 ms. range_request_timeout_in_ms: 10000 +# How long the coordinator should wait for aggregation read operations to complete, +# such as SELECT COUNT(*), MIN(x), etc. +# Lowest acceptable value is 10 ms. +aggregation_request_timeout_in_ms: 120000 # How long the coordinator should wait for writes to complete. # Lowest acceptable value is 10 ms. write_request_timeout_in_ms: 2000 @@ -1055,10 +1061,10 @@ slow_query_log_timeout_in_ms: 500 # Enable operation timeout information exchange between nodes to accurately # measure request timeouts. If disabled, replicas will assume that requests # were forwarded to them instantly by the coordinator, which means that -# under overload conditions we will waste that much extra time processing +# under overload conditions we will waste that much extra time processing # already-timed-out requests. # -# Warning: It is generally assumed that users have setup NTP on their clusters, and that clocks are modestly in sync, +# Warning: It is generally assumed that users have setup NTP on their clusters, and that clocks are modestly in sync, # since this is a requirement for general correctness of last write wins. #cross_node_timeout: true diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 4a526eeb38da..aaefbfc71f48 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -101,6 +101,8 @@ public class Config public volatile long range_request_timeout_in_ms = 10000L; + public volatile long aggregation_request_timeout_in_ms = 120000L; + public volatile long write_request_timeout_in_ms = 2000L; public volatile long hints_request_timeout_in_ms = 2000L; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index c5f80fdaac2a..44ed4fc1c264 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -154,7 +154,7 @@ public class DatabaseDescriptor private static boolean toolInitialized; private static boolean daemonInitialized; private static boolean enableMemtableAndCommitLog; - + private static final int searchConcurrencyFactor = Integer.parseInt(System.getProperty(Config.PROPERTY_PREFIX + "search_concurrency_factor", "1")); private static volatile boolean disableSTCSInL0 = Boolean.getBoolean(Config.PROPERTY_PREFIX + "disable_stcs_in_l0"); @@ -188,7 +188,7 @@ static void updateInitialized(boolean clientInitialized, boolean toolInitialized * It cannot delete directories because on remote storage this would result * in errors if the test containers for remote storage are being shutdown * concurrently. The caller should delete any directories if required. - * TODO If you run into problems with undeleted directories or with the + * TODO If you run into problems with undeleted directories or with the * caller deleting them, please add additional details here. *

* This method is called by integration tests that run in the same JVM. @@ -346,7 +346,7 @@ public static void setEnableMemtableAndCommitLog() { enableMemtableAndCommitLog = true; } - + public static boolean enableMemtableAndCommitLog() { return daemonInitialized || enableMemtableAndCommitLog; @@ -639,7 +639,7 @@ else if (conf.repair_session_space_in_mb > (int) (Runtime.getRuntime().maxMemory { conf.native_transport_max_concurrent_requests_in_bytes_per_ip = Runtime.getRuntime().maxMemory() / 40; } - + if (conf.native_transport_rate_limiting_enabled) logger.info("Native transport rate-limiting enabled at {} requests/second.", conf.native_transport_max_requests_per_second); else @@ -1197,6 +1197,12 @@ static void checkForLowestAcceptedTimeouts(Config conf) conf.range_request_timeout_in_ms = LOWEST_ACCEPTED_TIMEOUT; } + if(conf.aggregation_request_timeout_in_ms < LOWEST_ACCEPTED_TIMEOUT) + { + logInfo("aggregation_request_timeout_in_ms", conf.aggregation_request_timeout_in_ms, LOWEST_ACCEPTED_TIMEOUT); + conf.aggregation_request_timeout_in_ms = LOWEST_ACCEPTED_TIMEOUT; + } + if(conf.request_timeout_in_ms < LOWEST_ACCEPTED_TIMEOUT) { logInfo("request_timeout_in_ms", conf.request_timeout_in_ms, LOWEST_ACCEPTED_TIMEOUT); @@ -1717,6 +1723,16 @@ public static void setRangeRpcTimeout(long timeOutInMillis) conf.range_request_timeout_in_ms = timeOutInMillis; } + public static long getAggregationRpcTimeout(TimeUnit unit) + { + return unit.convert(conf.aggregation_request_timeout_in_ms, MILLISECONDS); + } + + public static void setAggregationRpcTimeout(long timeOutInMillis) + { + conf.aggregation_request_timeout_in_ms = timeOutInMillis; + } + public static long getWriteRpcTimeout(TimeUnit unit) { return unit.convert(conf.write_request_timeout_in_ms, MILLISECONDS); @@ -1788,13 +1804,14 @@ public static long getSlowQueryTimeout(TimeUnit units) } /** - * @return the minimum configured {read, write, range, truncate, misc} timeout + * @return the minimum configured {read, write, range, aggregated, truncate, misc} timeout */ public static long getMinRpcTimeout(TimeUnit unit) { return Longs.min(getRpcTimeout(unit), getReadRpcTimeout(unit), getRangeRpcTimeout(unit), + getAggregationRpcTimeout(unit), getWriteRpcTimeout(unit), getCounterWriteRpcTimeout(unit), getTruncateRpcTimeout(unit)); @@ -3416,7 +3433,7 @@ private static void validateMaxConcurrentAutoUpgradeTasksConf(int value) if (value > getConcurrentCompactors()) logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors()); } - + public static AuditLogOptions getAuditLoggingOptions() { return conf.audit_logging_options; @@ -3700,7 +3717,7 @@ public static void setSAIZeroCopyUsedThreshold(double threshold) { conf.sai_options.zerocopy_used_threshold = threshold; } - + public static GuardrailsConfig getGuardrailsConfig() { return conf.guardrails; diff --git a/src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java b/src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java index 05f3510e7b39..b0ccc91fd8b8 100644 --- a/src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java +++ b/src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java @@ -28,4 +28,10 @@ public ReadTimeoutException(ConsistencyLevel consistency, int received, int bloc super(ExceptionCode.READ_TIMEOUT, consistency, received, blockFor); this.dataPresent = dataPresent; } + + public ReadTimeoutException(ConsistencyLevel consistency) + { + super(ExceptionCode.READ_TIMEOUT, consistency); + this.dataPresent = false; + } } diff --git a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java b/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java index 853ba2fd1c46..d8d2a9c6f73b 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java +++ b/src/java/org/apache/cassandra/exceptions/RequestTimeoutException.java @@ -40,4 +40,12 @@ protected RequestTimeoutException(ExceptionCode code, ConsistencyLevel consisten this.received = received; this.blockFor = blockFor; } + + public RequestTimeoutException(ExceptionCode exceptionCode, ConsistencyLevel consistency) + { + super(exceptionCode, "Operation timeout out"); + this.consistency = consistency; + this.received = 0; + this.blockFor = 0; + } } diff --git a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java index 306dec55dc8a..b9849d86b1ce 100644 --- a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java @@ -20,20 +20,27 @@ import java.nio.ByteBuffer; import java.util.NoSuchElementException; import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.PageSize; -import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.exceptions.OperationExecutionException; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.aggregation.GroupingState; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.OperationExecutionException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.QueryState; /** @@ -55,11 +62,20 @@ public final class AggregationQueryPager implements QueryPager // The sub-pager, used to retrieve the next sub-page. private QueryPager subPager; + // the timeout in nanoseconds, if more time has elapsed, a ReadTimeoutException will be raised + private final long timeoutNanos; + public AggregationQueryPager(QueryPager subPager, PageSize subPageSize, DataLimits limits) + { + this(subPager, subPageSize, limits, DatabaseDescriptor.getAggregationRpcTimeout(TimeUnit.NANOSECONDS)); + } + + public AggregationQueryPager(QueryPager subPager, PageSize subPageSize, DataLimits limits, long timeoutNanos) { this.subPager = subPager; this.limits = limits; this.subPageSize = subPageSize; + this.timeoutNanos = timeoutNanos; } /** @@ -260,6 +276,18 @@ public final boolean hasNext() return next != null; } + private void checkTimeout() + { + // internal queries are not guarded by a timeout because cont. paging queries can be aborted + // and system queries should not be aborted + if (consistency == null) + return; + + long elapsed = System.nanoTime() - queryStartNanoTime; + if (elapsed > timeoutNanos) + throw new ReadTimeoutException(consistency); + } + /** * Loads the next RowIterator to be returned. The iteration finishes when we reach either the * user groups limit or the groups page size. The user provided limit is initially set in subPager.maxRemaining(). @@ -326,7 +354,9 @@ protected QueryPager updatePagerLimit(QueryPager pager, */ private final PartitionIterator fetchSubPage(PageSize subPageSize) { - return consistency != null ? subPager.fetchPage(subPageSize, consistency, queryState, queryStartNanoTime) + checkTimeout(); + + return consistency != null ? subPager.fetchPage(subPageSize, consistency, queryState, System.nanoTime()) : subPager.fetchPageInternal(subPageSize, executionController); } diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index 2b7d5232c1c6..cac827739e61 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -61,6 +61,7 @@ write_request_timeout_in_ms: 20000 counter_write_request_timeout_in_ms: 20000 cas_contention_timeout_in_ms: 20000 request_timeout_in_ms: 20000 +aggregation_request_timeout_in_ms: 120000 default_compaction: class_name: UnifiedCompactionStrategy parameters: diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationQueriesTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationQueriesTest.java new file mode 100644 index 000000000000..d9ebb8155f91 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationQueriesTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3.validation.operations; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.exceptions.ReadTimeoutException; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +@RunWith(BMUnitRunner.class) +@BMRule( +name = "delay page read", +targetClass = "org.apache.cassandra.service.StorageProxy", +targetMethod = "getRangeSlice", +targetLocation = "AT ENTRY", +action = "org.apache.cassandra.cql3.validation.operations.AggregationQueriesTest.delayPageRead();") +public class AggregationQueriesTest extends CQLTester +{ + private static final AtomicLong pageReadDelayMillis = new AtomicLong(); + + @Before + public void setup() + { + pageReadDelayMillis.set(0); + } + + @Test + public void testAggregationQueryShouldTimeoutWhenSinglePageReadExceedesReadTimeout() throws Throwable + { + logger.info("Creating table"); + createTable("CREATE TABLE %s (a int, b int, c double, primary key (a, b))"); + + logger.info("Inserting data"); + for (int i = 0; i < 4; i++) + for (int j = 0; j < 200; j++) + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, 1)", i, j); + + // connect net session + sessionNet(); + + logger.info("Setting timeouts"); + var oldTimeouts = getDBTimeouts(); + try + { + // 3rd and subsequent page reads should be delayed enough to time out the query + int rangeTimeoutMs = 50; + DatabaseDescriptor.setRangeRpcTimeout(rangeTimeoutMs); + pageReadDelayMillis.set(100); + + logger.info("Running aggregate, multi-page query"); + + long queryStartTime = System.nanoTime(); + ReadTimeoutException exception = assertThrows("expected read timeout", + ReadTimeoutException.class, + () -> executeNet("SELECT a, count(c) FROM %s group by a").all()); + long queryDuration = System.nanoTime() - queryStartTime; + assertTrue("Query duration " + queryDuration + " should be greater than range read timeout " + rangeTimeoutMs + "ms", + queryDuration > MILLISECONDS.toNanos(rangeTimeoutMs)); + logger.info("Query failed after {} ms as expected with ", NANOSECONDS.toMillis(queryDuration), exception); + } + finally + { + setDBTimeouts(oldTimeouts); + } + } + + @Test + public void testAggregationQueryShouldNotTimeoutWhenItExceedesReadTimeout() throws Throwable + { + logger.info("Creating table"); + createTable("CREATE TABLE %s (a int, b int, c double, primary key (a, b))"); + + logger.info("Inserting data"); + for (int i = 0; i < 4; i++) + for (int j = 0; j < 40000; j++) + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, 1)", i, j); + + // connect net session + sessionNet(); + + logger.info("Setting timeouts"); + var oldTimeouts = getDBTimeouts(); + try + { + // single page read should fit in the range timeout, but multiple pages should not; + // the query should complete nevertheless because aggregate timeout is large + int rangeTimeoutMs = 2000; + pageReadDelayMillis.set(400); + DatabaseDescriptor.setRangeRpcTimeout(rangeTimeoutMs); + DatabaseDescriptor.setAggregationRpcTimeout(120000); + + logger.info("Running aggregate, multi-page query"); + + long queryStartTime = System.nanoTime(); + List result = executeNet("SELECT a, count(c) FROM %s group by a").all(); + long queryDuration = System.nanoTime() - queryStartTime; + assertTrue("Query duration " + queryDuration + " should be greater than range read timeout " + rangeTimeoutMs + "ms", + queryDuration > MILLISECONDS.toNanos(rangeTimeoutMs)); + logger.info("Query succeeded in {} ms as expected; result={}", NANOSECONDS.toMillis(queryDuration), result); + } + finally + { + setDBTimeouts(oldTimeouts); + } + } + + @Test + public void testAggregationQueryShouldTimeoutWhenSinglePageReadIsFastButAggregationExceedesTimeout() throws Throwable + { + logger.info("Creating table"); + createTable("CREATE TABLE %s (a int, b int, c double, primary key (a, b))"); + + logger.info("Inserting data"); + for (int i = 0; i < 4; i++) + for (int j = 0; j < 40000; j++) + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, 1)", i, j); + + // connect net session + sessionNet(); + + logger.info("Setting timeouts"); + var oldTimeouts = getDBTimeouts(); + try + { + // page reads should fit in the timeout, but the query should time out on aggregate timeout + // the query should complete nevertheless + int aggregateTimeoutMs = 1000; + pageReadDelayMillis.set(400); + DatabaseDescriptor.setRangeRpcTimeout(10000); + DatabaseDescriptor.setAggregationRpcTimeout(aggregateTimeoutMs); + + logger.info("Running aggregate, multi-page query"); + + long queryStartTime = System.nanoTime(); + ReadTimeoutException exception = assertThrows("expected read timeout", + ReadTimeoutException.class, + () -> executeNet("SELECT a, count(c) FROM %s group by a").all()); + long queryDuration = System.nanoTime() - queryStartTime; + assertTrue("Query duration " + queryDuration + " should be greater than aggregate timeout " + aggregateTimeoutMs + "ms", + queryDuration > MILLISECONDS.toNanos(aggregateTimeoutMs)); + logger.info("Query failed after {} ms as expected with ", NANOSECONDS.toMillis(queryDuration), exception); + } + catch (Exception e) + { + logger.error("Query failed", e); + logger.info("let's wait for a while and see what happens"); + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); + throw e; + } + finally + { + setDBTimeouts(oldTimeouts); + } + } + + private long[] getDBTimeouts() + { + return new long[]{ + DatabaseDescriptor.getReadRpcTimeout(MILLISECONDS), + DatabaseDescriptor.getRangeRpcTimeout(MILLISECONDS), + DatabaseDescriptor.getAggregationRpcTimeout(MILLISECONDS) + }; + } + + private void setDBTimeouts(long[] timeouts) + { + DatabaseDescriptor.setReadRpcTimeout(timeouts[0]); + DatabaseDescriptor.setRangeRpcTimeout(timeouts[1]); + DatabaseDescriptor.setAggregationRpcTimeout(timeouts[2]); + } + + private static void delayPageRead() + { + long delay = pageReadDelayMillis.get(); + if (delay == 0) + return; + logger.info("Delaying page read for {} ms", delay); + Uninterruptibles.sleepUninterruptibly(delay, MILLISECONDS); + logger.info("Resuming page read"); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataCollectorTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataCollectorTest.java new file mode 100644 index 000000000000..b1e8e034900c --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataCollectorTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.metadata; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.utils.EstimatedHistogram; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.io.sstable.metadata.MetadataCollector.defaultPartitionSizeHistogram; +import static org.junit.Assert.*; + +public class MetadataCollectorTest +{ + private static final Logger logger = LoggerFactory.getLogger(MetadataCollectorTest.class); + + @Test + public void testNoOverflow() + { + EstimatedHistogram histogram = defaultPartitionSizeHistogram(); + histogram.add(1697806495183L); + assertFalse(histogram.isOverflowed()); + } + + @Test + public void testFindMaxSampleWithoutOverflow() + { + logger.info("dse compatible boundaries: {}", CassandraRelevantProperties.USE_DSE_COMPATIBLE_HISTOGRAM_BOUNDARIES.getBoolean()); + + long low = 0; + long high = Long.MAX_VALUE; + long result = -1; + + while (low <= high) { + long mid = low + (high - low) / 2; // Avoid potential overflow in (low + high) / 2 + + // Create a fresh histogram for each test to avoid accumulated state + EstimatedHistogram testHistogram = defaultPartitionSizeHistogram(); + testHistogram.add(mid); + + if (testHistogram.isOverflowed()) { + high = mid - 1; + } else { + result = mid; // Keep track of the last working value + low = mid + 1; + } + } + + logger.info("Max value without overflow: {}", result); + + // Verify the result + EstimatedHistogram finalHistogram = defaultPartitionSizeHistogram(); + finalHistogram.add(result); + assertFalse(finalHistogram.isOverflowed()); + + // Verify that result + 1 causes overflow + EstimatedHistogram overflowHistogram = defaultPartitionSizeHistogram(); + overflowHistogram.add(result + 1); + assertTrue(overflowHistogram.isOverflowed()); + } +} \ No newline at end of file