Skip to content

Commit feb60d9

Browse files
Investigating some things
1 parent e1c2760 commit feb60d9

File tree

9 files changed

+130
-42
lines changed

9 files changed

+130
-42
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ public class InferencePlugin extends Plugin
201201

202202
public static final String NAME = "inference";
203203
public static final String UTILITY_THREAD_POOL_NAME = "inference_utility";
204+
public static final String UTILITY_RESPONSE_THREAD_POOL_NAME = "inference_utility_response";
204205

205206
private static final Logger log = LogManager.getLogger(InferencePlugin.class);
206207

@@ -495,20 +496,31 @@ protected Settings getSecretsIndexSettings() {
495496

496497
@Override
497498
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
498-
return List.of(inferenceUtilityExecutor(settings));
499+
return List.of(inferenceUtilityExecutor(settings), inferenceResponseUtilityExecutor(settings));
499500
}
500501

501502
public static ExecutorBuilder<?> inferenceUtilityExecutor(Settings settings) {
502503
return new ScalingExecutorBuilder(
503504
UTILITY_THREAD_POOL_NAME,
504505
0,
505-
10,
506+
20,
506507
TimeValue.timeValueMinutes(10),
507508
false,
508509
"xpack.inference.utility_thread_pool"
509510
);
510511
}
511512

