Skip to content

Conversation

afoucret
Copy link
Contributor

@afoucret afoucret commented Jul 28, 2025

This PR implements a batch of evolution in the ES|QL inference runner that are required in the context of the TEXT_EMBEDDING function implementation.

Changes:

  • Full rewrite of the tasks runner to get rid of a race condition
  • Improve the way the inference resolution is working:
    • More flexible implementation, so it will be easier to resolve inference ids from plans and functions
    • Reduced footprint in the PreAnalyzer
  • InferenceOperator simplification
    • Remove the extra step in addInput
    • Simplification of the output building
    • Better test coverages (using randomized input/output columns)

Related Issue:

@afoucret afoucret marked this pull request as ready for review July 28, 2025 07:00
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Jul 28, 2025
new ResolveInference(),
new ResolveLookupTables(),
new ResolveFunctions(),
new ResolveInference(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Inference resolution is moved after function resolution, so we can resolve inference ids use in functions like TEXT_EMBEDDING

/**
* Collects and resolves inference deployments inference IDs from ES|QL logical plans.
*/
public class InferenceResolver {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Now a separate class as the logic of inference resolution is becoming much more complex.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice :)

* and other non-thread-safe components.
* </p>
*/
public class BulkInferenceRunner {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ This is a full rewrite of the InferenceRunner that was causing race conditions.

Copy link
Contributor

@ioanatia ioanatia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition mentioned in the PR description.
What is the race condition?
Do we need to backport a fix to 9.1 if we found a race condition since COMPLETION is already released for 9.1?
Is it necessary to rewrite the implementation of BulkInferenceExecutor to fix the race condition? Or can we have a more surgical fix that we can review separately?

}
}

private static class ResolveInference extends ParameterizedAnalyzerRule<InferencePlan<?>, AnalyzerContext> {
Copy link
Contributor

@ioanatia ioanatia Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any difference between this implementation and the new one you added?
I don't think it is - they both do the same thing, even if this one overrides the rule method and the new one the apply one.

So this diff here looks like it was entirely unnecessary.
If we needed to make a change to ResolveInference, we could have made it here, not move it entirely later in the file. This forces the reviewers to check each line to figure out what actually changed.

Maybe the one thing that we needed in Analyzer.java was the order in which we apply rules. But even that is debatable, because it's not necessary yet, we could have switched the order when we actually add the text_embedding function. 🤷‍♀️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no major difference but the new method is easier to extends since I can not chain more transform.

return plan.transformDown(InferencePlan.class, p -> resolveInferencePlan(p, context));

will become when implementing the resolution of InferenceFunction

return plan.transformDown(InferencePlan.class, p -> resolveInferencePlan(p, context))
    .transformExpressionsOnly(InferenceFunction.class, f -> resolveInferenceFunction(f, context));

This is the whole point of this PR to anticipate change that are required in the existing framework and to isolate these changes, so we can focus on testing potential regression.

BTW, I am 100% sure that we will have to change the order of the resolution and that I will use this change to finish the implementation of TEXT_EMBEDDING.

@afoucret
Copy link
Contributor Author

The race condition can happen when submitted several batch of inferences.
If previous batches have already exhausted the number of allocated permits, the newly submitted batch will never be started and. As a consequence the listener will never be called and the request will timeout.

In our execution model we are submitted one batch per page (then each request inside the batch are executed in parallel) up to 10 concurrent batches (this last point is managed by the AsyncOperator). It means that the problem affects mostly big request that are handling multiple pages (hundreds / thousands of row). So not the most likely case.

I have added a test to verify this parallel execution that I will be adding to the 9.1 branch :

