Skip to content

Commit 035ae04

Browse files
committed
Ensure InferenceOperatorTestCase execute InferenceOperation out of order.
1 parent 2387d14 commit 035ae04

File tree

2 files changed

+24
-10
lines changed

2 files changed

+24
-10
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,11 @@ private AbstractRunnable createTask(InferenceAction.Request request, ActionListe
237237
return new AbstractRunnable() {
238238
@Override
239239
protected void doRun() {
240-
inferenceRunner.doInference(request, completionListener);
240+
try {
241+
inferenceRunner.doInference(request, completionListener);
242+
} catch (Throwable e) {
243+
listener.onFailure(new RuntimeException("Unexpected failure while running inference", e));
244+
}
241245
}
242246

243247
@Override

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.common.util.concurrent.EsExecutors;
1919
import org.elasticsearch.compute.data.Block;
2020
import org.elasticsearch.compute.data.BlockFactory;
21-
import org.elasticsearch.compute.data.BlockUtils;
2221
import org.elasticsearch.compute.data.BooleanBlock;
2322
import org.elasticsearch.compute.data.BytesRefBlock;
2423
import org.elasticsearch.compute.data.DoubleBlock;
@@ -32,6 +31,7 @@
3231
import org.elasticsearch.compute.operator.SourceOperator;
3332
import org.elasticsearch.compute.test.AbstractBlockSourceOperator;
3433
import org.elasticsearch.compute.test.OperatorTestCase;
34+
import org.elasticsearch.core.TimeValue;
3535
import org.elasticsearch.inference.InferenceServiceResults;
3636
import org.elasticsearch.test.client.NoOpClient;
3737
import org.elasticsearch.threadpool.FixedExecutorBuilder;
@@ -89,7 +89,7 @@ protected int remaining() {
8989
protected Page createPage(int positionOffset, int length) {
9090
try (var builder = blockFactory.newBytesRefVectorBuilder(length)) {
9191
for (int i = 0; i < length; i++) {
92-
builder.appendBytesRef(new BytesRef(randomAlphaOfLength(10)));
92+
builder.appendBytesRef(new BytesRef(randomAlphaOfLength(1000)));
9393
}
9494
currentPosition += length;
9595
return new Page(builder.build().asBlock());
@@ -120,13 +120,21 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
120120
Request request,
121121
ActionListener<Response> listener
122122
) {
123-
if (action == InferenceAction.INSTANCE && request instanceof InferenceAction.Request inferenceRequest) {
124-
InferenceAction.Response inferenceResponse = new InferenceAction.Response(mockInferenceResult(inferenceRequest));
125-
listener.onResponse((Response) inferenceResponse);
126-
return;
127-
}
123+
Runnable runnable = () -> {
124+
if (action == InferenceAction.INSTANCE && request instanceof InferenceAction.Request inferenceRequest) {
125+
InferenceAction.Response inferenceResponse = new InferenceAction.Response(mockInferenceResult(inferenceRequest));
126+
listener.onResponse((Response) inferenceResponse);
127+
return;
128+
}
128129

129-
fail("Unexpected call to action [" + action.name() + "]");
130+
fail("Unexpected call to action [" + action.name() + "]");
131+
};
132+
133+
if (randomBoolean()) {
134+
runnable.run();
135+
} else {
136+
threadPool.schedule(runnable, TimeValue.timeValueNanos(between(1, 100)), threadPool.executor(ThreadPool.Names.SEARCH));
137+
}
130138
}
131139
};
132140

@@ -187,7 +195,9 @@ protected EvalOperator.ExpressionEvaluator.Factory evaluatorFactory(int channel)
187195
return context -> new EvalOperator.ExpressionEvaluator() {
188196
@Override
189197
public Block eval(Page page) {
190-
return BlockUtils.deepCopyOf(page.getBlock(channel), context.blockFactory());
198+
Block b = page.getBlock(channel);
199+
b.incRef();
200+
return b;
191201
}
192202

193203
@Override

0 commit comments

Comments
 (0)