Skip to content

Commit a72dcbc

Browse files
michaeljmarshallmaedhroz
authored andcommitted
Get SAI MemtableIndex refs before SSTableIndex refs at query time
SAI predicate search currently has a bug that could result in missing rows due to a concurrent flush during a query. The new test created in this PR shows the point of failure. The problem is that we get the SSTable index references before getting the Memtable index references. Note that we do it in the correct order in the ANN OF query path, but not in the WHERE query path. This commit updates the QueryView object to hold references to the appropriate Memtable indexes. It also removes the problematic search methods from MemtableIndexManager to prevent future misuse. patch by Michael Marshall; reviewed by Caleb Rackliffe and Ekaterina Dimitrova for CASSANDRA-20709
1 parent 53ccfd5 commit a72dcbc

File tree

6 files changed

+176
-79
lines changed

6 files changed

+176
-79
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.0.5
2+
* Get SAI MemtableIndex refs before SSTableIndex refs at query time (CASSANDRA-20709)
23
* Fix MAX_SEGMENT_SIZE < chunkSize in MmappedRegions::updateState (CASSANDRA-20636)
34
* Full Java 17 support (CASSANDRA-20681)
45
* Ensure replica filtering protection does not trigger unnecessary short read protection reads (CASSANDRA-20639)

src/java/org/apache/cassandra/index/sai/disk/IndexSearchResultIterator.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import org.apache.cassandra.dht.AbstractBounds;
3131
import org.apache.cassandra.exceptions.QueryCancelledException;
3232
import org.apache.cassandra.index.sai.QueryContext;
33+
import org.apache.cassandra.index.sai.memory.MemtableIndex;
3334
import org.apache.cassandra.index.sai.plan.Expression;
3435
import org.apache.cassandra.index.sai.iterators.KeyRangeIterator;
3536
import org.apache.cassandra.index.sai.iterators.KeyRangeUnionIterator;
37+
import org.apache.cassandra.index.sai.plan.QueryViewBuilder;
3638
import org.apache.cassandra.index.sai.utils.PrimaryKey;
3739
import org.apache.cassandra.io.util.FileUtils;
3840
import org.apache.cassandra.utils.Throwables;
@@ -51,22 +53,39 @@ private IndexSearchResultIterator(KeyRangeIterator union, Runnable onClose)
5153

5254
/**
5355
* Builds a new {@link IndexSearchResultIterator} that wraps a {@link KeyRangeUnionIterator} over the
54-
* results of searching the {@link org.apache.cassandra.index.sai.memory.MemtableIndex} and the {@link SSTableIndex}es.
56+
* results of searching the {@link QueryViewBuilder.QueryExpressionView}.
57+
*/
58+
public static IndexSearchResultIterator build(QueryViewBuilder.QueryExpressionView queryView,
59+
AbstractBounds<PartitionPosition> keyRange,
60+
QueryContext queryContext,
61+
boolean includeMemtables,
62+
Runnable onClose)
63+
{
64+
return build(queryView.expression, queryView.memtableIndexes, queryView.sstableIndexes, keyRange, queryContext, includeMemtables, onClose);
65+
}
66+
67+
/**
68+
* Builds a new {@link IndexSearchResultIterator} that wraps a {@link KeyRangeUnionIterator} over the
69+
* results of searching the {@link org.apache.cassandra.index.sai.memory.MemtableIndex}es and the {@link SSTableIndex}es.
5570
*/
5671
public static IndexSearchResultIterator build(Expression expression,
72+
Collection<MemtableIndex> memtableIndexes,
5773
Collection<SSTableIndex> sstableIndexes,
5874
AbstractBounds<PartitionPosition> keyRange,
5975
QueryContext queryContext,
6076
boolean includeMemtables,
6177
Runnable onClose)
6278
{
63-
List<KeyRangeIterator> subIterators = new ArrayList<>(sstableIndexes.size() + (includeMemtables ? 1 : 0));
79+
int size = sstableIndexes.size() + (includeMemtables ? memtableIndexes.size() : 0);
80+
List<KeyRangeIterator> subIterators = new ArrayList<>(size);
6481

6582
if (includeMemtables)
6683
{
67-
KeyRangeIterator memtableIterator = expression.getIndex().memtableIndexManager().searchMemtableIndexes(queryContext, expression, keyRange);
68-
if (memtableIterator != null)
84+
for (MemtableIndex memtableIndex : memtableIndexes)
85+
{
86+
KeyRangeIterator memtableIterator = memtableIndex.search(queryContext, expression, keyRange);
6987
subIterators.add(memtableIterator);
88+
}
7089
}
7190

7291
for (SSTableIndex sstableIndex : sstableIndexes)
@@ -98,7 +117,7 @@ public static IndexSearchResultIterator build(Expression expression,
98117
}
99118

100119
public static IndexSearchResultIterator build(List<KeyRangeIterator> sstableIntersections,
101-
KeyRangeIterator memtableResults,
120+
List<KeyRangeIterator> memtableResults,
102121
Set<SSTableIndex> referencedIndexes,
103122
QueryContext queryContext,
104123
Runnable onClose)

src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
package org.apache.cassandra.index.sai.memory;
2020

2121
import java.nio.ByteBuffer;
22+
import java.util.ArrayList;
2223
import java.util.Collection;
24+
import java.util.Collections;
2325
import java.util.Iterator;
24-
import java.util.List;
2526
import java.util.concurrent.ConcurrentHashMap;
2627
import java.util.concurrent.ConcurrentMap;
2728
import java.util.concurrent.TimeUnit;
@@ -31,17 +32,10 @@
3132
import com.google.common.annotations.VisibleForTesting;
3233

3334
import org.apache.cassandra.db.DecoratedKey;
34-
import org.apache.cassandra.db.PartitionPosition;
3535
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
3636
import org.apache.cassandra.db.memtable.Memtable;
3737
import org.apache.cassandra.db.rows.Row;
38-
import org.apache.cassandra.dht.AbstractBounds;
39-
import org.apache.cassandra.index.sai.QueryContext;
4038
import org.apache.cassandra.index.sai.StorageAttachedIndex;
41-
import org.apache.cassandra.index.sai.plan.Expression;
42-
import org.apache.cassandra.index.sai.iterators.KeyRangeIterator;
43-
import org.apache.cassandra.index.sai.iterators.KeyRangeUnionIterator;
44-
import org.apache.cassandra.index.sai.utils.PrimaryKey;
4539
import org.apache.cassandra.utils.Clock;
4640
import org.apache.cassandra.utils.FBUtilities;
4741

@@ -134,47 +128,19 @@ public MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker)
134128
.orElse(null);
135129
}
136130

