Skip to content

Commit 793b0ae

Browse files
authored
Adjust page size estimate for time-series queries (#136003)
Today, we aimed for a page_size of around 250KB, but found that its value is highly sensitive to the number of columns loaded. For example, in time-series queries: loading one keyword dimension and one metric yields a page_size of 2509; two keywords result in 1684; no keywords give 4923. This causes performance fluctuations in time-series queries. Performance in time-series queries is sensitive to page_size because large pages can disable bulk loading and constant block optimizations, while smaller pages incur more overhead. Additionally, the page_size should be a multiple of the numeric block size of tsdb codec to maximize these optimizations. This change proposes using jumbo_size (defaulting to the larger of 1/1000th of the heap or 1MB) to calculate page_size, then adjusting it to be a multiple of 128 and capping it at 2048. Benchmarks show that 2048 is a good upper limit, balancing the overhead per page with the benefits of bulk loading and constant blocks.
1 parent 202f444 commit 793b0ae

File tree

9 files changed

+103
-24
lines changed

9 files changed

+103
-24
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
* such as slice index and future max timestamp, to allow downstream operators to optimize processing.
2323
*/
2424
public final class TimeSeriesSourceOperator extends LuceneSourceOperator {
25+
private static final int MAX_TARGET_PAGE_SIZE = 2048;
26+
private static final int CHUNK_SIZE = 128;
2527

2628
public static final class Factory extends LuceneSourceOperator.Factory {
2729
public Factory(
@@ -76,4 +78,17 @@ protected void buildMetadataBlocks(Block[] blocks, int offset, int currentPagePo
7678
blocks[offset] = blockFactory.newConstantIntVector(currentSlice.slicePosition(), currentPagePos).asBlock();
7779
blocks[offset + 1] = blockFactory.newConstantLongVector(Long.MAX_VALUE, currentPagePos).asBlock();
7880
}
81+
82+
/**
83+
* For time-series queries, try to use a page size that is a multiple of CHUNK_SIZE (see NUMERIC_BLOCK_SIZE in the tsdb codec)
84+
* to enable bulk loading of numeric or tsid fields. Avoid pages that are too large, as this can disable bulk loading if there are
85+
* holes in the doc IDs and disable constant block optimizations. Therefore, we cap the page size at 2048, which balances the
86+
* overhead per page with the benefits of bulk loading and constant blocks.
87+
*/
88+
public static int pageSize(long estimateRowSizeInBytes, long maxPageSizeInBytes) {
89+
long chunkSizeInBytes = CHUNK_SIZE * estimateRowSizeInBytes;
90+
long numChunks = Math.ceilDiv(maxPageSizeInBytes, chunkSizeInBytes);
91+
long pageSize = Math.clamp(numChunks * CHUNK_SIZE, CHUNK_SIZE, MAX_TARGET_PAGE_SIZE);
92+
return Math.toIntExact(pageSize);
93+
}
7994
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ else if (aggregatorMode.isOutputPartial()) {
179179
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
180180
aggregatorMode,
181181
aggregatorFactories,
182-
context.pageSize(aggregateExec.estimatedRowSize()),
182+
context.pageSize(aggregateExec, aggregateExec.estimatedRowSize()),
183183
analysisRegistry
184184
);
185185
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
301301
querySupplier(esQueryExec.query()),
302302
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
303303
context.queryPragmas().taskConcurrency(),
304-
context.pageSize(rowEstimatedSize),
304+
context.pageSize(esQueryExec, rowEstimatedSize),
305305
limit,
306306
sortBuilders,
307307
estimatedPerRowSortSize,
@@ -312,17 +312,17 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
312312
shardContexts,
313313
querySupplier(esQueryExec.queryBuilderAndTags()),
314314
context.queryPragmas().taskConcurrency(),
315-
context.pageSize(rowEstimatedSize),
315+
context.pageSize(esQueryExec, rowEstimatedSize),
316316
limit
317317
);
318318
} else {
319319
luceneFactory = new LuceneSourceOperator.Factory(
320320
shardContexts,
321321
querySupplier(esQueryExec.queryBuilderAndTags()),
322322
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
323-
context.autoPartitioningStrategy().get(),
323+
context.autoPartitioningStrategy(),
324324
context.queryPragmas().taskConcurrency(),
325-
context.pageSize(rowEstimatedSize),
325+
context.pageSize(esQueryExec, rowEstimatedSize),
326326
limit,
327327
scoring
328328
);
@@ -408,7 +408,7 @@ public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
408408
groupSpecs,
409409
aggregatorMode,
410410
aggregatorFactories,
411-
context.pageSize(ts.estimatedRowSize())
411+
context.pageSize(ts, ts.estimatedRowSize())
412412
);
413413
}
414414

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.compute.data.Page;
2222
import org.elasticsearch.compute.lucene.DataPartitioning;
2323
import org.elasticsearch.compute.lucene.LuceneOperator;
24+
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
2425
import org.elasticsearch.compute.operator.ChangePointOperator;
2526
import org.elasticsearch.compute.operator.ColumnExtractOperator;
2627
import org.elasticsearch.compute.operator.ColumnLoadOperator;
@@ -204,8 +205,13 @@ public LocalExecutionPlanner(
204205
/**
205206
* turn the given plan into a list of drivers to execute
206207
*/
207-
public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) {
208-
208+
public LocalExecutionPlan plan(
209+
String description,
210+
FoldContext foldCtx,
211+
PlannerSettings plannerSettings,
212+
PhysicalPlan localPhysicalPlan
213+
) {
214+
final boolean timeSeries = localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec);
209215
var context = new LocalExecutionPlannerContext(
210216
description,
211217
new ArrayList<>(),
@@ -214,12 +220,8 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
214220
bigArrays,
215221
blockFactory,
216222
foldCtx,
217-
settings,
218-
new Holder<>(
219-
localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec)
220-
? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES
221-
: DataPartitioning.AutoStrategy.DEFAULT
222-
),
223+
plannerSettings,
224+
timeSeries,
223225
shardContexts
224226
);
225227

@@ -511,7 +513,7 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
511513
throw new EsqlIllegalArgumentException("limit only supported with literal values");
512514
}
513515
return source.with(
514-
new TopNOperatorFactory(limit, asList(elementTypes), asList(encoders), orders, context.pageSize(rowSize)),
516+
new TopNOperatorFactory(limit, asList(elementTypes), asList(encoders), orders, context.pageSize(topNExec, rowSize)),
515517
source.layout
516518
);
517519
}
@@ -1032,8 +1034,8 @@ public record LocalExecutionPlannerContext(
10321034
BigArrays bigArrays,
10331035
BlockFactory blockFactory,
10341036
FoldContext foldCtx,
1035-
Settings settings,
1036-
Holder<DataPartitioning.AutoStrategy> autoPartitioningStrategy,
1037+
PlannerSettings plannerSettings,
1038+
boolean timeSeries,
10371039
List<EsPhysicalOperationProviders.ShardContext> shardContexts
10381040
) {
10391041
void addDriverFactory(DriverFactory driverFactory) {
@@ -1044,7 +1046,11 @@ void driverParallelism(DriverParallelism parallelism) {
10441046
driverParallelism.set(parallelism);
10451047
}
10461048

1047-
int pageSize(Integer estimatedRowSize) {
1049+
DataPartitioning.AutoStrategy autoPartitioningStrategy() {
1050+
return timeSeries ? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES : DataPartitioning.AutoStrategy.DEFAULT;
1051+
}
1052+
1053+
int pageSize(PhysicalPlan node, Integer estimatedRowSize) {
10481054
if (estimatedRowSize == null) {
10491055
throw new IllegalStateException("estimated row size hasn't been set");
10501056
}
@@ -1054,7 +1060,11 @@ int pageSize(Integer estimatedRowSize) {
10541060
if (queryPragmas.pageSize() != 0) {
10551061
return queryPragmas.pageSize();
10561062
}
1057-
return Math.max(SourceOperator.MIN_TARGET_PAGE_SIZE, SourceOperator.TARGET_PAGE_SIZE / estimatedRowSize);
1063+
if (timeSeries && node instanceof EsQueryExec) {
1064+
return TimeSeriesSourceOperator.pageSize(estimatedRowSize, plannerSettings.valuesLoadingJumboSize().getBytes());
1065+
} else {
1066+
return Math.max(SourceOperator.MIN_TARGET_PAGE_SIZE, SourceOperator.TARGET_PAGE_SIZE / estimatedRowSize);
1067+
}
10581068
}
10591069
}
10601070

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) {
648648
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
649649
// it's doing this in the planning of EsQueryExec (the source of the data)
650650
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
651-
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.description(), context.foldCtx(), localPlan);
651+
var localExecutionPlan = planner.plan(context.description(), context.foldCtx(), plannerSettings, localPlan);
652652
if (LOGGER.isDebugEnabled()) {
653653
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
654654
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,7 @@ void executeSubPlan(
724724
LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan(
725725
"final",
726726
foldCtx,
727+
TEST_PLANNER_SETTINGS,
727728
new OutputExec(coordinatorPlan, collectedPages::add)
728729
);
729730
drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(getTestName()));
@@ -745,7 +746,12 @@ void executeSubPlan(
745746
throw new AssertionError("expected no failure", e);
746747
})
747748
);
748-
LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan("data", foldCtx, csvDataNodePhysicalPlan);
749+
LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan(
750+
"data",
751+
foldCtx,
752+
EsqlTestUtils.TEST_PLANNER_SETTINGS,
753+
csvDataNodePhysicalPlan
754+
);
749755

750756
drivers.addAll(dataNodeExecutionPlan.createDrivers(getTestName()));
751757
Randomness.shuffle(drivers);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@
136136
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
137137
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
138138
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
139+
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
139140
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
140141
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
141142
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
@@ -247,6 +248,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
247248
private TestDataSource countriesBboxWeb; // cartesian_shape field tests
248249

249250
private final Configuration config;
251+
private PlannerSettings plannerSettings;
250252

251253
private record TestDataSource(Map<String, EsField> mapping, EsIndex index, Analyzer analyzer, SearchStats stats) {}
252254

@@ -360,6 +362,7 @@ public void init() {
360362
functionRegistry,
361363
enrichResolution
362364
);
365+
this.plannerSettings = TEST_PLANNER_SETTINGS;
363366
}
364367

