Skip to content

Commit 9e19b85

Browse files
ankikumafcofdez
andauthored
Metrics to account for time spent waiting for next chunk (elastic#129469)
This PR addresses ES-12071. We want to collect metrics for the time that is spent waiting for the next chunk of a bulk request. This can help with diagnosing high bulk latency in case the latency is attributable to external factors such as network connection. Co-authored-by: Francisco Fernández Castaño <[email protected]>
1 parent 73a5ce2 commit 9e19b85

File tree

7 files changed

+69
-18
lines changed

7 files changed

+69
-18
lines changed

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.elasticsearch.core.Releasables;
2323
import org.elasticsearch.core.TimeValue;
2424
import org.elasticsearch.index.IndexingPressure;
25+
import org.elasticsearch.telemetry.metric.LongHistogram;
26+
import org.elasticsearch.telemetry.metric.MeterRegistry;
2527

2628
import java.util.ArrayList;
2729
import java.util.Collections;
@@ -33,6 +35,7 @@
3335
import static org.elasticsearch.common.settings.Setting.boolSetting;
3436

3537
public class IncrementalBulkService {
38+
public static final String CHUNK_WAIT_TIME_HISTOGRAM_NAME = "es.rest.incremental_bulk.wait_for_next_chunk.duration.histogram";
3639

3740
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
3841
"rest.incremental_bulk",
@@ -44,9 +47,17 @@ public class IncrementalBulkService {
4447
private final AtomicBoolean enabledForTests = new AtomicBoolean(true);
4548
private final IndexingPressure indexingPressure;
4649

47-
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
50+
/* Capture in milliseconds because the APM histogram only has a range of 100,000 */
51+
private final LongHistogram chunkWaitTimeMillisHistogram;
52+
53+
public IncrementalBulkService(Client client, IndexingPressure indexingPressure, MeterRegistry meterRegistry) {
4854
this.client = client;
4955
this.indexingPressure = indexingPressure;
56+
this.chunkWaitTimeMillisHistogram = meterRegistry.registerLongHistogram(
57+
CHUNK_WAIT_TIME_HISTOGRAM_NAME,
58+
"Total time in millis spent waiting for next chunk of a bulk request",
59+
"ms"
60+
);
5061
}
5162

5263
public Handler newBulkRequest() {
@@ -56,7 +67,7 @@ public Handler newBulkRequest() {
5667

5768
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
5869
ensureEnabled();
59-
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
70+
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram);
6071
}
6172

6273
private void ensureEnabled() {
@@ -99,6 +110,8 @@ public static class Handler implements Releasable {
99110
private final ArrayList<Releasable> releasables = new ArrayList<>(4);
100111
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
101112
private final IndexingPressure.Incremental incrementalOperation;
113+
// Ideally this should be in RestBulkAction, but it's harder to inject the metric registry there
114+
private final LongHistogram chunkWaitTimeMillisHistogram;
102115
private boolean closed = false;
103116
private boolean globalFailure = false;
104117
private boolean incrementalRequestSubmitted = false;
@@ -111,20 +124,26 @@ protected Handler(
111124
IndexingPressure indexingPressure,
112125
@Nullable String waitForActiveShards,
113126
@Nullable TimeValue timeout,
114-
@Nullable String refresh
127+
@Nullable String refresh,
128+
LongHistogram chunkWaitTimeMillisHistogram
115129
) {
116130
this.client = client;
117131
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
118132
this.timeout = timeout;
119133
this.refresh = refresh;
120134
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
135+
this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram;
121136
createNewBulkRequest(EMPTY_STATE);
122137
}
123138

124139
public IndexingPressure.Incremental getIncrementalOperation() {
125140
return incrementalOperation;
126141
}
127142

143+
public void updateWaitForChunkMetrics(long chunkWaitTimeInMillis) {
144+
chunkWaitTimeMillisHistogram.record(chunkWaitTimeInMillis);
145+
}
146+
128147
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
129148
assert closed == false;
130149
assert bulkInProgress == false;

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -997,7 +997,11 @@ public Map<String, String> queryFields() {
997997
.map(TerminationHandlerProvider::handler);
998998
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
999999

1000-
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
1000+
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
1001+
client,
1002+
indexingLimits,
1003+
telemetryProvider.getMeterRegistry()
1004+
);
10011005

10021006
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
10031007
modules.bindToInstance(ResponseCollectorService.class, responseCollectorService);

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.ArrayList;
4343
import java.util.List;
4444
import java.util.Set;
45+
import java.util.concurrent.TimeUnit;
4546
import java.util.function.Supplier;
4647

4748
import static org.elasticsearch.rest.RestRequest.Method.POST;
@@ -165,6 +166,9 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
165166
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
166167
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
167168

169+
private long requestNextChunkTime;
170+
private long totalChunkWaitTimeInNanos = 0L;
171+
168172
ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
169173
this.request = request;
170174
this.handlerSupplier = handlerSupplier;
@@ -189,13 +193,20 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
189193
public void accept(RestChannel restChannel) {
190194
this.restChannel = restChannel;
191195
this.handler = handlerSupplier.get();
196+
requestNextChunkTime = System.nanoTime();
192197
request.contentStream().next();
193198
}
194199

195200
@Override
196201
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
197202
assert handler != null;
198203
assert channel == restChannel;
204+
long now = System.nanoTime();
205+
long elapsedTime = now - requestNextChunkTime;
206+
if (elapsedTime > 0) {
207+
totalChunkWaitTimeInNanos += elapsedTime;
208+
requestNextChunkTime = now;
209+
}
199210
if (shortCircuited) {
200211
chunk.close();
201212
return;
@@ -239,12 +250,18 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
239250
items.clear();
240251
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
241252
}
253+
handler.updateWaitForChunkMetrics(TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTimeInNanos));
254+
totalChunkWaitTimeInNanos = 0L;
242255
} else if (items.isEmpty() == false) {
243256
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
244257
items.clear();
245-
handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
258+
handler.addItems(toPass, () -> Releasables.close(releasables), () -> {
259+
requestNextChunkTime = System.nanoTime();
260+
request.contentStream().next();
261+
});
246262
} else {
247263
Releasables.close(releasables);
264+
requestNextChunkTime = System.nanoTime();
248265
request.contentStream().next();
249266
}
250267
}

server/src/test/java/org/elasticsearch/action/ActionModuleTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.tasks.Task;
4343
import org.elasticsearch.tasks.TaskManager;
4444
import org.elasticsearch.telemetry.TelemetryProvider;
45+
import org.elasticsearch.telemetry.metric.MeterRegistry;
4546
import org.elasticsearch.test.ESTestCase;
4647
import org.elasticsearch.threadpool.TestThreadPool;
4748
import org.elasticsearch.threadpool.ThreadPool;
@@ -130,7 +131,7 @@ public void testSetupRestHandlerContainsKnownBuiltin() {
130131
List.of(),
131132
List.of(),
132133
RestExtension.allowAll(),
133-
new IncrementalBulkService(null, null),
134+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
134135
TestProjectResolvers.alwaysThrow()
135136
);
136137
actionModule.initRestHandlers(null, null);
@@ -197,7 +198,7 @@ public String getName() {
197198
List.of(),
198199
List.of(),
199200
RestExtension.allowAll(),
200-
new IncrementalBulkService(null, null),
201+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
201202
TestProjectResolvers.alwaysThrow()
202203
);
203204
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
@@ -257,7 +258,7 @@ public List<RestHandler> getRestHandlers(
257258
List.of(),
258259
List.of(),
259260
RestExtension.allowAll(),
260-
new IncrementalBulkService(null, null),
261+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
261262
TestProjectResolvers.alwaysThrow()
262263
);
263264
actionModule.initRestHandlers(null, null);
@@ -310,7 +311,7 @@ public void test3rdPartyHandlerIsNotInstalled() {
310311
List.of(),
311312
List.of(),
312313
RestExtension.allowAll(),
313-
new IncrementalBulkService(null, null),
314+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
314315
TestProjectResolvers.alwaysThrow()
315316
)
316317
);
@@ -354,7 +355,7 @@ public void test3rdPartyRestControllerIsNotInstalled() {
354355
List.of(),
355356
List.of(),
356357
RestExtension.allowAll(),
357-
new IncrementalBulkService(null, null),
358+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
358359
TestProjectResolvers.alwaysThrow()
359360
)
360361
);

