Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
bc62301
Added circuit breaker
Mikep86 Mar 19, 2025
0a58daa
Pass circuit breaker to action filter
Mikep86 Mar 20, 2025
a77516f
Estimate memory usage before performing inference
Mikep86 Mar 20, 2025
fee2c90
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Mar 20, 2025
83395f4
Reset circuit breaker on completion of request handling
Mikep86 Mar 20, 2025
f84ff66
Calculate actual memory usage
Mikep86 Mar 20, 2025
8698ecd
Spotless
Mikep86 Mar 20, 2025
c7b1af1
Added TODOs
Mikep86 Mar 20, 2025
6458bb3
Added more comments
Mikep86 Mar 20, 2025
6b7db55
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Mar 21, 2025
f4a4689
Track memory usage of requests that don't perform inference
Mikep86 Mar 21, 2025
c74ca3c
Fix test failures
Mikep86 Mar 21, 2025
f5e8a94
Add circuit breaker unit test
Mikep86 Mar 21, 2025
1d5e5bd
Circuit breaker test development
Mikep86 Mar 21, 2025
d93050f
Fix memory usage tracking in estimateMemoryUsage
Mikep86 Mar 24, 2025
2480955
Make circuit breaker limit setting dynamically updatable
Mikep86 Mar 24, 2025
5d76384
Updated estimateMemoryUsage to throw InferenceException
Mikep86 Mar 24, 2025
4bcff47
Updated InferenceException to retain the original message when it is …
Mikep86 Mar 24, 2025
080ae60
Added circuit breaker trips on estimated inference bytes unit test
Mikep86 Mar 24, 2025
30e0a08
[CI] Auto commit changes from spotless
Mar 24, 2025
8939687
Increment byte counters after updating breaker
Mikep86 Mar 28, 2025
6071cb0
Check that circuit breaker usage is 0
Mikep86 Mar 28, 2025
0507d94
Add indexing pressure to plugin services
Mikep86 Mar 28, 2025
a643877
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Mar 28, 2025
a6200d5
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Apr 1, 2025
c211ff6
Pass indexing pressure to action filter
Mikep86 Apr 1, 2025
a106f87
Pass coordinating object to AsyncBulkShardInferenceAction
Mikep86 Apr 1, 2025
4a2976c
Use coordinating indexing pressure in ShardBulkInferenceActionFilter
Mikep86 Apr 1, 2025
56a5f97
Update circuit breaker test
Mikep86 Apr 1, 2025
111bb1f
Update circuit breaker trips on estimated inference bytes test
Mikep86 Apr 1, 2025
02d70a0
Remove inference bytes circuit breaker
Mikep86 Apr 1, 2025
bf9d118
Adjust coordinating indexing pressure lifetime
Mikep86 Apr 2, 2025
456fc59
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 2, 2025
fee592b
Account for indexing pressure from source in batches
Mikep86 Apr 3, 2025
437ca6b
Account for indexing pressure from empty chunk inference updates
Mikep86 Apr 4, 2025
96f4037
Add indexing pressure from source modifications
Mikep86 Apr 4, 2025
fcf7387
Fix testIndexingPressure
Mikep86 Apr 4, 2025
e5f64ff
Fix testIndexingPressureTripsOnEstimatedInferenceBytes
Mikep86 Apr 4, 2025
b183b20
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Apr 4, 2025
3349168
Cleanup
Mikep86 Apr 4, 2025
8a28093
Fix compilation errors
Mikep86 Apr 4, 2025
bf420c8
Added unit test
Mikep86 Apr 4, 2025
2e479a9
Revert changes to InferenceException
Mikep86 Apr 4, 2025
2330e3e
Resolve TODO
Mikep86 Apr 4, 2025
0bb32cc
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 4, 2025
0065a87
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 7, 2025
0ff48cf
Resolve TODOs
Mikep86 Apr 8, 2025
ecb8e02
Pass indexing pressure in constructor
Mikep86 Apr 8, 2025
6a3a4fd
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 8, 2025
f4aef73
Added partial failure test
Mikep86 Apr 8, 2025
118c27f
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 8, 2025
8cc4402
Update docs/changelog/125517.yaml
Mikep86 Apr 11, 2025
0a138f2
Fix changelog
Mikep86 Apr 11, 2025
e8742d2
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Apr 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,24 @@
package org.elasticsearch.xpack.inference;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.rest.RestStatus;

