Skip to content

Commit 8763cd5

Browse files
committed
first cut
1 parent 9de75e6 commit 8763cd5

File tree

8 files changed

+108
-18
lines changed

8 files changed

+108
-18
lines changed

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.core.Releasables;
2323
import org.elasticsearch.core.TimeValue;
2424
import org.elasticsearch.index.IndexingPressure;
25+
import org.elasticsearch.rest.action.document.BulkOperationWaitForChunkMetrics;
2526

2627
import java.util.ArrayList;
2728
import java.util.Collections;
@@ -43,10 +44,16 @@ public class IncrementalBulkService {
4344
private final Client client;
4445
private final AtomicBoolean enabledForTests = new AtomicBoolean(true);
4546
private final IndexingPressure indexingPressure;
47+
private final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics;
4648

47-
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
49+
public IncrementalBulkService(
50+
Client client,
51+
IndexingPressure indexingPressure,
52+
BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics
53+
) {
4854
this.client = client;
4955
this.indexingPressure = indexingPressure;
56+
this.bulkOperationWaitForChunkMetrics = bulkOperationWaitForChunkMetrics;
5057
}
5158

5259
public Handler newBulkRequest() {
@@ -56,7 +63,7 @@ public Handler newBulkRequest() {
5663

5764
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
5865
ensureEnabled();
59-
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
66+
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, bulkOperationWaitForChunkMetrics);
6067
}
6168

6269
private void ensureEnabled() {
@@ -105,26 +112,33 @@ public static class Handler implements Releasable {
105112
private boolean bulkInProgress = false;
106113
private Exception bulkActionLevelFailure = null;
107114
private BulkRequest bulkRequest = null;
115+
private final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics;
108116

109117
protected Handler(
110118
Client client,
111119
IndexingPressure indexingPressure,
112120
@Nullable String waitForActiveShards,
113121
@Nullable TimeValue timeout,
114-
@Nullable String refresh
122+
@Nullable String refresh,
123+
@Nullable BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics
115124
) {
116125
this.client = client;
117126
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
118127
this.timeout = timeout;
119128
this.refresh = refresh;
120129
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
130+
this.bulkOperationWaitForChunkMetrics = bulkOperationWaitForChunkMetrics;
121131
createNewBulkRequest(EMPTY_STATE);
122132
}
123133

124134
public IndexingPressure.Incremental getIncrementalOperation() {
125135
return incrementalOperation;
126136
}
127137

138+
public void updateWaitForChunkMetrics(long chunkWaitTimeCentis) {
139+
bulkOperationWaitForChunkMetrics.recordTookTime(chunkWaitTimeCentis);
140+
}
141+
128142
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
129143
assert closed == false;
130144
assert bulkInProgress == false;

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@
195195
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthIndicatorService;
196196
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthTracker;
197197
import org.elasticsearch.reservedstate.service.FileSettingsServiceProvider;
198+
import org.elasticsearch.rest.action.document.BulkOperationWaitForChunkMetrics;
198199
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
199200
import org.elasticsearch.script.ScriptModule;
200201
import org.elasticsearch.script.ScriptService;
@@ -933,6 +934,9 @@ public Map<String, String> queryFields() {
933934
);
934935

935936
final IndexingPressure indexingLimits = new IndexingPressure(settings);
937+
final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics = new BulkOperationWaitForChunkMetrics(
938+
telemetryProvider.getMeterRegistry()
939+
);
936940

937941
PluginServiceInstances pluginServices = new PluginServiceInstances(
938942
client,
@@ -990,7 +994,11 @@ public Map<String, String> queryFields() {
990994
.map(TerminationHandlerProvider::handler);
991995
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
992996

993-
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);
997+
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
998+
client,
999+
indexingLimits,
1000+
bulkOperationWaitForChunkMetrics
1001+
);
9941002

9951003
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
9961004
modules.bindToInstance(ResponseCollectorService.class, responseCollectorService);
@@ -1246,6 +1254,7 @@ public Map<String, String> queryFields() {
12461254
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
12471255
b.bind(IngestService.class).toInstance(ingestService);
12481256
b.bind(IndexingPressure.class).toInstance(indexingLimits);
1257+
b.bind(BulkOperationWaitForChunkMetrics.class).toInstance(bulkOperationWaitForChunkMetrics);
12491258
b.bind(IncrementalBulkService.class).toInstance(incrementalBulkService);
12501259
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
12511260
b.bind(MetaStateService.class).toInstance(metaStateService);
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.rest.action.document;
11+
12+
import org.elasticsearch.telemetry.metric.LongHistogram;
13+
import org.elasticsearch.telemetry.metric.MeterRegistry;
14+
15+
public class BulkOperationWaitForChunkMetrics {
16+
public static final String CHUNK_WAIT_TIME_HISTOGRAM = "es.rest.wait.duration.histogram";
17+
18+
/* Capture in milliseconds because the APM histogram only has a range of 100,000 */
19+
private final LongHistogram chunkWaitTimeMillisHistogram;
20+
21+
public BulkOperationWaitForChunkMetrics(MeterRegistry meterRegistry) {
22+
this(meterRegistry.registerLongHistogram(CHUNK_WAIT_TIME_HISTOGRAM,
23+
"Total time in millis spent waiting for next chunk of a bulk request", "centis"));
24+
}
25+
26+
private BulkOperationWaitForChunkMetrics(LongHistogram chunkWaitTimeMillisHistogram) {
27+
this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram;
28+
}
29+
30+
public long recordTookTime(long tookTime) {
31+
chunkWaitTimeMillisHistogram.record(tookTime);
32+
return tookTime;
33+
}
34+
}

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.ArrayList;
4242
import java.util.List;
4343
import java.util.Set;
44+
import java.util.concurrent.TimeUnit;
4445
import java.util.function.Supplier;
4546

4647
import static org.elasticsearch.rest.RestRequest.Method.POST;
@@ -157,6 +158,9 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
157158
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
158159
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
159160

161+
private long requestNextChunkTime;
162+
private long totalChunkWaitTime = 0L;
163+
160164
ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
161165
this.request = request;
162166
this.handlerSupplier = handlerSupplier;
@@ -182,12 +186,18 @@ public void accept(RestChannel restChannel) {
182186
this.restChannel = restChannel;
183187
this.handler = handlerSupplier.get();
184188
request.contentStream().next();
189+
requestNextChunkTime = System.nanoTime();
185190
}
186191

187192
@Override
188193
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
189194
assert handler != null;
190195
assert channel == restChannel;
196+
long elapsedTime = System.nanoTime() - requestNextChunkTime;
197+
if (elapsedTime > 0) {
198+
totalChunkWaitTime += elapsedTime;
199+
requestNextChunkTime = 0L;
200+
}
191201
if (shortCircuited) {
192202
chunk.close();
193203
return;
@@ -231,13 +241,20 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
231241
items.clear();
232242
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
233243
}
244+
totalChunkWaitTime = TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTime);
245+
handler.updateWaitForChunkMetrics(totalChunkWaitTime);
246+
totalChunkWaitTime = 0L;
234247
} else if (items.isEmpty() == false) {
235248
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
236249
items.clear();
237-
handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
250+
handler.addItems(toPass, () -> Releasables.close(releasables), () -> {
251+
request.contentStream().next();
252+
requestNextChunkTime = System.nanoTime();
253+
});
238254
} else {
239255
Releasables.close(releasables);
240256
request.contentStream().next();
257+
requestNextChunkTime = System.nanoTime();
241258
}
242259
}
243260

server/src/test/java/org/elasticsearch/action/ActionModuleTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void testSetupRestHandlerContainsKnownBuiltin() {
130130
List.of(),
131131
List.of(),
132132
RestExtension.allowAll(),
133-
new IncrementalBulkService(null, null),
133+
new IncrementalBulkService(null, null, null),
134134
TestProjectResolvers.alwaysThrow()
135135
);
136136
actionModule.initRestHandlers(null, null);
@@ -197,7 +197,7 @@ public String getName() {
197197
List.of(),
198198
List.of(),
199199
RestExtension.allowAll(),
200-
new IncrementalBulkService(null, null),
200+
new IncrementalBulkService(null, null, null),
201201
TestProjectResolvers.alwaysThrow()
202202
);
203203
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
@@ -257,7 +257,7 @@ public List<RestHandler> getRestHandlers(
257257
List.of(),
258258
List.of(),
259259
RestExtension.allowAll(),
260-
new IncrementalBulkService(null, null),
260+
new IncrementalBulkService(null, null, null),
261261
TestProjectResolvers.alwaysThrow()
262262
);
263263
actionModule.initRestHandlers(null, null);
@@ -310,7 +310,7 @@ public void test3rdPartyHandlerIsNotInstalled() {
310310
List.of(),
311311
List.of(),
312312
RestExtension.allowAll(),
313-
new IncrementalBulkService(null, null),
313+
new IncrementalBulkService(null, null, null),
314314
TestProjectResolvers.alwaysThrow()
315315
)
316316
);
@@ -354,7 +354,7 @@ public void test3rdPartyRestControllerIsNotInstalled() {
354354
List.of(),
355355
List.of(),
356356
RestExtension.allowAll(),
357-
new IncrementalBulkService(null, null),
357+
new IncrementalBulkService(null, null, null),
358358
TestProjectResolvers.alwaysThrow()
359359
)
360360
);

