Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.rest.action.document.BulkOperationWaitForChunkMetrics;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -43,10 +44,16 @@ public class IncrementalBulkService {
private final Client client;
private final AtomicBoolean enabledForTests = new AtomicBoolean(true);
private final IndexingPressure indexingPressure;
private final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics;

public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
public IncrementalBulkService(
Client client,
IndexingPressure indexingPressure,
BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what are the advantages of using a dedicated BulkOperationWaitForChunkMetrics object here? Maube just inject the MeterRegistry and declare the histogram metric in IncrementalBulkService would be simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later on, calling updateWaitForChunkMetrics would update the metric directly instead of delegating to BulkOperationWaitForChunkMetrics too.

) {
this.client = client;
this.indexingPressure = indexingPressure;
this.bulkOperationWaitForChunkMetrics = bulkOperationWaitForChunkMetrics;
}

public Handler newBulkRequest() {
Expand All @@ -56,7 +63,7 @@ public Handler newBulkRequest() {

public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
ensureEnabled();
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, bulkOperationWaitForChunkMetrics);
}

private void ensureEnabled() {
Expand Down Expand Up @@ -105,26 +112,35 @@ public static class Handler implements Releasable {
private boolean bulkInProgress = false;
private Exception bulkActionLevelFailure = null;
private BulkRequest bulkRequest = null;
private final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we comment that the only reason this lives in this class is that it is simpler to inject here than into RestBulkAction?


protected Handler(
Client client,
IndexingPressure indexingPressure,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
@Nullable String refresh,
@Nullable BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics
) {
this.client = client;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout;
this.refresh = refresh;
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
this.bulkOperationWaitForChunkMetrics = bulkOperationWaitForChunkMetrics;
createNewBulkRequest(EMPTY_STATE);
}

public IndexingPressure.Incremental getIncrementalOperation() {
return incrementalOperation;
}

public void updateWaitForChunkMetrics(long chunkWaitTimeCentis) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this?

Suggested change
public void updateWaitForChunkMetrics(long chunkWaitTimeCentis) {
public void recordWaitForNextChunkTime(long waitForNextChunkTimeInMillis) {

if (bulkOperationWaitForChunkMetrics != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can assert that it is not null instead here?

bulkOperationWaitForChunkMetrics.recordTookTime(chunkWaitTimeCentis);
}
}

public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
assert closed == false;
assert bulkInProgress == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthIndicatorService;
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthTracker;
import org.elasticsearch.reservedstate.service.FileSettingsServiceProvider;
import org.elasticsearch.rest.action.document.BulkOperationWaitForChunkMetrics;
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -940,6 +941,9 @@ public Map<String, String> queryFields() {
);

final IndexingPressure indexingLimits = new IndexingPressure(settings);
final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics = new BulkOperationWaitForChunkMetrics(
telemetryProvider.getMeterRegistry()
);

PluginServiceInstances pluginServices = new PluginServiceInstances(
client,
Expand Down Expand Up @@ -997,7 +1001,11 @@ public Map<String, String> queryFields() {
.map(TerminationHandlerProvider::handler);
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);

final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
client,
indexingLimits,
bulkOperationWaitForChunkMetrics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would inject telemetryProvider.getMeterRegistry() directly

);

final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
modules.bindToInstance(ResponseCollectorService.class, responseCollectorService);
Expand Down Expand Up @@ -1253,6 +1261,7 @@ public Map<String, String> queryFields() {
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
b.bind(IngestService.class).toInstance(ingestService);
b.bind(IndexingPressure.class).toInstance(indexingLimits);
b.bind(BulkOperationWaitForChunkMetrics.class).toInstance(bulkOperationWaitForChunkMetrics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binding is necessary if the BulkOperationWaitForChunkMetrics is injected through Guice somewhere, I don't think it is the case?

b.bind(IncrementalBulkService.class).toInstance(incrementalBulkService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(MetaStateService.class).toInstance(metaStateService);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.action.document;

import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

public class BulkOperationWaitForChunkMetrics {
public static final String CHUNK_WAIT_TIME_HISTOGRAM = "es.rest.wait.duration.histogram";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should mention incremental bulk request / chunking:
es.rest.incremental_bulk.wait_for_next_chunk.duration.histogram

(or something along those lines)


/* Capture in milliseconds because the APM histogram only has a range of 100,000 */
private final LongHistogram chunkWaitTimeMillisHistogram;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final LongHistogram chunkWaitTimeMillisHistogram;
private final LongHistogram chunkWaitTimeInMillisHistogram;


public BulkOperationWaitForChunkMetrics(MeterRegistry meterRegistry) {
this(
meterRegistry.registerLongHistogram(
CHUNK_WAIT_TIME_HISTOGRAM,
"Total time in millis spent waiting for next chunk of a bulk request",
"centis"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"centis"
"ms"

)
);
}

private BulkOperationWaitForChunkMetrics(LongHistogram chunkWaitTimeMillisHistogram) {
this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram;
}

public long recordTookTime(long tookTime) {
chunkWaitTimeMillisHistogram.record(tookTime);
return tookTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.elasticsearch.rest.RestRequest.Method.POST;
Expand Down Expand Up @@ -157,6 +158,9 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);

private long requestNextChunkTime;
private long totalChunkWaitTime = 0L;

ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
this.request = request;
this.handlerSupplier = handlerSupplier;
Expand All @@ -182,12 +186,18 @@ public void accept(RestChannel restChannel) {
this.restChannel = restChannel;
this.handler = handlerSupplier.get();
request.contentStream().next();
requestNextChunkTime = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We often pass LongSupplier instead of the real time measurement method, it makes testing easier.

}

@Override
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
assert handler != null;
assert channel == restChannel;
long elapsedTime = System.nanoTime() - requestNextChunkTime;
if (elapsedTime > 0) {
totalChunkWaitTime += elapsedTime;
requestNextChunkTime = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead reset this to what System.nanoTime() gave above? 0 is not a special value and we do not seem to guard against it.

}
if (shortCircuited) {
chunk.close();
return;
Expand Down Expand Up @@ -231,13 +241,20 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
items.clear();
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
}
totalChunkWaitTime = TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTime);
handler.updateWaitForChunkMetrics(totalChunkWaitTime);
totalChunkWaitTime = 0L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
totalChunkWaitTime = 0L;
totalChunkWaitTime = -1L;

and then assert totalChunkWaitTime>= 0L in the handleChunk method?

} else if (items.isEmpty() == false) {
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
handler.addItems(toPass, () -> Releasables.close(releasables), () -> {
request.contentStream().next();
requestNextChunkTime = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if request.contentStream().next(); immediately calls handleChunk if data is available? Otherwise I wonder if we want to capture the time before calling next.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do it prior, seems more correct regardless.

});
} else {
Releasables.close(releasables);
request.contentStream().next();
requestNextChunkTime = System.nanoTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also set this before calling next here.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testSetupRestHandlerContainsKnownBuiltin() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, null),
TestProjectResolvers.alwaysThrow()
);
actionModule.initRestHandlers(null, null);
Expand Down Expand Up @@ -197,7 +197,7 @@ public String getName() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, null),
TestProjectResolvers.alwaysThrow()
);
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
Expand Down Expand Up @@ -257,7 +257,7 @@ public List<RestHandler> getRestHandlers(
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, null),
TestProjectResolvers.alwaysThrow()
);
actionModule.initRestHandlers(null, null);
Expand Down Expand Up @@ -310,7 +310,7 @@ public void test3rdPartyHandlerIsNotInstalled() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, null),
TestProjectResolvers.alwaysThrow()
)
);
Expand Down Expand Up @@ -354,7 +354,7 @@ public void test3rdPartyRestControllerIsNotInstalled() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, null),
TestProjectResolvers.alwaysThrow()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ public Collection<RestHeaderDefinition> getRestHeaders() {
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, null),
TestProjectResolvers.alwaysThrow()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
params.put("pipeline", "timestamps");
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), mock(BulkOperationWaitForChunkMetrics.class))
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
{"index":{"_id":"1"}}
Expand Down Expand Up @@ -101,7 +101,11 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
{
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
mock(BulkOperationWaitForChunkMetrics.class)
)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -125,7 +129,11 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
mock(BulkOperationWaitForChunkMetrics.class)
)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -148,7 +156,11 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
mock(BulkOperationWaitForChunkMetrics.class)
)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -172,7 +184,11 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
mock(BulkOperationWaitForChunkMetrics.class)
)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand Down Expand Up @@ -227,7 +243,7 @@ public void next() {

IndexingPressure indexingPressure = new IndexingPressure(Settings.EMPTY);
RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler(true, request, () -> {
return new IncrementalBulkService.Handler(null, indexingPressure, null, null, null) {
return new IncrementalBulkService.Handler(null, indexingPressure, null, null, null, null) {

@Override
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc
List.of(),
List.of(),
RestExtension.allowAll(),
new IncrementalBulkService(null, null),
new IncrementalBulkService(null, null, null),
TestProjectResolvers.alwaysThrow()
);
actionModule.initRestHandlers(null, null);
Expand Down