diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 8713eb023dad9..81ff1925182eb 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -22,6 +22,8 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.telemetry.metric.LongHistogram; +import org.elasticsearch.telemetry.metric.MeterRegistry; import java.util.ArrayList; import java.util.Collections; @@ -33,6 +35,7 @@ import static org.elasticsearch.common.settings.Setting.boolSetting; public class IncrementalBulkService { + public static final String CHUNK_WAIT_TIME_HISTOGRAM_NAME = "es.rest.incremental_bulk.wait_for_next_chunk.duration.histogram"; public static final Setting INCREMENTAL_BULK = boolSetting( "rest.incremental_bulk", @@ -44,9 +47,17 @@ public class IncrementalBulkService { private final AtomicBoolean enabledForTests = new AtomicBoolean(true); private final IndexingPressure indexingPressure; - public IncrementalBulkService(Client client, IndexingPressure indexingPressure) { + /* Capture in milliseconds because the APM histogram only has a range of 100,000 */ + private final LongHistogram chunkWaitTimeMillisHistogram; + + public IncrementalBulkService(Client client, IndexingPressure indexingPressure, MeterRegistry meterRegistry) { this.client = client; this.indexingPressure = indexingPressure; + this.chunkWaitTimeMillisHistogram = meterRegistry.registerLongHistogram( + CHUNK_WAIT_TIME_HISTOGRAM_NAME, + "Total time in millis spent waiting for next chunk of a bulk request", + "ms" + ); } public Handler newBulkRequest() { @@ -56,7 +67,7 @@ public Handler newBulkRequest() { public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { ensureEnabled(); - return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh); + return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram); } private void ensureEnabled() { @@ -99,6 +110,8 @@ public static class Handler implements Releasable { private final ArrayList releasables = new ArrayList<>(4); private final ArrayList responses = new ArrayList<>(2); private final IndexingPressure.Incremental incrementalOperation; + // Ideally this should be in RestBulkAction, but it's harder to inject the metric registry there + private final LongHistogram chunkWaitTimeMillisHistogram; private boolean closed = false; private boolean globalFailure = false; private boolean incrementalRequestSubmitted = false; @@ -111,13 +124,15 @@ protected Handler( IndexingPressure indexingPressure, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, - @Nullable String refresh + @Nullable String refresh, + LongHistogram chunkWaitTimeMillisHistogram ) { this.client = client; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; this.refresh = refresh; this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false); + this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram; createNewBulkRequest(EMPTY_STATE); } @@ -125,6 +140,10 @@ public IndexingPressure.Incremental getIncrementalOperation() { return incrementalOperation; } + public void updateWaitForChunkMetrics(long chunkWaitTimeInMillis) { + chunkWaitTimeMillisHistogram.record(chunkWaitTimeInMillis); + } + public void addItems(List> items, Releasable releasable, Runnable nextItems) { assert closed == false; assert bulkInProgress == false; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 0c0e260fc16da..f4ec7f245e78b 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -997,7 +997,11 @@ public Map queryFields() { .map(TerminationHandlerProvider::handler); terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService( + client, + indexingLimits, + telemetryProvider.getMeterRegistry() + ); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); modules.bindToInstance(ResponseCollectorService.class, responseCollectorService); diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index ae6c549cd3019..f638367b85e76 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -165,6 +166,9 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { private final ArrayDeque unParsedChunks = new ArrayDeque<>(4); private final ArrayList> items = new ArrayList<>(4); + private long requestNextChunkTime; + private long totalChunkWaitTimeInNanos = 0L; + ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier handlerSupplier) { this.request = request; this.handlerSupplier = handlerSupplier; @@ -189,6 +193,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { public void accept(RestChannel restChannel) { this.restChannel = restChannel; this.handler = handlerSupplier.get(); + requestNextChunkTime = System.nanoTime(); request.contentStream().next(); } @@ -196,6 +201,12 @@ public void accept(RestChannel restChannel) { public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) { assert handler != null; assert channel == restChannel; + long now = System.nanoTime(); + long elapsedTime = now - requestNextChunkTime; + if (elapsedTime > 0) { + totalChunkWaitTimeInNanos += elapsedTime; + requestNextChunkTime = now; + } if (shortCircuited) { chunk.close(); return; @@ -239,12 +250,18 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo items.clear(); handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); } + handler.updateWaitForChunkMetrics(TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTimeInNanos)); + totalChunkWaitTimeInNanos = 0L; } else if (items.isEmpty() == false) { ArrayList> toPass = new ArrayList<>(items); items.clear(); - handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next()); + handler.addItems(toPass, () -> Releasables.close(releasables), () -> { + requestNextChunkTime = System.nanoTime(); + request.contentStream().next(); + }); } else { Releasables.close(releasables); + requestNextChunkTime = System.nanoTime(); request.contentStream().next(); } } diff --git a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java index 94e8b8b870f33..e7e0dbbc4fe18 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -130,7 +131,7 @@ public void testSetupRestHandlerContainsKnownBuiltin() { List.of(), List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null), + new IncrementalBulkService(null, null, MeterRegistry.NOOP), TestProjectResolvers.alwaysThrow() ); actionModule.initRestHandlers(null, null); @@ -197,7 +198,7 @@ public String getName() { List.of(), List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null), + new IncrementalBulkService(null, null, MeterRegistry.NOOP), TestProjectResolvers.alwaysThrow() ); Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null)); @@ -257,7 +258,7 @@ public List getRestHandlers( List.of(), List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null), + new IncrementalBulkService(null, null, MeterRegistry.NOOP), TestProjectResolvers.alwaysThrow() ); actionModule.initRestHandlers(null, null); @@ -310,7 +311,7 @@ public void test3rdPartyHandlerIsNotInstalled() { List.of(), List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null), + new IncrementalBulkService(null, null, MeterRegistry.NOOP), TestProjectResolvers.alwaysThrow() ) ); @@ -354,7 +355,7 @@ public void test3rdPartyRestControllerIsNotInstalled() { List.of(), List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null), + new IncrementalBulkService(null, null, MeterRegistry.NOOP), TestProjectResolvers.alwaysThrow() ) ); diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index 0166dda1827a8..d8b6f0fc96ae8 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; @@ -1182,7 +1183,7 @@ public Collection getRestHeaders() { List.of(), List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null), + new IncrementalBulkService(null, null, MeterRegistry.NOOP), TestProjectResolvers.alwaysThrow() ); } diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index b8edb8be6d88c..f4d601c7ad3b4 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpNodeClient; import org.elasticsearch.test.rest.FakeRestChannel; @@ -68,7 +69,7 @@ public void bulk(BulkRequest request, ActionListener listener) { new RestBulkAction( settings(IndexVersion.current()).build(), ClusterSettings.createBuiltInClusterSettings(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray(""" {"index":{"_id":"1"}} @@ -104,7 +105,7 @@ public void bulk(BulkRequest request, ActionListener listener) { new RestBulkAction( settings(IndexVersion.current()).build(), ClusterSettings.createBuiltInClusterSettings(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -129,7 +130,7 @@ public void bulk(BulkRequest request, ActionListener listener) { new RestBulkAction( settings(IndexVersion.current()).build(), ClusterSettings.createBuiltInClusterSettings(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -153,7 +154,7 @@ public void bulk(BulkRequest request, ActionListener listener) { new RestBulkAction( settings(IndexVersion.current()).build(), ClusterSettings.createBuiltInClusterSettings(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -178,7 +179,7 @@ public void bulk(BulkRequest request, ActionListener listener) { new RestBulkAction( settings(IndexVersion.current()).build(), ClusterSettings.createBuiltInClusterSettings(), - new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class)) + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -233,7 +234,14 @@ public void next() { IndexingPressure indexingPressure = new IndexingPressure(Settings.EMPTY); RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler(true, request, () -> { - return new IncrementalBulkService.Handler(null, indexingPressure, null, null, null) { + return new IncrementalBulkService.Handler( + null, + indexingPressure, + null, + null, + null, + MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME) + ) { @Override public void addItems(List> items, Releasable releasable, Runnable nextItems) { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index fd590630094f2..0faac6422cffb 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -63,6 +63,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.script.ScriptService; import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.MockLog; @@ -947,7 +948,7 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc List.of(), List.of(), RestExtension.allowAll(), - new IncrementalBulkService(null, null), + new IncrementalBulkService(null, null, MeterRegistry.NOOP), TestProjectResolvers.alwaysThrow() ); actionModule.initRestHandlers(null, null);