Skip to content

Commit 3241da6

Browse files
authored
Allow config the docs threshold for auto-partitioning (#144690)
Today, auto data partitioning uses a hardcoded threshold (250k docs) to decide between shard and doc partitioning. Below that threshold, shard partitioning is used because the overhead of doc partitioning outweighs the concurrency benefits for small shards. However, this threshold is not configurable, making it difficult to exercise the doc-partitioning path in tests. This change exposes the threshold as a cluster setting (esql.docs_threshold_auto_partitioning) and query pragma, allowing us to trigger this path more often in tests.
1 parent 3874506 commit 3241da6

File tree

16 files changed

+159
-1
lines changed

16 files changed

+159
-1
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public Factory(
7474
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
7575
DataPartitioning dataPartitioning,
7676
DataPartitioning.AutoStrategy autoStrategy,
77+
int docThresholdForAutoStrategy,
7778
int taskConcurrency,
7879
int maxPageSize,
7980
int limit,
@@ -86,7 +87,7 @@ public Factory(
8687
dataPartitioning == DataPartitioning.AUTO ? autoStrategy.pickStrategy(limit) : q -> {
8788
throw new UnsupportedOperationException("locked in " + dataPartitioning);
8889
},
89-
LuceneOperator.SMALL_INDEX_BOUNDARY,
90+
docThresholdForAutoStrategy,
9091
taskConcurrency,
9192
limit,
9293
needsScore,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/TimeSeriesSourceOperator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public static final class Factory extends LuceneSourceOperator.Factory {
3131
public Factory(
3232
IndexedByShardId<? extends ShardContext> contexts,
3333
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
34+
int docThresholdForAutoStrategy,
3435
int taskConcurrency,
3536
int maxPageSize,
3637
int limit
@@ -40,6 +41,7 @@ public Factory(
4041
queryFunction,
4142
DataPartitioning.SHARD,
4243
query -> { throw new UnsupportedOperationException("locked to SHARD partitioning"); },
44+
docThresholdForAutoStrategy,
4345
taskConcurrency,
4446
maxPageSize,
4547
limit,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, List<Luc
494494
ctx -> queryAndTags,
495495
randomFrom(DataPartitioning.values()),
496496
DataPartitioning.AutoStrategy.DEFAULT,
497+
LuceneOperator.SMALL_INDEX_BOUNDARY,
497498
randomIntBetween(1, 10),
498499
randomPageSize(),
499500
limit,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneQueryEvaluatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ private static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader,
285285
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())),
286286
randomFrom(DataPartitioning.values()),
287287
DataPartitioning.AutoStrategy.DEFAULT,
288+
LuceneOperator.SMALL_INDEX_BOUNDARY,
288289
randomIntBetween(1, 10),
289290
randomPageSize(),
290291
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ private LuceneSourceOperator.Factory simple(DataPartitioning dataPartitioning, i
246246
queryFunction,
247247
dataPartitioning,
248248
DataPartitioning.AutoStrategy.DEFAULT,
249+
LuceneOperator.SMALL_INDEX_BOUNDARY,
249250
taskConcurrency,
250251
maxPageSize,
251252
limit,
@@ -451,6 +452,7 @@ public void testAccumulateSearchLoad() throws IOException {
451452
queryFunction,
452453
DataPartitioning.SEGMENT,
453454
DataPartitioning.AutoStrategy.DEFAULT,
455+
LuceneOperator.SMALL_INDEX_BOUNDARY,
454456
taskConcurrency,
455457
maxPageSize,
456458
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/TimeSeriesSourceOperatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ TimeSeriesSourceOperator.Factory simple(List<LuceneSliceQueue.QueryAndTags> quer
8383
return new TimeSeriesSourceOperator.Factory(
8484
new IndexedByShardIdFromList<>(contexts),
8585
c -> queryAndTags,
86+
LuceneOperator.SMALL_INDEX_BOUNDARY,
8687
randomIntBetween(1, 4),
8788
between(10, 100),
8889
between(1, 100)

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ private SourceOperator simpleInput(DriverContext context, int size, int commitEv
278278
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(Queries.ALL_DOCS_INSTANCE, List.of())),
279279
DataPartitioning.SHARD,
280280
DataPartitioning.AutoStrategy.DEFAULT,
281+
LuceneOperator.SMALL_INDEX_BOUNDARY,
281282
1,// randomIntBetween(1, 10),
282283
pageSize,
283284
LuceneOperator.NO_LIMIT,
@@ -858,6 +859,7 @@ public void testWithNulls() throws IOException {
858859
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(Queries.ALL_DOCS_INSTANCE, List.of())),
859860
randomFrom(DataPartitioning.values()),
860861
DataPartitioning.AutoStrategy.DEFAULT,
862+
LuceneOperator.SMALL_INDEX_BOUNDARY,
861863
randomIntBetween(1, 10),
862864
randomPageSize(),
863865
LuceneOperator.NO_LIMIT,
@@ -1034,6 +1036,7 @@ public void testManyShards() throws IOException {
10341036
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(Queries.ALL_DOCS_INSTANCE, List.of())),
10351037
DataPartitioning.SHARD,
10361038
DataPartitioning.AutoStrategy.DEFAULT,
1039+
LuceneOperator.SMALL_INDEX_BOUNDARY,
10371040
randomIntBetween(1, 10),
10381041
1000,
10391042
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ private SourceOperator sourceOperator(DriverContext context, int pageSize) {
226226
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(Queries.ALL_DOCS_INSTANCE, List.of())),
227227
DataPartitioning.SHARD,
228228
DataPartitioning.AutoStrategy.DEFAULT,
229+
LuceneOperator.SMALL_INDEX_BOUNDARY,
229230
randomIntBetween(1, 10),
230231
pageSize,
231232
LuceneOperator.NO_LIMIT,
@@ -1539,6 +1540,7 @@ public void testWithNulls() throws IOException {
15391540
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(Queries.ALL_DOCS_INSTANCE, List.of())),
15401541
randomFrom(DataPartitioning.values()),
15411542
DataPartitioning.AutoStrategy.DEFAULT,
1543+
LuceneOperator.SMALL_INDEX_BOUNDARY,
15421544
randomIntBetween(1, 10),
15431545
randomPageSize(),
15441546
LuceneOperator.NO_LIMIT,
@@ -2015,6 +2017,7 @@ public void testManyShards() throws IOException {
20152017
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(Queries.ALL_DOCS_INSTANCE, List.of())),
20162018
DataPartitioning.SHARD,
20172019
DataPartitioning.AutoStrategy.DEFAULT,
2020+
LuceneOperator.SMALL_INDEX_BOUNDARY,
20182021
randomIntBetween(1, 10),
20192022
1000,
20202023
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,6 +1261,67 @@ public void testProfileConditionalBlockLoader() throws IOException {
12611261
assertMap(reader, matchesMap().extraOk().entry("status", matchesMap().extraOk().entry("readers_built", readersBuiltMatcher)));
12621262
}
12631263

1264+
public void testAutoPartitioning() throws IOException {
1265+
indexTimestampData(1);
1266+
assumeTrue("require pragmas", Build.current().isSnapshot());
1267+
{
1268+
RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | STATS AVG(value)");
1269+
builder.profile(true);
1270+
builder.pragmas(
1271+
Settings.builder().put("data_partitioning", "auto").put("esql.docs_threshold_auto_partitioning", 1000_000).build()
1272+
);
1273+
Map<String, Object> result = runEsql(builder);
1274+
assertResultMap(
1275+
result,
1276+
getResultMatcher(result).entry("profile", getProfileMatcher()),
1277+
matchesList().item(matchesMap().entry("name", "AVG(value)").entry("type", "double")),
1278+
equalTo(List.of(List.of(499.5d)))
1279+
);
1280+
@SuppressWarnings("unchecked")
1281+
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
1282+
for (Map<String, Object> p : profiles) {
1283+
@SuppressWarnings("unchecked")
1284+
List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
1285+
for (Map<String, Object> o : operators) {
1286+
String name = signature(o);
1287+
if (name.equals("LuceneSourceOperator")) {
1288+
MapMatcher status = matchesMap().entry("total_slices", equalTo(1))
1289+
.entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD"))
1290+
.extraOk();
1291+
assertMap(o, matchesMap().entry("operator", startsWith(name)).entry("status", status));
1292+
}
1293+
}
1294+
}
1295+
}
1296+
{
1297+
RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | STATS AVG(value)");
1298+
builder.profile(true);
1299+
builder.pragmas(Settings.builder().put("data_partitioning", "auto").put("esql.docs_threshold_auto_partitioning", 20).build());
1300+
Map<String, Object> result = runEsql(builder);
1301+
assertResultMap(
1302+
result,
1303+
getResultMatcher(result).entry("profile", getProfileMatcher()),
1304+
matchesList().item(matchesMap().entry("name", "AVG(value)").entry("type", "double")),
1305+
equalTo(List.of(List.of(499.5d)))
1306+
);
1307+
@SuppressWarnings("unchecked")
1308+
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
1309+
for (Map<String, Object> p : profiles) {
1310+
@SuppressWarnings("unchecked")
1311+
List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
1312+
for (Map<String, Object> o : operators) {
1313+
String name = signature(o);
1314+
if (name.equals("LuceneSourceOperator")) {
1315+
MapMatcher status = matchesMap().entry("total_slices", greaterThan(1))
1316+
.entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "DOC"))
1317+
.extraOk();
1318+
assertMap(o, matchesMap().entry("operator", startsWith(name)).entry("status", status));
1319+
}
1320+
}
1321+
}
1322+
}
1323+
}
1324+
12641325
private Map<String, Object> findSingleReaderProfile(String driverDescription, Map<String, Object> result) {
12651326
Map<String, Object> reader = null;
12661327
@SuppressWarnings("unchecked")

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,11 @@ protected void addRandomPragma(Settings.Builder pragma) {
461461
pragma.put(PlannerSettings.PARTIAL_AGGREGATION_EMIT_KEYS_THRESHOLD.getKey(), between(10, 1000))
462462
.put(PlannerSettings.PARTIAL_AGGREGATION_EMIT_UNIQUENESS_THRESHOLD.getKey(), randomDoubleBetween(0.1, 1.0, true));
463463
}
464+
if (enableRoundingDoubleValuesOnAsserting()
465+
&& hasCapabilities(client(), List.of("auto_partition_docs_threshold"))
466+
&& randomBoolean()) {
467+
pragma.put(PlannerSettings.DOC_THRESHOLD_AUTO_PARTITIONING.getKey(), between(1, 1000));
468+
}
464469
if (randomBoolean() && hasCapabilities(client(), List.of("fork_no_implicit_limit"))) {
465470
pragma.put("fork_implicit_limit", false);
466471
}

0 commit comments

Comments
 (0)