server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1183,7 +1183,7 @@ public Collection<RestHeaderDefinition> getRestHeaders() {
11831183
List.of(),
11841184
List.of(),
11851185
RestExtension.allowAll(),
1186-
new IncrementalBulkService(null, null),
1186+
new IncrementalBulkService(null, null, null),
11871187
TestProjectResolvers.alwaysThrow()
11881188
);
11891189
}

server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
6666
params.put("pipeline", "timestamps");
6767
new RestBulkAction(
6868
settings(IndexVersion.current()).build(),
69-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
69+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), mock(BulkOperationWaitForChunkMetrics.class))
7070
).handleRequest(
7171
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
7272
{"index":{"_id":"1"}}
@@ -101,7 +101,11 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
101101
{
102102
new RestBulkAction(
103103
settings(IndexVersion.current()).build(),
104-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
104+
new IncrementalBulkService(
105+
mock(Client.class),
106+
mock(IndexingPressure.class),
107+
mock(BulkOperationWaitForChunkMetrics.class)
108+
)
105109
).handleRequest(
106110
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
107111
.withParams(params)
@@ -125,7 +129,11 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
125129
bulkCalled.set(false);
126130
new RestBulkAction(
127131
settings(IndexVersion.current()).build(),
128-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
132+
new IncrementalBulkService(
133+
mock(Client.class),
134+
mock(IndexingPressure.class),
135+
mock(BulkOperationWaitForChunkMetrics.class)
136+
)
129137
).handleRequest(
130138
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
131139
.withParams(params)
@@ -148,7 +156,11 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
148156
bulkCalled.set(false);
149157
new RestBulkAction(
150158
settings(IndexVersion.current()).build(),
151-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
159+
new IncrementalBulkService(
160+
mock(Client.class),
161+
mock(IndexingPressure.class),
162+
mock(BulkOperationWaitForChunkMetrics.class)
163+
)
152164
).handleRequest(
153165
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
154166
.withParams(params)
@@ -172,7 +184,11 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
172184
bulkCalled.set(false);
173185
new RestBulkAction(
174186
settings(IndexVersion.current()).build(),
175-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class))
187+
new IncrementalBulkService(
188+
mock(Client.class),
189+
mock(IndexingPressure.class),
190+
mock(BulkOperationWaitForChunkMetrics.class)
191+
)
176192
).handleRequest(
177193
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
178194
.withParams(params)
@@ -227,7 +243,7 @@ public void next() {
227243

228244
IndexingPressure indexingPressure = new IndexingPressure(Settings.EMPTY);
229245
RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler(true, request, () -> {
230-
return new IncrementalBulkService.Handler(null, indexingPressure, null, null, null) {
246+
return new IncrementalBulkService.Handler(null, indexingPressure, null, null, null, null) {
231247

232248
@Override
233249
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,7 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc
945945
List.of(),
946946
List.of(),
947947
RestExtension.allowAll(),
948-
new IncrementalBulkService(null, null),
948+
new IncrementalBulkService(null, null, null),
949949
TestProjectResolvers.alwaysThrow()
950950
);
951951
actionModule.initRestHandlers(null, null);

0 commit comments

Comments
 (0)