Skip to content

Commit fca2543

Browse files
Not sending enabled field across nodes
1 parent 2a7ff64 commit fca2543

File tree

8 files changed

+110
-72
lines changed

8 files changed

+110
-72
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -355,23 +355,8 @@ static TransportVersion def(int id) {
355355
public static final TransportVersion ML_INFERENCE_LLAMA_ADDED = def(9_125_0_00);
356356
public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00);
357357
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00);
358-
public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_128_0_00);
359-
public static final TransportVersion NODE_WEIGHTS_ADDED_TO_NODE_BALANCE_STATS = def(9_129_0_00);
360-
public static final TransportVersion RERANK_SNIPPETS = def(9_130_0_00);
361-
public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00);
362-
public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00);
363-
public static final TransportVersion TO_CHILD_BLOCK_JOIN_QUERY = def(9_133_0_00);
364-
public static final TransportVersion ML_INFERENCE_AI21_COMPLETION_ADDED = def(9_134_0_00);
365-
public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_135_0_00);
366-
public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
367-
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
368-
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
369-
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
370-
public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00);
371-
public static final TransportVersion RESOLVE_INDEX_MODE_ADDED = def(9_141_0_00);
372358
public static final TransportVersion ALLOCATION_DECISION_NOT_PREFERRED = def(9_145_0_00);
373359
public static final TransportVersion ESQL_QUALIFIERS_IN_ATTRIBUTES = def(9_146_0_00);
374-
public static final TransportVersion INFERENCE_DISABLE_EIS_DISABLE_RATE_LIMITING_ADDED = def(9_147_0_00);
375360