    public void testParallelBulkExecution() throws Exception {
        int batches = between(50, 100);
        CountDownLatch latch = new CountDownLatch(batches);

        for (int i = 0; i < batches; i++) {
            List<InferenceAction.Request> requests = randomInferenceRequestList(between(1, 1_000));
            List<InferenceAction.Response> responses = randomInferenceResponseList(requests.size());

            Client client = mockClient(invocation -> {
                runWithRandomDelay(() -> {
                    ActionListener<InferenceAction.Response> l = invocation.getArgument(2);
                    l.onResponse(responses.get(requests.indexOf(invocation.getArgument(1, InferenceAction.Request.class))));
                });
                return null;
            });

            ActionListener<List<InferenceAction.Response>> listener = ActionListener.wrap(r -> {
                assertThat(r, equalTo(responses));
                LogManager.getLogger(BulkInferenceRunnerTests.class).warn("Received [{}] responses", responses.size());
                latch.countDown();
            }, ESTestCase::fail);

            inferenceRunnerFactory(client).create(randomBulkExecutionConfig()).executeBulk(requestIterator(requests), listener);
        }

        latch.await();
    }

I think a surgical patch will be not be to complicated.

@afoucret
Copy link
Contributor Author

@ioanatia Here is the patch for the race condition in 9.1: https://github.com/elastic/elasticsearch/pull/130991/files

@afoucret afoucret force-pushed the esql-inference-runner-refactoring branch from 56d6597 to 57fe411 Compare July 31, 2025 10:20
Copy link
Contributor

@tteofili tteofili left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, nice work Aurelien 💯

/**
* Collects and resolves inference deployments inference IDs from ES|QL logical plans.
*/
public class InferenceResolver {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice :)

protected RerankOperatorRequestIterator requests(Page inputPage) {
int inputBlockChannel = inputPage.getBlockCount() - 1;
return new RerankOperatorRequestIterator(inputPage.getBlock(inputBlockChannel), inferenceId(), queryText, batchSize);
return new RerankOperatorRequestIterator((BytesRefBlock) rowEncoder.eval(inputPage), inferenceId(), queryText, batchSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it always a BytesRefBlock ? probably it is, but I wonder if we should be safe and do a check here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a small issue here but I will handle it as part of another PR because it does also applies to branch 9.1 and 8.19, so it will be easier to backport.

@afoucret afoucret merged commit dd3e3c9 into elastic:main Jul 31, 2025
33 checks passed
szybia added a commit to szybia/elasticsearch that referenced this pull request Aug 15, 2025
* upstream/main: (822 commits)
  Improve Semantic Text Exists Query Tests (elastic#132283)
  Make hierarchical k-means over centroids cheaper (elastic#132316)
  Remove unnecessary listener.delegateFailure in IndexShard#ensureMutable (elastic#132294)
  Add missing release note (elastic#132319)
  Unmute elastic#131803 (elastic#132295)
  Include bytes for live docs in ShardFieldStats (elastic#132232)
  Fix default missing index sort value of data_nanos pre 7.14 (elastic#132162)
  [DiskBBQ] Quantize centroids using 7 bits instead of 4 bits (elastic#132261)
  Use panamized version for windows in Int7VectorScorer (elastic#132311)
  Mute org.elasticsearch.xpack.ml.integration.AutodetectMemoryLimitIT testTooManyByAndOverFields elastic#132310
  Mute org.elasticsearch.xpack.ml.integration.AutodetectMemoryLimitIT testManyDistinctOverFields elastic#132308
  Update 8.17 version to 8.17.10 (elastic#132303)
  Mute org.elasticsearch.datastreams.DataStreamsClientYamlTestSuiteIT test {p0=data_stream/10_basic/Create hidden data stream with match all template} elastic#132298
  Add random queries to logsdb data generation tests (elastic#132109)
  ES|QL Inference runner refactoring (elastic#131986)
  Add basic example to linear-retriever.md (elastic#132196)
  Refactor RemoteClusterService to be multi-project aware (elastic#131894)
  ESQL: Mark csv-spec tests (elastic#132098)
  Mute org.elasticsearch.common.logging.JULBridgeTests testThrowable elastic#132280
  Bump versions after 8.19.0 release
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants