Skip to content

Commit 3e5eed8

Browse files
authored
Add indexing pressure tracking to OTLP endpoints (elastic#144009)
Wire IndexingPressure into AbstractOTLPRestAction so protobuf request bodies are streamed and accumulated under coordinating memory pressure, bounded by http.max_protobuf_content_length. Refactor IndexingPressureAwareContentAggregator to own the full reservation lifecycle: it now takes IndexingPressure directly and performs the reservation in accept(), routing failures through the onFailure callback instead of throwing. This ensures both 413 (body too large) and 429 (pressure rejected) produce format-appropriate responses for OTLP and Prometheus endpoints. Add AbstractOTLPRestActionTests with coverage for success, empty body, transport failure, 413, and 429 paths.
1 parent 1cb9630 commit 3e5eed8

File tree

12 files changed

+492
-34
lines changed

12 files changed

+492
-34
lines changed

docs/changelog/144009.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: TSDB
2+
issues: []
3+
pr: 144009
4+
summary: Add indexing pressure tracking to OTLP endpoints
5+
type: enhancement

server/src/main/java/org/elasticsearch/rest/IndexingPressureAwareContentAggregator.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@
2727
* which must be fully accumulated before processing. It provides backpressure by reserving memory
2828
* up front and rejecting oversized requests with a 413 status.
2929
* <p>
30-
* The caller must reserve memory via {@link IndexingPressure#markCoordinatingOperationStarted} before
31-
* constructing this aggregator. If the reservation fails ({@code EsRejectedExecutionException}),
32-
* the caller should let it propagate to produce a 429 response.
30+
* When {@link #accept(RestChannel)} is called, the aggregator reserves memory via
31+
* {@link IndexingPressure#markCoordinatingOperationStarted}. If the reservation fails
32+
* (e.g. {@code EsRejectedExecutionException} under heavy load), the {@link CompletionHandler#onFailure}
33+
* callback is invoked so the caller can produce a format-appropriate error response (e.g. protobuf).
3334
* <p>
3435
* Once all chunks are accumulated, the reservation is lowered to the actual size and the
3536
* {@link CompletionHandler} is invoked with the aggregated content and the pressure reservation
@@ -73,7 +74,8 @@ public interface CompletionHandler {
7374
void onComplete(RestChannel channel, ReleasableBytesReference content, Releasable indexingPressureRelease);
7475

7576
/**
76-
* Called when the request body exceeds the maximum allowed size.
77+
* Called when a failure occurs during content accumulation, such as the request body
78+
* exceeding the maximum allowed size or the indexing pressure reservation being rejected.
7779
*
7880
* @param channel the REST channel for sending the error response
7981
* @param e the exception describing the failure
@@ -82,31 +84,39 @@ public interface CompletionHandler {
8284
}
8385

8486
private final RestRequest request;
85-
private final IndexingPressure.Coordinating coordinating;
87+
private final IndexingPressure indexingPressure;
8688
private final long maxRequestSize;
8789
private final CompletionHandler completionHandler;
8890
private final BodyPostProcessor bodyPostProcessor;
8991

92+
private IndexingPressure.Coordinating coordinating;
9093
private ArrayList<ReleasableBytesReference> chunks;
9194
private long accumulatedSize;
9295
private boolean closed;
9396

9497
public IndexingPressureAwareContentAggregator(
9598
RestRequest request,
96-
IndexingPressure.Coordinating coordinating,
99+
IndexingPressure indexingPressure,
97100
long maxRequestSize,
98101
CompletionHandler completionHandler,
99102
BodyPostProcessor bodyPostProcessor
100103
) {
101104
this.request = request;
102-
this.coordinating = coordinating;
105+
this.indexingPressure = indexingPressure;
103106
this.maxRequestSize = maxRequestSize;
104107
this.completionHandler = completionHandler;
105108
this.bodyPostProcessor = Objects.requireNonNull(bodyPostProcessor);
106109
}
107110

108111
@Override
109112
public void accept(RestChannel channel) {
113+
try {
114+
coordinating = indexingPressure.markCoordinatingOperationStarted(1, maxRequestSize, false);
115+
} catch (Exception e) {
116+
closed = true;
117+
completionHandler.onFailure(channel, e);
118+
return;
119+
}
110120
request.contentStream().next();
111121
}
112122

server/src/test/java/org/elasticsearch/rest/IndexingPressureAwareContentAggregatorTests.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.bytes.CompositeBytesReference;
1414
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1515
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.common.unit.ByteSizeValue;
1617
import org.elasticsearch.core.Releasable;
1718
import org.elasticsearch.http.HttpBody;
1819
import org.elasticsearch.index.IndexingPressure;
@@ -163,6 +164,53 @@ public void testBodyExactlyAtMaxSizeSucceeds() {
163164
assertEquals(0, indexingPressure.stats().getCurrentCoordinatingBytes());
164165
}
165166

167+
public void testRejectsWhenIndexingPressureLimitExceeded() {
168+
long limitBytes = 1024;
169+
var tightPressure = new IndexingPressure(
170+
Settings.builder().put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), ByteSizeValue.ofBytes(limitBytes)).build()
171+
);
172+
var request = newStreamedRequest(stream);
173+
channel = new FakeRestChannel(request, true, 1);
174+
var failureRef = new AtomicReference<Exception>();
175+
aggregator = new IndexingPressureAwareContentAggregator(
176+
request,
177+
tightPressure,
178+
limitBytes + 1,
179+
new IndexingPressureAwareContentAggregator.CompletionHandler() {
180+
@Override
181+
public void onComplete(RestChannel ch, ReleasableBytesReference content, Releasable pressure) {
182+
fail("should not complete");
183+
}
184+
185+
@Override
186+
public void onFailure(RestChannel ch, Exception e) {
187+
failureRef.set(e);
188+
}
189+
},
190+
IndexingPressureAwareContentAggregator.BodyPostProcessor.NOOP
191+
);
192+
stream.setHandler(new HttpBody.ChunkHandler() {
193+
@Override
194+
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
195+
aggregator.handleChunk(channel, chunk, isLast);
196+
}
197+
198+
@Override
199+
public void close() {
200+
aggregator.streamClose();
201+
}
202+
});
203+
try {
204+
aggregator.accept(channel);
205+
} catch (Exception e) {
206+
throw new AssertionError(e);
207+
}
208+
209+
assertNotNull("onFailure should have been called", failureRef.get());
210+
assertNull(contentRef.get());
211+
assertEquals(0, tightPressure.stats().getCurrentCoordinatingBytes());
212+
}
213+
166214
public void testReducesPressureToActualSize() {
167215
long maxSize = 1024;
168216
int actualSize = 100;
@@ -257,10 +305,9 @@ private void initAggregator(long maxSize) {
257305
private void initAggregator(long maxSize, IndexingPressureAwareContentAggregator.BodyPostProcessor postProcessor) {
258306
var request = newStreamedRequest(stream);
259307
channel = new FakeRestChannel(request, true, 1);
260-
var coordinating = indexingPressure.markCoordinatingOperationStarted(1, maxSize, false);
261308
aggregator = new IndexingPressureAwareContentAggregator(
262309
request,
263-
coordinating,
310+
indexingPressure,
264311
maxSize,
265312
new IndexingPressureAwareContentAggregator.CompletionHandler() {
266313
@Override

x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/AbstractOTLPIndexingRestIT.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
1212
import io.opentelemetry.sdk.resources.Resource;
1313

14+
import org.apache.http.entity.ByteArrayEntity;
15+
import org.apache.http.entity.ContentType;
1416
import org.elasticsearch.client.Request;
17+
import org.elasticsearch.client.ResponseException;
1518
import org.elasticsearch.common.settings.SecureString;
1619
import org.elasticsearch.common.settings.Settings;
1720
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -26,6 +29,7 @@
2629
import java.io.IOException;
2730

2831
import static io.opentelemetry.api.common.AttributeKey.stringKey;
32+
import static org.hamcrest.Matchers.equalTo;
2933

3034
public abstract class AbstractOTLPIndexingRestIT extends ESRestTestCase {
3135

@@ -55,6 +59,9 @@ protected Settings restClientSettings() {
5559
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
5660
}
5761

62+
/** The OTLP endpoint path for this signal type, e.g. {@code "/_otlp/v1/metrics"}. */
63+
protected abstract String otlpEndpointPath();
64+
5865
@Before
5966
@Override
6067
public void setUp() throws Exception {
@@ -68,6 +75,20 @@ public void tearDown() throws Exception {
6875
super.tearDown();
6976
}
7077

78+
/**
79+
* A request body exceeding the default {@code http.max_protobuf_content_length} (8MiB) must be rejected with 413.
80+
* Uses the main {@link #cluster} where the coordinating limit is not tight, so the upfront reservation
81+
* of 8MiB succeeds and the body size check is what triggers the rejection.
82+
*/
83+
public void testOversizedRequestReturns413() throws Exception {
84+
// 9MiB exceeds the default 8MiB http.max_protobuf_content_length
85+
byte[] oversizedBody = new byte[9 * 1024 * 1024];
86+
Request request = new Request("POST", otlpEndpointPath());
87+
request.setEntity(new ByteArrayEntity(oversizedBody, ContentType.create("application/x-protobuf")));
88+
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request));
89+
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(413));
90+
}
91+
7192
protected static String createApiKey(String indexPattern) throws IOException {
7293
Request createApiKeyRequest = new Request("POST", "/_security/api_key");
7394
createApiKeyRequest.setJsonEntity("""
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.oteldata.otlp;
9+
10+
import org.apache.http.HttpHost;
11+
import org.apache.http.entity.ByteArrayEntity;
12+
import org.apache.http.entity.ContentType;
13+
import org.elasticsearch.client.Request;
14+
import org.elasticsearch.client.ResponseException;
15+
import org.elasticsearch.client.RestClient;
16+
import org.elasticsearch.common.settings.SecureString;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.util.concurrent.ThreadContext;
19+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
20+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
21+
import org.elasticsearch.test.rest.ESRestTestCase;
22+
import org.junit.ClassRule;
23+
24+
import static org.hamcrest.Matchers.equalTo;
25+
26+
/**
27+
* Standalone IT for the 429 indexing-pressure rejection. Uses a single cluster whose
28+
* coordinating limit (4MiB) is below the OTLP handler's upfront reservation of
29+
* {@code http.max_protobuf_content_length} (8MiB), so every OTLP request is rejected
30+
* with 429 before a single byte of the body is read.
31+
*
32+
* This class intentionally does NOT extend {@link AbstractOTLPIndexingRestIT} so that
33+
* signal-specific ITs (e.g. {@link OTLPMetricsIndexingRestIT}) never pay the cost of
34+
* starting this pressure cluster.
35+
*/
36+
public class OTLPIndexingPressureRestIT extends ESRestTestCase {
37+
38+
private static final String USER = "test_admin";
39+
private static final String PASS = "x-pack-test-password";
40+
41+
@ClassRule
42+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
43+
.distribution(DistributionType.DEFAULT)
44+
.user(USER, PASS, "superuser", false)
45+
.setting("xpack.security.enabled", "true")
46+
.setting("xpack.security.autoconfiguration.enabled", "false")
47+
.setting("xpack.license.self_generated.type", "trial")
48+
.setting("xpack.ml.enabled", "false")
49+
.setting("xpack.watcher.enabled", "false")
50+
.setting("indexing_pressure.memory.coordinating.limit", "4mb")
51+
.build();
52+
53+
@Override
54+
protected String getTestRestCluster() {
55+
return cluster.getHttpAddresses();
56+
}
57+
58+
@Override
59+
protected Settings restClientSettings() {
60+
String token = basicAuthHeaderValue(USER, new SecureString(PASS.toCharArray()));
61+
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
62+
}
63+
64+
/**
65+
* Any OTLP request is rejected with 429 because the handler's upfront reservation of
66+
* {@code http.max_protobuf_content_length} (8MiB) exceeds the 4MiB coordinating limit.
67+
* The body size is irrelevant — the 429 fires before any bytes are read.
68+
*/
69+
public void testIndexingPressureLimitReturns429() throws Exception {
70+
byte[] body = new byte[1024];
71+
Request request = new Request("POST", "/_otlp/v1/metrics");
72+
request.setEntity(new ByteArrayEntity(body, ContentType.create("application/x-protobuf")));
73+
HttpHost[] hosts = parseClusterHosts(cluster.getHttpAddresses()).toArray(HttpHost[]::new);
74+
try (RestClient pressureClient = buildClient(restClientSettings(), hosts)) {
75+
ResponseException e = expectThrows(ResponseException.class, () -> pressureClient.performRequest(request));
76+
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(429));
77+
}
78+
}
79+
}

x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public class OTLPMetricsIndexingRestIT extends AbstractOTLPIndexingRestIT {
6161
private OtlpHttpMetricExporter exporter;
6262
private SdkMeterProvider meterProvider;
6363

64+
@Override
65+
protected String otlpEndpointPath() {
66+
return "/_otlp/v1/metrics";
67+
}
68+
6469
@Before
6570
@Override
6671
public void setUp() throws Exception {

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/OTelPlugin.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.elasticsearch.common.settings.Setting;
1616
import org.elasticsearch.common.settings.Settings;
1717
import org.elasticsearch.features.NodeFeature;
18+
import org.elasticsearch.http.HttpTransportSettings;
19+
import org.elasticsearch.index.IndexingPressure;
1820
import org.elasticsearch.plugins.ActionPlugin;
1921
import org.elasticsearch.plugins.Plugin;
2022
import org.elasticsearch.rest.RestHandler;
@@ -55,10 +57,13 @@ public enum HistogramMappingSettingValues {
5557
private static final Logger logger = LogManager.getLogger(OTelPlugin.class);
5658

5759
private final SetOnce<OTelIndexTemplateRegistry> registry = new SetOnce<>();
60+
private final SetOnce<IndexingPressure> indexingPressure = new SetOnce<>();
5861
private final boolean enabled;
62+
private final long maxProtobufContentLengthBytes;
5963

6064
public OTelPlugin(Settings settings) {
6165
this.enabled = XPackSettings.OTEL_DATA_ENABLED.get(settings);
66+
this.maxProtobufContentLengthBytes = HttpTransportSettings.SETTING_HTTP_MAX_PROTOBUF_CONTENT_LENGTH.get(settings).getBytes();
6267
}
6368

6469
@Override
@@ -67,14 +72,16 @@ public Collection<RestHandler> getRestHandlers(
6772
Supplier<DiscoveryNodes> nodesInCluster,
6873
Predicate<NodeFeature> clusterSupportsFeature
6974
) {
70-
return List.of(new OTLPMetricsRestAction());
75+
assert indexingPressure.get() != null : "indexing pressure must be set";
76+
return List.of(new OTLPMetricsRestAction(indexingPressure.get(), maxProtobufContentLengthBytes));
7177
}
7278

7379
@Override
7480
public Collection<?> createComponents(PluginServices services) {
7581
logger.info("OTel ingest plugin is {}", enabled ? "enabled" : "disabled");
7682
Settings settings = services.environment().settings();
7783
ClusterService clusterService = services.clusterService();
84+
indexingPressure.set(services.indexingPressure());
7885
registry.set(
7986
new OTelIndexTemplateRegistry(settings, clusterService, services.threadPool(), services.client(), services.xContentRegistry())
8087
);

0 commit comments

Comments
 (0)