137-
public KeyRangeIterator searchMemtableIndexes(QueryContext queryContext, Expression e, AbstractBounds<PartitionPosition> keyRange)
131+
public long liveMemtableWriteCount()
138132
{
139-
Collection<MemtableIndex> memtableIndexes = liveMemtableIndexMap.values();
140-
141-
if (memtableIndexes.isEmpty())
142-
{
143-
return KeyRangeIterator.empty();
144-
}
145-
146-
KeyRangeIterator.Builder builder = KeyRangeUnionIterator.builder(memtableIndexes.size());
147-
148-
for (MemtableIndex memtableIndex : memtableIndexes)
149-
{
150-
builder.add(memtableIndex.search(queryContext, e, keyRange));
151-
}
152-
153-
return builder.build();
133+
return liveMemtableIndexMap.values().stream().mapToLong(MemtableIndex::writeCount).sum();
154134
}
155135

156-
public KeyRangeIterator limitToTopResults(QueryContext context, List<PrimaryKey> source, Expression e)
136+
public Collection<MemtableIndex> getLiveMemtableIndexesSnapshot()
157137
{
158-
Collection<MemtableIndex> memtables = liveMemtableIndexMap.values();
159-
160-
if (memtables.isEmpty())
161-
{
162-
return KeyRangeIterator.empty();
163-
}
164-
165-
KeyRangeUnionIterator.Builder builder = KeyRangeUnionIterator.builder(memtables.size());
166-
167-
for (MemtableIndex index : memtables)
168-
{
169-
builder.add(index.limitToTopResults(source, e, context.vectorContext().limit()));
170-
}
171-
172-
return builder.build();
173-
}
138+
Collection<MemtableIndex> memtableIndexes = liveMemtableIndexMap.values();
139+
if (memtableIndexes.isEmpty())
140+
return Collections.emptyList();
174141

175-
public long liveMemtableWriteCount()
176-
{
177-
return liveMemtableIndexMap.values().stream().mapToLong(MemtableIndex::writeCount).sum();
142+
// Copy the values. Otherwise, we'll only have a view of the map's values which is subject to change.
143+
return new ArrayList<>(memtableIndexes);
178144
}
179145

180146
public long estimatedMemIndexMemoryUsed()

src/java/org/apache/cassandra/index/sai/plan/QueryController.java

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import org.apache.cassandra.schema.TableMetadata;
6666
import org.apache.cassandra.tracing.Tracing;
6767
import org.apache.cassandra.utils.InsertionOrderedNavigableSet;
68-
import org.apache.cassandra.utils.Pair;
6968
import org.apache.cassandra.utils.Throwables;
7069

7170
import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_VECTOR_SEARCH_ORDER_CHUNK_SIZE;
@@ -178,12 +177,12 @@ public UnfilteredRowIterator queryStorage(List<PrimaryKey> keys, ReadExecutionCo
178177
* This is achieved by creating an on-disk view of the query that maps the expressions to
179178
* the {@link SSTableIndex}s that will satisfy the expression.
180179
* <p>
181-
* Each (expression, SSTable indexes) pair is then passed to
182-
* {@link IndexSearchResultIterator#build(Expression, Collection, AbstractBounds, QueryContext, boolean, Runnable)}
183-
* to search the in-memory index associated with the expression and the SSTable indexes, the results of
180+
* Each {@link QueryViewBuilder.QueryExpressionView} is then passed to
181+
* {@link IndexSearchResultIterator#build(QueryViewBuilder.QueryExpressionView, AbstractBounds, QueryContext, boolean, Runnable)}
182+
* to search the in-memory indexes associated with the expression and the SSTable indexes, the results of
184183
* which are unioned and returned.
185184
* <p>
186-
* The results from each call to {@link IndexSearchResultIterator#build(Expression, Collection, AbstractBounds, QueryContext, boolean, Runnable)}
185+
* The results from each call to {@link IndexSearchResultIterator#build(QueryViewBuilder.QueryExpressionView, AbstractBounds, QueryContext, boolean, Runnable)}
187186
* are added to a {@link KeyRangeIntersectionIterator} and returned if strict filtering is allowed.
188187
* <p>
189188
* If strict filtering is not allowed, indexes are split into two groups according to the repaired status of their
@@ -214,30 +213,31 @@ public KeyRangeIterator.Builder getIndexQueryResults(Collection<Expression> expr
214213
// If strict filtering is enabled, evaluate indexes for both repaired and un-repaired SSTables together.
215214
// This usually means we are making this local index query in the context of a user query that reads
216215
// from a single replica and thus can safely perform local intersections.
217-
for (Pair<Expression, Collection<SSTableIndex>> queryViewPair : queryView.view)
218-
builder.add(IndexSearchResultIterator.build(queryViewPair.left, queryViewPair.right, mergeRange, queryContext, true, () -> {}));
216+
for (QueryViewBuilder.QueryExpressionView queryExpressionView : queryView.view)
217+
builder.add(IndexSearchResultIterator.build(queryExpressionView, mergeRange, queryContext, true, () -> {}));
219218
}
220219
else
221220
{
222221
KeyRangeIterator.Builder repairedBuilder = KeyRangeIntersectionIterator.builder(expressions.size(), () -> {});
223222

224-
for (Pair<Expression, Collection<SSTableIndex>> queryViewPair : queryView.view)
223+
for (QueryViewBuilder.QueryExpressionView queryExpressionView : queryView.view)
225224
{
225+
Expression expression = queryExpressionView.expression;
226226
// The initial sizes here reflect little more than an effort to avoid resizing for
227227
// partition-restricted searches w/ LCS:
228228
List<SSTableIndex> repaired = new ArrayList<>(5);
229229
List<SSTableIndex> unrepaired = new ArrayList<>(5);
230230

231231
// Split SSTable indexes into repaired and un-reparired:
232-
for (SSTableIndex index : queryViewPair.right)
232+
for (SSTableIndex index : queryExpressionView.sstableIndexes)
233233
if (index.getSSTable().isRepaired())
234234
repaired.add(index);
235235
else
236236
unrepaired.add(index);
237237

238238
// Always build an iterator for the un-repaired set, given this must include Memtable indexes...
239239
IndexSearchResultIterator unrepairedIterator =
240-
IndexSearchResultIterator.build(queryViewPair.left, unrepaired, mergeRange, queryContext, true, () -> {});
240+
IndexSearchResultIterator.build(expression, queryExpressionView.memtableIndexes, unrepaired, mergeRange, queryContext, true, () -> {});
241241

242242
// ...but ignore it if our combined results are empty.
243243
if (unrepairedIterator.getMaxKeys() > 0)
@@ -253,7 +253,7 @@ public KeyRangeIterator.Builder getIndexQueryResults(Collection<Expression> expr
253253

254254
// ...then only add an iterator to the repaired intersection if repaired SSTable indexes exist.
255255
if (!repaired.isEmpty())
256-
repairedBuilder.add(IndexSearchResultIterator.build(queryViewPair.left, repaired, mergeRange, queryContext, false, () -> {}));
256+
repairedBuilder.add(IndexSearchResultIterator.build(expression, Collections.emptyList(), repaired, mergeRange, queryContext, false, () -> {}));
257257
}
258258

259259
if (repairedBuilder.rangeCount() > 0)
@@ -274,8 +274,8 @@ private void maybeTriggerGuardrails(QueryViewBuilder.QueryView queryView)
274274
int referencedIndexes = 0;
275275

276276
// We want to make sure that no individual column expression touches too many SSTable-attached indexes:
277-
for (Pair<Expression, Collection<SSTableIndex>> expressionSSTables : queryView.view)
278-
referencedIndexes = Math.max(referencedIndexes, expressionSSTables.right.size());
277+
for (QueryViewBuilder.QueryExpressionView expressionSSTables : queryView.view)
278+
referencedIndexes = Math.max(referencedIndexes, expressionSSTables.sstableIndexes.size());
279279

280280
if (Guardrails.saiSSTableIndexesPerQuery.failsOn(referencedIndexes, null))
281281
{
@@ -319,14 +319,19 @@ public KeyRangeIterator getTopKRows(RowFilter.Expression expression)
319319
StorageAttachedIndex index = indexFor(expression);
320320
assert index != null;
321321
Expression planExpression = Expression.create(index).add(Operator.ANN, expression.getIndexValue().duplicate());
322-
// search memtable before referencing sstable indexes; otherwise we may miss newly flushed memtable index
323-
KeyRangeIterator memtableResults = index.memtableIndexManager().searchMemtableIndexes(queryContext, planExpression, mergeRange);
324322

325323
QueryViewBuilder.QueryView queryView = new QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build();
326324
Runnable onClose = () -> queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly);
327325

328326
try
329327
{
328+
List<KeyRangeIterator> memtableResults = queryView.view
329+
.stream()
330+
.map(v -> v.memtableIndexes)
331+
.flatMap(Collection::stream)
332+
.map(idx -> idx.search(queryContext, planExpression, mergeRange))
333+
.collect(Collectors.toList());
334+
330335
List<KeyRangeIterator> sstableIntersections = queryView.view
331336
.stream()
332337
.map(this::createRowIdIterator)
@@ -360,16 +365,21 @@ private KeyRangeIterator getTopKRows(List<PrimaryKey> rawSourceKeys, RowFilter.E
360365
Expression planExpression = Expression.create(index);
361366
planExpression.add(Operator.ANN, expression.getIndexValue().duplicate());
362367

363-
// search memtable before referencing sstable indexes; otherwise we may miss newly flushed memtable index
364-
KeyRangeIterator memtableResults = index.memtableIndexManager().limitToTopResults(queryContext, sourceKeys, planExpression);
365368
QueryViewBuilder.QueryView queryView = new QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build();
366369
Runnable onClose = () -> queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly);
367370

368371
try
369372
{
373+
List<KeyRangeIterator> memtableResults = queryView.view
374+
.stream()
375+
.map(v -> v.memtableIndexes)
376+
.flatMap(Collection::stream)
377+
.map(idx -> idx.limitToTopResults(sourceKeys, planExpression, vectorQueryContext.limit()))
378+
.collect(Collectors.toList());
379+
370380
List<KeyRangeIterator> sstableIntersections = queryView.view
371381
.stream()
372-
.flatMap(pair -> pair.right.stream())
382+
.flatMap(pair -> pair.sstableIndexes.stream())
373383
.map(idx -> {
374384
try
375385
{
@@ -395,15 +405,15 @@ private KeyRangeIterator getTopKRows(List<PrimaryKey> rawSourceKeys, RowFilter.E
395405
/**
396406
* Create row id iterator from different indexes' on-disk searcher of the same sstable
397407
*/
398-
private KeyRangeIterator createRowIdIterator(Pair<Expression, Collection<SSTableIndex>> indexExpression)
408+
private KeyRangeIterator createRowIdIterator(QueryViewBuilder.QueryExpressionView indexExpression)
399409
{
400-
List<KeyRangeIterator> subIterators = indexExpression.right
410+
List<KeyRangeIterator> subIterators = indexExpression.sstableIndexes
401411
.stream()
402412
.map(index ->
403413
{
404414
try
405415
{
406-
List<KeyRangeIterator> iterators = index.search(indexExpression.left, mergeRange, queryContext);
416+
List<KeyRangeIterator> iterators = index.search(indexExpression.expression, mergeRange, queryContext);
407417
// concat the result from multiple segments for the same index
408418
return KeyRangeConcatIterator.builder(iterators.size()).add(iterators).build();
409419
}

0 commit comments

Comments
 (0)