376361
/*
377362
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -252,20 +252,16 @@ private void cleanup() {
252252

253253
private void handleTasks() {
254254
try {
255-
TimeValue timeToWait;
256-
257-
do {
258-
if (shutdown.get()) {
259-
logger.debug("Shutdown requested while handling tasks, cleaning up");
260-
cleanup();
261-
return;
262-
}
255+
if (shutdown.get()) {
256+
logger.debug("Shutdown requested while handling tasks, cleaning up");
257+
cleanup();
258+
return;
259+
}
263260

264-
timeToWait = settings.getTaskPollFrequency();
265-
for (var endpoint : rateLimitGroupings.values()) {
266-
timeToWait = TimeValue.min(endpoint.executeEnqueuedTask(), timeToWait);
267-
}
268-
} while (timeToWait.compareTo(TimeValue.ZERO) <= 0);
261+
var timeToWait = settings.getTaskPollFrequency();
262+
for (var endpoint : rateLimitGroupings.values()) {
263+
timeToWait = TimeValue.min(endpoint.executeEnqueuedTask(), timeToWait);
264+
}
269265

270266
scheduleNextHandleTasks(timeToWait);
271267
} catch (Exception e) {
@@ -453,9 +449,11 @@ public synchronized TimeValue executeEnqueuedTask() {
453449
}
454450

455451
private TimeValue executeEnqueuedTaskInternal() {
456-
var timeBeforeAvailableToken = rateLimiter.timeToReserve(1);
457-
if (shouldExecuteImmediately(timeBeforeAvailableToken) == false) {
458-
return timeBeforeAvailableToken;
452+
if (rateLimitSettings.isEnabled()) {
453+
var timeBeforeAvailableToken = rateLimiter.timeToReserve(1);
454+
if (shouldExecuteImmediately(timeBeforeAvailableToken) == false) {
455+
return timeBeforeAvailableToken;
456+
}
459457
}
460458

461459
var task = queue.poll();
@@ -467,9 +465,11 @@ private TimeValue executeEnqueuedTaskInternal() {
467465
return NO_TASKS_AVAILABLE;
468466
}
469467

470-
// We should never have to wait because we checked above
471-
var reserveRes = rateLimiter.reserve(1);
472-
assert shouldExecuteImmediately(reserveRes) : "Reserving request tokens required a sleep when it should not have";
468+
if (rateLimitSettings.isEnabled()) {
469+
// We should never have to wait because we checked above
470+
var reserveRes = rateLimiter.reserve(1);
471+
assert shouldExecuteImmediately(reserveRes) : "Reserving request tokens required a sleep when it should not have";
472+
}
473473

474474
task.getRequestManager()
475475
.execute(task.getInferenceInputs(), requestSender, task.getRequestCompletedFunction(), task.getListener());

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/completion/ElasticInferenceServiceCompletionServiceSettings.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.inference.ServiceSettings;
1717
import org.elasticsearch.xcontent.XContentBuilder;
1818
import org.elasticsearch.xpack.inference.services.ConfigurationParseContext;
19-
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceService;
2019
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceRateLimitServiceSettings;
2120
import org.elasticsearch.xpack.inference.services.settings.FilteredXContentObject;
2221
import org.elasticsearch.xpack.inference.services.settings.RateLimitSettings;
@@ -41,13 +40,7 @@ public static ElasticInferenceServiceCompletionServiceSettings fromMap(Map<Strin
4140
ValidationException validationException = new ValidationException();
4241

4342
String modelId = extractRequiredString(map, MODEL_ID, ModelConfigurations.SERVICE_SETTINGS, validationException);
44-
RateLimitSettings rateLimitSettings = RateLimitSettings.of(
45-
map,
46-
DEFAULT_RATE_LIMIT_SETTINGS,
47-
validationException,
48-
ElasticInferenceService.NAME,
49-
context
50-
);
43+
RateLimitSettings rateLimitSettings = RateLimitSettings.disabledRateLimiting(map);
5144

5245
if (validationException.validationErrors().isEmpty() == false) {
5346
throw validationException;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/rerank/ElasticInferenceServiceRerankServiceSettings.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.inference.ServiceSettings;
1717
import org.elasticsearch.xcontent.XContentBuilder;
1818
import org.elasticsearch.xpack.inference.services.ConfigurationParseContext;
19-
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceService;
2019
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceRateLimitServiceSettings;
2120
import org.elasticsearch.xpack.inference.services.settings.FilteredXContentObject;
2221
import org.elasticsearch.xpack.inference.services.settings.RateLimitSettings;
@@ -41,13 +40,7 @@ public static ElasticInferenceServiceRerankServiceSettings fromMap(Map<String, O
4140
ValidationException validationException = new ValidationException();
4241

4342
String modelId = extractRequiredString(map, MODEL_ID, ModelConfigurations.SERVICE_SETTINGS, validationException);
44-
RateLimitSettings rateLimitSettings = RateLimitSettings.of(
45-
map,
46-
DEFAULT_RATE_LIMIT_SETTINGS,
47-
validationException,
48-
ElasticInferenceService.NAME,
49-
context
50-
);
43+
RateLimitSettings rateLimitSettings = RateLimitSettings.disabledRateLimiting(map);
5144

5245
return new ElasticInferenceServiceRerankServiceSettings(modelId, rateLimitSettings);
5346
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/sparseembeddings/ElasticInferenceServiceSparseEmbeddingsServiceSettings.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.inference.ServiceSettings;
1818
import org.elasticsearch.xcontent.XContentBuilder;
1919
import org.elasticsearch.xpack.inference.services.ConfigurationParseContext;
20-
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceService;
2120
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceRateLimitServiceSettings;
2221
import org.elasticsearch.xpack.inference.services.settings.FilteredXContentObject;
2322
import org.elasticsearch.xpack.inference.services.settings.RateLimitSettings;
@@ -54,13 +53,7 @@ public static ElasticInferenceServiceSparseEmbeddingsServiceSettings fromMap(
5453
validationException
5554
);
5655

57-
RateLimitSettings rateLimitSettings = RateLimitSettings.of(
58-
map,
59-
DEFAULT_RATE_LIMIT_SETTINGS,
60-
validationException,
61-
ElasticInferenceService.NAME,
62-
context
63-
);
56+
RateLimitSettings rateLimitSettings = RateLimitSettings.disabledRateLimiting(map);
6457

6558
if (validationException.validationErrors().isEmpty() == false) {
6659
throw validationException;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/settings/RateLimitSettings.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,14 @@
2626
import java.util.concurrent.TimeUnit;
2727

2828
import static org.elasticsearch.xpack.inference.services.ServiceUtils.extractOptionalPositiveLong;
29+
import static org.elasticsearch.xpack.inference.services.ServiceUtils.removeFromMap;
2930
import static org.elasticsearch.xpack.inference.services.ServiceUtils.removeFromMapOrDefaultEmpty;
3031
import static org.elasticsearch.xpack.inference.services.ServiceUtils.throwIfNotEmptyMap;
3132

3233
public class RateLimitSettings implements Writeable, ToXContentFragment {
3334
public static final String FIELD_NAME = "rate_limit";
3435
public static final String REQUESTS_PER_MINUTE_FIELD = "requests_per_minute";
3536

36-
private final long requestsPerTimeUnit;
37-
private final TimeUnit timeUnit;
38-
3937
public static RateLimitSettings of(
4038
Map<String, Object> map,
4139
RateLimitSettings defaultValue,
@@ -53,9 +51,11 @@ public static RateLimitSettings of(
5351
return requestsPerMinute == null ? defaultValue : new RateLimitSettings(requestsPerMinute);
5452
}
5553

56-
// public static RateLimitSettings disabledRateLimiting() {
57-
//
58-
// }
54+
public static RateLimitSettings disabledRateLimiting(Map<String, Object> map) {
55+
removeFromMap(map, FIELD_NAME);
56+
57+
return new RateLimitSettings(1, TimeUnit.MINUTES, false);
58+
}
5959

6060
public static Map<String, SettingsConfiguration> toSettingsConfigurationWithDescription(
6161
String description,
@@ -79,6 +79,10 @@ public static Map<String, SettingsConfiguration> toSettingsConfiguration(EnumSet
7979
return RateLimitSettings.toSettingsConfigurationWithDescription("Minimize the number of rate limit errors.", supportedTaskTypes);
8080
}
8181

82+
private final long requestsPerTimeUnit;
83+
private final TimeUnit timeUnit;
84+
private final boolean enabled;
85+
8286
/**
8387
* Defines the settings in requests per minute
8488
* @param requestsPerMinute _
@@ -97,22 +101,23 @@ public RateLimitSettings(long requestsPerMinute) {
97101
* Note: The time unit is not serialized
98102
*/
99103
public RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit) {
104+
this(requestsPerTimeUnit, timeUnit, true);
105+
}
106+
107+
// This should only be used for testing.
108+
RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit, boolean enabled) {
100109
if (requestsPerTimeUnit <= 0) {
101110
throw new IllegalArgumentException("requests per minute must be positive");
102111
}
103-
this.requestsPerTimeUnit = requestsPerTimeUnit;
104-
this.timeUnit = Objects.requireNonNull(timeUnit);
105-
}
106-
107-
private RateLimitSettings(long requestsPerTimeUnit, TimeUnit timeUnit, boolean disableRateLimiting) {
108112
this.requestsPerTimeUnit = 0;
109113
this.timeUnit = Objects.requireNonNull(timeUnit);
110-
114+
this.enabled = enabled;
111115
}
112116