server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.rest.RestStatus;
4444
import org.elasticsearch.tasks.Task;
4545
import org.elasticsearch.telemetry.TelemetryProvider;
46+
import org.elasticsearch.telemetry.metric.MeterRegistry;
4647
import org.elasticsearch.telemetry.tracing.Tracer;
4748
import org.elasticsearch.test.ESTestCase;
4849
import org.elasticsearch.test.MockLog;
@@ -1182,7 +1183,7 @@ public Collection<RestHeaderDefinition> getRestHeaders() {
11821183
List.of(),
11831184
List.of(),
11841185
RestExtension.allowAll(),
1185-
new IncrementalBulkService(null, null),
1186+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
11861187
TestProjectResolvers.alwaysThrow()
11871188
);
11881189
}

server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.index.IndexingPressure;
2929
import org.elasticsearch.rest.RestChannel;
3030
import org.elasticsearch.rest.RestRequest;
31+
import org.elasticsearch.telemetry.metric.MeterRegistry;
3132
import org.elasticsearch.test.ESTestCase;
3233
import org.elasticsearch.test.client.NoOpNodeClient;
3334
import org.elasticsearch.test.rest.FakeRestChannel;
@@ -68,7 +69,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
6869
new RestBulkAction(
6970
settings(IndexVersion.current()).build(),
7071
ClusterSettings.createBuiltInClusterSettings(),
71-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
72+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
7273
).handleRequest(
7374
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
7475
{"index":{"_id":"1"}}
@@ -104,7 +105,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
104105
new RestBulkAction(
105106
settings(IndexVersion.current()).build(),
106107
ClusterSettings.createBuiltInClusterSettings(),
107-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
108+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
108109
).handleRequest(
109110
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
110111
.withParams(params)
@@ -129,7 +130,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
129130
new RestBulkAction(
130131
settings(IndexVersion.current()).build(),
131132
ClusterSettings.createBuiltInClusterSettings(),
132-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
133+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
133134
).handleRequest(
134135
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
135136
.withParams(params)
@@ -153,7 +154,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
153154
new RestBulkAction(
154155
settings(IndexVersion.current()).build(),
155156
ClusterSettings.createBuiltInClusterSettings(),
156-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
157+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
157158
).handleRequest(
158159
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
159160
.withParams(params)
@@ -178,7 +179,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
178179
new RestBulkAction(
179180
settings(IndexVersion.current()).build(),
180181
ClusterSettings.createBuiltInClusterSettings(),
181-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
182+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
182183
).handleRequest(
183184
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
184185
.withParams(params)
@@ -233,7 +234,14 @@ public void next() {
233234

234235
IndexingPressure indexingPressure = new IndexingPressure(Settings.EMPTY);
235236
RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler(true, request, () -> {
236-
return new IncrementalBulkService.Handler(null, indexingPressure, null, null, null) {
237+
return new IncrementalBulkService.Handler(
238+
null,
239+
indexingPressure,
240+
null,
241+
null,
242+
null,
243+
MeterRegistry.NOOP.getLongHistogram(IncrementalBulkService.CHUNK_WAIT_TIME_HISTOGRAM_NAME)
244+
) {
237245

238246
@Override
239247
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.elasticsearch.rest.RestRequest;
6464
import org.elasticsearch.script.ScriptService;
6565
import org.elasticsearch.telemetry.TelemetryProvider;
66+
import org.elasticsearch.telemetry.metric.MeterRegistry;
6667
import org.elasticsearch.test.ESTestCase;
6768
import org.elasticsearch.test.IndexSettingsModule;
6869
import org.elasticsearch.test.MockLog;
@@ -947,7 +948,7 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc
947948
List.of(),
948949
List.of(),
949950
RestExtension.allowAll(),
950-
new IncrementalBulkService(null, null),
951+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
951952
TestProjectResolvers.alwaysThrow()
952953
);
953954
actionModule.initRestHandlers(null, null);

0 commit comments

Comments
 (0)