diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java index 12f039a5a122f..f312075c0c05d 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java @@ -8,6 +8,7 @@ */ package org.elasticsearch.plugin.noop.action.bulk; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -16,6 +17,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.core.Releasables; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; @@ -25,6 +27,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -78,10 +81,12 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC request.getRestApiVersion() ); + // The actual bulk request items are mutable during the bulk process so we must create a copy + List> toClose = new ArrayList<>(bulkRequest.requests()); // short circuit the call to the transport layer return channel -> { BulkRestBuilderListener listener = new BulkRestBuilderListener(channel, request); - listener.onResponse(bulkRequest); + ActionListener.releaseAfter(listener, () -> Releasables.close(toClose)).onResponse(bulkRequest); }; } diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java index 4ae28e8309007..ee95750135f67 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexSource; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; @@ -25,7 +26,6 @@ import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Strings; import org.elasticsearch.index.Index; @@ -89,7 +89,7 @@ public void testShortCircuitFailure() throws Exception { String coordinatingOnlyNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + final ArrayList contextsToRelease = new ArrayList<>(); IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, coordinatingOnlyNode); try (IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest()) { @@ -97,8 +97,9 @@ public void testShortCircuitFailure() throws Exception { int successfullyStored = 0; while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(DATA_STREAM_NAME)), refCounted::decRef, () -> nextRequested.set(true)); + IndexRequest indexRequest = indexRequest(DATA_STREAM_NAME); + contextsToRelease.add(indexRequest.indexSource()); + handler.addItems(List.of(indexRequest), () -> nextRequested.set(true)); successfullyStored++; } assertBusy(() -> assertTrue(nextRequested.get())); @@ -116,12 +117,13 @@ public void testShortCircuitFailure() throws Exception { while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) { while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); List> requests = new ArrayList<>(); for (int i = 0; i < 20; ++i) { - requests.add(indexRequest(DATA_STREAM_NAME)); + IndexRequest indexRequest = indexRequest(DATA_STREAM_NAME); + contextsToRelease.add(indexRequest.indexSource()); + requests.add(indexRequest); } - handler.addItems(requests, refCounted::decRef, () -> nextRequested.set(true)); + handler.addItems(requests, () -> nextRequested.set(true)); } assertBusy(() -> assertTrue(nextRequested.get())); } @@ -129,16 +131,20 @@ public void testShortCircuitFailure() throws Exception { while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(DATA_STREAM_NAME)), refCounted::decRef, () -> nextRequested.set(true)); + IndexRequest indexRequest = indexRequest(DATA_STREAM_NAME); + contextsToRelease.add(indexRequest.indexSource()); + handler.addItems(List.of(indexRequest), () -> nextRequested.set(true)); } assertBusy(() -> assertTrue(nextRequested.get())); PlainActionFuture future = new PlainActionFuture<>(); - handler.lastItems(List.of(indexRequest(DATA_STREAM_NAME)), refCounted::decRef, future); + IndexRequest lastRequest = indexRequest(DATA_STREAM_NAME); + contextsToRelease.add(lastRequest.indexSource()); + handler.lastItems(List.of(lastRequest), future); BulkResponse bulkResponse = safeGet(future); + assertThat(contextsToRelease.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L)); for (int i = 0; i < bulkResponse.getItems().length; ++i) { // the first requests were successful diff --git a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkRestIT.java b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkRestIT.java index 3faa88339f0a3..4d8cb82a03cb0 100644 --- a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkRestIT.java +++ b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkRestIT.java @@ -52,11 +52,7 @@ public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOExcept } public void testBulkMissingBody() throws IOException { - Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); - request.setJsonEntity(""); - ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); - assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); - assertThat(responseException.getMessage(), containsString("request body is required")); + sendMissingBody(); } public void testBulkInvalidIndexNameString() throws IOException { @@ -79,16 +75,7 @@ public void testBulkInvalidIndexNameString() throws IOException { } public void testBulkRequestBodyImproperlyTerminated() throws IOException { - Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); - // missing final line of the bulk body. cannot process - request.setJsonEntity( - "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" - + "{\"field\":1}\n" - + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}" - ); - ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); - assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); - assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline")); + sendImproperlyTerminated(); } public void testBulkRequest() throws IOException { @@ -156,6 +143,9 @@ public void testBulkWithIncrementalDisabled() throws IOException { try { sendLargeBulk(); + sendMalFormedActionLine(); + sendImproperlyTerminated(); + sendMissingBody(); } finally { internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true)); updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null)); @@ -177,6 +167,31 @@ public void testMalformedActionLineBulk() throws IOException { final Response indexCreatedResponse = getRestClient().performRequest(createRequest); assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + sendMalFormedActionLine(); + } + + private static void sendMissingBody() { + Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); + request.setJsonEntity(""); + ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); + assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); + assertThat(responseException.getMessage(), containsString("request body is required")); + } + + private static void sendImproperlyTerminated() { + Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); + // missing final line of the bulk body. cannot process + request.setJsonEntity( + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" + + "{\"field\":1}\n" + + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}" + ); + ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); + assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); + assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline")); + } + + private static void sendMalFormedActionLine() throws IOException { Request bulkRequest = new Request("POST", "/index_name/_bulk"); final StringBuilder bulk = new StringBuilder(); diff --git a/qa/system-indices/src/main/java/org/elasticsearch/system/indices/SystemIndicesQA.java b/qa/system-indices/src/main/java/org/elasticsearch/system/indices/SystemIndicesQA.java index 46c6d1b9228d6..2cd5907cbf2b9 100644 --- a/qa/system-indices/src/main/java/org/elasticsearch/system/indices/SystemIndicesQA.java +++ b/qa/system-indices/src/main/java/org/elasticsearch/system/indices/SystemIndicesQA.java @@ -180,10 +180,10 @@ public List routes() { protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { var content = request.requiredContent(); IndexRequest indexRequest = new IndexRequest(".net-new-system-index-primary"); - indexRequest.source(content, request.getXContentType()); + indexRequest.indexSource().source(content.retain(), request.getXContentType()); indexRequest.id(request.param("id")); indexRequest.setRefreshPolicy(request.param("refresh")); - return channel -> client.index(indexRequest, ActionListener.withRef(new RestToXContentListener<>(channel), content)); + return channel -> client.index(indexRequest, ActionListener.releaseAfter(new RestToXContentListener<>(channel), indexRequest)); } @Override diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkSourceReleaseIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkSourceReleaseIT.java new file mode 100644 index 0000000000000..64a0350d0bb82 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkSourceReleaseIT.java @@ -0,0 +1,223 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.ingest.IngestClientIT; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 1) +public class BulkSourceReleaseIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(IngestClientIT.ExtendedIngestTestPlugin.class); + } + + public void testBulkSourceReleaseWhenIngestReplacesSource() throws Exception { + String index = "test1"; + createIndex(index); + + String pipelineId = "pipeline_id"; + putPipeline(pipelineId); + + // Get the data node to ensure the ingest pipeline can be performed on this node + IncrementalBulkService incrementalBulkService = internalCluster().getDataNodeInstance(IncrementalBulkService.class); + + ReleasableBytesReference originalBytes = new ReleasableBytesReference(new BytesArray("{\"field\": \"value\"}"), () -> {}); + + IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(index); + indexRequest.indexSource().source(originalBytes, XContentType.JSON); + indexRequest.setPipeline(pipelineId); + + CountDownLatch blockLatch = new CountDownLatch(1); + blockWritePool(internalCluster().getDataNodeInstance(ThreadPool.class), blockLatch); + + PlainActionFuture future = new PlainActionFuture<>(); + + try { + handler.lastItems(List.of(indexRequest), future); + assertBusy(() -> assertFalse(originalBytes.hasReferences())); + } finally { + blockLatch.countDown(); + } + + BulkResponse bulkResponse = safeGet(future); + assertNoFailures(bulkResponse); + } + + public void testBytesReferencedByTwoSourcesNotReleasedIfOnlyOneIngestPipeline() throws Exception { + String index = "test1"; + createIndex(index); + + String pipelineId = "pipeline_id"; + putPipeline(pipelineId); + + // Get the data node to ensure the ingest pipeline can be performed on this node + IncrementalBulkService incrementalBulkService = internalCluster().getDataNodeInstance(IncrementalBulkService.class); + + ReleasableBytesReference originalBytes = new ReleasableBytesReference( + new BytesArray("{\"field\": \"value1\"}{\"field\": \"value2\"}"), + () -> {} + ); + int splitPoint = originalBytes.indexOf((byte) '}', 0) + 1; + + IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(index); + indexRequest.indexSource().source(originalBytes.retainedSlice(0, splitPoint), XContentType.JSON); + indexRequest.setPipeline(pipelineId); + + IndexRequest indexRequestNoIngest = new IndexRequest(); + indexRequestNoIngest.index(index); + indexRequestNoIngest.indexSource() + .source(originalBytes.retainedSlice(splitPoint, originalBytes.length() - splitPoint), XContentType.JSON); + + originalBytes.decRef(); + assertTrue(originalBytes.hasReferences()); + + CountDownLatch blockLatch = new CountDownLatch(1); + blockWritePool(internalCluster().getDataNodeInstance(ThreadPool.class), blockLatch); + + PlainActionFuture future = new PlainActionFuture<>(); + try { + handler.lastItems(List.of(indexRequest, indexRequestNoIngest), future); + + // Pause briefly to allow bytes to theoretically be released after ingest processing + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50)); + + assertTrue(originalBytes.hasReferences()); + } finally { + blockLatch.countDown(); + } + + blockLatch.countDown(); + + BulkResponse bulkResponse = safeGet(future); + assertNoFailures(bulkResponse); + } + + public void testSomeReferencesCanBeReleasedWhileOthersRetained() throws Exception { + String index = "test1"; + createIndex(index); + + String pipelineId = "pipeline_id"; + putPipeline(pipelineId); + + // Get the data node to ensure the ingest pipeline can be performed on this node + IncrementalBulkService incrementalBulkService = internalCluster().getDataNodeInstance(IncrementalBulkService.class); + + ReleasableBytesReference releasedBytes = new ReleasableBytesReference(new BytesArray("{\"field\": \"value1\"}"), () -> {}); + ReleasableBytesReference retainedBytes = new ReleasableBytesReference( + new BytesArray("{\"field\": \"value2\"}{\"field\": \"value3\"}"), + () -> {} + ); + int splitPoint = retainedBytes.indexOf((byte) '}', 0) + 1; + + IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); + IndexRequest indexRequest1 = new IndexRequest(); + indexRequest1.index(index); + indexRequest1.indexSource().source(releasedBytes, XContentType.JSON); + indexRequest1.setPipeline(pipelineId); + + IndexRequest indexRequest2 = new IndexRequest(); + indexRequest2.index(index); + indexRequest2.indexSource().source(retainedBytes.slice(0, splitPoint), XContentType.JSON); + indexRequest2.setPipeline(pipelineId); + + IndexRequest indexRequestNoIngest = new IndexRequest(); + indexRequestNoIngest.index(index); + indexRequestNoIngest.indexSource() + .source(retainedBytes.retainedSlice(splitPoint, retainedBytes.length() - splitPoint), XContentType.JSON); + + assertTrue(retainedBytes.hasReferences()); + + CountDownLatch blockLatch = new CountDownLatch(1); + blockWritePool(internalCluster().getDataNodeInstance(ThreadPool.class), blockLatch); + + PlainActionFuture future = new PlainActionFuture<>(); + try { + handler.lastItems(List.of(indexRequest2, indexRequest1, indexRequestNoIngest), future); + + assertBusy(() -> assertFalse(releasedBytes.hasReferences())); + + assertTrue(retainedBytes.hasReferences()); + } finally { + blockLatch.countDown(); + } + + blockLatch.countDown(); + + BulkResponse bulkResponse = safeGet(future); + assertNoFailures(bulkResponse); + } + + private static void putPipeline(String pipelineId) throws IOException { + BytesReference pipelineSource = BytesReference.bytes( + jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + .endObject() + ); + putJsonPipeline(pipelineId, pipelineSource); + } + + private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) { + final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax(); + final var startBarrier = new CyclicBarrier(threadCount + 1); + final var blockingTask = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + fail(e); + } + + @Override + protected void doRun() { + safeAwait(startBarrier); + safeAwait(finishLatch); + } + + @Override + public boolean isForceExecution() { + return true; + } + }; + for (int i = 0; i < threadCount; i++) { + threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask); + } + safeAwait(startBarrier); + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java index 528ab5760d9a6..9d2257e639bcb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java @@ -11,13 +11,13 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexSource; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; @@ -80,8 +80,7 @@ public void testSingleBulkRequest() { IndexRequest indexRequest = indexRequest(index); PlainActionFuture future = new PlainActionFuture<>(); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); - handler.lastItems(List.of(indexRequest), refCounted::decRef, future); + handler.lastItems(List.of(indexRequest), future); BulkResponse bulkResponse = safeGet(future); assertNoFailures(bulkResponse); @@ -93,7 +92,7 @@ public void testSingleBulkRequest() { assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) 1)); }); - assertFalse(refCounted.hasReferences()); + assertTrue(indexRequest.indexSource().isClosed()); } public void testBufferedResourcesReleasedOnClose() { @@ -107,15 +106,14 @@ public void testBufferedResourcesReleasedOnClose() { IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); IndexRequest indexRequest = indexRequest(index); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); - handler.addItems(List.of(indexRequest), refCounted::decRef, () -> {}); + handler.addItems(List.of(indexRequest), () -> {}); - assertTrue(refCounted.hasReferences()); + assertFalse(indexRequest.indexSource().isClosed()); assertThat(indexingPressure.stats().getCurrentCoordinatingBytes(), greaterThan(0L)); handler.close(); - assertFalse(refCounted.hasReferences()); + assertTrue(indexRequest.indexSource().isClosed()); assertThat(indexingPressure.stats().getCurrentCoordinatingBytes(), equalTo(0L)); } @@ -129,20 +127,20 @@ public void testIndexingPressureRejection() { try (Releasable r = indexingPressure.markCoordinatingOperationStarted(1, indexingPressure.stats().getMemoryLimit(), true)) { IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + ArrayList contextsToClose = new ArrayList<>(); if (randomBoolean()) { AtomicBoolean nextPage = new AtomicBoolean(false); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true)); + handler.addItems(List.of(indexRequest(index, contextsToClose)), () -> nextPage.set(true)); assertTrue(nextPage.get()); } PlainActionFuture future = new PlainActionFuture<>(); - handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); + handler.lastItems(List.of(indexRequest(index, contextsToClose)), future); expectThrows(EsRejectedExecutionException.class, future::actionGet); - assertFalse(refCounted.hasReferences()); + + assertThat(contextsToClose.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L)); } } @@ -156,7 +154,7 @@ public void testIncrementalBulkLowWatermarkBackOff() throws Exception { IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + ArrayList contextsToClose = new ArrayList<>(); AtomicBoolean nextPage = new AtomicBoolean(false); IndexRequest indexRequest = indexRequest(index); @@ -164,8 +162,8 @@ public void testIncrementalBulkLowWatermarkBackOff() throws Exception { long lowWaterMarkSplits = indexingPressure.stats().getLowWaterMarkSplits(); long highWaterMarkSplits = indexingPressure.stats().getHighWaterMarkSplits(); while (total < 2048) { - refCounted.incRef(); - handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true)); + contextsToClose.add(indexRequest.indexSource()); + handler.addItems(List.of(indexRequest), () -> nextPage.set(true)); assertTrue(nextPage.get()); nextPage.set(false); indexRequest = indexRequest(index); @@ -173,19 +171,18 @@ public void testIncrementalBulkLowWatermarkBackOff() throws Exception { } assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L)); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true)); + handler.addItems(List.of(indexRequest(index, contextsToClose)), () -> nextPage.set(true)); assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L))); assertBusy(() -> assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(lowWaterMarkSplits + 1))); assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(highWaterMarkSplits)); PlainActionFuture future = new PlainActionFuture<>(); - handler.lastItems(List.of(indexRequest), refCounted::decRef, future); + handler.lastItems(List.of(indexRequest(index, contextsToClose)), future); BulkResponse bulkResponse = safeGet(future); assertNoFailures(bulkResponse); - assertFalse(refCounted.hasReferences()); + assertThat(contextsToClose.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L)); } public void testIncrementalBulkHighWatermarkBackOff() throws Exception { @@ -199,28 +196,26 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception { long lowWaterMarkSplits = indexingPressure.stats().getLowWaterMarkSplits(); long highWaterMarkSplits = indexingPressure.stats().getHighWaterMarkSplits(); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + ArrayList contextsToClose = new ArrayList<>(); AtomicBoolean nextPage = new AtomicBoolean(false); ArrayList handlers = new ArrayList<>(); for (int i = 0; i < 4; ++i) { ArrayList> requests = new ArrayList<>(); - add512BRequests(requests, index); + add512BRequests(requests, contextsToClose, index); IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); handlers.add(handler); - refCounted.incRef(); - handler.addItems(requests, refCounted::decRef, () -> nextPage.set(true)); + handler.addItems(requests, () -> nextPage.set(true)); assertTrue(nextPage.get()); nextPage.set(false); } // Test that a request smaller than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is not throttled ArrayList> requestsNoThrottle = new ArrayList<>(); - add512BRequests(requestsNoThrottle, index); + add512BRequests(requestsNoThrottle, contextsToClose, index); IncrementalBulkService.Handler handlerNoThrottle = incrementalBulkService.newBulkRequest(); handlers.add(handlerNoThrottle); - refCounted.incRef(); - handlerNoThrottle.addItems(requestsNoThrottle, refCounted::decRef, () -> nextPage.set(true)); + handlerNoThrottle.addItems(requestsNoThrottle, () -> nextPage.set(true)); assertTrue(nextPage.get()); nextPage.set(false); assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(highWaterMarkSplits)); @@ -228,14 +223,13 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception { ArrayList> requestsThrottle = new ArrayList<>(); // Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled - add512BRequests(requestsThrottle, index); - add512BRequests(requestsThrottle, index); + add512BRequests(requestsThrottle, contextsToClose, index); + add512BRequests(requestsThrottle, contextsToClose, index); CountDownLatch finishLatch = new CountDownLatch(1); blockWriteCoordinationPool(threadPool, finishLatch); IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest(); - refCounted.incRef(); - handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true)); + handlerThrottled.addItems(requestsThrottle, () -> nextPage.set(true)); assertFalse(nextPage.get()); finishLatch.countDown(); @@ -247,16 +241,14 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception { assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(lowWaterMarkSplits)); for (IncrementalBulkService.Handler h : handlers) { - refCounted.incRef(); PlainActionFuture future = new PlainActionFuture<>(); - h.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); + h.lastItems(List.of(indexRequest(index, contextsToClose)), future); BulkResponse bulkResponse = safeGet(future); assertNoFailures(bulkResponse); } assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L))); - refCounted.decRef(); - assertFalse(refCounted.hasReferences()); + assertThat(contextsToClose.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L)); } public void testMultipleBulkPartsWithBackoff() { @@ -306,10 +298,10 @@ public void testGlobalBulkFailure() throws InterruptedException { ); } else { PlainActionFuture future = new PlainActionFuture<>(); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); - handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); - assertFalse(refCounted.hasReferences()); + IndexRequest indexRequest = indexRequest(index); + handler.lastItems(List.of(indexRequest), future); expectThrows(EsRejectedExecutionException.class, future::actionGet); + assertTrue(indexRequest.indexSource().isClosed()); } } } @@ -325,7 +317,7 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, randomNodeName); ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, randomNodeName); IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + ArrayList contextsToClose = new ArrayList<>(); PlainActionFuture future = new PlainActionFuture<>(); CountDownLatch blockingLatch1 = new CountDownLatch(1); @@ -336,8 +328,7 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except blockWriteCoordinationPool(threadPool, blockingLatch1); while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextRequested.set(true)); + handler.addItems(List.of(indexRequest(index, contextsToClose)), () -> nextRequested.set(true)); hits.incrementAndGet(); } } finally { @@ -351,7 +342,7 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except blockWriteCoordinationPool(threadPool, blockingLatch2); fillWriteCoordinationQueue(threadPool); - handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); + handler.lastItems(List.of(indexRequest(index, contextsToClose)), future); } finally { blockingLatch2.countDown(); } @@ -379,7 +370,7 @@ public void testShortCircuitShardLevelFailure() throws Exception { String coordinatingOnlyNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + ArrayList contextsToClose = new ArrayList<>(); IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, coordinatingOnlyNode); IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); @@ -387,8 +378,7 @@ public void testShortCircuitShardLevelFailure() throws Exception { AtomicLong hits = new AtomicLong(0); while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextRequested.set(true)); + handler.addItems(List.of(indexRequest(index, contextsToClose)), () -> nextRequested.set(true)); hits.incrementAndGet(); } @@ -403,12 +393,11 @@ public void testShortCircuitShardLevelFailure() throws Exception { while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) { while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); List> requests = new ArrayList<>(); for (int i = 0; i < 20; ++i) { - requests.add(indexRequest(index)); + requests.add(indexRequest(index, contextsToClose)); } - handler.addItems(requests, refCounted::decRef, () -> nextRequested.set(true)); + handler.addItems(requests, () -> nextRequested.set(true)); } assertBusy(() -> assertTrue(nextRequested.get())); } @@ -416,14 +405,13 @@ public void testShortCircuitShardLevelFailure() throws Exception { while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextRequested.set(true)); + handler.addItems(List.of(indexRequest(index, contextsToClose)), () -> nextRequested.set(true)); } assertBusy(() -> assertTrue(nextRequested.get())); PlainActionFuture future = new PlainActionFuture<>(); - handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); + handler.lastItems(List.of(indexRequest(index, contextsToClose)), future); BulkResponse bulkResponse = safeGet(future); assertTrue(bulkResponse.hasFailures()); @@ -441,6 +429,8 @@ public void testShortCircuitShardLevelFailure() throws Exception { assertThat(item.getFailure().getCause().getCause(), instanceOf(EsRejectedExecutionException.class)); } } + + assertThat(contextsToClose.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L)); } public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exception { @@ -478,7 +468,7 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio // a node with the ingest role. String coordinatingOnlyNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + ArrayList contextsToClose = new ArrayList<>(); IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, coordinatingOnlyNode); IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); @@ -486,8 +476,7 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio AtomicLong hits = new AtomicLong(0); while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(index).setPipeline(pipelineId)), refCounted::decRef, () -> nextRequested.set(true)); + handler.addItems(List.of(indexRequest(index, contextsToClose).setPipeline(pipelineId)), () -> nextRequested.set(true)); hits.incrementAndGet(); } @@ -500,8 +489,7 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio try (Releasable releasable = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) { while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(index).setPipeline(pipelineId)), refCounted::decRef, () -> nextRequested.set(true)); + handler.addItems(List.of(indexRequest(index, contextsToClose).setPipeline(pipelineId)), () -> nextRequested.set(true)); } assertBusy(() -> assertTrue(nextRequested.get())); @@ -509,14 +497,13 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio while (nextRequested.get()) { nextRequested.set(false); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(index).setPipeline(pipelineId)), refCounted::decRef, () -> nextRequested.set(true)); + handler.addItems(List.of(indexRequest(index, contextsToClose).setPipeline(pipelineId)), () -> nextRequested.set(true)); } assertBusy(() -> assertTrue(nextRequested.get())); PlainActionFuture future = new PlainActionFuture<>(); - handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); + handler.lastItems(List.of(indexRequest(index, contextsToClose)), future); BulkResponse bulkResponse = safeGet(future); assertTrue(bulkResponse.hasFailures()); @@ -529,6 +516,8 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio assertTrue(item.isFailed()); assertThat(item.getFailure().getCause().getCause(), instanceOf(EsRejectedExecutionException.class)); } + + assertThat(contextsToClose.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L)); } private static void blockWriteCoordinationPool(ThreadPool threadPool, CountDownLatch finishLatch) { @@ -583,13 +572,13 @@ public boolean isForceExecution() { } private BulkResponse executeBulk(long docs, String index, IncrementalBulkService.Handler handler, ExecutorService executorService) { - ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue<>(); + List contextsToClose = new ArrayList<>(); + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); for (int i = 0; i < docs; i++) { - IndexRequest indexRequest = indexRequest(index); + IndexRequest indexRequest = indexRequest(index, contextsToClose); queue.add(indexRequest); } - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); PlainActionFuture future = new PlainActionFuture<>(); Runnable r = new Runnable() { @@ -602,10 +591,9 @@ public void run() { } if (queue.isEmpty()) { - handler.lastItems(docs, refCounted::decRef, future); + handler.lastItems(docs, future); } else { - refCounted.incRef(); - handler.addItems(docs, refCounted::decRef, () -> executorService.execute(this)); + handler.addItems(docs, () -> executorService.execute(this)); } } }; @@ -613,20 +601,26 @@ public void run() { executorService.execute(r); BulkResponse bulkResponse = future.actionGet(); - assertFalse(refCounted.hasReferences()); + assertThat(contextsToClose.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L)); return bulkResponse; } - private static void add512BRequests(ArrayList> requests, String index) { + private static void add512BRequests(ArrayList> requests, ArrayList contextsToClose, String index) { long total = 0; while (total < 512) { - IndexRequest indexRequest = indexRequest(index); + IndexRequest indexRequest = indexRequest(index, contextsToClose); requests.add(indexRequest); total += indexRequest.ramBytesUsed(); } assertThat(total, lessThan(1024L)); } + private static IndexRequest indexRequest(String index, List contextsToClose) { + IndexRequest indexRequest = indexRequest(index); + contextsToClose.add(indexRequest.indexSource()); + return indexRequest; + } + private static IndexRequest indexRequest(String index) { IndexRequest indexRequest = new IndexRequest(); indexRequest.index(index); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java index 290f299df5a4c..632fe202b6ed8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexSource; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -25,7 +26,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.VersionType; @@ -704,14 +704,14 @@ public void testIncrementalBulkLowWatermarkSplitMetrics() throws Exception { IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + ArrayList contextsToClose = new ArrayList<>(); AtomicBoolean nextPage = new AtomicBoolean(false); IndexRequest indexRequest = indexRequest(index); long total = indexRequest.ramBytesUsed(); while (total < 2048) { - refCounted.incRef(); - handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true)); + contextsToClose.add(indexRequest.indexSource()); + handler.addItems(List.of(indexRequest), () -> nextPage.set(true)); assertTrue(nextPage.get()); nextPage.set(false); indexRequest = indexRequest(index); @@ -737,8 +737,9 @@ public void testIncrementalBulkLowWatermarkSplitMetrics() throws Exception { equalTo(0L) ); - refCounted.incRef(); - handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true)); + indexRequest = indexRequest(index); + contextsToClose.add(indexRequest.indexSource()); + handler.addItems(List.of(indexRequest), () -> nextPage.set(true)); assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L))); assertBusy(() -> assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(1L))); @@ -761,11 +762,13 @@ public void testIncrementalBulkLowWatermarkSplitMetrics() throws Exception { ); PlainActionFuture future = new PlainActionFuture<>(); - handler.lastItems(List.of(indexRequest), refCounted::decRef, future); + IndexRequest lastRequest = indexRequest(index); + contextsToClose.add(lastRequest.indexSource()); + handler.lastItems(List.of(lastRequest), future); BulkResponse bulkResponse = safeGet(future); assertNoFailures(bulkResponse); - assertFalse(refCounted.hasReferences()); + assertThat(contextsToClose.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L)); } // Borrowed this test from IncrementalBulkIT and added test for metrics to it @@ -792,28 +795,26 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception { .orElseThrow(); testTelemetryPlugin.resetMeter(); - AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); + ArrayList contextsToClose = new ArrayList<>(); AtomicBoolean nextPage = new AtomicBoolean(false); ArrayList handlers = new ArrayList<>(); for (int i = 0; i < 4; ++i) { ArrayList> requests = new ArrayList<>(); - add512BRequests(requests, index); + add512BRequests(requests, contextsToClose, index); IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); handlers.add(handler); - refCounted.incRef(); - handler.addItems(requests, refCounted::decRef, () -> nextPage.set(true)); + handler.addItems(requests, () -> nextPage.set(true)); assertTrue(nextPage.get()); nextPage.set(false); } // Test that a request smaller than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is not throttled ArrayList> requestsNoThrottle = new ArrayList<>(); - add512BRequests(requestsNoThrottle, index); + add512BRequests(requestsNoThrottle, contextsToClose, index); IncrementalBulkService.Handler handlerNoThrottle = incrementalBulkService.newBulkRequest(); handlers.add(handlerNoThrottle); - refCounted.incRef(); - handlerNoThrottle.addItems(requestsNoThrottle, refCounted::decRef, () -> nextPage.set(true)); + handlerNoThrottle.addItems(requestsNoThrottle, () -> nextPage.set(true)); assertTrue(nextPage.get()); nextPage.set(false); assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(0L)); @@ -836,14 +837,13 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception { ArrayList> requestsThrottle = new ArrayList<>(); // Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled - add512BRequests(requestsThrottle, index); - add512BRequests(requestsThrottle, index); + add512BRequests(requestsThrottle, contextsToClose, index); + add512BRequests(requestsThrottle, contextsToClose, index); CountDownLatch finishLatch = new CountDownLatch(1); blockWriteCoordinationPool(threadPool, finishLatch); IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest(); - refCounted.incRef(); - handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true)); + handlerThrottled.addItems(requestsThrottle, () -> nextPage.set(true)); assertFalse(nextPage.get()); finishLatch.countDown(); @@ -871,16 +871,16 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception { ); for (IncrementalBulkService.Handler h : handlers) { - refCounted.incRef(); PlainActionFuture future = new PlainActionFuture<>(); - h.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); + IndexRequest indexRequest = indexRequest(index); + contextsToClose.add(indexRequest.indexSource()); + h.lastItems(List.of(indexRequest), future); BulkResponse bulkResponse = safeGet(future); assertNoFailures(bulkResponse); } assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L))); - refCounted.decRef(); - assertFalse(refCounted.hasReferences()); + assertThat(contextsToClose.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L)); testTelemetryPlugin.collect(); } @@ -909,10 +909,11 @@ private static IndexRequest indexRequest(String index) { return indexRequest; } - private static void add512BRequests(ArrayList> requests, String index) { + private static void add512BRequests(ArrayList> requests, ArrayList contextsToClose, String index) { long total = 0; while (total < 512) { IndexRequest indexRequest = indexRequest(index); + contextsToClose.add(indexRequest.indexSource()); requests.add(indexRequest); total += indexRequest.ramBytesUsed(); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/rest/action/document/RestBulkActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/rest/action/document/RestBulkActionIT.java index d0b5ec4562903..8e35116102c14 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/rest/action/document/RestBulkActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/rest/action/document/RestBulkActionIT.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.not; public class RestBulkActionIT extends ESIntegTestCase { + @Override protected boolean addMockHttpTransport() { return false; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index 2f336566953ba..c8ea5476ee7d2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -107,11 +108,12 @@ private static int findNextMarker(byte marker, int from, BytesReference data, bo * Returns the sliced {@link BytesReference}. If the {@link XContentType} is JSON, the byte preceding the marker is checked to see * if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored */ - private static BytesReference sliceTrimmingCarriageReturn( - BytesReference bytesReference, + private static ReleasableBytesReference sliceTrimmingCarriageReturn( + ReleasableBytesReference bytesReference, int from, int nextMarker, - XContentType xContentType + XContentType xContentType, + boolean retain ) { final int length; if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') { @@ -119,7 +121,7 @@ private static BytesReference sliceTrimmingCarriageReturn( } else { length = nextMarker - from; } - return bytesReference.slice(from, length); + return retain ? bytesReference.retainedSlice(from, length) : bytesReference.slice(from, length); } /** @@ -157,7 +159,12 @@ public void parse( deleteRequestConsumer ); - incrementalParser.parse(data, true); + incrementalParser.parse( + data instanceof ReleasableBytesReference releasableBytesReference + ? releasableBytesReference + : ReleasableBytesReference.wrap(data), + true + ); } public IncrementalParser incrementalParser( @@ -251,7 +258,7 @@ private IncrementalParser( this.deleteRequestConsumer = deleteRequestConsumer; } - public int parse(BytesReference data, boolean lastData) throws IOException { + public int parse(ReleasableBytesReference data, boolean lastData) throws IOException { if (failure != null) { assert false : failure.getMessage(); throw new IllegalStateException("Parser has already encountered exception", failure); @@ -264,7 +271,7 @@ public int parse(BytesReference data, boolean lastData) throws IOException { } } - private int tryParse(BytesReference data, boolean lastData) throws IOException { + private int tryParse(ReleasableBytesReference data, boolean lastData) throws IOException { int from = 0; int consumed = 0; @@ -523,16 +530,16 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO return true; } - private void parseAndConsumeDocumentLine(BytesReference data, int from, int to) throws IOException { + private void parseAndConsumeDocumentLine(ReleasableBytesReference data, int from, int to) throws IOException { assert currentRequest != null && currentRequest instanceof DeleteRequest == false; if (currentRequest instanceof IndexRequest indexRequest) { - indexRequest.source(sliceTrimmingCarriageReturn(data, from, to, xContentType), xContentType); + indexRequest.indexSource().source(sliceTrimmingCarriageReturn(data, from, to, xContentType, true), xContentType); indexRequestConsumer.accept(indexRequest, currentType); } else if (currentRequest instanceof UpdateRequest updateRequest) { try ( XContentParser sliceParser = createParser( xContentType.xContent(), - sliceTrimmingCarriageReturn(data, from, to, xContentType) + sliceTrimmingCarriageReturn(data, from, to, xContentType, false) ) ) { updateRequest.fromXContent(sliceParser); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index b721a8f4f2b6b..f97920ea38a15 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -154,15 +154,15 @@ public void updateWaitForChunkMetrics(long chunkWaitTimeInMillis) { chunkWaitTimeMillisHistogram.record(chunkWaitTimeInMillis); } - public void addItems(List> items, Releasable releasable, Runnable nextItems) { + public void addItems(List> items, Runnable nextItems) { assert closed == false; assert bulkInProgress == false; if (bulkActionLevelFailure != null) { - shortCircuitDueToTopLevelFailure(items, releasable); + shortCircuitDueToTopLevelFailure(items); nextItems.run(); } else { assert bulkRequest != null; - if (internalAddItems(items, releasable)) { + if (internalAddItems(items)) { Optional maybeSplit = incrementalOperation.maybeSplit(); if (maybeSplit.isPresent()) { Releasable coordinating = maybeSplit.get(); @@ -200,14 +200,14 @@ public void onFailure(Exception e) { } } - public void lastItems(List> items, Releasable releasable, ActionListener listener) { + public void lastItems(List> items, ActionListener listener) { assert bulkInProgress == false; if (bulkActionLevelFailure != null) { - shortCircuitDueToTopLevelFailure(items, releasable); + shortCircuitDueToTopLevelFailure(items); errorResponse(listener); } else { assert bulkRequest != null; - if (internalAddItems(items, releasable)) { + if (internalAddItems(items)) { Releasable coordinating = incrementalOperation.split(); final ArrayList toRelease = new ArrayList<>(releasables); releasables.clear(); @@ -248,14 +248,14 @@ public void close() { } } - private void shortCircuitDueToTopLevelFailure(List> items, Releasable releasable) { + private void shortCircuitDueToTopLevelFailure(List> items) { assert releasables.isEmpty(); assert incrementalOperation.currentOperationsSize() == 0; assert bulkRequest == null; if (globalFailure == false) { addItemLevelFailures(items); } - Releasables.close(releasable); + Releasables.close(items); } private void errorResponse(ActionListener listener) { @@ -290,10 +290,10 @@ private void addItemLevelFailures(List> items) { responses.add(new BulkResponse(bulkItemResponses, 0, 0)); } - private boolean internalAddItems(List> items, Releasable releasable) { + private boolean internalAddItems(List> items) { try { bulkRequest.add(items); - releasables.add(releasable); + releasables.addAll(items); long size = items.stream().mapToLong(Accountable::ramBytesUsed).sum(); incrementalOperation.increment(items.size(), size); return true; diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexSource.java b/server/src/main/java/org/elasticsearch/action/index/IndexSource.java index d0eb5213ac8bc..378ae1f3cd1c3 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexSource.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexSource.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; @@ -33,7 +34,7 @@ public class IndexSource implements Writeable, Releasable { private XContentType contentType; - private BytesReference source; + private ReleasableBytesReference source; private boolean isClosed = false; public IndexSource() {} @@ -75,6 +76,11 @@ public BytesReference bytes() { return source; } + public ReleasableBytesReference retainedBytes() { + assert isClosed == false; + return source.retain(); + } + public boolean hasSource() { assert isClosed == false; return source != null; @@ -93,6 +99,7 @@ public boolean isClosed() { public void close() { assert isClosed == false; isClosed = true; + Releasables.close(source); source = null; contentType = null; } @@ -243,8 +250,14 @@ public void source(byte[] source, int offset, int length, XContentType contentTy } private void setSource(BytesReference source, XContentType contentType) { + setSource(ReleasableBytesReference.wrap(source), contentType); + } + + private void setSource(ReleasableBytesReference source, XContentType contentType) { assert isClosed == false; + Releasable toClose = this.source; this.source = source; this.contentType = contentType; + Releasables.close(toClose); } } diff --git a/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java index ff8e68d462829..b7d567a89d3dd 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java @@ -84,6 +84,16 @@ public ReleasableBytesReference retain() { return this; } + /** + * Similar to {@link #retain} except that it retains the current instance and then returns a new instance with its own dedicated + * ref-count. This is primarily useful if you are splitting off a reference for a different purpose and want to separate the ref-counts + * for less contention or making the counts easier to reason about. + */ + public ReleasableBytesReference retainChild() { + refCounted.incRef(); + return new ReleasableBytesReference(delegate, refCounted::decRef); + } + /** * Same as {@link #slice} except that the slice is not guaranteed to share the same underlying reference count as this instance. * This method is equivalent to calling {@code .slice(from, length).retain()} but might be more efficient through the avoidance of diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 20a9fb7ed23aa..111d9b4d24148 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -18,12 +18,10 @@ import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.node.NodeClient; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.BaseRestHandler; @@ -113,11 +111,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC bulkRequest.setRefreshPolicy(request.param("refresh")); bulkRequest.includeSourceOnError(RestUtils.getIncludeSourceOnError(request)); bulkRequest.requestParamsUsed(request.params().keySet()); - ReleasableBytesReference content = request.requiredContent(); try { bulkRequest.add( - content, + request.requiredContent(), defaultIndex, defaultRouting, defaultFetchSourceContext, @@ -130,12 +127,16 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC request.getRestApiVersion() ); } catch (Exception e) { + Releasables.close(bulkRequest.requests()); return channel -> new RestToXContentListener<>(channel).onFailure(parseFailureException(e)); } - return channel -> { - content.mustIncRef(); - client.bulk(bulkRequest, ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), content)); - }; + + // The actual bulk request items are mutable during the bulk process so we must create a copy + List> toClose = new ArrayList<>(bulkRequest.requests()); + return channel -> client.bulk( + bulkRequest, + ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), () -> Releasables.close(toClose)) + ); } else { request.ensureContent(); String waitForActiveShards = request.param("wait_for_active_shards"); @@ -217,25 +218,33 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo return; } - final BytesReference data; int bytesConsumed; if (chunk.length() == 0) { chunk.close(); bytesConsumed = 0; } else { + final ReleasableBytesReference data; try { handler.getIncrementalOperation().incrementUnparsedBytes(chunk.length()); unParsedChunks.add(chunk); if (unParsedChunks.size() > 1) { - data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0])); + ReleasableBytesReference[] components = unParsedChunks.toArray(new ReleasableBytesReference[0]); + for (ReleasableBytesReference reference : components) { + reference.incRef(); + } + data = new ReleasableBytesReference(CompositeBytesReference.of(components), () -> Releasables.close(components)); } else { - data = chunk; + // We are creating a dedicated ref count for the application layer here. Otherwise, the leak tracking infrastructure + // will be hit every time a source is released for each doc. We still have leak tracking since the child references + // the parent. There is just a single reference instead of one for every doc. + data = chunk.retainChild(); } - bytesConsumed = parser.parse(data, isLast); - handler.getIncrementalOperation().transferUnparsedBytesToParsed(bytesConsumed); - + try (ReleasableBytesReference toClose = data) { + bytesConsumed = parser.parse(data, isLast); + handler.getIncrementalOperation().transferUnparsedBytesToParsed(bytesConsumed); + } } catch (Exception e) { shortCircuit(); new RestToXContentListener<>(channel).onFailure(parseFailureException(e)); @@ -243,7 +252,8 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo } } - final ArrayList releasables = accountParsing(bytesConsumed); + releaseConsumeBytes(bytesConsumed); + if (isLast) { assert unParsedChunks.isEmpty(); if (handler.getIncrementalOperation().totalParsedBytes() == 0) { @@ -253,19 +263,18 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo assert channel != null; ArrayList> toPass = new ArrayList<>(items); items.clear(); - handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); + handler.lastItems(toPass, new RestRefCountedChunkedToXContentListener<>(channel)); } handler.updateWaitForChunkMetrics(TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTimeInNanos)); totalChunkWaitTimeInNanos = 0L; } else if (items.isEmpty() == false) { ArrayList> toPass = new ArrayList<>(items); items.clear(); - handler.addItems(toPass, () -> Releasables.close(releasables), () -> { + handler.addItems(toPass, () -> { requestNextChunkTime = System.nanoTime(); request.contentStream().next(); }); } else { - Releasables.close(releasables); requestNextChunkTime = System.nanoTime(); request.contentStream().next(); } @@ -283,23 +292,24 @@ private void shortCircuit() { shortCircuited = true; Releasables.close(handler); Releasables.close(unParsedChunks); + Releasables.close(items); + items.clear(); unParsedChunks.clear(); } - private ArrayList accountParsing(int bytesConsumed) { - ArrayList releasables = new ArrayList<>(unParsedChunks.size()); + private void releaseConsumeBytes(int bytesConsumed) { while (bytesConsumed > 0) { ReleasableBytesReference reference = unParsedChunks.removeFirst(); - releasables.add(reference); if (bytesConsumed >= reference.length()) { bytesConsumed -= reference.length(); } else { unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed)); bytesConsumed = 0; } + reference.close(); } - return releasables; } + } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index ce22ce65b2011..1799635193c36 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -157,7 +157,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC indexRequest, ActionListener.releaseAfter( new RestToXContentListener<>(channel, DocWriteResponse::status, r -> r.getLocation(indexRequest.routing())), - source + indexRequest ) ); }; diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index 4eb5140004ea6..266bfbbd3bf78 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_6,8840008 +transform_check_for_dangling_tasks,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 476468b203875..3600b3f8c633a 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_3,8841067 +transform_check_for_dangling_tasks,8841070 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv index f8f50cc6d7839..c11e6837bb813 100644 --- a/server/src/main/resources/transport/upper_bounds/9.0.csv +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -1 +1 @@ -initial_elasticsearch_9_0_6,9000015 +transform_check_for_dangling_tasks,9000018 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 5a65f2e578156..80b97d85f7511 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_elasticsearch_9_1_4,9112007 +transform_check_for_dangling_tasks,9112009 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index e24f914a1d1ca..2147eab66c207 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ml_inference_endpoint_cache,9157000 +initial_9.2.0,9185000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv new file mode 100644 index 0000000000000..2147eab66c207 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -0,0 +1 @@ +initial_9.2.0,9185000 diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java index de2dc8f9ea0b5..b5f2697dd537b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.test.ESTestCase; @@ -34,10 +35,10 @@ public class BulkRequestParserTests extends ESTestCase { .toList(); public void testParserCannotBeReusedAfterFailure() { - BytesArray request = new BytesArray(""" + ReleasableBytesReference request = ReleasableBytesReference.wrap(new BytesArray(""" { "index":{ }, "something": "unexpected" } {} - """); + """)); BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser( @@ -58,10 +59,10 @@ public void testParserCannotBeReusedAfterFailure() { IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, false)); assertEquals("Malformed action/metadata line [1], expected END_OBJECT but found [FIELD_NAME]", ex.getMessage()); - BytesArray valid = new BytesArray(""" + ReleasableBytesReference valid = ReleasableBytesReference.wrap(new BytesArray(""" { "index":{ "_id": "bar" } } {} - """); + """)); expectThrows(AssertionError.class, () -> incrementalParser.parse(valid, false)); } @@ -86,7 +87,7 @@ public void testIncrementalParsing() throws IOException { deleteRequests::add ); - BytesArray request = new BytesArray(""" + ReleasableBytesReference request = new ReleasableBytesReference(new BytesArray(""" { "index":{ "_id": "bar", "pipeline": "foo" } } { "field": "value"} { "index":{ "require_alias": false } } @@ -97,7 +98,7 @@ public void testIncrementalParsing() throws IOException { { "index": { } } { "field": "value"} { "delete":{ "_id": "bop" } } - """); + """), () -> {}); int consumed = 0; for (int i = 0; i < request.length() - 1; ++i) { @@ -106,9 +107,21 @@ public void testIncrementalParsing() throws IOException { consumed += incrementalParser.parse(request.slice(consumed, request.length() - consumed), true); assertThat(consumed, equalTo(request.length())); + request.decRef(); + + // 3 Index request retaining + assertTrue(request.hasReferences()); + assertThat(indexRequests.size(), equalTo(3)); assertThat(updateRequests.size(), equalTo(1)); assertThat(deleteRequests.size(), equalTo(2)); + + for (DocWriteRequest req : indexRequests) { + req.close(); + } + + // Deletes and updates do not retain (upsert source is copied out opposed to sliced) + assertFalse(request.hasReferences()); } public void testIndexRequest() throws IOException { @@ -255,9 +268,12 @@ public void testBarfOnLackOfTrailingNewline() throws IOException { ); // Should not throw because not last - incrementalParser.parse(request, false); + incrementalParser.parse(ReleasableBytesReference.wrap(request), false); - IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, true)); + IllegalArgumentException e2 = expectThrows( + IllegalArgumentException.class, + () -> incrementalParser.parse(ReleasableBytesReference.wrap(request), true) + ); assertEquals("The bulk request must be terminated by a newline [\\n]", e2.getMessage()); } diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index 8bf4ee5c9cf1c..042b5d6e33f38 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -24,7 +24,7 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.rest.RestChannel; @@ -256,15 +256,15 @@ public void next() { ) { @Override - public void addItems(List> items, Releasable releasable, Runnable nextItems) { - releasable.close(); + public void addItems(List> items, Runnable nextItems) { docs.addAll(items); + Releasables.close(items); } @Override - public void lastItems(List> items, Releasable releasable, ActionListener listener) { - releasable.close(); + public void lastItems(List> items, ActionListener listener) { docs.addAll(items); + Releasables.close(items); isLast.set(true); } }; diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java index aef7b9089992b..80223eb5888f9 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -729,38 +730,39 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons IndexSource indexSource = indexRequest.indexSource(); int originalSourceSize = indexSource.byteLength(); - BytesReference originalSource = indexSource.bytes(); - if (useLegacyFormat) { - var newDocMap = indexSource.sourceAsMap(); - for (var entry : inferenceFieldsMap.entrySet()) { - XContentMapValues.insertValue(entry.getKey(), newDocMap, entry.getValue()); + try (ReleasableBytesReference originalSource = indexSource.retainedBytes()) { + if (useLegacyFormat) { + var newDocMap = indexSource.sourceAsMap(); + for (var entry : inferenceFieldsMap.entrySet()) { + XContentMapValues.insertValue(entry.getKey(), newDocMap, entry.getValue()); + } + indexSource.source(newDocMap, indexSource.contentType()); + } else { + try (XContentBuilder builder = XContentBuilder.builder(indexSource.contentType().xContent())) { + appendSourceAndInferenceMetadata(builder, indexSource.bytes(), indexSource.contentType(), inferenceFieldsMap); + indexSource.source(builder); + } } - indexSource.source(newDocMap, indexSource.contentType()); - } else { - try (XContentBuilder builder = XContentBuilder.builder(indexSource.contentType().xContent())) { - appendSourceAndInferenceMetadata(builder, indexSource.bytes(), indexSource.contentType(), inferenceFieldsMap); - indexSource.source(builder); + long modifiedSourceSize = indexSource.byteLength(); + + // Add the indexing pressure from the source modifications. + // Don't increment operation count because we count one source update as one operation, and we already accounted for those + // in addFieldInferenceRequests. + try { + coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSourceSize); + } catch (EsRejectedExecutionException e) { + indexSource.source(originalSource.retain(), indexSource.contentType()); + item.abort( + item.index(), + new InferenceException( + "Unable to insert inference results into document [" + + indexRequest.id() + + "] due to memory pressure. Please retry the bulk request with fewer documents or smaller document sizes.", + e + ) + ); } } - long modifiedSourceSize = indexSource.byteLength(); - - // Add the indexing pressure from the source modifications. - // Don't increment operation count because we count one source update as one operation, and we already accounted for those - // in addFieldInferenceRequests. - try { - coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSourceSize); - } catch (EsRejectedExecutionException e) { - indexSource.source(originalSource, indexSource.contentType()); - item.abort( - item.index(), - new InferenceException( - "Unable to insert inference results into document [" - + indexRequest.id() - + "] due to memory pressure. Please retry the bulk request with fewer documents or smaller document sizes.", - e - ) - ); - } } }