Skip to content

Commit 25390eb

Browse files
alanwang67dcapwell
authored andcommitted
Log queries scanning too many SSTables per read
patch by Alan Wang; reviewed by David Capwell, Marcus Eriksson for CASSANDRA-21048
1 parent a84abe7 commit 25390eb

File tree

9 files changed

+169
-1
lines changed

9 files changed

+169
-1
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Log queries scanning too many SSTables per read (CASSANDRA-21048)
23
* Extend nodetool verify to (optionally) validate SAI files (CASSANDRA-20949)
34
* Fix CompressionDictionary being closed while still in use (CASSANDRA-21047)
45
* When updating a multi cell collection element, if the update is rejected then the shared Row.Builder is not freed causing all future mutations to be rejected (CASSANDRA-21055)

conf/cassandra.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1990,6 +1990,10 @@ transparent_data_encryption_options:
19901990
# SAFETY THRESHOLDS #
19911991
#####################
19921992

1993+
# Log the query if it reads more than
1994+
# sstables_per_read_log_threshold SSTables
1995+
sstables_per_read_log_threshold: 100
1996+
19931997
# When executing a scan, within or across a partition, we need to keep the
19941998
# tombstones seen in memory so we can return them to the coordinator, which
19951999
# will use them to make sure other replicas also know about the deleted rows.

src/java/org/apache/cassandra/config/Config.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,7 @@ public static class SSTableConfig
579579
public volatile DataStorageSpec.LongBytesBound row_index_read_size_warn_threshold = null;
580580
public volatile DataStorageSpec.LongBytesBound row_index_read_size_fail_threshold = null;
581581

582+
public volatile int sstables_per_read_log_threshold = 100;
582583
public volatile int tombstone_warn_threshold = 1000;
583584
public volatile int tombstone_failure_threshold = 100000;
584585

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3215,6 +3215,16 @@ public static int getMaxMutationSize()
32153215
return conf.max_mutation_size.toBytes();
32163216
}
32173217

