Skip to content

Commit 632478d

Browse files
dnhatnjoshua-adams-1
authored andcommitted
Run field extraction concurrently in TS (elastic#128643)
This change extracts the field extraction logic previously run inside `TimeSeriesSourceOperator` into a separate operator, executed in a separate driver. We should consider consolidating this operator with `ValuesSourceReaderOperator`. I tried to extend `ValuesSourceReaderOperator` to support this, but it may take some time to complete. Our current goal is to move quickly with experimental support for querying time-series data in ES|QL, so I am proceeding with a separate operator. I will spend more time on combining these two operators later. With elastic#128419 and this PR, the query time for the example below decreased from 41ms to 27ms. ``` POST /_query { "profile": true, "query": "TS metrics-hostmetricsreceiver.otel-default | WHERE @timestamp >= \"2025-05-08T18:00:08.001Z\" | STATS cpu = avg(rate(`metrics.process.cpu.time`)) BY host.name, BUCKET(@timestamp, 5 minute)" } ```
1 parent fac84e9 commit 632478d

File tree

12 files changed

+556
-450
lines changed

12 files changed

+556
-450
lines changed
Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.lucene;
9+
10+
import org.apache.lucene.index.IndexReader;
11+
import org.apache.lucene.index.LeafReaderContext;
12+
import org.apache.lucene.index.SortedDocValues;
13+
import org.apache.lucene.util.BytesRef;
14+
import org.elasticsearch.compute.data.Block;
15+
import org.elasticsearch.compute.data.BlockFactory;
16+
import org.elasticsearch.compute.data.BytesRefBlock;
17+
import org.elasticsearch.compute.data.BytesRefVector;
18+
import org.elasticsearch.compute.data.DocBlock;
19+
import org.elasticsearch.compute.data.DocVector;
20+
import org.elasticsearch.compute.data.IntVector;
21+
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
22+
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
23+
import org.elasticsearch.compute.data.Page;
24+
import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
25+
import org.elasticsearch.compute.operator.DriverContext;
26+
import org.elasticsearch.compute.operator.Operator;
27+
import org.elasticsearch.core.Releasable;
28+
import org.elasticsearch.core.Releasables;
29+
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
30+
import org.elasticsearch.index.mapper.BlockLoader;
31+
import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
32+
import org.elasticsearch.index.mapper.SourceLoader;
33+
import org.elasticsearch.search.fetch.StoredFieldsSpec;
34+
35+
import java.io.IOException;
36+
import java.io.UncheckedIOException;
37+
import java.util.Arrays;
38+
import java.util.List;
39+
40+
/**
41+
* A variant of {@link ValuesSourceReaderOperator} for extracting fields in time-series indices. The differences are:
42+
* 1. Caches all segments of the last shard instead of only the last segment, since data in time-series can come from
43+
* any segment at any time
44+
* 2. Although docs do not arrive in the global order (by shard, then segment, then docId), they are still sorted
45+
* within each segment; hence, this reader does not perform sorting and regrouping, which are expensive.
46+
* 3. For dimension fields, values are read only once per tsid.
47+
* These changes are made purely for performance reasons. We should look into consolidating this operator with
48+
* {@link ValuesSourceReaderOperator} by adding some metadata to the {@link DocVector} and handling them accordingly.
49+
*/
50+
public class TimeSeriesExtractFieldOperator extends AbstractPageMappingOperator {
51+
52+
public record Factory(List<ValuesSourceReaderOperator.FieldInfo> fields, List<? extends ShardContext> shardContexts)
53+
implements
54+
OperatorFactory {
55+
@Override
56+
public Operator get(DriverContext driverContext) {
57+
return new TimeSeriesExtractFieldOperator(driverContext.blockFactory(), fields, shardContexts);
58+
}
59+
60+
@Override
61+
public String describe() {
62+
StringBuilder sb = new StringBuilder();
63+
sb.append("TimeSeriesExtractFieldOperator[fields = [");
64+
if (fields.size() < 10) {
65+
boolean first = true;
66+
for (var f : fields) {
67+
if (first) {
68+
first = false;
69+
} else {
70+
sb.append(", ");
71+
}
72+
sb.append(f.name());
73+
}
74+
} else {
75+
sb.append(fields.size()).append(" fields");
76+
}
77+
return sb.append("]]").toString();
78+
}
79+
}
80+
81+
private final BlockFactory blockFactory;
82+
private final List<ValuesSourceReaderOperator.FieldInfo> fields;
83+
private final List<? extends ShardContext> shardContexts;
84+
85+
private ShardLevelFieldsReader fieldsReader;
86+
87+
public TimeSeriesExtractFieldOperator(
88+
BlockFactory blockFactory,
89+
List<ValuesSourceReaderOperator.FieldInfo> fields,
90+
List<? extends ShardContext> shardContexts
91+
) {
92+
this.blockFactory = blockFactory;
93+
this.fields = fields;
94+
this.shardContexts = shardContexts;
95+
}
96+
97+
private OrdinalBytesRefVector getTsid(Page page, int channel) {
98+
BytesRefBlock block = page.getBlock(channel);
99+
OrdinalBytesRefBlock ordinals = block.asOrdinals();
100+
if (ordinals == null) {
101+
throw new IllegalArgumentException("tsid must be an ordinals block, got: " + block.getClass().getName());
102+
}
103+
OrdinalBytesRefVector vector = ordinals.asVector();
104+
if (vector == null) {
105+
throw new IllegalArgumentException("tsid must be an ordinals vector, got: " + block.getClass().getName());
106+
}
107+
return vector;
108+
}
109+
110+
private DocVector getDocVector(Page page, int channel) {
111+
DocBlock docBlock = page.getBlock(channel);
112+
DocVector docVector = docBlock.asVector();
113+
if (docVector == null) {
114+
throw new IllegalArgumentException("doc must be a doc vector, got: " + docBlock.getClass().getName());
115+
}
116+
return docVector;
117+
}
118+
119+
@Override
120+
protected Page process(Page page) {
121+
try {
122+
return processUnchecked(page);
123+
} catch (IOException e) {
124+
throw new UncheckedIOException(e);
125+
}
126+
}
127+
128+
private Page processUnchecked(Page page) throws IOException {
129+
DocVector docVector = getDocVector(page, 0);
130+
IntVector shards = docVector.shards();
131+
if (shards.isConstant() == false) {
132+
throw new IllegalArgumentException("shards must be a constant vector, got: " + shards.getClass().getName());
133+
}
134+
OrdinalBytesRefVector tsidVector = getTsid(page, 1);
135+
IntVector tsidOrdinals = tsidVector.getOrdinalsVector();
136+
int shardIndex = shards.getInt(0);
137+
if (fieldsReader == null || fieldsReader.shardIndex != shardIndex) {
138+
Releasables.close(fieldsReader);
139+
fieldsReader = new ShardLevelFieldsReader(shardIndex, blockFactory, shardContexts.get(shardIndex), fields);
140+
}
141+
fieldsReader.prepareForReading(page.getPositionCount());
142+
IntVector docs = docVector.docs();
143+
IntVector segments = docVector.segments();
144+
int lastTsidOrdinal = -1;
145+
for (int p = 0; p < docs.getPositionCount(); p++) {
146+
int doc = docs.getInt(p);
147+
int segment = segments.getInt(p);
148+
int tsidOrd = tsidOrdinals.getInt(p);
149+
if (tsidOrd == lastTsidOrdinal) {
150+
fieldsReader.readValues(segment, doc, true);
151+
} else {
152+
fieldsReader.readValues(segment, doc, false);
153+
lastTsidOrdinal = tsidOrd;
154+
}
155+
}
156+
Block[] blocks = new Block[fields.size()];
157+
Page result = null;
158+
try {
159+
fieldsReader.buildBlocks(blocks, tsidOrdinals);
160+
result = page.appendBlocks(blocks);
161+
return result;
162+
} finally {
163+
if (result == null) {
164+
Releasables.close(blocks);
165+
}
166+
}
167+
}
168+
169+
@Override
170+
public String toString() {
171+
StringBuilder sb = new StringBuilder();
172+
sb.append("TimeSeriesExtractFieldOperator[fields = [");
173+
if (fields.size() < 10) {
174+
boolean first = true;
175+
for (var f : fields) {
176+
if (first) {
177+
first = false;
178+
} else {
179+
sb.append(", ");
180+
}
181+
sb.append(f.name());
182+
}
183+
} else {
184+
sb.append(fields.size()).append(" fields");
185+
}
186+
return sb.append("]]").toString();
187+
}
188+
189+
@Override
190+
public void close() {
191+
Releasables.close(fieldsReader, super::close);
192+
}
193+
194+
static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory {
195+
BlockLoaderFactory(BlockFactory factory) {
196+
super(factory);
197+
}
198+
199+
@Override
200+
public BlockLoader.Block constantNulls() {
201+
throw new UnsupportedOperationException("must not be used by column readers");
202+
}
203+
204+
@Override
205+
public BlockLoader.Block constantBytes(BytesRef value) {
206+
throw new UnsupportedOperationException("must not be used by column readers");
207+
}
208+
209+
@Override
210+
public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
211+
throw new UnsupportedOperationException("must not be used by column readers");
212+
}
213+
}
214+
215+
static final class ShardLevelFieldsReader implements Releasable {
216+
final int shardIndex;
217+
private final BlockLoaderFactory blockFactory;
218+
private final SegmentLevelFieldsReader[] segments;
219+
private final BlockLoader[] loaders;
220+
private final boolean[] dimensions;
221+
private final Block.Builder[] builders;
222+
private final StoredFieldsSpec storedFieldsSpec;
223+
private final SourceLoader sourceLoader;
224+
225+
ShardLevelFieldsReader(
226+
int shardIndex,
227+
BlockFactory blockFactory,
228+
ShardContext shardContext,
229+
List<ValuesSourceReaderOperator.FieldInfo> fields
230+
) {
231+
this.shardIndex = shardIndex;
232+
this.blockFactory = new BlockLoaderFactory(blockFactory);
233+
final IndexReader indexReader = shardContext.searcher().getIndexReader();
234+
this.segments = new SegmentLevelFieldsReader[indexReader.leaves().size()];
235+
this.loaders = new BlockLoader[fields.size()];
236+
this.builders = new Block.Builder[loaders.length];
237+
StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
238+
for (int i = 0; i < fields.size(); i++) {
239+
BlockLoader loader = fields.get(i).blockLoader().apply(shardIndex);
240+
storedFieldsSpec = storedFieldsSpec.merge(loader.rowStrideStoredFieldSpec());
241+
loaders[i] = loader;
242+
}
243+
for (int i = 0; i < indexReader.leaves().size(); i++) {
244+
LeafReaderContext leafReaderContext = indexReader.leaves().get(i);
245+
segments[i] = new SegmentLevelFieldsReader(leafReaderContext, loaders);
246+
}
247+
if (storedFieldsSpec.requiresSource()) {
248+
sourceLoader = shardContext.newSourceLoader();
249+
storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(false, false, sourceLoader.requiredStoredFields()));
250+
} else {
251+
sourceLoader = null;
252+
}
253+
this.storedFieldsSpec = storedFieldsSpec;
254+
this.dimensions = new boolean[fields.size()];
255+
for (int i = 0; i < fields.size(); i++) {
256+
dimensions[i] = shardContext.fieldType(fields.get(i).name()).isDimension();
257+
}
258+
}
259+
260+
/**
261+
* For dimension fields, skips reading them when {@code nonDimensionFieldsOnly} is true,
262+
* since they only need to be read once per tsid.
263+
*/
264+
void readValues(int segment, int docID, boolean nonDimensionFieldsOnly) throws IOException {
265+
segments[segment].read(docID, builders, nonDimensionFieldsOnly, dimensions);
266+
}
267+
268+
void prepareForReading(int estimatedSize) throws IOException {
269+
if (this.builders.length > 0 && this.builders[0] == null) {
270+
for (int f = 0; f < builders.length; f++) {
271+
builders[f] = (Block.Builder) loaders[f].builder(blockFactory, estimatedSize);
272+
}
273+
}
274+
for (SegmentLevelFieldsReader segment : segments) {
275+
segment.reinitializeIfNeeded(sourceLoader, storedFieldsSpec);
276+
}
277+
}
278+
279+
void buildBlocks(Block[] blocks, IntVector tsidOrdinals) {
280+
for (int i = 0; i < builders.length; i++) {
281+
if (dimensions[i]) {
282+
blocks[i] = buildBlockForDimensionField(builders[i], tsidOrdinals);
283+
} else {
284+
blocks[i] = builders[i].build();
285+
}
286+
}
287+
Arrays.fill(builders, null);
288+
}
289+
290+
private Block buildBlockForDimensionField(Block.Builder builder, IntVector tsidOrdinals) {
291+
try (var values = builder.build()) {
292+
if (values.asVector() instanceof BytesRefVector bytes) {
293+
tsidOrdinals.incRef();
294+
values.incRef();
295+
return new OrdinalBytesRefVector(tsidOrdinals, bytes).asBlock();
296+
} else if (values.areAllValuesNull()) {
297+
return blockFactory.factory.newConstantNullBlock(tsidOrdinals.getPositionCount());
298+
} else {
299+
final int positionCount = tsidOrdinals.getPositionCount();
300+
try (var newBuilder = values.elementType().newBlockBuilder(positionCount, blockFactory.factory)) {
301+
for (int p = 0; p < positionCount; p++) {
302+
int pos = tsidOrdinals.getInt(p);
303+
newBuilder.copyFrom(values, pos, pos + 1);
304+
}
305+
return newBuilder.build();
306+
}
307+
}
308+
}
309+
}
310+
311+
@Override
312+
public void close() {
313+
Releasables.close(builders);
314+
}
315+
}
316+
317+
static final class SegmentLevelFieldsReader {
318+
private final BlockLoader.RowStrideReader[] rowStride;
319+
private final BlockLoader[] loaders;
320+
private final LeafReaderContext leafContext;
321+
private BlockLoaderStoredFieldsFromLeafLoader storedFields;
322+
private Thread loadedThread = null;
323+
324+
SegmentLevelFieldsReader(LeafReaderContext leafContext, BlockLoader[] loaders) {
325+
this.leafContext = leafContext;
326+
this.loaders = loaders;
327+
this.rowStride = new BlockLoader.RowStrideReader[loaders.length];
328+
}
329+
330+
private void reinitializeIfNeeded(SourceLoader sourceLoader, StoredFieldsSpec storedFieldsSpec) throws IOException {
331+
final Thread currentThread = Thread.currentThread();
332+
if (loadedThread != currentThread) {
333+
loadedThread = currentThread;
334+
for (int f = 0; f < loaders.length; f++) {
335+
rowStride[f] = loaders[f].rowStrideReader(leafContext);
336+
}
337+
storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
338+
StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(leafContext, null),
339+
sourceLoader != null ? sourceLoader.leaf(leafContext.reader(), null) : null
340+
);
341+
}
342+
}
343+
344+
void read(int docId, Block.Builder[] builder, boolean nonDimensionFieldsOnly, boolean[] dimensions) throws IOException {
345+
storedFields.advanceTo(docId);
346+
if (nonDimensionFieldsOnly) {
347+
for (int i = 0; i < rowStride.length; i++) {
348+
if (dimensions[i] == false) {
349+
rowStride[i].read(docId, storedFields, builder[i]);
350+
}
351+
}
352+
} else {
353+
for (int i = 0; i < rowStride.length; i++) {
354+
rowStride[i].read(docId, storedFields, builder[i]);
355+
}
356+
}
357+
}
358+
}
359+
}

0 commit comments

Comments
 (0)