Skip to content

Commit c94f366

Browse files
authored
ES|QL: Add FUSE operator tests (elastic#135307)
1 parent ee2327b commit c94f366

File tree

17 files changed

+447
-69
lines changed

17 files changed

+447
-69
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9171000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
transform_check_for_dangling_tasks,9170000
1+
esql_fuse_linear_operator_status,9171000

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/LinearScoreEvalOperator.java

Lines changed: 121 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,22 @@
88
package org.elasticsearch.compute.operator.fuse;
99

1010
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.TransportVersion;
12+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
1115
import org.elasticsearch.compute.data.Block;
1216
import org.elasticsearch.compute.data.BytesRefBlock;
1317
import org.elasticsearch.compute.data.DoubleVector;
1418
import org.elasticsearch.compute.data.DoubleVectorBlock;
1519
import org.elasticsearch.compute.data.Page;
1620
import org.elasticsearch.compute.operator.DriverContext;
1721
import org.elasticsearch.compute.operator.Operator;
22+
import org.elasticsearch.core.Releasables;
23+
import org.elasticsearch.core.TimeValue;
24+
import org.elasticsearch.xcontent.XContentBuilder;
1825

26+
import java.io.IOException;
1927
import java.util.ArrayDeque;
2028
import java.util.Collection;
2129
import java.util.Deque;
@@ -60,6 +68,12 @@ public String describe() {
6068
private final Deque<Page> outputPages;
6169
private boolean finished;
6270

71+
private long emitNanos;
72+
private int pagesReceived = 0;
73+
private int pagesProcessed = 0;
74+
private long rowsReceived = 0;
75+
private long rowsEmitted = 0;
76+
6377
public LinearScoreEvalOperator(int discriminatorPosition, int scorePosition, LinearConfig config) {
6478
this.scorePosition = scorePosition;
6579
this.discriminatorPosition = discriminatorPosition;
@@ -79,6 +93,8 @@ public boolean needsInput() {
7993
@Override
8094
public void addInput(Page page) {
8195
inputPages.add(page);
96+
pagesReceived++;
97+
rowsReceived += page.getPositionCount();
8298
}
8399

84100
@Override
@@ -90,35 +106,58 @@ public void finish() {
90106
}
91107

92108
private void createOutputPages() {
109+
final var emitStart = System.nanoTime();
93110
normalizer.preprocess(inputPages, scorePosition, discriminatorPosition);
111+
try {
112+
while (inputPages.isEmpty() == false) {
113+
Page inputPage = inputPages.peek();
114+
processInputPage(inputPage);
115+
inputPages.removeFirst();
116+
pagesProcessed += 1;
117+
}
118+
} finally {
119+
emitNanos = System.nanoTime() - emitStart;
120+
Releasables.close(inputPages);
121+
}
122+
}
94123

95-
while (inputPages.isEmpty() == false) {
96-
Page inputPage = inputPages.peek();
124+
private void processInputPage(Page inputPage) {
125+
BytesRefBlock discriminatorBlock = inputPage.getBlock(discriminatorPosition);
126+
DoubleVectorBlock initialScoreBlock = inputPage.getBlock(scorePosition);
97127

98-
BytesRefBlock discriminatorBlock = inputPage.getBlock(discriminatorPosition);
99-
DoubleVectorBlock initialScoreBlock = inputPage.getBlock(scorePosition);
128+
Page newPage = null;
129+
Block scoreBlock = null;
130+
DoubleVector.Builder scores = null;
100131

101-
DoubleVector.Builder scores = discriminatorBlock.blockFactory().newDoubleVectorBuilder(discriminatorBlock.getPositionCount());
132+
try {
133+
scores = discriminatorBlock.blockFactory().newDoubleVectorBuilder(discriminatorBlock.getPositionCount());
102134

103135
for (int i = 0; i < inputPage.getPositionCount(); i++) {
104136
String discriminator = discriminatorBlock.getBytesRef(i, new BytesRef()).utf8ToString();
105137

106138
var weight = config.weights().get(discriminator) == null ? 1.0 : config.weights().get(discriminator);
107139

108-
Double score = initialScoreBlock.getDouble(i);
140+
double score = initialScoreBlock.getDouble(i);
109141
scores.appendDouble(weight * normalizer.normalize(score, discriminator));
110142
}
111-
Block scoreBlock = scores.build().asBlock();
112-
inputPage = inputPage.appendBlock(scoreBlock);
113143

114-
int[] projections = new int[inputPage.getBlockCount() - 1];
144+
scoreBlock = scores.build().asBlock();
145+
newPage = inputPage.appendBlock(scoreBlock);
146+
147+
int[] projections = new int[newPage.getBlockCount() - 1];
115148

116-
for (int i = 0; i < inputPage.getBlockCount() - 1; i++) {
117-
projections[i] = i == scorePosition ? inputPage.getBlockCount() - 1 : i;
149+
for (int i = 0; i < newPage.getBlockCount() - 1; i++) {
150+
projections[i] = i == scorePosition ? newPage.getBlockCount() - 1 : i;
151+
}
152+
153+
outputPages.add(newPage.projectBlocks(projections));
154+
} finally {
155+
if (newPage != null) {
156+
newPage.releaseBlocks();
157+
}
158+
if (scoreBlock == null && scores != null) {
159+
Releasables.close(scores);
118160
}
119-
inputPages.removeFirst();
120-
outputPages.add(inputPage.projectBlocks(projections));
121-
inputPage.releaseBlocks();
122161
}
123162
}
124163

@@ -132,7 +171,11 @@ public Page getOutput() {
132171
if (finished == false || outputPages.isEmpty()) {
133172
return null;
134173
}
135-
return outputPages.removeFirst();
174+
175+
Page page = outputPages.removeFirst();
176+
rowsEmitted += page.getPositionCount();
177+
178+
return page;
136179
}
137180

138181
@Override
@@ -156,6 +199,69 @@ public String toString() {
156199
+ "]";
157200
}
158201

202+
@Override
203+
public Operator.Status status() {
204+
return new Status(emitNanos, pagesReceived, pagesProcessed, rowsReceived, rowsEmitted);
205+
}
206+
207+
public record Status(long emitNanos, int pagesReceived, int pagesProcessed, long rowsReceived, long rowsEmitted)
208+
implements
209+
Operator.Status {
210+
211+
public static final TransportVersion ESQL_FUSE_LINEAR_OPERATOR_STATUS = TransportVersion.fromName(
212+
"esql_fuse_linear_operator_status"
213+
);
214+
215+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
216+
Operator.Status.class,
217+
"linearScoreEval",
218+
Status::new
219+
);
220+
221+
Status(StreamInput streamInput) throws IOException {
222+
this(streamInput.readLong(), streamInput.readInt(), streamInput.readInt(), streamInput.readLong(), streamInput.readLong());
223+
}
224+
225+
@Override
226+
public String getWriteableName() {
227+
return ENTRY.name;
228+
}
229+
230+
@Override
231+
public boolean supportsVersion(TransportVersion version) {
232+
return version.supports(ESQL_FUSE_LINEAR_OPERATOR_STATUS);
233+
}
234+
235+
@Override
236+
public TransportVersion getMinimalSupportedVersion() {
237+
assert false : "must not be called when overriding supportsVersion";
238+
throw new UnsupportedOperationException("must not be called when overriding supportsVersion");
239+
}
240+
241+
@Override
242+
public void writeTo(StreamOutput out) throws IOException {
243+
out.writeLong(emitNanos);
244+
out.writeInt(pagesReceived);
245+
out.writeInt(pagesProcessed);
246+
out.writeLong(rowsReceived);
247+
out.writeLong(rowsEmitted);
248+
}
249+
250+
@Override
251+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
252+
builder.startObject();
253+
builder.field("emit_nanos", emitNanos);
254+
if (builder.humanReadable()) {
255+
builder.field("emit_time", TimeValue.timeValueNanos(emitNanos));
256+
}
257+
builder.field("pages_received", pagesReceived);
258+
builder.field("pages_processed", pagesProcessed);
259+
builder.field("rows_received", rowsReceived);
260+
builder.field("rows_emitted", rowsEmitted);
261+
return builder.endObject();
262+
}
263+
}
264+
159265
private Normalizer createNormalizer(LinearConfig.Normalizer normalizer) {
160266
return switch (normalizer) {
161267
case NONE -> new NoneNormalizer();

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/fuse/RrfScoreEvalOperator.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
1616
import org.elasticsearch.compute.operator.DriverContext;
1717
import org.elasticsearch.compute.operator.Operator;
18+
import org.elasticsearch.core.Releasables;
1819

1920
import java.util.HashMap;
2021

@@ -73,18 +74,30 @@ protected Page process(Page page) {
7374
scores.appendDouble(1.0 / (config.rankConstant() + rank) * weight);
7475
}
7576

76-
Block scoreBlock = scores.build().asBlock();
77-
page = page.appendBlock(scoreBlock);
77+
Page newPage = null;
78+
Block scoreBlock = null;
7879

79-
int[] projections = new int[page.getBlockCount() - 1];
80-
81-
for (int i = 0; i < page.getBlockCount() - 1; i++) {
82-
projections[i] = i == scorePosition ? page.getBlockCount() - 1 : i;
83-
}
8480
try {
85-
return page.projectBlocks(projections);
81+
scoreBlock = scores.build().asBlock();
82+
newPage = page.appendBlock(scoreBlock);
83+
84+
int[] projections = new int[newPage.getBlockCount() - 1];
85+
86+
for (int i = 0; i < newPage.getBlockCount() - 1; i++) {
87+
projections[i] = i == scorePosition ? newPage.getBlockCount() - 1 : i;
88+
}
89+
return newPage.projectBlocks(projections);
8690
} finally {
87-
page.releaseBlocks();
91+
if (newPage != null) {
92+
newPage.releaseBlocks();
93+
} else {
94+
// we never got to a point where the new page was constructed, so we need to release the initial one
95+
page.releaseBlocks();
96+
}
97+
if (scoreBlock == null) {
98+
// we never built scoreBlock, so we need to release the scores builder
99+
Releasables.close(scores);
100+
}
88101
}
89102
}
90103

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator.fuse;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.common.TriConsumer;
12+
import org.elasticsearch.compute.data.Block;
13+
import org.elasticsearch.compute.data.BlockFactory;
14+
import org.elasticsearch.compute.data.BytesRefBlock;
15+
import org.elasticsearch.compute.data.DoubleVectorBlock;
16+
import org.elasticsearch.compute.data.Page;
17+
import org.elasticsearch.compute.operator.SourceOperator;
18+
import org.elasticsearch.compute.test.AbstractBlockSourceOperator;
19+
import org.elasticsearch.compute.test.OperatorTestCase;
20+
import org.elasticsearch.core.Releasables;
21+
import org.junit.Before;
22+
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
27+
public abstract class FuseOperatorTestCase extends OperatorTestCase {
28+
protected int blocksCount;
29+
protected int discriminatorPosition;
30+
protected int scorePosition;
31+
protected int discriminatorCount;
32+
33+
@Before
34+
public void initialize() {
35+
discriminatorPosition = randomIntBetween(1, 20);
36+
scorePosition = randomIntBetween(discriminatorPosition + 1, 50);
37+
blocksCount = randomIntBetween(scorePosition + 1, 100);
38+
discriminatorCount = randomIntBetween(1, 20);
39+
}
40+
41+
protected void assertOutput(List<Page> input, List<Page> results, TriConsumer<String, Double, Double> assertScore) {
42+
assertEquals(input.size(), results.size());
43+
44+
for (int i = 0; i < results.size(); i++) {
45+
Page resultPage = results.get(i);
46+
Page initialPage = input.get(i);
47+
48+
assertEquals(initialPage.getPositionCount(), resultPage.getPositionCount());
49+
assertEquals(resultPage.getBlockCount(), blocksCount);
50+
51+
BytesRefBlock discriminatorBlock = resultPage.getBlock(discriminatorPosition);
52+
DoubleVectorBlock actualScoreBlock = resultPage.getBlock(scorePosition);
53+
DoubleVectorBlock initialScoreBlock = initialPage.getBlock(scorePosition);
54+
55+
for (int j = 0; j < resultPage.getPositionCount(); j++) {
56+
String discriminator = discriminatorBlock.getBytesRef(j, new BytesRef()).utf8ToString();
57+
double actualScore = actualScoreBlock.getDouble(j);
58+
double initialScore = initialScoreBlock.getDouble(j);
59+
assertScore.apply(discriminator, actualScore, initialScore);
60+
}
61+
}
62+
}
63+
64+
@Override
65+
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
66+
return new AbstractBlockSourceOperator(blockFactory, 8 * 1024) {
67+
@Override
68+
protected int remaining() {
69+
return size - currentPosition;
70+
}
71+
72+
@Override
73+
protected Page createPage(int positionOffset, int length) {
74+
length = Integer.min(length, remaining());
75+
Block[] blocks = new Block[blocksCount];
76+
77+
try {
78+
for (int b = 0; b < blocksCount; b++) {
79+
if (b == scorePosition) {
80+
try (var builder = blockFactory.newDoubleBlockBuilder(length)) {
81+
for (int i = 0; i < length; i++) {
82+
builder.appendDouble(randomDouble());
83+
}
84+
blocks[b] = builder.build();
85+
}
86+
} else {
87+
try (var builder = blockFactory.newBytesRefBlockBuilder(length)) {
88+
for (int i = 0; i < length; i++) {
89+
String stringInput = b == discriminatorPosition
90+
? "fork" + randomIntBetween(0, discriminatorCount)
91+
: randomAlphaOfLength(10);
92+
93+
builder.appendBytesRef(new BytesRef(stringInput));
94+
}
95+
blocks[b] = builder.build();
96+
}
97+
}
98+
}
99+
} catch (Exception e) {
100+
Releasables.closeExpectNoException(blocks);
101+
throw e;
102+
}
103+
104+
currentPosition += length;
105+
return new Page(blocks);
106+
}
107+
};
108+
}
109+
110+
protected Map<String, Double> randomWeights() {
111+
Map<String, Double> weights = new HashMap<>();
112+
for (int i = 0; i < discriminatorCount; i++) {
113+
if (randomBoolean()) {
114+
weights.put("fork" + i, randomDouble());
115+
}
116+
}
117+
return weights;
118+
}
119+
}

0 commit comments

Comments
 (0)