513+
public static ExecutorBuilder<?> inferenceResponseUtilityExecutor(Settings settings) {
514+
return new ScalingExecutorBuilder(
515+
UTILITY_RESPONSE_THREAD_POOL_NAME,
516+
0,
517+
20,
518+
TimeValue.timeValueMinutes(10),
519+
false,
520+
"xpack.inference.utility_response_thread_pool"
521+
);
522+
}
523+
512524
@Override
513525
public List<Setting<?>> getSettings() {
514526
return List.copyOf(getInferenceSettings());

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import java.util.concurrent.atomic.AtomicReference;
3131

3232
import static org.elasticsearch.core.Strings.format;
33-
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;
33+
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_RESPONSE_THREAD_POOL_NAME;
3434

3535
/**
3636
* Provides a wrapper around a {@link CloseableHttpAsyncClient} to move the responses to a separate thread for processing.
@@ -135,7 +135,7 @@ public void cancelled() {
135135
}
136136

137137
private void respondUsingUtilityThread(HttpResponse response, HttpRequest request, ActionListener<HttpResult> listener) {
138-
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> {
138+
threadPool.executor(UTILITY_RESPONSE_THREAD_POOL_NAME).execute(() -> {
139139
try {
140140
listener.onResponse(HttpResult.create(settings.getMaxResponseSize(), response));
141141
} catch (Exception e) {
@@ -150,7 +150,7 @@ private void respondUsingUtilityThread(HttpResponse response, HttpRequest reques
150150
}
151151

152152
private void failUsingUtilityThread(Exception exception, ActionListener<?> listener) {
153-
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> listener.onFailure(exception));
153+
threadPool.executor(UTILITY_RESPONSE_THREAD_POOL_NAME).execute(() -> listener.onFailure(exception));
154154
}
155155

156156
public void stream(HttpRequest request, HttpContext context, ActionListener<StreamingHttpResult> listener) throws IOException {
@@ -167,12 +167,12 @@ public void completed(Void response) {
167167

168168
@Override
169169
public void failed(Exception ex) {
170-
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> streamingProcessor.failed(ex));
170+
threadPool.executor(UTILITY_RESPONSE_THREAD_POOL_NAME).execute(() -> streamingProcessor.failed(ex));
171171
}
172172

173173
@Override
174174
public void cancelled() {
175-
threadPool.executor(UTILITY_THREAD_POOL_NAME)
175+
threadPool.executor(UTILITY_RESPONSE_THREAD_POOL_NAME)
176176
.execute(
177177
() -> streamingProcessor.failed(
178178
new CancellationException(

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClientManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class HttpClientManager implements Closeable {
4848
*/
4949
public static final Setting<Integer> MAX_TOTAL_CONNECTIONS = Setting.intSetting(
5050
"xpack.inference.http.max_total_connections",
51-
50, // default
51+
1000, // default
5252
1, // min
5353
Setting.Property.NodeScope,
5454
Setting.Property.Dynamic
@@ -60,7 +60,7 @@ public class HttpClientManager implements Closeable {
6060
*/
6161
public static final Setting<Integer> MAX_ROUTE_CONNECTIONS = Setting.intSetting(
6262
"xpack.inference.http.max_route_connections",
63-
20, // default
63+
1000, // default
6464
1, // min
6565
Setting.Property.NodeScope,
6666
Setting.Property.Dynamic

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/retry/RetrySettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class RetrySettings {
1818

1919
static final Setting<TimeValue> RETRY_INITIAL_DELAY_SETTING = Setting.timeSetting(
2020
"xpack.inference.http.retry.initial_delay",
21-
TimeValue.timeValueSeconds(1),
21+
TimeValue.timeValueMillis(5),
2222
Setting.Property.NodeScope,
2323
Setting.Property.Dynamic
2424
);

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/retry/RetryingHttpSender.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -191,16 +191,17 @@ private Exception wrapWithElasticsearchException(Exception e, String inferenceEn
191191

192192
@Override
193193
public boolean shouldRetry(Exception e) {
194-
if (retryCount.get() >= MAX_RETIES) {
195-
return false;
196-
}
197-
198-
if (e instanceof Retryable retry) {
199-
request = retry.rebuildRequest(request);
200-
return retry.shouldRetry();
201-
}
202-
203194
return false;
195+
// if (retryCount.get() >= MAX_RETIES) {
196+
// return false;
197+
// }
198+
//
199+
// if (e instanceof Retryable retry) {
200+
// request = retry.rebuildRequest(request);
201+
// return retry.shouldRetry();
202+
// }
203+
//
204+
// return false;
204205
}
205206
}
206207

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -252,16 +252,19 @@ private void cleanup() {
252252

253253
private void handleTasks() {
254254
try {
255-
if (shutdown.get()) {
256-
logger.debug("Shutdown requested while handling tasks, cleaning up");
257-
cleanup();
258-
return;
259-
}
255+
TimeValue timeToWait;
256+
do {
257+
if (shutdown.get()) {
258+
logger.debug("Shutdown requested while handling tasks, cleaning up");
259+
cleanup();
260+
return;
261+
}
260262

261-
var timeToWait = settings.getTaskPollFrequency();
262-
for (var endpoint : rateLimitGroupings.values()) {
263-
timeToWait = TimeValue.min(endpoint.executeEnqueuedTask(), timeToWait);
264-
}
263+
timeToWait = settings.getTaskPollFrequency();
264+
for (var endpoint : rateLimitGroupings.values()) {
265+
timeToWait = TimeValue.min(endpoint.executeEnqueuedTask(), timeToWait);
266+
}
267+
} while (timeToWait.compareTo(TimeValue.ZERO) <= 0);
265268

266269
scheduleNextHandleTasks(timeToWait);
267270
} catch (Exception e) {

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -249,15 +249,17 @@ public MinimalServiceSettings getMinimalServiceSettings(String inferenceEntityId
249249
* @param listener Model listener
250250
*/
251251
public void getModelWithSecrets(String inferenceEntityId, ActionListener<UnparsedModel> listener) {
252+
var maybeDefault = defaultConfigIds.get(inferenceEntityId);
253+
if (maybeDefault != null) {
254+
getDefaultConfig(false, maybeDefault, listener);
255+
logger.debug("Returning default inference endpoint [{}] with secrets", inferenceEntityId);
256+
return;
257+
}
258+
252259
ActionListener<SearchResponse> searchListener = ActionListener.wrap((searchResponse) -> {
253260
// There should be a hit for the configurations
254261
if (searchResponse.getHits().getHits().length == 0) {
255-
var maybeDefault = defaultConfigIds.get(inferenceEntityId);
256-
if (maybeDefault != null) {
257-
getDefaultConfig(true, maybeDefault, listener);
258-
} else {
259-
listener.onFailure(inferenceNotFoundException(inferenceEntityId));
260-
}
262+
listener.onFailure(inferenceNotFoundException(inferenceEntityId));
261263
return;
262264
}
263265

@@ -289,15 +291,16 @@ public void getModelWithSecrets(String inferenceEntityId, ActionListener<Unparse
289291
* @param listener Model listener
290292
*/
291293
public void getModel(String inferenceEntityId, ActionListener<UnparsedModel> listener) {
294+
var maybeDefault = defaultConfigIds.get(inferenceEntityId);
295+
if (maybeDefault != null) {
296+
getDefaultConfig(false, maybeDefault, listener);
297+
return;
298+
}
299+
292300
ActionListener<SearchResponse> searchListener = ActionListener.wrap((searchResponse) -> {
293301
// There should be a hit for the configurations
294302
if (searchResponse.getHits().getHits().length == 0) {
295-
var maybeDefault = defaultConfigIds.get(inferenceEntityId);
296-
if (maybeDefault != null) {
297-
getDefaultConfig(true, maybeDefault, listener);
298-
} else {
299-
listener.onFailure(inferenceNotFoundException(inferenceEntityId));
300-
}
303+
listener.onFailure(inferenceNotFoundException(inferenceEntityId));
301304
return;
302305
}
303306

@@ -428,7 +431,11 @@ private void getDefaultConfig(
428431
if (persistDefaultEndpoints) {
429432
storeDefaultEndpoint(m, () -> listener.onResponse(modelToUnparsedModel(m)));
430433
} else {
431-
listener.onResponse(modelToUnparsedModel(m));
434+
if (m.getSecrets() != null) {
435+
listener.onResponse(modelToUnparsedModelWithSecrets(m));
436+
} else {
437+
listener.onResponse(modelToUnparsedModel(m));
438+
}
432439
}
433440
break;
434441
}
@@ -922,6 +929,26 @@ private static IndexRequest createIndexRequest(String docId, String indexName, T
922929
}
923930
}
924931

932+
private static UnparsedModel modelToUnparsedModelWithSecrets(Model model) {
933+
try (XContentBuilder builder = XContentFactory.jsonBuilder(); var secretsBuilder = XContentFactory.jsonBuilder()) {
934+
model.getConfigurations()
935+
.toXContent(builder, new ToXContent.MapParams(Map.of(ModelConfigurations.USE_ID_FOR_INDEX, Boolean.TRUE.toString())));
936+
937+
model.getSecrets()
938+
.toXContent(
939+
secretsBuilder,
940+
new ToXContent.MapParams(Map.of(ModelConfigurations.USE_ID_FOR_INDEX, Boolean.TRUE.toString()))
941+
);
942+
943+
var modelConfigMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2();
944+
var modelSecretsMap = XContentHelper.convertToMap(BytesReference.bytes(secretsBuilder), false, builder.contentType()).v2();
945+
return unparsedModelFromMap(new ModelConfigMap(modelConfigMap, modelSecretsMap));
946+
947+
} catch (IOException ex) {
948+
throw new ElasticsearchException("[{}] Error serializing inference endpoint configuration", model.getInferenceEntityId(), ex);
949+
}
950+
}
951+
925952
private static UnparsedModel modelToUnparsedModel(Model model) {
926953
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
927954
model.getConfigurations()

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/OpenAiService.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.cluster.service.ClusterService;
1515
import org.elasticsearch.common.ValidationException;
16+
import org.elasticsearch.common.settings.SecureString;
1617
import org.elasticsearch.common.util.LazyInitializable;
1718
import org.elasticsearch.core.Nullable;
1819
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper;
1921
import org.elasticsearch.inference.ChunkedInference;
2022
import org.elasticsearch.inference.ChunkingSettings;
2123
import org.elasticsearch.inference.InferenceServiceConfiguration;
2224
import org.elasticsearch.inference.InferenceServiceExtension;
2325
import org.elasticsearch.inference.InferenceServiceResults;
2426
import org.elasticsearch.inference.InputType;
27+
import org.elasticsearch.inference.MinimalServiceSettings;
2528
import org.elasticsearch.inference.Model;
2629
import org.elasticsearch.inference.ModelConfigurations;
2730
import org.elasticsearch.inference.ModelSecrets;
@@ -48,6 +51,7 @@
4851
import org.elasticsearch.xpack.inference.services.openai.completion.OpenAiChatCompletionModel;
4952
import org.elasticsearch.xpack.inference.services.openai.embeddings.OpenAiEmbeddingsModel;
5053
import org.elasticsearch.xpack.inference.services.openai.embeddings.OpenAiEmbeddingsServiceSettings;
54+
import org.elasticsearch.xpack.inference.services.openai.embeddings.OpenAiEmbeddingsTaskSettings;
5155
import org.elasticsearch.xpack.inference.services.openai.request.OpenAiUnifiedChatCompletionRequest;
5256
import org.elasticsearch.xpack.inference.services.openai.response.OpenAiChatCompletionResponseEntity;
5357
import org.elasticsearch.xpack.inference.services.settings.DefaultSecretSettings;
@@ -58,7 +62,9 @@
5862
import java.util.List;
5963
import java.util.Map;
6064
import java.util.Set;
65+
import java.util.concurrent.TimeUnit;
6166

67+
import static org.elasticsearch.xpack.inference.chunking.ChunkingSettingsBuilder.DEFAULT_SETTINGS;
6268
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
6369
import static org.elasticsearch.xpack.inference.services.ServiceFields.DIMENSIONS;
6470
import static org.elasticsearch.xpack.inference.services.ServiceFields.MODEL_ID;
@@ -77,6 +83,8 @@
7783
public class OpenAiService extends SenderService {
7884
public static final String NAME = "openai";
7985

86+
private static final String DEFAULT_EMBEDDING_ID = ".openai_text_embedding";
87+
8088
private static final String SERVICE_NAME = "OpenAI";
8189
// The task types exposed via the _inference/_services API
8290
private static final EnumSet<TaskType> SUPPORTED_TASK_TYPES_FOR_SERVICES_API = EnumSet.of(
@@ -395,6 +403,43 @@ public Set<TaskType> supportedStreamingTasks() {
395403
return EnumSet.of(TaskType.COMPLETION, TaskType.CHAT_COMPLETION);
396404
}
397405

406+
@Override
407+
public List<DefaultConfigId> defaultConfigIds() {
408+
return List.of(
409+
new DefaultConfigId(
410+
DEFAULT_EMBEDDING_ID,
411+
MinimalServiceSettings.textEmbedding(name(), 1536, SimilarityMeasure.DOT_PRODUCT, DenseVectorFieldMapper.ElementType.FLOAT),
412+
this
413+
)
414+
);
415+
}
416+
417+
@Override
418+
public void defaultConfigs(ActionListener<List<Model>> listener) {
419+
listener.onResponse(
420+
List.of(
421+
new OpenAiEmbeddingsModel(
422+
".openai_text_embedding",
423+
TaskType.TEXT_EMBEDDING,
424+
NAME,
425+
new OpenAiEmbeddingsServiceSettings(
426+
"text-embedding-3-small",
427+
null,
428+
null,
429+
null,
430+
1536,
431+
null,
432+
false,
433+
new RateLimitSettings(200000, TimeUnit.MINUTES)
434+
),
435+
new OpenAiEmbeddingsTaskSettings((String) null),
436+
DEFAULT_SETTINGS,
437+
new DefaultSecretSettings(new SecureString(("todo").toCharArray()))
438+
)
439+
)
440+
);
441+
}
442+
398443
/**
399444
* Model was originally defined in task settings, but it should
400445
* have been part of the service settings.

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/embeddings/OpenAiEmbeddingsModel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public OpenAiEmbeddingsModel(
6060
}
6161

6262
// Should only be used directly for testing
63-
OpenAiEmbeddingsModel(
63+
public OpenAiEmbeddingsModel(
6464
String inferenceEntityId,
6565
TaskType taskType,
6666
String service,

0 commit comments

Comments
 (0)