Skip to content

Commit c133fab

Browse files
authored
CNDB-13925: Prefer not analyzed indexes for contains queries (#1718)
Prefer not-analyzed indexes over analyzed indexes for contains queries, so they have a deterministic behaviour. Also, emit a client warning when a not-analyzed index is selected over an analyzed index. Otherwise, different points in the codebase will make different, pseudo-random decisions about what index should be used for a certain contains expression, leading to erratic behaviour.
1 parent 4bc9132 commit c133fab

File tree

9 files changed

+423
-33
lines changed

9 files changed

+423
-33
lines changed

src/java/org/apache/cassandra/cql3/Operator.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,16 @@ public boolean isSlice()
575575
return this == LT || this == LTE || this == GT || this == GTE;
576576
}
577577

578+
/**
579+
* Checks if this operator is any of the variations of contains ({@code [NOT] CONTAINS [KEY]}).
580+
*
581+
* @return {@code true} if this operator is any kind of contains operator, {@code false} otherwise.
582+
*/
583+
public boolean isContains()
584+
{
585+
return this == CONTAINS || this == CONTAINS_KEY || this == NOT_CONTAINS || this == NOT_CONTAINS_KEY;
586+
}
587+
578588
@Override
579589
public String toString()
580590
{

src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.cassandra.index.IndexRegistry;
3939
import org.apache.cassandra.schema.ColumnMetadata;
4040
import org.apache.cassandra.serializers.ListSerializer;
41+
import org.apache.cassandra.service.ClientWarn;
4142
import org.apache.cassandra.transport.ProtocolVersion;
4243
import org.apache.cassandra.utils.ByteBufferUtil;
4344
import org.apache.cassandra.utils.Pair;
@@ -564,6 +565,8 @@ public String toString()
564565
// This holds CONTAINS, CONTAINS_KEY, NOT CONTAINS, NOT CONTAINS KEY and map[key] = value restrictions because we might want to have any combination of them.
565566
public static final class ContainsRestriction extends SingleColumnRestriction
566567
{
568+
public static final String MULTIPLE_INDEXES_WARNING = "Multiple indexes found for CONTAINS restriction on %s. Using not-analyzed index %s";
569+
567570
private final List<Term> values = new ArrayList<>(); // for CONTAINS
568571
private final List<Term> negativeValues = new ArrayList<>(); // for NOT_CONTAINS
569572
private final List<Term> keys = new ArrayList<>(); // for CONTAINS_KEY
@@ -732,6 +735,38 @@ public int numberOfNegativeEntries()
732735
return negativeEntryKeys.size();
733736
}
734737

738+
@Override
739+
public Index findSupportingIndex(IndexRegistry indexRegistry)
740+
{
741+
// if there are multiple supporting indexes, we prefer those without an analyzer (see CNDB-13925)
742+
Index notAnalyzedIndex = null;
743+
Index analyzedIndex = null;
744+
for (Index index : indexRegistry.listIndexes())
745+
{
746+
if (isSupportedBy(index))
747+
{
748+
if (index.getAnalyzer(null).isPresent())
749+
analyzedIndex = index;
750+
else
751+
notAnalyzedIndex = index;
752+
}
753+
}
754+
755+
if (notAnalyzedIndex != null)
756+
{
757+
// We prefer the not analyzed index, but if there was also an analyzed index, we warn the user.
758+
// We use a client warning key so the warning is emitted just once per query.
759+
if (analyzedIndex != null)
760+
{
761+
String msg = String.format(MULTIPLE_INDEXES_WARNING, columnDef.name, notAnalyzedIndex.getIndexMetadata().name);
762+
ClientWarn.instance.warn(msg, "multiple_indexes_for_contains_on_" + columnDef.name);
763+
}
764+
return notAnalyzedIndex;
765+
}
766+
767+
return analyzedIndex;
768+
}
769+
735770
@Override
736771
public void addFunctionsTo(List<Function> functions)
737772
{

src/java/org/apache/cassandra/index/Index.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,4 +1033,41 @@ enum Status
10331033
BUILD_SUCCEEDED,
10341034
DROPPED
10351035
}
1036+
1037+
/**
1038+
* Returns the best index for a given column and operator among the specified collection of indexes.
1039+
* </p>
1040+
* The best index is simply the first one that supports the column and operator, because we shouldn't have multiple
1041+
* indexes for the same column and operator, so we can just return the first one. The only exception to this is when
1042+
* the operator is {@code [NOT] CONTAINS [KEY]}, in which case the best index be the first one that doesn't use an
1043+
* analyzer.
1044+
*
1045+
* @param indexes a collection on indexes
1046+
* @param column a column
1047+
* @param operator an operator
1048+
* @return the best index for the column and operator among the specified indexes
1049+
*/
1050+
static <T extends Index> Optional<T> getBestIndexFor(Collection<T> indexes, ColumnMetadata column, Operator operator)
1051+
{
1052+
// we simply return the first index that supports the expression, unless it's a contains operator
1053+
if (!operator.isContains())
1054+
return indexes.stream().filter((i) -> i.supportsExpression(column, operator)).findFirst();
1055+
1056+
// if we have a contains operator, we prefer indexes without an analyzer (see CNDB-13925)
1057+
T firstAnalyzedIndex = null;
1058+
for (T index : indexes)
1059+
{
1060+
if (index.supportsExpression(column, operator))
1061+
{
1062+
// we prefer indexes without an analyzer
1063+
if (index.getAnalyzer(null).isEmpty())
1064+
return Optional.of(index);
1065+
1066+
if (firstAnalyzedIndex == null)
1067+
firstAnalyzedIndex = index;
1068+
}
1069+
}
1070+
1071+
return Optional.ofNullable(firstAnalyzedIndex);
1072+
}
10361073
}

src/java/org/apache/cassandra/index/IndexRegistry.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Optional;
2727
import java.util.Set;
2828
import java.util.concurrent.Callable;
29-
import java.util.function.Function;
3029
import java.util.function.Predicate;
3130
import java.util.function.Supplier;
3231

@@ -95,7 +94,7 @@ public Index getIndex(IndexMetadata indexMetadata)
9594
}
9695

9796
@Override
98-
public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
97+
public Optional<Index> getBestIndexFor(ColumnMetadata column, Operator operator)
9998
{
10099
return Optional.empty();
101100
}
@@ -283,7 +282,8 @@ public Collection<Index.Group> listIndexGroups()
283282
return Collections.singletonList(group);
284283
}
285284

286-
public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
285+
@Override
286+
public Optional<Index> getBestIndexFor(ColumnMetadata column, Operator operator)
287287
{
288288
return Optional.empty();
289289
}
@@ -306,19 +306,15 @@ default void registerIndex(Index index)
306306

307307
default Optional<Index.Analyzer> getAnalyzerFor(ColumnMetadata column, Operator operator, ByteBuffer value)
308308
{
309-
for (Index index : listIndexes())
310-
{
311-
if (index.supportsExpression(column, operator))
312-
{
313-
Optional<Index.Analyzer> analyzer = index.getAnalyzer(value);
314-
if (analyzer.isPresent())
315-
return analyzer;
316-
}
317-
}
318-
return Optional.empty();
309+
return getBestIndexFor(column, operator).flatMap(i -> i.getAnalyzer(value));
310+
}
311+
312+
default Optional<Index> getBestIndexFor(RowFilter.Expression expression)
313+
{
314+
return getBestIndexFor(expression.column(), expression.operator());
319315
}
320316

321-
Optional<Index> getBestIndexFor(RowFilter.Expression expression);
317+
Optional<Index> getBestIndexFor(ColumnMetadata column, Operator operator);
322318