113117
public RateLimitSettings(StreamInput in) throws IOException {
114118
requestsPerTimeUnit = in.readVLong();
115119
timeUnit = TimeUnit.MINUTES;
120+
enabled = true;
116121
}
117122

118123
public long requestsPerTimeUnit() {
@@ -123,8 +128,16 @@ public TimeUnit timeUnit() {
123128
return timeUnit;
124129
}
125130

131+
public boolean isEnabled() {
132+
return enabled;
133+
}
134+
126135
@Override
127136
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
137+
if (enabled == false) {
138+
return builder;
139+
}
140+
128141
builder.startObject(FIELD_NAME);
129142
builder.field(REQUESTS_PER_MINUTE_FIELD, requestsPerTimeUnit);
130143
builder.endObject();
@@ -141,11 +154,13 @@ public boolean equals(Object o) {
141154
if (this == o) return true;
142155
if (o == null || getClass() != o.getClass()) return false;
143156
RateLimitSettings that = (RateLimitSettings) o;
144-
return Objects.equals(requestsPerTimeUnit, that.requestsPerTimeUnit) && Objects.equals(timeUnit, that.timeUnit);
157+
return Objects.equals(requestsPerTimeUnit, that.requestsPerTimeUnit)
158+
&& Objects.equals(timeUnit, that.timeUnit)
159+
&& enabled == that.enabled;
145160
}
146161

147162
@Override
148163
public int hashCode() {
149-
return Objects.hash(requestsPerTimeUnit, timeUnit);
164+
return Objects.hash(requestsPerTimeUnit, timeUnit, enabled);
150165
}
151166
}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,36 @@ public void testDoesNotExecuteTask_WhenCannotReserveTokens() {
569569
verifyNoInteractions(requestSender);
570570
}
571571