365368
TestDataSource makeTestDataSource(
@@ -7878,7 +7881,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP
78787881
List.of()
78797882
);
78807883

7881-
return planner.plan("test", FoldContext.small(), plan);
7884+
return planner.plan("test", FoldContext.small(), plannerSettings, plan);
78827885
}
78837886

78847887
private List<Set<String>> findFieldNamesInLookupJoinDescription(LocalExecutionPlanner.LocalExecutionPlan physicalOperations) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import org.apache.lucene.tests.index.RandomIndexWriter;
2020
import org.elasticsearch.cluster.ClusterName;
2121
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.unit.ByteSizeValue;
2223
import org.elasticsearch.common.util.BigArrays;
24+
import org.elasticsearch.compute.aggregation.AggregatorMode;
25+
import org.elasticsearch.compute.lucene.DataPartitioning;
2326
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
2427
import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
2528
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
@@ -42,6 +45,7 @@
4245
import org.elasticsearch.plugins.Plugin;
4346
import org.elasticsearch.search.internal.AliasFilter;
4447
import org.elasticsearch.search.internal.ContextIndexSearcher;
48+
import org.elasticsearch.xpack.esql.core.expression.Alias;
4549
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
4650
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
4751
import org.elasticsearch.xpack.esql.core.expression.Literal;
@@ -51,9 +55,11 @@
5155
import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
5256
import org.elasticsearch.xpack.esql.core.util.StringUtils;
5357
import org.elasticsearch.xpack.esql.expression.Order;
58+
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
5459
import org.elasticsearch.xpack.esql.index.EsIndex;
5560
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
5661
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
62+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
5763
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
5864
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
5965
import org.elasticsearch.xpack.esql.session.Configuration;
@@ -125,6 +131,7 @@ public void testLuceneSourceOperatorHugeRowSize() throws IOException {
125131
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
126132
"test",
127133
FoldContext.small(),
134+
TEST_PLANNER_SETTINGS,
128135
new EsQueryExec(
129136
Source.EMPTY,
130137
index().name(),
@@ -156,6 +163,7 @@ public void testLuceneTopNSourceOperator() throws IOException {
156163
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
157164
"test",
158165
FoldContext.small(),
166+
TEST_PLANNER_SETTINGS,
159167
new EsQueryExec(
160168
Source.EMPTY,
161169
index().name(),
@@ -187,6 +195,7 @@ public void testLuceneTopNSourceOperatorDistanceSort() throws IOException {
187195
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
188196
"test",
189197
FoldContext.small(),
198+
TEST_PLANNER_SETTINGS,
190199
new EsQueryExec(
191200
Source.EMPTY,
192201
index().name(),
@@ -211,6 +220,7 @@ public void testDriverClusterAndNodeName() throws IOException {
211220
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
212221
"test",
213222
FoldContext.small(),
223+
TEST_PLANNER_SETTINGS,
214224
new EsQueryExec(
215225
Source.EMPTY,
216226
index().name(),
@@ -244,6 +254,41 @@ public void testPlanUnmappedFieldExtractSyntheticSource() throws Exception {
244254
assertThat(blockLoader, instanceOf(FallbackSyntheticSourceBlockLoader.class));
245255
}
246256

257+
public void testTimeSeries() throws IOException {
258+
int estimatedRowSize = estimatedRowSizeIsHuge ? randomIntBetween(20000, Integer.MAX_VALUE) : randomIntBetween(1, 50);
259+
EsQueryExec queryExec = new EsQueryExec(
260+
Source.EMPTY,
261+
index().name(),
262+
IndexMode.STANDARD,
263+
index().indexNameWithModes(),
264+
List.of(),
265+
new Literal(Source.EMPTY, 10, DataType.INTEGER),
266+
List.of(),
267+
estimatedRowSize,
268+
List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of()))
269+
);
270+
TimeSeriesAggregateExec aggExec = new TimeSeriesAggregateExec(
271+
Source.EMPTY,
272+
queryExec,
273+
List.of(),
274+
List.of(new Alias(Source.EMPTY, "count(*)", new Count(Source.EMPTY, Literal.keyword(Source.EMPTY, "*")))),
275+
AggregatorMode.SINGLE,
276+
List.of(),
277+
10,
278+
null
279+
);
280+
PlannerSettings plannerSettings = new PlannerSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000, ByteSizeValue.ofMb(1));
281+
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), plannerSettings, aggExec);
282+
assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency()));
283+
LocalExecutionPlanner.DriverSupplier supplier = plan.driverFactories.get(0).driverSupplier();
284+
var factory = (LuceneSourceOperator.Factory) supplier.physicalOperation().sourceOperatorFactory;
285+
if (estimatedRowSizeIsHuge) {
286+
assertThat(factory.maxPageSize(), equalTo(128));
287+
} else {
288+
assertThat(factory.maxPageSize(), equalTo(2048));
289+
}
290+
}
291+
247292
private BlockLoader constructBlockLoader() throws IOException {
248293
EsQueryExec queryExec = new EsQueryExec(
249294
Source.EMPTY,
@@ -264,7 +309,7 @@ private BlockLoader constructBlockLoader() throws IOException {
264309
),
265310
MappedFieldType.FieldExtractPreference.NONE
266311
);
267-
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), fieldExtractExec);
312+
var plan = planner().plan("test", FoldContext.small(), TEST_PLANNER_SETTINGS, fieldExtractExec);
268313
var p = plan.driverFactories.get(0).driverSupplier().physicalOperation();
269314
var fieldInfo = ((ValuesSourceReaderOperator.Factory) p.intermediateOperatorFactories.get(0)).fields().get(0);
270315
return fieldInfo.blockLoader().apply(0);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
137137
groupSpecs,
138138
aggregatorMode,
139139
aggregatorFactories,
140-
context.pageSize(ts.estimatedRowSize())
140+
context.pageSize(ts, ts.estimatedRowSize())
141141
);
142142
}
143143

0 commit comments

Comments
 (0)