Skip to content

Commit 41d7ca9

Browse files
authored
Adjust page size estimate for time-series queries (#136003) (#136095)
Today, we aimed for a page_size of around 250KB, but found that its value is highly sensitive to the number of columns loaded. For example, in time-series queries: loading one keyword dimension and one metric yields a page_size of 2509; two keywords result in 1684; no keywords give 4923. This causes performance fluctuations in time-series queries. Performance in time-series queries is sensitive to page_size because large pages can disable bulk loading and constant block optimizations, while smaller pages incur more overhead. Additionally, the page_size should be a multiple of the numeric block size of tsdb codec to maximize these optimizations. This change proposes using jumbo_size (defaulting to the larger of 1/1000th of the heap or 1MB) to calculate page_size, then adjusting it to be a multiple of 128 and capping it at 2048. Benchmarks show that 2048 is a good upper limit, balancing the overhead per page with the benefits of bulk loading and constant blocks.
1 parent 304b573 commit 41d7ca9

File tree

9 files changed

+103
-24
lines changed

9 files changed

+103
-24
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
* such as slice index and future max timestamp, to allow downstream operators to optimize processing.
2323
*/
2424
public final class TimeSeriesSourceOperator extends LuceneSourceOperator {
25+
private static final int MAX_TARGET_PAGE_SIZE = 2048;
26+
private static final int CHUNK_SIZE = 128;
2527

2628
public static final class Factory extends LuceneSourceOperator.Factory {
2729
public Factory(
@@ -76,4 +78,17 @@ protected void buildMetadataBlocks(Block[] blocks, int offset, int currentPagePo
7678
blocks[offset] = blockFactory.newConstantIntVector(currentSlice.slicePosition(), currentPagePos).asBlock();
7779
blocks[offset + 1] = blockFactory.newConstantLongVector(Long.MAX_VALUE, currentPagePos).asBlock();
7880
}
81+
82+
/**
83+
* For time-series queries, try to use a page size that is a multiple of CHUNK_SIZE (see NUMERIC_BLOCK_SIZE in the tsdb codec)
84+
* to enable bulk loading of numeric or tsid fields. Avoid pages that are too large, as this can disable bulk loading if there are
85+
* holes in the doc IDs and disable constant block optimizations. Therefore, we cap the page size at 2048, which balances the
86+
* overhead per page with the benefits of bulk loading and constant blocks.
87+
*/
88+
public static int pageSize(long estimateRowSizeInBytes, long maxPageSizeInBytes) {
89+
long chunkSizeInBytes = CHUNK_SIZE * estimateRowSizeInBytes;
90+
long numChunks = Math.ceilDiv(maxPageSizeInBytes, chunkSizeInBytes);
91+
long pageSize = Math.clamp(numChunks * CHUNK_SIZE, CHUNK_SIZE, MAX_TARGET_PAGE_SIZE);
92+
return Math.toIntExact(pageSize);
93+
}
7994
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ else if (aggregatorMode.isOutputPartial()) {
181181
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
182182
aggregatorMode,
183183
aggregatorFactories,
184-
context.pageSize(aggregateExec.estimatedRowSize()),
184+
context.pageSize(aggregateExec, aggregateExec.estimatedRowSize()),
185185
analysisRegistry
186186
);
187187
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
301301
querySupplier(esQueryExec.query()),
302302
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
303303
context.queryPragmas().taskConcurrency(),
304-
context.pageSize(rowEstimatedSize),
304+
context.pageSize(esQueryExec, rowEstimatedSize),
305305
limit,
306306
sortBuilders,
307307
estimatedPerRowSortSize,
@@ -312,17 +312,17 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
312312
shardContexts,
313313
querySupplier(esQueryExec.queryBuilderAndTags()),
314314
context.queryPragmas().taskConcurrency(),
315-
context.pageSize(rowEstimatedSize),
315+
context.pageSize(esQueryExec, rowEstimatedSize),
316316
limit
317317
);
318318
} else {
319319
luceneFactory = new LuceneSourceOperator.Factory(
320320
shardContexts,
321321
querySupplier(esQueryExec.queryBuilderAndTags()),
322322
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
323-
context.autoPartitioningStrategy().get(),
323+
context.autoPartitioningStrategy(),
324324
context.queryPragmas().taskConcurrency(),
325-
context.pageSize(rowEstimatedSize),
325+
context.pageSize(esQueryExec, rowEstimatedSize),
326326
limit,
327327
scoring
328328
);
@@ -408,7 +408,7 @@ public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
408408
groupSpecs,
409409
aggregatorMode,
410410
aggregatorFactories,
411-
context.pageSize(ts.estimatedRowSize())
411+
context.pageSize(ts, ts.estimatedRowSize())
412412
);
413413
}
414414

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.compute.data.Page;
2222
import org.elasticsearch.compute.lucene.DataPartitioning;
2323
import org.elasticsearch.compute.lucene.LuceneOperator;
24+
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
2425
import org.elasticsearch.compute.operator.ChangePointOperator;
2526
import org.elasticsearch.compute.operator.ColumnExtractOperator;
2627
import org.elasticsearch.compute.operator.ColumnLoadOperator;
@@ -203,8 +204,13 @@ public LocalExecutionPlanner(
203204
/**
204205
* turn the given plan into a list of drivers to execute
205206
*/
206-
public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) {
207-
207+
public LocalExecutionPlan plan(
208+
String description,
209+
FoldContext foldCtx,
210+
PlannerSettings plannerSettings,
211+
PhysicalPlan localPhysicalPlan
212+
) {
213+
final boolean timeSeries = localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec);
208214
var context = new LocalExecutionPlannerContext(
209215
description,
210216
new ArrayList<>(),
@@ -213,12 +219,8 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
213219
bigArrays,
214220
blockFactory,
215221
foldCtx,
216-
settings,
217-
new Holder<>(
218-
localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec)
219-
? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES
220-
: DataPartitioning.AutoStrategy.DEFAULT
221-
),
222+
plannerSettings,
223+
timeSeries,
222224
shardContexts
223225
);
224226

@@ -503,7 +505,7 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
503505
throw new EsqlIllegalArgumentException("limit only supported with literal values");
504506
}
505507
return source.with(
506-
new TopNOperatorFactory(limit, asList(elementTypes), asList(encoders), orders, context.pageSize(rowSize)),
508+
new TopNOperatorFactory(limit, asList(elementTypes), asList(encoders), orders, context.pageSize(topNExec, rowSize)),
507509
source.layout
508510
);
509511
}
@@ -1024,8 +1026,8 @@ public record LocalExecutionPlannerContext(
10241026
BigArrays bigArrays,
10251027
BlockFactory blockFactory,
10261028
FoldContext foldCtx,
1027-
Settings settings,
1028-
Holder<DataPartitioning.AutoStrategy> autoPartitioningStrategy,
1029+
PlannerSettings plannerSettings,
1030+
boolean timeSeries,
10291031
List<EsPhysicalOperationProviders.ShardContext> shardContexts
10301032
) {
10311033
void addDriverFactory(DriverFactory driverFactory) {
@@ -1036,7 +1038,11 @@ void driverParallelism(DriverParallelism parallelism) {
10361038
driverParallelism.set(parallelism);
10371039
}
10381040

1039-
int pageSize(Integer estimatedRowSize) {
1041+
DataPartitioning.AutoStrategy autoPartitioningStrategy() {
1042+
return timeSeries ? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES : DataPartitioning.AutoStrategy.DEFAULT;
1043+
}
1044+
1045+
int pageSize(PhysicalPlan node, Integer estimatedRowSize) {
10401046
if (estimatedRowSize == null) {
10411047
throw new IllegalStateException("estimated row size hasn't been set");
10421048
}
@@ -1046,7 +1052,11 @@ int pageSize(Integer estimatedRowSize) {
10461052
if (queryPragmas.pageSize() != 0) {
10471053
return queryPragmas.pageSize();
10481054
}
1049-
return Math.max(SourceOperator.MIN_TARGET_PAGE_SIZE, SourceOperator.TARGET_PAGE_SIZE / estimatedRowSize);
1055+
if (timeSeries && node instanceof EsQueryExec) {
1056+
return TimeSeriesSourceOperator.pageSize(estimatedRowSize, plannerSettings.valuesLoadingJumboSize().getBytes());
1057+
} else {
1058+
return Math.max(SourceOperator.MIN_TARGET_PAGE_SIZE, SourceOperator.TARGET_PAGE_SIZE / estimatedRowSize);
1059+
}
10501060
}
10511061
}
10521062

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) {
648648
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
649649
// it's doing this in the planning of EsQueryExec (the source of the data)
650650
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
651-
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.description(), context.foldCtx(), localPlan);
651+
var localExecutionPlan = planner.plan(context.description(), context.foldCtx(), plannerSettings, localPlan);
652652
if (LOGGER.isDebugEnabled()) {
653653
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
654654
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,7 @@ void executeSubPlan(
724724
LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan(
725725
"final",
726726
foldCtx,
727+
TEST_PLANNER_SETTINGS,
727728
new OutputExec(coordinatorPlan, collectedPages::add)
728729
);
729730
drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(getTestName()));
@@ -745,7 +746,12 @@ void executeSubPlan(
745746
throw new AssertionError("expected no failure", e);
746747
})
747748
);
748-
LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan("data", foldCtx, csvDataNodePhysicalPlan);
749+
LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan(
750+
"data",
751+
foldCtx,
752+
EsqlTestUtils.TEST_PLANNER_SETTINGS,
753+
csvDataNodePhysicalPlan
754+
);
749755

750756
drivers.addAll(dataNodeExecutionPlan.createDrivers(getTestName()));
751757
Randomness.shuffle(drivers);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@
136136
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
137137
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
138138
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
139+
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
139140
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
140141
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
141142
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
@@ -247,6 +248,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
247248
private TestDataSource countriesBboxWeb; // cartesian_shape field tests
248249

249250
private final Configuration config;
251+
private PlannerSettings plannerSettings;
250252

251253
private record TestDataSource(Map<String, EsField> mapping, EsIndex index, Analyzer analyzer, SearchStats stats) {}
252254

@@ -360,6 +362,7 @@ public void init() {
360362
functionRegistry,
361363
enrichResolution
362364
);
365+
this.plannerSettings = TEST_PLANNER_SETTINGS;
363366
}
364367

