Skip to content

Commit 5f0fb60

Browse files
committed
Moving back InferenceRunnerConfig to BulkInferenceRunnerConfig
1 parent 1f9ac99 commit 5f0fb60

File tree

8 files changed

+91
-129
lines changed

8 files changed

+91
-129
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.sandbox.document.HalfFloatPoint;
1313
import org.apache.lucene.util.BytesRef;
1414
import org.elasticsearch.ExceptionsHelper;
15+
import org.elasticsearch.client.internal.Client;
1516
import org.elasticsearch.cluster.RemoteException;
1617
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1718
import org.elasticsearch.cluster.project.ProjectResolver;
@@ -75,8 +76,7 @@
7576
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.NotEquals;
7677
import org.elasticsearch.xpack.esql.index.EsIndex;
7778
import org.elasticsearch.xpack.esql.inference.InferenceResolution;
78-
import org.elasticsearch.xpack.esql.inference.InferenceResolver;
79-
import org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceRunner;
79+
import org.elasticsearch.xpack.esql.inference.InferenceService;
8080
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
8181
import org.elasticsearch.xpack.esql.parser.QueryParam;
8282
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -420,8 +420,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
420420
mock(ProjectResolver.class),
421421
mock(IndexNameExpressionResolver.class),
422422
null,
423-
mock(InferenceResolver.Factory.class),
424-
mock(BulkInferenceRunner.Factory.class)
423+
new InferenceService(mock(Client.class))
425424
);
426425

