Skip to content

Commit 0703ce3

Browse files
authored
CNDB-13353: Queries with index-based ordering should reject aggregation queries (#1638)
Queries with index-based ordering (ANN, BM25, generic `ORDER BY`) don't support CQL's aggregation queries, such as `GROUP BY` or `SELECT COUNT(*)`. The reason is that the coordinator internally uses paging to fetch more results from the replicas until the aggregated limit is satisfied. Since top-k index-based ordering doesn't support paging, these queries will fail. We should reject these queries until we have a way to support them.
1 parent 7653af9 commit 0703ce3

File tree

4 files changed

+96
-9
lines changed

4 files changed

+96
-9
lines changed

src/java/org/apache/cassandra/cql3/statements/SelectStatement.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement
114114

115115
private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
116116
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(SelectStatement.logger, 1, TimeUnit.MINUTES);
117+
public static final String TOPK_AGGREGATION_ERROR = "Top-K queries can not be run with aggregation";
117118
public static final String TOPK_CONSISTENCY_LEVEL_ERROR = "Top-K queries can only be run with consistency level ONE/LOCAL_ONE. Consistency level %s was used.";
118119
public static final String TOPK_CONSISTENCY_LEVEL_WARNING = "Top-K queries can only be run with consistency level ONE " +
119120
"/ LOCAL_ONE / NODE_LOCAL. Consistency level %s was requested. " +
@@ -431,10 +432,16 @@ public ReadQuery getQuery(QueryState queryState,
431432

432433
// We don't support offset for top-k queries.
433434
checkFalse(userOffset != NO_OFFSET, String.format(TOPK_OFFSET_ERROR, userOffset));
435+
436+
// We don't support aggregation for top-k queries because we don't support paging.
437+
checkFalse(aggregationSpec != null, TOPK_AGGREGATION_ERROR);
434438
}
435439

436440
selectOptions.validate(queryState, table.keyspace, userLimit);
437441

442+
// If there's a secondary index that the command can use, have it validate the request parameters.
443+
query.maybeValidateIndexes();
444+
438445
return query;
439446
}
440447

@@ -783,13 +790,7 @@ private ReadQuery getRangeCommand(QueryOptions options, ColumnFilter columnFilte
783790
if (keyBounds == null)
784791
return ReadQuery.empty(table);
785792

786-
ReadQuery command =
787-
PartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
788-
789-
// If there's a secondary index that the command can use, have it validate the request parameters.
790-
command.maybeValidateIndexes();
791-
792-
return command;
793+
return PartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
793794
}
794795

795796
private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options, ColumnFilter columnFilter, QueryState queryState)

test/unit/org/apache/cassandra/index/sai/cql/BM25Test.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
import org.junit.Test;
2828

2929
import org.apache.cassandra.cql3.UntypedResultSet;
30+
import org.apache.cassandra.cql3.statements.SelectStatement;
31+
import org.apache.cassandra.exceptions.InvalidRequestException;
3032
import org.apache.cassandra.index.sai.SAITester;
3133
import org.apache.cassandra.index.sai.SAIUtil;
3234
import org.apache.cassandra.index.sai.disk.format.Version;
3335
import org.apache.cassandra.index.sai.disk.v1.SegmentBuilder;
3436
import org.apache.cassandra.index.sai.plan.QueryController;
3537

3638
import static org.apache.cassandra.index.sai.analyzer.AnalyzerEqOperatorSupport.EQ_AMBIGUOUS_ERROR;
39+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3740
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
3841
import static org.junit.Assert.assertEquals;
3942

@@ -597,8 +600,32 @@ public void testWildcardSelection()
597600

598601
var result = execute("SELECT * FROM %s ORDER BY v BM25 OF 'apple' LIMIT 3");
599602
assertThat(result).hasSize(1);
603+
}
604+
605+
@Test
606+
public void cannotHaveAggregationOnBM25Query()
607+
{
608+
createSimpleTable();
609+
610+
execute("INSERT INTO %s (k, v) VALUES (1, '4')");
611+
execute("INSERT INTO %s (k, v) VALUES (2, '3')");
612+
execute("INSERT INTO %s (k, v) VALUES (3, '2')");
613+
execute("INSERT INTO %s (k, v) VALUES (4, '1')");
614+
615+
assertThatThrownBy(() -> execute("SELECT max(v) FROM %s ORDER BY v BM25 OF 'apple' LIMIT 4"))
616+
.isInstanceOf(InvalidRequestException.class)
617+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
618+
619+
assertThatThrownBy(() -> execute("SELECT max(v) FROM %s WHERE k = 1 ORDER BY v BM25 OF 'apple' LIMIT 4"))
620+
.isInstanceOf(InvalidRequestException.class)
621+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
622+
623+
assertThatThrownBy(() -> execute("SELECT * FROM %s GROUP BY k ORDER BY v BM25 OF 'apple' LIMIT 4"))
624+
.isInstanceOf(InvalidRequestException.class)
625+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
600626

601-
var result2 = execute("SELECT * FROM %s GROUP BY k, c ORDER BY v BM25 OF 'apple' LIMIT 3");
602-
assertThat(result2).hasSize(1);
627+
assertThatThrownBy(() -> execute("SELECT count(*) FROM %s ORDER BY v BM25 OF 'apple' LIMIT 4"))
628+
.isInstanceOf(InvalidRequestException.class)
629+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
603630
}
604631
}