3218+
public static int getSSTablesPerReadLogThreshold()
3219+
{
3220+
return conf.sstables_per_read_log_threshold;
3221+
}
3222+
3223+
public static void setSSTablesPerReadLogThreshold(int threshold)
3224+
{
3225+
conf.sstables_per_read_log_threshold = threshold;
3226+
}
3227+
32183228
public static int getTombstoneWarnThreshold()
32193229
{
32203230
return conf.tombstone_warn_threshold;

src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@
5858
import org.apache.cassandra.tcm.Epoch;
5959
import org.apache.cassandra.tracing.Tracing;
6060
import org.apache.cassandra.transport.Dispatcher;
61+
import org.apache.cassandra.utils.NoSpamLogger;
6162

6263
/**
6364
* A read command that selects a (part of a) range of partitions.
6465
*/
6566
public class PartitionRangeReadCommand extends ReadCommand implements PartitionRangeReadQuery
6667
{
68+
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
6769
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
6870

6971
protected final Slices requestedSlices;
@@ -434,6 +436,9 @@ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, Rea
434436

435437
final int finalSelectedSSTables = selectedSSTablesCnt;
436438

439+
if (finalSelectedSSTables > DatabaseDescriptor.getSSTablesPerReadLogThreshold())
440+
noSpamLogger.info("The following query '{}' has read {} SSTables.", this.toCQLString(), finalSelectedSSTables);
441+
437442
// iterators can be empty for offline tools
438443
if (inputCollector.isEmpty())
439444
return EmptyIterators.unfilteredPartition(metadata());

src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,14 @@
8787
import org.apache.cassandra.transport.Dispatcher;
8888
import org.apache.cassandra.utils.FBUtilities;
8989
import org.apache.cassandra.utils.btree.BTreeSet;
90+
import org.apache.cassandra.utils.NoSpamLogger;
9091

9192
/**
9293
* A read command that selects a (part of a) single partition.
9394
*/
9495
public class SinglePartitionReadCommand extends ReadCommand implements SinglePartitionReadQuery
9596
{
97+
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
9698
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
9799
protected static final Function<Seekable, SelectionDeserializer> accordSelectionDeserializer = AccordDeserializer::new;
98100

@@ -876,7 +878,13 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
876878
StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
877879

878880
List<UnfilteredRowIterator> iterators = inputCollector.finalizeIterators(cfs, nowInSec(), controller.oldestUnrepairedTombstone());
879-
return withSSTablesIterated(iterators, cfs.metric, metricsCollector);
881+
882+
UnfilteredRowIterator result = withSSTablesIterated(iterators, cfs.metric, metricsCollector);
883+
884+
if (metricsCollector.getMergedSSTables() > DatabaseDescriptor.getSSTablesPerReadLogThreshold())
885+
noSpamLogger.info("The following query '{}' has read {} SSTables.", this.toCQLString(), metricsCollector.getMergedSSTables());
886+
887+
return result;
880888
}
881889
catch (RuntimeException | Error e)
882890
{
@@ -1084,6 +1092,9 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
10841092

10851093
cfs.metric.updateSSTableIterated(metricsCollector.getMergedSSTables());
10861094

1095+
if (metricsCollector.getMergedSSTables() > DatabaseDescriptor.getSSTablesPerReadLogThreshold())
1096+
noSpamLogger.info("The following query '{}' has read {} SSTables.", this.toCQLString(), metricsCollector.getMergedSSTables());
1097+
10871098
if (result == null || result.isEmpty())
10881099
return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);
10891100

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4719,6 +4719,17 @@ public void setInvalidateKeycacheOnSSTableDeletion(boolean invalidate)
47194719
DatabaseDescriptor.setInvalidateKeycacheOnSSTableDeletion(invalidate);
47204720
}
47214721

4722+
public int getSSTablesPerReadLogThreshold()
4723+
{
4724+
return DatabaseDescriptor.getSSTablesPerReadLogThreshold();
4725+
}
4726+
4727+
public void setSSTablesPerReadLogThreshold(int threshold)
4728+
{
4729+
DatabaseDescriptor.setSSTablesPerReadLogThreshold(threshold);
4730+
logger.info("updated sstables_per_read_log_threshold to {}", threshold);
4731+
}
4732+
47224733
public int getTombstoneWarnThreshold()
47234734
{
47244735
return DatabaseDescriptor.getTombstoneWarnThreshold();

src/java/org/apache/cassandra/service/StorageServiceMBean.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,11 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion,
10361036
/** Returns the cluster partitioner */
10371037
public String getPartitionerName();
10381038

1039+
/** Returns the threshold for logging queries that read more than threshold amount of SSTables */
1040+
public int getSSTablesPerReadLogThreshold();
1041+
/** Sets the threshold for logging queries that read more than threshold amount of SSTables */
1042+
public void setSSTablesPerReadLogThreshold(int threshold);
1043+
10391044
/** Returns the threshold for warning of queries with many tombstones */
10401045
public int getTombstoneWarnThreshold();
10411046
/** Sets the threshold for warning queries with many tombstones */
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test;
20+
21+
import org.junit.Test;
22+
import static org.junit.Assert.assertEquals;
23+
24+
import org.apache.cassandra.config.DatabaseDescriptor;
25+
import org.apache.cassandra.db.Keyspace;
26+
import org.apache.cassandra.distributed.Cluster;
27+
28+
public class SSTableReadLogsQueryTest extends TestBaseImpl
29+
{
30+
@Test
31+
public void logQueryTest() throws Throwable
32+
{
33+
try (Cluster cluster = init(Cluster.build(1)
34+
.start()))
35+
{
36+
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v counter)");
37+
38+
cluster.get(1).runOnInstance(() -> {
39+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction();
40+
});
41+
42+
for (int i = 0; i <= 100; i++)
43+
{
44+
cluster.get(1).executeInternal("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 2");
45+
cluster.get(1).flush(withKeyspace("%s"));
46+
}
47+
48+
cluster.get(1).runOnInstance(() -> {
49+
assertEquals(101, Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size());
50+
});
51+
52+
String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 2";
53+
cluster.get(1).executeInternalWithResult(query);
54+
55+
assertEquals(1, cluster.get(1).logs().watchFor("The following query").getResult().size());
56+
}
57+
}
58+
59+
@Test
60+
public void setSSTablesPerReadLogThresholdTest() throws Throwable
61+
{
62+
try (Cluster cluster = init(Cluster.build(1)
63+
.start()))
64+
{
65+
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v counter)");
66+
67+
cluster.get(1).runOnInstance(() -> {
68+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction();
69+
});
70+
71+
cluster.get(1).runOnInstance(() -> {
72+
DatabaseDescriptor.setSSTablesPerReadLogThreshold(25);
73+
});
74+
75+
for (int i = 0; i <= 25; i++)
76+
{
77+
cluster.get(1).executeInternal("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 2");
78+
cluster.get(1).flush(withKeyspace("%s"));
79+
}
80+
81+
cluster.get(1).runOnInstance(() -> {
82+
assertEquals(26, Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size());
83+
});
84+
85+
String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 2";
86+
cluster.get(1).executeInternalWithResult(query);
87+
88+
assertEquals(1, cluster.get(1).logs().watchFor("The following query").getResult().size());
89+
}
90+
}
91+
92+
@Test
93+
public void logRangeReadQueryTest() throws Throwable
94+
{
95+
try (Cluster cluster = init(Cluster.build(1)
96+
.start()))
97+
{
98+
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v int)");
99+
100+
cluster.get(1).runOnInstance(() -> {
101+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction();
102+
});
103+
104+
for (int i = 0; i <= 100; i++)
105+
{
106+
cluster.get(1).executeInternal(String.format("INSERT INTO " + KEYSPACE + ".tbl (pk, v) VALUES (%s, %s)", i, i));
107+
cluster.get(1).flush(withKeyspace("%s"));
108+
}
109+
110+
cluster.get(1).runOnInstance(() -> {
111+
assertEquals(101, Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size());
112+
});
113+
114+
String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk >= 0 AND pk < 51 ALLOW FILTERING";
115+
cluster.get(1).executeInternalWithResult(query);
116+
117+
assertEquals(1, cluster.get(1).logs().watchFor("The following query").getResult().size());
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)