Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f1e7419
Remove worst-case additional 50ms latency for non-rate limited requests
timgrein Oct 8, 2025
a326e31
Update docs/changelog/136167.yaml
timgrein Oct 8, 2025
191029f
Merge branch 'main' into es-eis-latency-issue
timgrein Oct 8, 2025
edaa0f0
Do not use forbidden API
timgrein Oct 8, 2025
57c5605
Merge remote-tracking branch 'origin/es-eis-latency-issue' into es-ei…
timgrein Oct 8, 2025
f98b82e
Merge branch 'main' into es-eis-latency-issue
timgrein Oct 8, 2025
f17ec00
Move startRequestQueueTask before start signal
timgrein Oct 8, 2025
90ee1a1
Merge remote-tracking branch 'origin/es-eis-latency-issue' into es-ei…
timgrein Oct 8, 2025
a9e7610
Cleanup in finally block
timgrein Oct 8, 2025
ec513be
Reject request on shutdown
timgrein Oct 8, 2025
174526c
Reuse rateLimitSettingsEnabled check
timgrein Oct 8, 2025
f506cb3
Add NoopTask to wake up queue on shutdown
timgrein Oct 9, 2025
ae349fd
Only add non-rate-limited tasks to fast-path request queue
timgrein Oct 9, 2025
540f49d
Extract rejection logic to common static method
timgrein Oct 9, 2025
0dca88a
Remove unnecessary cast
timgrein Oct 10, 2025
0590561
Use string placeholder in assertion
timgrein Oct 10, 2025
2e65475
Adjust test to check that a throwing task does not terminate the service
timgrein Oct 10, 2025
4fb2372
Adjust error message in general exception handler
timgrein Oct 10, 2025
2930151
Adjust warn to error
timgrein Oct 10, 2025
b2fd85f
Adjust error message when request gets rejected
timgrein Oct 13, 2025
91f387a
Rename id in RateLimitingEndpointHandler to rateLimitGroupingId
timgrein Oct 13, 2025
90e672b
Use Strings.format(...) in assertion
timgrein Oct 13, 2025
1cf24dc
Use thenAnswer instead of suppression
timgrein Oct 13, 2025
a92a7c0
Only reject requests of the respective execution path (rate-limited v…
timgrein Oct 13, 2025
8e52c22
Merge branch 'main' into es-eis-latency-issue
timgrein Oct 14, 2025
a868152
Submit only ingest embeddings requests to rate-limited execution path
timgrein Oct 14, 2025
dd53fcb
Merge remote-tracking branch 'origin/es-eis-latency-issue' into es-ei…
timgrein Oct 14, 2025
c575eba
Add rate limiting check
timgrein Oct 14, 2025
69db0e1
Make NoopTask all caps
timgrein Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public static boolean isInternalTypeOrUnspecified(InputType inputType) {
return inputType == InputType.INTERNAL_INGEST || inputType == InputType.INTERNAL_SEARCH || inputType == InputType.UNSPECIFIED;
}

public static boolean isIngest(InputType inputType) {
return inputType == InputType.INGEST || inputType == InputType.INTERNAL_INGEST;
}

public static boolean isSpecified(InputType inputType) {
return inputType != null && inputType != InputType.UNSPECIFIED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.inference.InferenceServiceResults;
import org.elasticsearch.inference.InputType;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueue;
Expand Down Expand Up @@ -336,8 +337,8 @@ void submitTaskToRateLimitedExecutionPath(RequestTask task) {
endpoint.enqueue(task);
}

private static boolean rateLimitingEnabled(RateLimitSettings rateLimitSettings) {
return rateLimitSettings != null && rateLimitSettings.isEnabled();
private static boolean isEmbeddingsIngestInput(InferenceInputs inputs) {
return inputs instanceof EmbeddingsInput embeddingsInput && InputType.isIngest(embeddingsInput.getInputType());
}

private void cleanup(CleanupStrategy cleanupStrategy) {
Expand Down Expand Up @@ -478,7 +479,7 @@ public void execute(
return;
}

if (rateLimitingEnabled(requestManager.rateLimitSettings())) {
if (isEmbeddingsIngestInput(inferenceInputs)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rateLimitingEnabled check should be retained

if (isEmbeddingsIngestInput(inferenceInputs) || rateLimitingEnabled(requestManager.rateLimitSettings())) {

submitTaskToRateLimitedExecutionPath(task);
} else {
boolean taskAccepted = requestQueue.offer(task);
Expand Down Expand Up @@ -592,7 +593,7 @@ public synchronized TimeValue executeEnqueuedTask() {
}

private TimeValue executeEnqueuedTaskInternal() {
if (rateLimitingEnabled(rateLimitSettings)) {
if (rateLimitSettings.isEnabled()) {
var timeBeforeAvailableToken = rateLimiter.timeToReserve(1);
if (shouldExecuteImmediately(timeBeforeAvailableToken) == false) {
return timeBeforeAvailableToken;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public static InputType randomWithInternalAndUnspecified() {
return randomFrom(InputType.INTERNAL_SEARCH, InputType.INTERNAL_INGEST, InputType.UNSPECIFIED);
}

public static InputType randomIngest() {
return randomFrom(InputType.INGEST, InputType.INTERNAL_INGEST);
}

public void testFromRestString_ValidInputType() {
for (String internal : List.of("search", "ingest", "classification", "clustering", "unspecified")) {
assertEquals(InputType.fromRestString(internal), InputType.fromString(internal));
Expand Down Expand Up @@ -211,4 +215,8 @@ public void testValidateInputTypeTranslationValues_ThrowsAnException_WhenValueIs
)
);
}

public void testIsIngest() {
assertTrue(InputType.isIngest(randomFrom(InputType.INGEST, InputType.INTERNAL_INGEST)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@ public void testExecute_Throws_WhenRateLimitedQueueIsFull() {

service.execute(
RequestManagerTests.createMockWithRateLimitingEnabled(),
new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()),
new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()),
null,
new PlainActionFuture<>()
);

var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled("id");
var listener = new PlainActionFuture<InferenceServiceResults>();
service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener);
service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, listener);

var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT));

Expand Down Expand Up @@ -411,15 +411,15 @@ public void testChangingCapacity_SetsCapacityToTwo() throws ExecutionException,

service.execute(
RequestManagerTests.createMockWithRateLimitingEnabled(requestSender),
new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()),
new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()),
null,
new PlainActionFuture<>()
);
assertThat(service.queueSize(), is(1));

PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id");
service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener);
service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, listener);

var thrownException = expectThrows(EsRejectedExecutionException.class, () -> listener.actionGet(TIMEOUT));
assertThat(
Expand Down Expand Up @@ -524,13 +524,13 @@ public void testChangingCapacity_ToZero_SetsQueueCapacityToUnbounded() throws IO
var service = new RequestExecutorService(threadPool, null, settings, requestSender);
var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender);

service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, new PlainActionFuture<>());
service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, new PlainActionFuture<>());
assertThat(service.queueSize(), is(1));

PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
service.execute(
RequestManagerTests.createMockWithRateLimitingEnabled(requestSender, "id"),
new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()),
new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()),
null,
listener
);
Expand Down Expand Up @@ -656,7 +656,7 @@ public void testDoesNotExecuteTask_WhenCannotReserveTokens_AndThenCanReserve_And
var requestManager = RequestManagerTests.createMockWithRateLimitingEnabled(requestSender);

PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, listener);
service.execute(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomIngest()), null, listener);

when(mockRateLimiter.timeToReserve(anyInt())).thenReturn(TimeValue.timeValueDays(1)).thenReturn(TimeValue.timeValueDays(0));

Expand Down