Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -33,6 +35,7 @@
import static org.elasticsearch.common.settings.Setting.boolSetting;

public class IncrementalBulkService {
public static final String CHUNK_WAIT_TIME_HISTOGRAM_NAME = "es.rest.incremental_bulk.wait_for_next_chunk.duration.histogram";

public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
"rest.incremental_bulk",
Expand All @@ -44,9 +47,17 @@ public class IncrementalBulkService {
private final AtomicBoolean enabledForTests = new AtomicBoolean(true);
private final IndexingPressure indexingPressure;

public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
/* Capture in milliseconds because the APM histogram only has a range of 100,000 */
private final LongHistogram chunkWaitTimeMillisHistogram;

public IncrementalBulkService(Client client, IndexingPressure indexingPressure, MeterRegistry meterRegistry) {
this.client = client;
this.indexingPressure = indexingPressure;
this.chunkWaitTimeMillisHistogram = meterRegistry.registerLongHistogram(
CHUNK_WAIT_TIME_HISTOGRAM_NAME,
"Total time in millis spent waiting for next chunk of a bulk request",
"ms"
);
}

public Handler newBulkRequest() {
Expand All @@ -56,7 +67,7 @@ public Handler newBulkRequest() {

public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
ensureEnabled();
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram);
}

private void ensureEnabled() {
Expand Down Expand Up @@ -99,6 +110,8 @@ public static class Handler implements Releasable {
private final ArrayList<Releasable> releasables = new ArrayList<>(4);
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
private final IndexingPressure.Incremental incrementalOperation;
// Ideally this should be in RestBulkAction, but it's harder to inject the metric registry there
private final LongHistogram chunkWaitTimeMillisHistogram;
private boolean closed = false;
private boolean globalFailure = false;
private boolean incrementalRequestSubmitted = false;
Expand All @@ -111,20 +124,26 @@ protected Handler(
IndexingPressure indexingPressure,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
@Nullable String refresh,
LongHistogram chunkWaitTimeMillisHistogram
) {
this.client = client;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout;
this.refresh = refresh;
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram;
createNewBulkRequest(EMPTY_STATE);
}

public IndexingPressure.Incremental getIncrementalOperation() {
return incrementalOperation;
}

public void updateWaitForChunkMetrics(long chunkWaitTimeInMillis) {
chunkWaitTimeMillisHistogram.record(chunkWaitTimeInMillis);
}

public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
assert closed == false;
assert bulkInProgress == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,11 @@ public Map<String, String> queryFields() {
.map(TerminationHandlerProvider::handler);
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);

final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
client,
indexingLimits,
telemetryProvider.getMeterRegistry()
);

final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
modules.bindToInstance(ResponseCollectorService.class, responseCollectorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.elasticsearch.rest.RestRequest.Method.POST;
Expand Down Expand Up @@ -157,6 +158,9 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);

private long requestNextChunkTime;
private long totalChunkWaitTimeInNanos = 0L;

ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
this.request = request;
this.handlerSupplier = handlerSupplier;
Expand All @@ -181,13 +185,20 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
public void accept(RestChannel restChannel) {
this.restChannel = restChannel;
this.handler = handlerSupplier.get();
requestNextChunkTime = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We often pass LongSupplier instead of the real time measurement method, it makes testing easier.

request.contentStream().next();
}

@Override
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
assert handler != null;
assert channel == restChannel;
long now = System.nanoTime();
long elapsedTime = now - requestNextChunkTime;
if (elapsedTime > 0) {
totalChunkWaitTimeInNanos += elapsedTime;
requestNextChunkTime = now;
}
if (shortCircuited) {
chunk.close();
return;
Expand Down Expand Up @@ -231,12 +242,18 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
items.clear();
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
}
handler.updateWaitForChunkMetrics(TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTimeInNanos));
totalChunkWaitTimeInNanos = 0L;
} else if (items.isEmpty() == false) {
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
handler.addItems(toPass, () -> Releasables.close(releasables), () -> {
requestNextChunkTime = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if request.contentStream().next(); immediately calls handleChunk if data is available? Otherwise I wonder if we want to capture the time before calling next.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do it prior, seems more correct regardless.

request.contentStream().next();
});
} else {
Releasables.close(releasables);
requestNextChunkTime = System.nanoTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also set this before calling next here.

request.contentStream().next();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -130,7 +131,7 @@ public void testSetupRestHandlerContainsKnownBuiltin() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
TestProjectResolvers.alwaysThrow()
);
actionModule.initRestHandlers(null, null);
Expand Down Expand Up @@ -197,7 +198,7 @@ public String getName() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
TestProjectResolvers.alwaysThrow()
);
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
Expand Down Expand Up @@ -257,7 +258,7 @@ public List<RestHandler> getRestHandlers(
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
TestProjectResolvers.alwaysThrow()
);
actionModule.initRestHandlers(null, null);
Expand Down Expand Up @@ -310,7 +311,7 @@ public void test3rdPartyHandlerIsNotInstalled() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
TestProjectResolvers.alwaysThrow()
)
);
Expand Down Expand Up @@ -354,7 +355,7 @@ public void test3rdPartyRestControllerIsNotInstalled() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
TestProjectResolvers.alwaysThrow()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
Expand Down Expand Up @@ -1183,7 +1184,7 @@ public Collection<RestHeaderDefinition> getRestHeaders() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
TestProjectResolvers.alwaysThrow()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpNodeClient;
import org.elasticsearch.test.rest.FakeRestChannel;
Expand Down Expand Up @@ -66,7 +67,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
params.put("pipeline", "timestamps");
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
{"index":{"_id":"1"}}
Expand Down Expand Up @@ -101,7 +102,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
{
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -125,7 +126,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -148,7 +149,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -172,7 +173,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand Down Expand Up @@ -227,7 +228,14 @@ public void next() {

IndexingPressure indexingPressure = new IndexingPressure(Settings.EMPTY);
RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler(true, request, () -> {
return new IncrementalBulkService.Handler(null, indexingPressure, null, null, null) {
return new IncrementalBulkService.Handler(
null,
indexingPressure,
null,
null,
null,
MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME)
) {

@Override
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.MockLog;
Expand Down Expand Up @@ -947,7 +948,7 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
TestProjectResolvers.alwaysThrow()
);
actionModule.initRestHandlers(null, null);
Expand Down