test/unit/org/apache/cassandra/index/sai/cql/GenericOrderByTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@
2323
import org.junit.Test;
2424

2525
import org.apache.cassandra.cql3.CQLTester;
26+
import org.apache.cassandra.cql3.statements.SelectStatement;
27+
import org.apache.cassandra.exceptions.InvalidRequestException;
2628
import org.apache.cassandra.index.sai.SAITester;
2729
import org.apache.cassandra.index.sai.plan.QueryController;
2830

31+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
32+
2933
public class GenericOrderByTest extends SAITester
3034
{
3135
@Test
@@ -226,4 +230,31 @@ public void testSelectionAndOrderByOnTheSameColumnWithLargeRowCountDesc() throws
226230
testSelectionAndOrderByOnTheSameColumnWithLargeRowCount(false);
227231
}
228232

233+
@Test
234+
public void cannotHaveAggregationOnOrderByQuery()
235+
{
236+
createTable("CREATE TABLE %s (k int PRIMARY KEY, v int)");
237+
createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex'");
238+
239+
execute("INSERT INTO %s (k, v) VALUES (1, 4)");
240+
execute("INSERT INTO %s (k, v) VALUES (2, 3)");
241+
execute("INSERT INTO %s (k, v) VALUES (3, 2)");
242+
execute("INSERT INTO %s (k, v) VALUES (4, 1)");
243+
244+
assertThatThrownBy(() -> execute("SELECT sum(v) FROM %s ORDER BY v LIMIT 4"))
245+
.isInstanceOf(InvalidRequestException.class)
246+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
247+
248+
assertThatThrownBy(() -> execute("SELECT sum(v) FROM %s WHERE k = 1 ORDER BY v LIMIT 4"))
249+
.isInstanceOf(InvalidRequestException.class)
250+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
251+
252+
assertThatThrownBy(() -> execute("SELECT * FROM %s GROUP BY k ORDER BY v LIMIT 4"))
253+
.isInstanceOf(InvalidRequestException.class)
254+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
255+
256+
assertThatThrownBy(() -> execute("SELECT count(*) FROM %s ORDER BY v LIMIT 4"))
257+
.isInstanceOf(InvalidRequestException.class)
258+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
259+
}
229260
}

test/unit/org/apache/cassandra/index/sai/cql/VectorInvalidQueryTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,34 @@ public void disallowClusteringColumnPredicateWithoutSupportingIndex()
240240
assertRows(execute("SELECT num FROM %s WHERE pk=3 AND num > 3 ORDER BY v ANN OF [1,1] LIMIT 1"), row(4));
241241
}
242242

243+
@Test
244+
public void cannotHaveAggregationOnANNQuery()
245+
{
246+
createTable("CREATE TABLE %s (k int PRIMARY KEY, v vector<float, 1>, c int)");
247+
createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex' WITH OPTIONS = {'similarity_function' : 'euclidean'}");
248+
249+
execute("INSERT INTO %s (k, v, c) VALUES (1, [4], 1)");
250+
execute("INSERT INTO %s (k, v, c) VALUES (2, [3], 10)");
251+
execute("INSERT INTO %s (k, v, c) VALUES (3, [2], 100)");
252+
execute("INSERT INTO %s (k, v, c) VALUES (4, [1], 1000)");
253+
254+
assertThatThrownBy(() -> execute("SELECT sum(c) FROM %s ORDER BY v ANN OF [0] LIMIT 4"))
255+
.isInstanceOf(InvalidRequestException.class)
256+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
257+
258+
assertThatThrownBy(() -> execute("SELECT sum(c) FROM %s WHERE k = 1 ORDER BY v ANN OF [0] LIMIT 4"))
259+
.isInstanceOf(InvalidRequestException.class)
260+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
261+
262+
assertThatThrownBy(() -> execute("SELECT * FROM %s GROUP BY k ORDER BY v ANN OF [0] LIMIT 4"))
263+
.isInstanceOf(InvalidRequestException.class)
264+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
265+
266+
assertThatThrownBy(() -> execute("SELECT count(*) FROM %s ORDER BY v ANN OF [0] LIMIT 4"))
267+
.isInstanceOf(InvalidRequestException.class)
268+
.hasMessage(SelectStatement.TOPK_AGGREGATION_ERROR);
269+
}
270+
243271
@Test
244272
public void canOnlyExecuteWithCorrectConsistencyLevel()
245273
{

0 commit comments

Comments
 (0)