427426
private EsqlTestUtils() {}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.inference;
9+
10+
import org.elasticsearch.client.internal.Client;
11+
import org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceRunner;
12+
13+
public class InferenceService {
14+
private final InferenceResolver.Factory inferenceResolverFactory;
15+
16+
private final BulkInferenceRunner.Factory bulkInferenceRunnerFactory;
17+
18+
/**
19+
* Creates a new inference service with the given client.
20+
*
21+
* @param client the Elasticsearch client for inference operations
22+
*/
23+
public InferenceService(Client client) {
24+
this(InferenceResolver.factory(client), BulkInferenceRunner.factory(client));
25+
}
26+
27+
private InferenceService(InferenceResolver.Factory inferenceResolverFactory, BulkInferenceRunner.Factory bulkInferenceRunnerFactory) {
28+
this.inferenceResolverFactory = inferenceResolverFactory;
29+
this.bulkInferenceRunnerFactory = bulkInferenceRunnerFactory;
30+
}
31+
32+
/**
33+
* Creates an inference resolver for resolving inference IDs in logical plans.
34+
*
35+
* @return a new inference resolver instance
36+
*/
37+
public InferenceResolver inferenceResolver() {
38+
return inferenceResolverFactory.create();
39+
}
40+
41+
/**
42+
* Returns the bulk inference runner factory for executing batched inference operations.
43+
*
44+
* @return the bulk inference runner factory
45+
*/
46+
public BulkInferenceRunner.Factory bulkInferenceRunnerFactory() {
47+
return bulkInferenceRunnerFactory;
48+
}
49+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@
8686
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
8787
import org.elasticsearch.xpack.esql.evaluator.command.GrokEvaluatorExtracter;
8888
import org.elasticsearch.xpack.esql.expression.Order;
89+
import org.elasticsearch.xpack.esql.inference.InferenceService;
8990
import org.elasticsearch.xpack.esql.inference.XContentRowEncoder;
90-
import org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceRunner;
9191
import org.elasticsearch.xpack.esql.inference.completion.CompletionOperator;
9292
import org.elasticsearch.xpack.esql.inference.rerank.RerankOperator;
9393
import org.elasticsearch.xpack.esql.plan.logical.Fork;
@@ -159,7 +159,7 @@ public class LocalExecutionPlanner {
159159
private final Supplier<ExchangeSink> exchangeSinkSupplier;
160160
private final EnrichLookupService enrichLookupService;
161161
private final LookupFromIndexService lookupFromIndexService;
162-
private final BulkInferenceRunner.Factory bulkInferenceRunnerFactory;
162+
private final InferenceService inferenceService;
163163
private final PhysicalOperationProviders physicalOperationProviders;
164164
private final List<ShardContext> shardContexts;
165165

@@ -175,7 +175,7 @@ public LocalExecutionPlanner(
175175
Supplier<ExchangeSink> exchangeSinkSupplier,
176176
EnrichLookupService enrichLookupService,
177177
LookupFromIndexService lookupFromIndexService,
178-
BulkInferenceRunner.Factory bulkInferenceRunnerFactory,
178+
InferenceService inferenceService,
179179
PhysicalOperationProviders physicalOperationProviders,
180180
List<ShardContext> shardContexts
181181
) {
@@ -191,7 +191,7 @@ public LocalExecutionPlanner(
191191
this.exchangeSinkSupplier = exchangeSinkSupplier;
192192
this.enrichLookupService = enrichLookupService;
193193
this.lookupFromIndexService = lookupFromIndexService;
194-
this.bulkInferenceRunnerFactory = bulkInferenceRunnerFactory;
194+
this.inferenceService = inferenceService;
195195
this.physicalOperationProviders = physicalOperationProviders;
196196
this.shardContexts = shardContexts;
197197
}
@@ -318,7 +318,10 @@ private PhysicalOperation planCompletion(CompletionExec completion, LocalExecuti
318318
source.layout
319319
);
320320

321-
return source.with(new CompletionOperator.Factory(bulkInferenceRunnerFactory, inferenceId, promptEvaluatorFactory), outputLayout);
321+
return source.with(
322+
new CompletionOperator.Factory(inferenceService.bulkInferenceRunnerFactory(), inferenceId, promptEvaluatorFactory),
323+
outputLayout
324+
);
322325
}
323326

324327
private PhysicalOperation planRrfScoreEvalExec(RrfScoreEvalExec rrf, LocalExecutionPlannerContext context) {
@@ -654,7 +657,13 @@ private PhysicalOperation planRerank(RerankExec rerank, LocalExecutionPlannerCon
654657
int scoreChannel = outputLayout.get(rerank.scoreAttribute().id()).channel();
655658

656659
return source.with(
657-
new RerankOperator.Factory(bulkInferenceRunnerFactory, inferenceId, queryText, rowEncoderFactory, scoreChannel),
660+
new RerankOperator.Factory(
661+
inferenceService.bulkInferenceRunnerFactory(),
662+
inferenceId,
663+
queryText,
664+
rowEncoderFactory,
665+
scoreChannel
666+
),
658667
outputLayout
659668
);
660669
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
5353
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
5454
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
55-
import org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceRunner;
55+
import org.elasticsearch.xpack.esql.inference.InferenceService;
5656
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
5757
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
5858
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
@@ -131,7 +131,7 @@ public class ComputeService {
131131
private final DriverTaskRunner driverRunner;
132132
private final EnrichLookupService enrichLookupService;
133133
private final LookupFromIndexService lookupFromIndexService;
134-
private final BulkInferenceRunner.Factory bulkInferenceRunnerFactory;
134+
private final InferenceService inferenceService;
135135
private final ClusterService clusterService;
136136
private final ProjectResolver projectResolver;
137137
private final AtomicLong childSessionIdGenerator = new AtomicLong();
@@ -158,7 +158,7 @@ public ComputeService(
158158
this.driverRunner = new DriverTaskRunner(transportService, esqlExecutor);
159159
this.enrichLookupService = enrichLookupService;
160160
this.lookupFromIndexService = lookupFromIndexService;
161-
this.bulkInferenceRunnerFactory = transportActionServices.bulkInferenceRunnerFactory();
161+
this.inferenceService = transportActionServices.inferenceService();
162162
this.clusterService = transportActionServices.clusterService();
163163
this.projectResolver = transportActionServices.projectResolver();
164164
this.dataNodeComputeHandler = new DataNodeComputeHandler(
@@ -622,7 +622,7 @@ public SourceProvider createSourceProvider() {
622622
context.exchangeSinkSupplier(),
623623
enrichLookupService,
624624
lookupFromIndexService,
625-
bulkInferenceRunnerFactory,
625+
inferenceService,
626626
physicalOperationProviders,
627627
contexts
628628
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java

Lines changed: 12 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -14,105 +14,15 @@
1414
import org.elasticsearch.search.SearchService;
1515
import org.elasticsearch.transport.TransportService;
1616
import org.elasticsearch.usage.UsageService;
17-
import org.elasticsearch.xpack.esql.inference.InferenceResolver;
18-
import org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceRunner;
19-
import org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceRunnerConfig;
20-
21-
public class TransportActionServices {
22-
private final TransportService transportService;
23-
private final SearchService searchService;
24-
private final ExchangeService exchangeService;
25-
private final ClusterService clusterService;
26-
private final ProjectResolver projectResolver;
27-
private final IndexNameExpressionResolver indexNameExpressionResolver;
28-
private final UsageService usageService;
29-
private final InferenceResolver.Factory inferenceResolverFactory;
30-
private final BulkInferenceRunner.Factory inferenceRunnerFactory;
31-
32-
public TransportActionServices(
33-
TransportService transportService,
34-
SearchService searchService,
35-
ExchangeService exchangeService,
36-
ClusterService clusterService,
37-
ProjectResolver projectResolver,
38-
IndexNameExpressionResolver indexNameExpressionResolver,
39-
UsageService usageService,
40-
InferenceResolver.Factory inferenceResolverFactory,
41-
BulkInferenceRunner.Factory bulkInferenceRunnerFactory
42-
) {
43-
this.transportService = transportService;
44-
this.searchService = searchService;
45-
this.exchangeService = exchangeService;
46-
this.clusterService = clusterService;
47-
this.projectResolver = projectResolver;
48-
this.indexNameExpressionResolver = indexNameExpressionResolver;
49-
this.usageService = usageService;
50-
this.inferenceResolverFactory = inferenceResolverFactory;
51-
this.inferenceRunnerFactory = bulkInferenceRunnerFactory;
52-
}
53-
54-
public TransportService transportService() {
55-
return transportService;
56-
}
57-
58-
public SearchService searchService() {
59-
return searchService;
60-
}
61-
62-
public ExchangeService exchangeService() {
63-
return exchangeService;
64-
}
65-
66-
public ClusterService clusterService() {
67-
return clusterService;
68-
}
69-
70-
public ProjectResolver projectResolver() {
71-
return projectResolver;
72-
}
73-
74-
public IndexNameExpressionResolver indexNameExpressionResolver() {
75-
return indexNameExpressionResolver;
76-
}
77-
78-
public UsageService usageService() {
79-
return usageService;
80-
}
81-
82-
/**
83-
* Creates an inference resolver for collecting used inference IDs from logical plans.
84-
*
85-
* @return A resolver capable of extracting inference IDs from plans and expressions
86-
*/
87-
public InferenceResolver inferenceResolver() {
88-
return inferenceResolverFactory.create();
89-
}
90-
91-
/**
92-
* Returns the inference runner factory for creating multiple configured runners.
93-
*
94-
* @return The factory for creating inference runners
95-
*/
96-
public BulkInferenceRunner.Factory bulkInferenceRunnerFactory() {
97-
return inferenceRunnerFactory;
98-
}
99-
100-
/**
101-
* Creates an inference runner with the specified execution configuration using the default configuration.
102-
*
103-
* @return A configured inference runner capable of executing inference requests
104-
*/
105-
public BulkInferenceRunner bulkInferenceRunner() {
106-
return bulkInferenceRunner(BulkInferenceRunnerConfig.DEFAULT);
107-
}
108-
109-
/**
110-
* Creates an inference runner with the specified execution configuration.
111-
*
112-
* @param bulkInferenceRunnerConfig Configuration specifying concurrency limits and execution parameters
113-
* @return A configured inference runner capable of executing inference requests
114-
*/
115-
public BulkInferenceRunner bulkInferenceRunner(BulkInferenceRunnerConfig bulkInferenceRunnerConfig) {
116-
return inferenceRunnerFactory.create(bulkInferenceRunnerConfig);
117-
}
118-
}
17+
import org.elasticsearch.xpack.esql.inference.InferenceService;
18+
19+
public record TransportActionServices(
20+
TransportService transportService,
21+
SearchService searchService,
22+
ExchangeService exchangeService,
23+
ClusterService clusterService,
24+
ProjectResolver projectResolver,
25+
IndexNameExpressionResolver indexNameExpressionResolver,
26+
UsageService usageService,
27+
InferenceService inferenceService
28+
) {}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@
5151
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
5252
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
5353
import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
54-
import org.elasticsearch.xpack.esql.inference.InferenceResolver;
55-
import org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceRunner;
54+
import org.elasticsearch.xpack.esql.inference.InferenceService;
5655
import org.elasticsearch.xpack.esql.session.Configuration;
5756
import org.elasticsearch.xpack.esql.session.EsqlSession.PlanRunner;
5857
import org.elasticsearch.xpack.esql.session.Result;
@@ -167,8 +166,7 @@ public TransportEsqlQueryAction(
167166
projectResolver,
168167
indexNameExpressionResolver,
169168
usageService,
170-
InferenceResolver.factory(client),
171-
BulkInferenceRunner.factory(client)
169+
new InferenceService(client)
172170
);
173171

174172
this.computeService = new ComputeService(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@
5454
import org.elasticsearch.xpack.esql.index.IndexResolution;
5555
import org.elasticsearch.xpack.esql.index.MappingException;
5656
import org.elasticsearch.xpack.esql.inference.InferenceResolution;
57-
import org.elasticsearch.xpack.esql.inference.InferenceResolver;
58-
import org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceRunner;
57+
import org.elasticsearch.xpack.esql.inference.InferenceService;
5958
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
6059
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
6160
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
@@ -119,8 +118,7 @@ public interface PlanRunner {
119118
private final PhysicalPlanOptimizer physicalPlanOptimizer;
120119
private final PlanTelemetry planTelemetry;
121120
private final IndicesExpressionGrouper indicesExpressionGrouper;
122-
private final BulkInferenceRunner.Factory inferenceRunnerFactory;
123-
private final InferenceResolver inferenceResolver;
121+
private final InferenceService inferenceService;
124122
private final RemoteClusterService remoteClusterService;
125123

126124
private boolean explainMode;
@@ -155,8 +153,7 @@ public EsqlSession(
155153
this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration));
156154
this.planTelemetry = planTelemetry;
157155
this.indicesExpressionGrouper = indicesExpressionGrouper;
158-
this.inferenceRunnerFactory = services.bulkInferenceRunnerFactory();
159-
this.inferenceResolver = services.inferenceResolver();
156+
this.inferenceService = services.inferenceService();
160157
this.preMapper = new PreMapper(services);
161158
this.remoteClusterService = services.transportService().getRemoteClusterService();
162159
}
@@ -749,7 +746,7 @@ private static void resolveFieldNames(LogicalPlan parsed, EnrichResolution enric
749746
}
750747

751748
private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener<PreAnalysisResult> l) {
752-
inferenceResolver.resolveInferenceIds(plan, l.map(preAnalysisResult::withInferenceResolution));
749+
inferenceService.inferenceResolver().resolveInferenceIds(plan, l.map(preAnalysisResult::withInferenceResolution));
753750
}
754751

755752
private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
6868
import org.elasticsearch.xpack.esql.index.EsIndex;
6969
import org.elasticsearch.xpack.esql.index.IndexResolution;
70-
import org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceRunner;
70+
import org.elasticsearch.xpack.esql.inference.InferenceService;
7171
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
7272
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer;
7373
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
@@ -707,7 +707,7 @@ void executeSubPlan(
707707
() -> exchangeSink.createExchangeSink(() -> {}),
708708
mock(EnrichLookupService.class),
709709
mock(LookupFromIndexService.class),
710-
mock(BulkInferenceRunner.Factory.class),
710+
mock(InferenceService.class),
711711
physicalOperationProviders,
712712
List.of()
713713
);

0 commit comments

Comments
 (0)