323319
/**
324320
* Called at write time to ensure that values present in the update

src/java/org/apache/cassandra/index/SecondaryIndexManager.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.cassandra.concurrent.NamedThreadFactory;
7272
import org.apache.cassandra.concurrent.Stage;
7373
import org.apache.cassandra.config.DatabaseDescriptor;
74+
import org.apache.cassandra.cql3.Operator;
7475
import org.apache.cassandra.cql3.PageSize;
7576
import org.apache.cassandra.cql3.statements.schema.IndexTarget;
7677
import org.apache.cassandra.db.Clustering;
@@ -1261,19 +1262,20 @@ private static String commaSeparated(Collection<Index> indexes)
12611262
return indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(","));
12621263
}
12631264

1264-
1265-
public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
1265+
@Override
1266+
public Optional<Index> getBestIndexFor(ColumnMetadata column, Operator operator)
12661267
{
1267-
return indexes.values().stream().filter((i) -> i.supportsExpression(expression.column(), expression.operator())).findFirst();
1268+
return Index.getBestIndexFor(indexes.values(), column, operator);
12681269
}
12691270

12701271
public <T extends Index> Optional<T> getBestIndexFor(RowFilter.Expression expression, Class<T> indexType)
12711272
{
1272-
return indexes.values()
1273-
.stream()
1274-
.filter(i -> indexType.isInstance(i) && i.supportsExpression(expression.column(), expression.operator()))
1275-
.map(indexType::cast)
1276-
.findFirst();
1273+
Set<T> candidates = indexes.values()
1274+
.stream()
1275+
.filter(indexType::isInstance)
1276+
.map(indexType::cast)
1277+
.collect(Collectors.toSet());
1278+
return Index.getBestIndexFor(candidates, expression.column(), expression.operator());
12771279
}
12781280

12791281
/**

src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.cassandra.index.sai.plan;
1919

2020
import java.util.HashSet;
21+
import java.util.Optional;
2122
import java.util.Set;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.function.Function;
@@ -162,16 +163,14 @@ private static boolean selectedIndexes(RowFilter.Expression expression,
162163
if (expression.isUserDefined())
163164
return false;
164165

165-
boolean hasIndex = false;
166-
for (StorageAttachedIndex index : allIndexes)
166+
// collect the best index from those that support the specified expression
167+
Optional<StorageAttachedIndex> index = Index.getBestIndexFor(allIndexes, expression.column(), expression.operator());
168+
if (index.isPresent())
167169
{
168-
if (index.supportsExpression(expression.column(), expression.operator()))
169-
{
170-
selectedIndexes.add(index);
171-
hasIndex = true;
172-
}
170+
selectedIndexes.add(index.get());
171+
return true;
173172
}
174-
return hasIndex;
173+
return false;
175174
}
176175

177176
@Override
@@ -196,7 +195,7 @@ public boolean shouldEstimateInitialConcurrency()
196195
}
197196

198197
@Override
199-
public Index.Searcher searcherFor(ReadCommand command)
198+
public StorageAttachedIndexSearcher searcherFor(ReadCommand command)
200199
{
201200
return new StorageAttachedIndexSearcher(cfs,
202201
queryMetrics,

src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
import java.util.Map;
2929
import java.util.PriorityQueue;
3030
import java.util.Queue;
31+
import java.util.Set;
3132
import java.util.function.Supplier;
3233
import java.util.stream.Collectors;
3334
import javax.annotation.Nonnull;
3435
import javax.annotation.Nullable;
3536

37+
import com.google.common.annotations.VisibleForTesting;
3638
import com.google.common.base.Preconditions;
3739
import org.slf4j.Logger;
3840
import org.slf4j.LoggerFactory;
@@ -99,6 +101,24 @@ public ReadCommand command()
99101
return command;
100102
}
101103

104+
@VisibleForTesting
105+
public final Set<String> plannedIndexes()
106+
{
107+
try
108+
{
109+
Plan plan = controller.buildPlan().optimize();
110+
Set<String> indexes = new HashSet<>();
111+
plan.nodesOfType(Plan.IndexScan.class)
112+
.forEach(s -> indexes.add(s.getIndexName()));
113+
return indexes;
114+
}
115+
finally
116+
{
117+
// we need to call this to clean up the resources opened by the plan
118+
controller.abort();
119+
}
120+
}
121+
102122
@Override
103123
@SuppressWarnings("unchecked")
104124
public UnfilteredPartitionIterator search(ReadExecutionController executionController) throws RequestTimeoutException

test/unit/org/apache/cassandra/index/sai/SAITester.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.cassandra.db.ColumnFamilyStore;
6363
import org.apache.cassandra.db.Directories;
6464
import org.apache.cassandra.db.Keyspace;
65+
import org.apache.cassandra.db.ReadCommand;
6566
import org.apache.cassandra.db.compaction.CompactionManager;
6667
import org.apache.cassandra.db.lifecycle.SSTableSet;
6768
import org.apache.cassandra.db.lifecycle.View;
@@ -73,6 +74,8 @@
7374
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
7475
import org.apache.cassandra.index.sai.disk.format.Version;
7576
import org.apache.cassandra.index.sai.plan.QueryController;
77+
import org.apache.cassandra.index.sai.plan.StorageAttachedIndexQueryPlan;
78+
import org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher;
7679
import org.apache.cassandra.index.sai.utils.PrimaryKey;
7780
import org.apache.cassandra.index.sai.utils.ResourceLeakDetector;
7881
import org.apache.cassandra.inject.ActionBuilder;
@@ -88,10 +91,12 @@
8891
import org.apache.cassandra.schema.MockSchema;
8992
import org.apache.cassandra.schema.Schema;
9093
import org.apache.cassandra.schema.TableId;
94+
import org.apache.cassandra.service.ClientWarn;
9195
import org.apache.cassandra.service.StorageService;
9296
import org.apache.cassandra.utils.FBUtilities;
9397
import org.apache.cassandra.utils.Throwables;
9498
import org.apache.lucene.codecs.CodecUtil;
99+
import org.assertj.core.api.Assertions;
95100

96101
import static org.apache.cassandra.inject.ActionBuilder.newActionBuilder;
97102
import static org.apache.cassandra.inject.Expression.expr;
@@ -959,4 +964,57 @@ public void start()
959964
}
960965
}
961966
}
967+
968+
protected PlanSelectionAssertion assertThatPlanFor(String query, Object[]... expectedRows)
969+
{
970+
// First execute the query and check the number of rows returned
971+
ClientWarn.instance.captureWarnings();
972+
assertRowsIgnoringOrder(execute(query), expectedRows);
973+
List<String> warnings = ClientWarn.instance.getWarnings();
974+
ClientWarn.instance.resetWarnings();
975+
976+
ReadCommand command = parseReadCommand(query);
977+
Index.QueryPlan queryPlan = command.indexQueryPlan();
978+
if (queryPlan == null)
979+
return new PlanSelectionAssertion(null, warnings);
980+
981+
StorageAttachedIndexQueryPlan saiQueryPlan = (StorageAttachedIndexQueryPlan) queryPlan;
982+
Assertions.assertThat(saiQueryPlan).isNotNull();
983+
StorageAttachedIndexSearcher searcher = saiQueryPlan.searcherFor(command);
984+
Set<String> selectedIndexes = searcher.plannedIndexes();
985+
return new PlanSelectionAssertion(selectedIndexes, warnings);
986+
}
987+
988+
protected static class PlanSelectionAssertion
989+
{
990+
private final List<String> warnings;
991+
private final Set<String> selectedIndexes;
992+
993+
public PlanSelectionAssertion(@Nullable Set<String> selectedIndexes, @Nullable List<String> warnings)
994+
{
995+
this.warnings = warnings;
996+
this.selectedIndexes = selectedIndexes;
997+
}
998+
999+
public PlanSelectionAssertion uses(String... indexes)
1000+
{
1001+
Assertions.assertThat(selectedIndexes)
1002+
.isNotNull()
1003+
.as("Expected to select only %s, but got: %s", indexes, selectedIndexes)
1004+
.isEqualTo(Set.of(indexes));
1005+
return this;
1006+
}
1007+
1008+
public void doesntWarn()
1009+
{
1010+
Assert.assertNull(warnings);
1011+
}
1012+
1013+
public void warns(String expectedWarning)
1014+
{
1015+
Assert.assertNotNull(warnings);
1016+
Assert.assertEquals(1, warnings.size());
1017+
Assert.assertEquals(expectedWarning, warnings.get(0));
1018+
}
1019+
}
9621020
}

0 commit comments

Comments
 (0)