public class InferenceException extends ElasticsearchException implements ElasticsearchWrapperException {
public class InferenceException extends ElasticsearchException {
public InferenceException(String message, Throwable cause, Object... args) {
super(message, cause, args);
}

@Override
public RestStatus status() {
// Override status so that we get the status of the cause while retaining the message of the inference exception when emitting to
// XContent
RestStatus status = RestStatus.INTERNAL_SERVER_ERROR;
Throwable cause = getCause();
if (cause != null) {
status = ExceptionsHelper.status(cause);
}

return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,29 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.inference.InferenceServiceExtension;
import org.elasticsearch.inference.InferenceServiceRegistry;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicensedFeature;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.node.PluginComponentBinding;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.CircuitBreakerPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.MapperPlugin;
Expand Down Expand Up @@ -157,7 +161,8 @@ public class InferencePlugin extends Plugin
MapperPlugin,
SearchPlugin,
InternalSearchPlugin,
ClusterPlugin {
ClusterPlugin,
CircuitBreakerPlugin {

/**
* When this setting is true the verification check that
Expand All @@ -175,6 +180,15 @@ public class InferencePlugin extends Plugin
Setting.Property.Dynamic
);

public static final String INFERENCE_BYTES_CIRCUIT_BREAKER_NAME = "inference.bytes";
public static final double INFERENCE_BYTES_CIRCUIT_BREAKER_OVERHEAD = 1.0;
public static final Setting<ByteSizeValue> INFERENCE_BYTES_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting(
"indices.breaker.inference.bytes.limit",
"25%",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final LicensedFeature.Momentary INFERENCE_API_FEATURE = LicensedFeature.momentary(
"inference",
"api",
Expand All @@ -199,6 +213,7 @@ public class InferencePlugin extends Plugin
private final SetOnce<ElasticInferenceServiceComponents> elasticInferenceServiceComponents = new SetOnce<>();
private final SetOnce<InferenceServiceRegistry> inferenceServiceRegistry = new SetOnce<>();
private final SetOnce<ShardBulkInferenceActionFilter> shardBulkInferenceActionFilter = new SetOnce<>();
private final SetOnce<CircuitBreaker> inferenceBytesBreaker = new SetOnce<>();
private List<InferenceServiceExtension> inferenceServiceExtensions;

public InferencePlugin(Settings settings) {
Expand Down Expand Up @@ -324,8 +339,14 @@ public Collection<?> createComponents(PluginServices services) {
inferenceServiceRegistry.set(serviceRegistry);

var actionFilter = new ShardBulkInferenceActionFilter(services.clusterService(), serviceRegistry, modelRegistry, getLicenseState());
actionFilter.setInferenceBytesCircuitBreaker(inferenceBytesBreaker.get());
shardBulkInferenceActionFilter.set(actionFilter);

services.clusterService().getClusterSettings().addSettingsUpdateConsumer(
INFERENCE_BYTES_CIRCUIT_BREAKER_LIMIT_SETTING,
this::updateCircuitBreaker
);

var meterRegistry = services.telemetryProvider().getMeterRegistry();
var inferenceStats = new PluginComponentBinding<>(InferenceStats.class, InferenceStats.create(meterRegistry));

Expand Down Expand Up @@ -467,6 +488,7 @@ public List<Setting<?>> getSettings() {
settings.add(SKIP_VALIDATE_AND_START);
settings.add(INDICES_INFERENCE_BATCH_SIZE);
settings.addAll(ElasticInferenceServiceSettings.getSettingsDefinitions());
settings.add(INFERENCE_BYTES_CIRCUIT_BREAKER_LIMIT_SETTING);

return settings;
}
Expand Down Expand Up @@ -538,6 +560,32 @@ public Map<String, Highlighter> getHighlighters() {
return Map.of(SemanticTextHighlighter.NAME, new SemanticTextHighlighter());
}

@Override
public BreakerSettings getCircuitBreaker(Settings settings) {
return new BreakerSettings(
INFERENCE_BYTES_CIRCUIT_BREAKER_NAME,
INFERENCE_BYTES_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(),
INFERENCE_BYTES_CIRCUIT_BREAKER_OVERHEAD,
CircuitBreaker.Type.MEMORY,
CircuitBreaker.Durability.TRANSIENT
);
}

@Override
public void setCircuitBreaker(CircuitBreaker circuitBreaker) {
assert circuitBreaker.getName().equals(INFERENCE_BYTES_CIRCUIT_BREAKER_NAME);
this.inferenceBytesBreaker.set(circuitBreaker);
}

private void updateCircuitBreaker(ByteSizeValue inferenceBytesLimit) {
CircuitBreaker circuitBreaker = this.inferenceBytesBreaker.get();
if (circuitBreaker == null) {
throw new IllegalStateException("[" + INFERENCE_BYTES_CIRCUIT_BREAKER_NAME + "] circuit breaker not set");
}

circuitBreaker.setLimitAndOverhead(inferenceBytesLimit.getBytes(), INFERENCE_BYTES_CIRCUIT_BREAKER_OVERHEAD);
}

@Override
public void onNodeStarted() {
var registry = inferenceServiceRegistry.get();
Expand Down
Loading