572+
public void testDoesNotExecuteTask_WhenCannotReserveTokens() {
573+
var mockRateLimiter = mock(RateLimiter.class);
574+
RequestExecutorService.RateLimiterCreator rateLimiterCreator = (a, b, c) -> mockRateLimiter;
575+
576+
var requestSender = mock(RetryingHttpSender.class);
577+
var settings = createRequestExecutorServiceSettings(1);
578+
var service = new RequestExecutorService(
579+
threadPool,
580+
RequestExecutorService.DEFAULT_QUEUE_CREATOR,
581+
null,
582+
settings,
583+
requestSender,
584+
Clock.systemUTC(),
585+
rateLimiterCreator
586+
);
587+
var requestManager = RequestManagerTests.createMock(requestSender);
588+
589+
PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
590+
service.execute(requestManager, new EmbeddingsInput(List.of(), null), null, listener);
591+
592+
doAnswer(invocation -> {
593+
service.shutdown();
594+
return TimeValue.timeValueDays(1);
595+
}).when(mockRateLimiter).timeToReserve(anyInt());
596+
597+
service.start();
598+
599+
verifyNoInteractions(requestSender);
600+
}
601+
572602
public void testDoesNotExecuteTask_WhenCannotReserveTokens_AndThenCanReserve_AndExecutesTask() {
573603
var mockRateLimiter = mock(RateLimiter.class);
574604
when(mockRateLimiter.reserve(anyInt())).thenReturn(TimeValue.timeValueDays(0));

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/settings/RateLimitSettingsTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.Strings;
1212
import org.elasticsearch.common.ValidationException;
1313
import org.elasticsearch.common.io.stream.Writeable;
14+
import org.elasticsearch.common.xcontent.XContentHelper;
1415
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1516
import org.elasticsearch.xcontent.XContentBuilder;
1617
import org.elasticsearch.xcontent.XContentFactory;
@@ -54,6 +55,7 @@ public void testOf() {
5455
var res = RateLimitSettings.of(settings, new RateLimitSettings(1), validation, "test", ConfigurationParseContext.PERSISTENT);
5556

5657
assertThat(res, is(new RateLimitSettings(100)));
58+
assertTrue(res.isEnabled());
5759
assertTrue(validation.validationErrors().isEmpty());
5860
}
5961

@@ -65,6 +67,7 @@ public void testOf_UsesDefaultValue_WhenRateLimit_IsAbsent() {
6567
var res = RateLimitSettings.of(settings, new RateLimitSettings(1), validation, "test", ConfigurationParseContext.PERSISTENT);
6668

6769
assertThat(res, is(new RateLimitSettings(1)));
70+
assertTrue(res.isEnabled());
6871
assertTrue(validation.validationErrors().isEmpty());
6972
}
7073

@@ -74,6 +77,7 @@ public void testOf_UsesDefaultValue_WhenRequestsPerMinute_IsAbsent() {
7477
var res = RateLimitSettings.of(settings, new RateLimitSettings(1), validation, "test", ConfigurationParseContext.PERSISTENT);
7578

7679
assertThat(res, is(new RateLimitSettings(1)));
80+
assertTrue(res.isEnabled());
7781
assertTrue(validation.validationErrors().isEmpty());
7882
}
7983

@@ -102,6 +106,31 @@ public void testToXContent() throws IOException {
102106
{"rate_limit":{"requests_per_minute":100}}"""));
103107
}
104108

109+
public void testDisableRateLimiting() {
110+
Map<String, Object> settings = new HashMap<>(
111+
Map.of(RateLimitSettings.FIELD_NAME, new HashMap<>(Map.of(RateLimitSettings.REQUESTS_PER_MINUTE_FIELD, 100)))
112+
);
113+
var res = RateLimitSettings.disabledRateLimiting(settings);
114+
115+
assertThat(res, is(new RateLimitSettings(1, TimeUnit.MINUTES, false)));
116+
assertFalse(res.isEnabled());
117+
assertThat(settings, is(new HashMap<>()));
118+
}
119+
120+
public void testToXContent_WhenDisabled() throws IOException {
121+
var settings = new RateLimitSettings(1, TimeUnit.MINUTES, false);
122+
123+
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
124+
builder.startObject();
125+
settings.toXContent(builder, null);
126+
builder.endObject();
127+
String xContentResult = Strings.toString(builder);
128+
129+
assertThat(xContentResult, is(XContentHelper.stripWhitespace("""
130+
{
131+
}""")));
132+
}
133+
105134
@Override
106135
protected Writeable.Reader<RateLimitSettings> instanceReader() {
107136
return RateLimitSettings::new;

0 commit comments

Comments
 (0)