Skip to content

Commit 3ee3331

Browse files
authored
Late materialization of dimension fields in time-series (#135961)
This change adds an optimization rule for time-series queries that moves reading dimension fields from before the time-series operator to after, reading each dimension field once per group. This is possible because dimension field values for `_tsid` are identical across all documents in the same time-series. For example: ``` TS .. | STATS sum(rate(r1)), sum(rate(r2)) BY cluster, host, tbucket(1m) ``` Without this rule: ``` TS .. | EXTRACT_FIELDS(r1, r2, cluster, host) | STATS rate(r1), rate(r2), VALUES(cluster), VALUES(host) BY _tsid, tbucket(1m) | ... ``` With this rule: ``` TS .. | EXTRACT_FIELDS(r1, r2) | STATS rate(r1), rate(r2), FIRST_DOC_ID(_doc) BY _tsid, tbucket(1m) | EXTRACT_FIELDS(cluster, host) | ... ``` Ideally, dimension fields should be read once per _tsid in the final result, similar to the fetch phase. Currently, dimension fields are read once per group key in each pipeline; if there are multiple time buckets, dimensions for the same _tsid are read multiple times. This can be avoided by extending ValuesSourceReaderOperator to understand the ordinals of _tsid. I will follow up with this improvement later to keep this PR small.
1 parent 80b4d97 commit 3ee3331

File tree

13 files changed

+1113
-6
lines changed

13 files changed

+1113
-6
lines changed

docs/changelog/135961.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135961
2+
summary: Late materialization of dimension fields in time-series
3+
area: TSDB
4+
type: enhancement
5+
issues: []
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.compute.aggregation;
8+
9+
// begin generated imports
10+
11+
import org.apache.lucene.util.BytesRef;
12+
import org.elasticsearch.compute.data.Block;
13+
import org.elasticsearch.compute.data.BytesRefBlock;
14+
import org.elasticsearch.compute.data.BytesRefVector;
15+
import org.elasticsearch.compute.data.ElementType;
16+
import org.elasticsearch.compute.data.IntArrayBlock;
17+
import org.elasticsearch.compute.data.IntBigArrayBlock;
18+
import org.elasticsearch.compute.data.IntBlock;
19+
import org.elasticsearch.compute.data.IntVector;
20+
import org.elasticsearch.compute.data.Page;
21+
import org.elasticsearch.compute.operator.DriverContext;
22+
23+
import java.util.List;
24+
// end generated imports
25+
26+
public final class DimensionValuesByteRefGroupingAggregatorFunction implements GroupingAggregatorFunction {
27+
28+
public static final class FunctionSupplier implements AggregatorFunctionSupplier {
29+
30+
@Override
31+
public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
32+
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
33+
}
34+
35+
@Override
36+
public List<IntermediateStateDesc> groupingIntermediateStateDesc() {
37+
return INTERMEDIATE_STATE_DESC;
38+
}
39+
40+
@Override
41+
public AggregatorFunction aggregator(DriverContext driverContext, List<Integer> channels) {
42+
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
43+
}
44+
45+
@Override
46+
public DimensionValuesByteRefGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, List<Integer> channels) {
47+
return new DimensionValuesByteRefGroupingAggregatorFunction(channels, driverContext);
48+
}
49+
50+
@Override
51+
public String describe() {
52+
return "dimensions of bytes";
53+
}
54+
}
55+
56+
static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(new IntermediateStateDesc("values", ElementType.BYTES_REF));
57+
58+
private final BytesRefBlock.Builder builder;
59+
private final int channel;
60+
private final DriverContext driverContext;
61+
private int maxGroupId = -1;
62+
63+
public DimensionValuesByteRefGroupingAggregatorFunction(List<Integer> channels, DriverContext driverContext) {
64+
this.channel = channels.getFirst();
65+
this.driverContext = driverContext;
66+
this.builder = driverContext.blockFactory().newBytesRefBlockBuilder(4096);
67+
}
68+
69+
@Override
70+
public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) {
71+
// manage nulls
72+
}
73+
74+
@Override
75+
public AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, Page page) {
76+
BytesRefBlock valuesBlock = page.getBlock(0);
77+
if (valuesBlock.areAllValuesNull()) {
78+
return new AddInput() {
79+
@Override
80+
public void add(int positionOffset, IntArrayBlock groupIds) {
81+
82+
}
83+
84+
@Override
85+
public void add(int positionOffset, IntBigArrayBlock groupIds) {
86+
87+
}
88+
89+
@Override
90+
public void add(int positionOffset, IntVector groupIds) {
91+
92+
}
93+
94+
@Override
95+
public void close() {
96+
97+
}
98+
};
99+
}
100+
return new AddInput() {
101+
@Override
102+
public void add(int positionOffset, IntArrayBlock groupIds) {
103+
addInputValuesBlock(positionOffset, groupIds, valuesBlock);
104+
}
105+
106+
@Override
107+
public void add(int positionOffset, IntBigArrayBlock groupIds) {
108+
addInputValuesBlock(positionOffset, groupIds, valuesBlock);
109+
}
110+
111+
@Override
112+
public void add(int positionOffset, IntVector groupIds) {
113+
var valuesVector = valuesBlock.asVector();
114+
if (valuesVector != null) {
115+
addInputValuesVector(positionOffset, groupIds, valuesVector);
116+
} else {
117+
addInputValuesBlock(positionOffset, groupIds, valuesBlock);
118+
}
119+
}
120+
121+
@Override
122+
public void close() {
123+
124+
}
125+
};
126+
}
127+
128+
// Note that this path can be executed randomly in tests, not in production
129+
private void addInputValuesBlock(int positionOffset, IntBlock groups, BytesRefBlock valueBlock) {
130+
var scratch = new BytesRef();
131+
int positionCount = groups.getPositionCount();
132+
for (int p = 0; p < positionCount; p++) {
133+
if (groups.isNull(p)) {
134+
continue;
135+
}
136+
int valuePosition = p + positionOffset;
137+
int groupStart = groups.getFirstValueIndex(p);
138+
int groupEnd = groupStart + groups.getValueCount(p);
139+
for (int g = groupStart; g < groupEnd; g++) {
140+
final int groupId = groups.getInt(g);
141+
if (maxGroupId < groupId) {
142+
fillNullsUpTo(groupId);
143+
builder.copyFrom(valueBlock, valuePosition, scratch);
144+
maxGroupId = groupId;
145+
}
146+
}
147+
}
148+
}
149+
150+
private void addInputValuesBlock(int positionOffset, IntVector groups, BytesRefBlock valueBlock) {
151+
var scratch = new BytesRef();
152+
int positionCount = groups.getPositionCount();
153+
if (groups.isConstant()) {
154+
int groupId = groups.getInt(0);
155+
if (groupId > maxGroupId) {
156+
fillNullsUpTo(groupId);
157+
builder.copyFrom(valueBlock, positionOffset, scratch);
158+
maxGroupId = groupId;
159+
}
160+
} else {
161+
for (int p = 0; p < positionCount; p++) {
162+
int groupId = groups.getInt(p);
163+
if (groupId > maxGroupId) {
164+
fillNullsUpTo(groupId);
165+
builder.copyFrom(valueBlock, positionOffset + p, scratch);
166+
maxGroupId = groupId;
167+
}
168+
}
169+
}
170+
}
171+
172+
private void addInputValuesVector(int positionOffset, IntVector groups, BytesRefVector valueVector) {
173+
var scratch = new BytesRef();
174+
int positionCount = groups.getPositionCount();
175+
if (groups.isConstant()) {
176+
int groupId = groups.getInt(0);
177+
if (groupId > maxGroupId) {
178+
fillNullsUpTo(groupId);
179+
builder.appendBytesRef(valueVector.getBytesRef(positionOffset, scratch));
180+
maxGroupId = groupId;
181+
}
182+
} else {
183+
for (int p = 0; p < positionCount; p++) {
184+
int groupId = groups.getInt(p);
185+
if (groupId > maxGroupId) {
186+
fillNullsUpTo(groupId);
187+
builder.appendBytesRef(valueVector.getBytesRef(positionOffset + p, scratch));
188+
maxGroupId = groupId;
189+
}
190+
}
191+
}
192+
}
193+
194+
private void fillNullsUpTo(int groupId) {
195+
for (int i = maxGroupId + 1; i < groupId; i++) {
196+
builder.appendNull();
197+
}
198+
}
199+
200+
@Override
201+
public int intermediateBlockCount() {
202+
return INTERMEDIATE_STATE_DESC.size();
203+
}
204+
205+
@Override
206+
public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) {
207+
BytesRefBlock valuesBlock = page.getBlock(channel);
208+
if (valuesBlock.areAllValuesNull()) {
209+
return;
210+
}
211+
addInputValuesBlock(positionOffset, groups, valuesBlock);
212+
}
213+
214+
@Override
215+
public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) {
216+
BytesRefBlock valuesBlock = page.getBlock(channel);
217+
if (valuesBlock.areAllValuesNull()) {
218+
return;
219+
}
220+
addInputValuesBlock(positionOffset, groups, valuesBlock);
221+
}
222+
223+
@Override
224+
public void addIntermediateInput(int positionOffset, IntVector groups, Page page) {
225+
BytesRefBlock valuesBlock = page.getBlock(channel);
226+
if (valuesBlock.areAllValuesNull()) {
227+
return;
228+
}
229+
var valuesVector = valuesBlock.asVector();
230+
if (valuesVector != null) {
231+
addInputValuesVector(positionOffset, groups, valuesVector);
232+
} else {
233+
addInputValuesBlock(positionOffset, groups, valuesBlock);
234+
}
235+
}
236+
237+
@Override
238+
public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
239+
int positionCount = selected.getPositionCount();
240+
boolean allSelected = positionCount == maxGroupId + 1;
241+
if (allSelected) {
242+
for (int i = 0; i < selected.getPositionCount(); i++) {
243+
if (selected.getInt(i) == i) {
244+
allSelected = false;
245+
break;
246+
}
247+
}
248+
}
249+
if (allSelected) {
250+
blocks[offset] = builder.build();
251+
return;
252+
}
253+
BytesRef scratch = new BytesRef();
254+
try (var block = builder.build(); var outputBuilder = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) {
255+
for (int p = 0; p < positionCount; p++) {
256+
int groupId = selected.getInt(p);
257+
if (groupId <= maxGroupId) {
258+
outputBuilder.copyFrom(block, groupId, scratch);
259+
} else {
260+
outputBuilder.appendNull();
261+
}
262+
}
263+
blocks[offset] = outputBuilder.build();
264+
}
265+
}
266+
267+
@Override
268+
public void close() {
269+
builder.close();
270+
}
271+
272+
@Override
273+
public void evaluateFinal(Block[] blocks, int offset, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
274+
evaluateIntermediate(blocks, offset, selected);
275+
}
276+
277+
@Override
278+
public String toString() {
279+
StringBuilder sb = new StringBuilder();
280+
sb.append(getClass().getSimpleName()).append("[");
281+
sb.append("channels=").append(channel);
282+
sb.append("]");
283+
return sb.toString();
284+
}
285+
}

0 commit comments

Comments
 (0)