Skip to content

Commit cb09e30

Browse files
Cleaning up from failed merge
1 parent eba5fce commit cb09e30

File tree

12 files changed

+65
-93
lines changed

12 files changed

+65
-93
lines changed

x-pack/plugin/inference/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
apply plugin: 'elasticsearch.internal-es-plugin'
99
apply plugin: 'elasticsearch.internal-cluster-test'
1010
apply plugin: 'elasticsearch.internal-yaml-rest-test'
11+
apply plugin: 'elasticsearch.internal-test-artifact'
1112

1213
restResources {
1314
restApi {

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99

1010
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1111

12-
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1312
import org.elasticsearch.action.bulk.BulkItemResponse;
1413
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1514
import org.elasticsearch.action.bulk.BulkResponse;
1615
import org.elasticsearch.action.delete.DeleteRequestBuilder;
1716
import org.elasticsearch.action.index.IndexRequestBuilder;
1817
import org.elasticsearch.action.search.SearchRequest;
1918
import org.elasticsearch.action.search.SearchResponse;
19+
import org.elasticsearch.action.support.WriteRequest;
2020
import org.elasticsearch.action.update.UpdateRequestBuilder;
2121
import org.elasticsearch.cluster.metadata.IndexMetadata;
2222
import org.elasticsearch.common.settings.Settings;
@@ -242,12 +242,10 @@ public void testRestart() throws Exception {
242242

243243
private void assertRandomBulkOperations(String indexName, Function<Boolean, Map<String, Object>> sourceSupplier) throws Exception {
244244
int numHits = numHits(indexName);
245-
int totalBulkReqs = randomIntBetween(2, 100);
246-
long totalDocs = numHits;
245+
int totalBulkReqs = randomIntBetween(2, 10);
247246
Set<String> ids = new HashSet<>();
248-
249-
for (int bulkReqs = numHits; bulkReqs < totalBulkReqs; bulkReqs++) {
250-
BulkRequestBuilder bulkReqBuilder = client().prepareBulk();
247+
for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) {
248+
BulkRequestBuilder bulkReqBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
251249
int totalBulkSize = randomIntBetween(1, 100);
252250
for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) {
253251
if (ids.size() > 0 && rarely(random())) {
@@ -257,24 +255,15 @@ private void assertRandomBulkOperations(String indexName, Function<Boolean, Map<
257255
bulkReqBuilder.add(request);
258256
continue;
259257
}
260-
String id = Long.toString(totalDocs++);
261-
boolean isIndexRequest = randomBoolean();
258+
boolean isIndexRequest = ids.size() == 0 || randomBoolean();
262259
Map<String, Object> source = sourceSupplier.apply(isIndexRequest);
263260
if (isIndexRequest) {
261+
String id = randomAlphaOfLength(20);
264262
bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(indexName).setId(id).setSource(source));
265263
ids.add(id);
266264
} else {
267-
boolean isUpsert = randomBoolean();
268-
UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(indexName).setDoc(source);
269-
if (isUpsert || ids.size() == 0) {
270-
request.setDocAsUpsert(true);
271-
} else {
272-
// Update already existing document
273-
id = randomFrom(ids);
274-
}
275-
request.setId(id);
276-
bulkReqBuilder.add(request);
277-
ids.add(id);
265+
String id = randomFrom(ids);
266+
bulkReqBuilder.add(new UpdateRequestBuilder(client()).setIndex(indexName).setId(id).setDoc(source));
278267
}
279268
}
280269
BulkResponse bulkResponse = bulkReqBuilder.get();
@@ -293,8 +282,7 @@ private void assertRandomBulkOperations(String indexName, Function<Boolean, Map<
293282
}
294283
assertFalse(bulkResponse.hasFailures());
295284
}
296-
client().admin().indices().refresh(new RefreshRequest(indexName)).get();
297-
assertThat(numHits(indexName), equalTo(ids.size() + numHits));
285+
assertThat(numHits(indexName), equalTo(numHits + ids.size()));
298286
}
299287

300288
private int numHits(String indexName) throws Exception {

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,17 @@ public Collection<?> createComponents(PluginServices services) {
277277
var inferenceServices = new ArrayList<>(inferenceServiceExtensions);
278278
inferenceServices.add(this::getInferenceServiceFactories);
279279

280+
var inferenceServiceSettings = new ElasticInferenceServiceSettings(settings);
281+
inferenceServiceSettings.init(services.clusterService());
282+
280283
// Create a separate instance of HTTPClientManager with its own SSL configuration (`xpack.inference.elastic.http.ssl.*`).
281284
var elasticInferenceServiceHttpClientManager = HttpClientManager.create(
282285
settings,
283286
services.threadPool(),
284287
services.clusterService(),
285288
throttlerManager,
286-
getSslService()
289+
getSslService(),
290+
inferenceServiceSettings.getConnectionTtl()
287291
);
288292

289293
var elasticInferenceServiceRequestSenderFactory = new HttpRequestSender.Factory(
@@ -293,9 +297,6 @@ public Collection<?> createComponents(PluginServices services) {
293297
);
294298
elasicInferenceServiceFactory.set(elasticInferenceServiceRequestSenderFactory);
295299

296-
var inferenceServiceSettings = new ElasticInferenceServiceSettings(settings);
297-
inferenceServiceSettings.init(services.clusterService());
298-
299300
var authorizationHandler = new ElasticInferenceServiceAuthorizationRequestHandler(
300301
inferenceServiceSettings.getElasticInferenceServiceUrl(),
301302
services.threadPool()

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportPutInferenceModelAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ protected void masterOperation(
177177
return;
178178
}
179179

180-
parseAndStoreModel(service.get(), request.getInferenceEntityId(), resolvedTaskType, requestAsMap, request.ackTimeout(), listener);
180+
parseAndStoreModel(service.get(), request.getInferenceEntityId(), resolvedTaskType, requestAsMap, request.getTimeout(), listener);
181181
}
182182

183183
private void parseAndStoreModel(

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClientManager.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.Closeable;
3333
import java.io.IOException;
3434
import java.util.List;
35+
import java.util.concurrent.TimeUnit;
3536

3637
import static org.elasticsearch.core.Strings.format;
3738
import static org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSettings.ELASTIC_INFERENCE_SERVICE_SSL_CONFIGURATION_PREFIX;
@@ -112,14 +113,15 @@ public static HttpClientManager create(
112113
ThreadPool threadPool,
113114
ClusterService clusterService,
114115
ThrottlerManager throttlerManager,
115-
SSLService sslService
116+
SSLService sslService,
117+
TimeValue connectionTtl
116118
) {
117119
// Set the sslStrategy to ensure an encrypted connection, as Elastic Inference Service requires it.
118120
SSLIOSessionStrategy sslioSessionStrategy = sslService.sslIOSessionStrategy(
119121
sslService.getSSLConfiguration(ELASTIC_INFERENCE_SERVICE_SSL_CONFIGURATION_PREFIX)
120122
);
121123

122-
PoolingNHttpClientConnectionManager connectionManager = createConnectionManager(sslioSessionStrategy);
124+
PoolingNHttpClientConnectionManager connectionManager = createConnectionManager(sslioSessionStrategy, connectionTtl);
123125
return new HttpClientManager(settings, connectionManager, threadPool, clusterService, throttlerManager);
124126
}
125127

@@ -146,7 +148,7 @@ public static HttpClientManager create(
146148
this.addSettingsUpdateConsumers(clusterService);
147149
}
148150

149-
private static PoolingNHttpClientConnectionManager createConnectionManager(SSLIOSessionStrategy sslStrategy) {
151+
private static PoolingNHttpClientConnectionManager createConnectionManager(SSLIOSessionStrategy sslStrategy, TimeValue connectionTtl) {
150152
ConnectingIOReactor ioReactor;
151153
try {
152154
var configBuilder = IOReactorConfig.custom().setSoKeepAlive(true);
@@ -162,7 +164,15 @@ private static PoolingNHttpClientConnectionManager createConnectionManager(SSLIO
162164
.register("https", sslStrategy)
163165
.build();
164166

165-
return new PoolingNHttpClientConnectionManager(ioReactor, registry);
167+
return new PoolingNHttpClientConnectionManager(
168+
ioReactor,
169+
null,
170+
registry,
171+
null,
172+
null,
173+
Math.toIntExact(connectionTtl.getMillis()),
174+
TimeUnit.MILLISECONDS
175+
);
166176
}
167177

168178
private static PoolingNHttpClientConnectionManager createConnectionManager() {

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/StreamingHttpResult.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.ExceptionsHelper;
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.rest.RestStatus;
14-
import org.elasticsearch.xpack.inference.external.request.HttpRequest;
1514

1615
import java.io.ByteArrayOutputStream;
1716
import java.util.concurrent.Flow;
@@ -22,7 +21,7 @@ public boolean isSuccessfulResponse() {
2221
return RestStatus.isSuccessful(response.getStatusLine().getStatusCode());
2322
}
2423

25-
public Flow.Publisher<HttpResult> toHttpResult(HttpRequest httpRequest) {
24+
public Flow.Publisher<HttpResult> toHttpResult() {
2625
return subscriber -> body().subscribe(new Flow.Subscriber<>() {
2726
@Override
2827
public void onSubscribe(Flow.Subscription subscription) {
@@ -46,7 +45,7 @@ public void onComplete() {
4645
});
4746
}
4847

49-
public void readFullResponse(HttpRequest httpRequest, ActionListener<HttpResult> fullResponse) {
48+
public void readFullResponse(ActionListener<HttpResult> fullResponse) {
5049
var stream = new ByteArrayOutputStream();
5150
AtomicReference<Flow.Subscription> upstream = new AtomicReference<>(null);
5251
body.subscribe(new Flow.Subscriber<>() {

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/retry/RetryingHttpSender.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,11 @@ public void tryAction(ActionListener<InferenceServiceResults> listener) {
115115

116116
try {
117117
if (request.isStreaming() && responseHandler.canHandleStreamingResponses()) {
118-
var httpRequest = request.createHttpRequest();
119-
httpClient.stream(httpRequest, context, retryableListener.delegateFailure((l, r) -> {
118+
httpClient.stream(request.createHttpRequest(), context, retryableListener.delegateFailure((l, r) -> {
120119
if (r.isSuccessfulResponse()) {
121-
l.onResponse(responseHandler.parseResult(request, r.toHttpResult(httpRequest)));
120+
l.onResponse(responseHandler.parseResult(request, r.toHttpResult()));
122121
} else {
123-
r.readFullResponse(httpRequest, l.delegateFailureAndWrap((ll, httpResult) -> {
122+
r.readFullResponse(l.delegateFailureAndWrap((ll, httpResult) -> {
124123
try {
125124
responseHandler.validateResponse(throttlerManager, logger, request, httpResult, true);
126125
InferenceServiceResults inferenceResults = responseHandler.parseResult(request, httpResult);

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.features.NodeFeature;
3333
import org.elasticsearch.index.IndexSettings;
3434
import org.elasticsearch.index.IndexVersion;
35+
import org.elasticsearch.index.IndexVersions;
3536
import org.elasticsearch.index.fielddata.FieldDataContext;
3637
import org.elasticsearch.index.fielddata.IndexFieldData;
3738
import org.elasticsearch.index.mapper.BlockLoader;
@@ -98,6 +99,7 @@
9899
import java.util.function.Supplier;
99100

100101
import static org.elasticsearch.index.IndexVersions.SEMANTIC_TEXT_DEFAULTS_TO_BBQ;
102+
import static org.elasticsearch.index.IndexVersions.SEMANTIC_TEXT_DEFAULTS_TO_BBQ_BACKPORT_8_X;
101103
import static org.elasticsearch.inference.TaskType.SPARSE_EMBEDDING;
102104
import static org.elasticsearch.inference.TaskType.TEXT_EMBEDDING;
103105
import static org.elasticsearch.search.SearchService.DEFAULT_SIZE;
@@ -1077,7 +1079,8 @@ private static Mapper.Builder createEmbeddingsField(
10771079
denseVectorMapperBuilder.elementType(modelSettings.elementType());
10781080

10791081
DenseVectorFieldMapper.IndexOptions defaultIndexOptions = null;
1080-
if (indexVersionCreated.onOrAfter(SEMANTIC_TEXT_DEFAULTS_TO_BBQ)) {
1082+
if (indexVersionCreated.onOrAfter(SEMANTIC_TEXT_DEFAULTS_TO_BBQ)
1083+
|| indexVersionCreated.between(SEMANTIC_TEXT_DEFAULTS_TO_BBQ_BACKPORT_8_X, IndexVersions.UPGRADE_TO_LUCENE_10_0_0)) {
10811084
defaultIndexOptions = defaultSemanticDenseIndexOptions();
10821085
}
10831086
if (defaultIndexOptions != null

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceServiceSettings.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,20 +70,33 @@ public class ElasticInferenceServiceSettings {
7070
Setting.Property.NodeScope
7171
);
7272

73+
/**
74+
* Total time to live (TTL) defines maximum life span of persistent connections regardless of their
75+
* expiration setting. No persistent connection will be re-used past its TTL value.
76+
* Using a TTL of -1 will disable the expiration of persistent connections (the idle connection evictor will still apply).
77+
*/
78+
public static final Setting<TimeValue> CONNECTION_TTL_SETTING = Setting.timeSetting(
79+
"xpack.inference.elastic.http.connection_ttl",
80+
TimeValue.timeValueSeconds(60),
81+
Setting.Property.NodeScope
82+
);
83+
7384
@Deprecated
7485
private final String eisGatewayUrl;
7586

7687
private final String elasticInferenceServiceUrl;
7788
private final boolean periodicAuthorizationEnabled;
7889
private volatile TimeValue authRequestInterval;
7990
private volatile TimeValue maxAuthorizationRequestJitter;
91+
private final TimeValue connectionTtl;
8092

8193
public ElasticInferenceServiceSettings(Settings settings) {
8294
eisGatewayUrl = EIS_GATEWAY_URL.get(settings);
8395
elasticInferenceServiceUrl = ELASTIC_INFERENCE_SERVICE_URL.get(settings);
8496
periodicAuthorizationEnabled = PERIODIC_AUTHORIZATION_ENABLED.get(settings);
8597
authRequestInterval = AUTHORIZATION_REQUEST_INTERVAL.get(settings);
8698
maxAuthorizationRequestJitter = MAX_AUTHORIZATION_REQUEST_JITTER.get(settings);
99+
connectionTtl = CONNECTION_TTL_SETTING.get(settings);
87100
}
88101

89102
/**
@@ -115,6 +128,10 @@ public TimeValue getMaxAuthorizationRequestJitter() {
115128
return maxAuthorizationRequestJitter;
116129
}
117130

131+
public TimeValue getConnectionTtl() {
132+
return connectionTtl;
133+
}
134+
118135
public static List<Setting<?>> getSettingsDefinitions() {
119136
ArrayList<Setting<?>> settings = new ArrayList<>();
120137
settings.add(EIS_GATEWAY_URL);
@@ -124,6 +141,7 @@ public static List<Setting<?>> getSettingsDefinitions() {
124141
settings.add(PERIODIC_AUTHORIZATION_ENABLED);
125142
settings.add(AUTHORIZATION_REQUEST_INTERVAL);
126143
settings.add(MAX_AUTHORIZATION_REQUEST_JITTER);
144+
settings.add(CONNECTION_TTL_SETTING);
127145
return settings;
128146
}
129147

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/OpenAiUnifiedStreamingProcessor.java

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Deque;
2727
import java.util.Iterator;
2828
import java.util.List;
29-
import java.util.concurrent.LinkedBlockingDeque;
3029
import java.util.function.BiFunction;
3130

3231
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
@@ -60,21 +59,11 @@ public class OpenAiUnifiedStreamingProcessor extends DelegatingProcessor<
6059
public static final String TOTAL_TOKENS_FIELD = "total_tokens";
6160

6261
private final BiFunction<String, Exception, Exception> errorParser;
63-
private final Deque<StreamingUnifiedChatCompletionResults.ChatCompletionChunk> buffer = new LinkedBlockingDeque<>();
6462

6563
public OpenAiUnifiedStreamingProcessor(BiFunction<String, Exception, Exception> errorParser) {
6664
this.errorParser = errorParser;
6765
}
6866

69-
@Override
70-
protected void upstreamRequest(long n) {
71-
if (buffer.isEmpty()) {
72-
super.upstreamRequest(n);
73-
} else {
74-
downstream().onNext(new StreamingUnifiedChatCompletionResults.Results(singleItem(buffer.poll())));
75-
}
76-
}
77-
7867
@Override
7968
protected void next(Deque<ServerSentEvent> item) throws Exception {
8069
var parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE);
@@ -96,15 +85,8 @@ protected void next(Deque<ServerSentEvent> item) throws Exception {
9685

9786
if (results.isEmpty()) {
9887
upstream().request(1);
99-
} else if (results.size() == 1) {
100-
downstream().onNext(new StreamingUnifiedChatCompletionResults.Results(results));
10188
} else {
102-
// results > 1, but openai spec only wants 1 chunk per SSE event
103-
var firstItem = singleItem(results.poll());
104-
while (results.isEmpty() == false) {
105-
buffer.offer(results.poll());
106-
}
107-
downstream().onNext(new StreamingUnifiedChatCompletionResults.Results(firstItem));
89+
downstream().onNext(new StreamingUnifiedChatCompletionResults.Results(results));
10890
}
10991
}
11092

@@ -297,12 +279,4 @@ public static StreamingUnifiedChatCompletionResults.ChatCompletionChunk.Usage pa
297279
}
298280
}
299281
}
300-
301-
private Deque<StreamingUnifiedChatCompletionResults.ChatCompletionChunk> singleItem(
302-
StreamingUnifiedChatCompletionResults.ChatCompletionChunk result
303-
) {
304-
var deque = new ArrayDeque<StreamingUnifiedChatCompletionResults.ChatCompletionChunk>(1);
305-
deque.offer(result);
306-
return deque;
307-
}
308282
}

0 commit comments

Comments
 (0)