365368
TestDataSource makeTestDataSource(
@@ -7878,7 +7881,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP
78787881
List.of()
78797882
);
78807883

7881-
return planner.plan("test", FoldContext.small(), plan);
7884+
return planner.plan("test", FoldContext.small(), plannerSettings, plan);
78827885
}
78837886

78847887
private List<Set<String>> findFieldNamesInLookupJoinDescription(LocalExecutionPlanner.LocalExecutionPlan physicalOperations) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import org.apache.lucene.tests.index.RandomIndexWriter;
2020
import org.elasticsearch.cluster.ClusterName;
2121
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.unit.ByteSizeValue;
2223
import org.elasticsearch.common.util.BigArrays;
24+
import org.elasticsearch.compute.aggregation.AggregatorMode;
25+
import org.elasticsearch.compute.lucene.DataPartitioning;
2326
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
2427
import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
2528
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
@@ -42,6 +45,7 @@
4245
import org.elasticsearch.plugins.Plugin;
4346
import org.elasticsearch.search.internal.AliasFilter;
4447
import org.elasticsearch.search.internal.ContextIndexSearcher;
48+
import org.elasticsearch.xpack.esql.core.expression.Alias;
4549
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
4650
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
4751
import org.elasticsearch.xpack.esql.core.expression.Literal;
@@ -51,9 +55,11 @@
5155
import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
5256
import org.elasticsearch.xpack.esql.core.util.StringUtils;
5357
import org.elasticsearch.xpack.esql.expression.Order;
58+
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
5459
import org.elasticsearch.xpack.esql.index.EsIndex;
5560
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
5661
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
62+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
5763
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
5864
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
5965
import org.elasticsearch.xpack.esql.session.Configuration;
@@ -125,6 +131,7 @@ public void testLuceneSourceOperatorHugeRowSize() throws IOException {
125131
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
126132
"test",
127133
FoldContext.small(),
134+
TEST_PLANNER_SETTINGS,
128135
new EsQueryExec(
129136
Source.EMPTY,
130137
index().name(),
@@ -156,6 +163,7 @@ public void testLuceneTopNSourceOperator() throws IOException {
156163
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
157164
"test",
158165
FoldContext.small(),
166+
TEST_PLANNER_SETTINGS,
159167
new EsQueryExec(
160168
Source.EMPTY,
161169
index().name(),
@@ -187,6 +195,7 @@ public void testLuceneTopNSourceOperatorDistanceSort() throws IOException {
187195
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
188196
"test",
189197
FoldContext.small(),
198+
TEST_PLANNER_SETTINGS,
190199
new EsQueryExec(
191200
Source.EMPTY,
192201
index().name(),
@@ -211,6 +220,7 @@ public void testDriverClusterAndNodeName() throws IOException {
211220
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
212221
"test",
213222
FoldContext.small(),
223+
TEST_PLANNER_SETTINGS,
214224
new EsQueryExec(
215225
Source.EMPTY,
216226
index().name(),
@@ -244,6 +254,41 @@ public void testPlanUnmappedFieldExtractSyntheticSource() throws Exception {
244254
assertThat(blockLoader, instanceOf(FallbackSyntheticSourceBlockLoader.class));
245255
}
246256

257+
public void testTimeSeries() throws IOException {
258+
int estimatedRowSize = estimatedRowSizeIsHuge ? randomIntBetween(20000, Integer.MAX_VALUE) : randomIntBetween(1, 50);
259+
EsQueryExec queryExec = new EsQueryExec(
260+
Source.EMPTY,
261+
index().name(),
262+
IndexMode.STANDARD,
263+
index().indexNameWithModes(),
264+
List.of(),
265+
new Literal(Source.EMPTY, 10, DataType.INTEGER),
266+
List.of(),
267+
estimatedRowSize,
268+
List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of()))
269+
);
270+
TimeSeriesAggregateExec aggExec = new TimeSeriesAggregateExec(
271+
Source.EMPTY,
272+
queryExec,
273+
List.of(),
274+
List.of(new Alias(Source.EMPTY, "count(*)", new Count(Source.EMPTY, Literal.keyword(Source.EMPTY, "*")))),
275+
AggregatorMode.SINGLE,
276+
List.of(),
277+
10,
278+
null
279+
);
280+
PlannerSettings plannerSettings = new PlannerSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000, ByteSizeValue.ofMb(1));
281+
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), plannerSettings, aggExec);
282+
assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency()));
283+
LocalExecutionPlanner.DriverSupplier supplier = plan.driverFactories.get(0).driverSupplier();
284+
var factory = (LuceneSourceOperator.Factory) supplier.physicalOperation().sourceOperatorFactory;
285+
if (estimatedRowSizeIsHuge) {
286+
assertThat(factory.maxPageSize(), equalTo(128));
287+
} else {
288+
assertThat(factory.maxPageSize(), equalTo(2048));
289+
}
290+
}
291+
247292
private BlockLoader constructBlockLoader() throws IOException {
248293
EsQueryExec queryExec = new EsQueryExec(
249294
Source.EMPTY,
@@ -264,7 +309,7 @@ private BlockLoader constructBlockLoader() throws IOException {
264309
),
265310
MappedFieldType.FieldExtractPreference.NONE
266311
);
267-
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), fieldExtractExec);
312+
var plan = planner().plan("test", FoldContext.small(), TEST_PLANNER_SETTINGS, fieldExtractExec);
268313
var p = plan.driverFactories.get(0).driverSupplier().physicalOperation();
269314
var fieldInfo = ((ValuesSourceReaderOperator.Factory) p.intermediateOperatorFactories.get(0)).fields().get(0);
270315
return fieldInfo.blockLoader().apply(0);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
137137
groupSpecs,
138138
aggregatorMode,
139139
aggregatorFactories,
140-
context.pageSize(ts.estimatedRowSize())
140+
context.pageSize(ts, ts.estimatedRowSize())
141141
);
142142
}
143143

0 commit comments

Comments
 (0)