Skip to content

Commit 6a6c27a

Browse files
committed
Review comments
1 parent dc16d64 commit 6a6c27a

File tree

8 files changed

+46
-99
lines changed

8 files changed

+46
-99
lines changed

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import org.elasticsearch.core.Releasables;
2323
import org.elasticsearch.core.TimeValue;
2424
import org.elasticsearch.index.IndexingPressure;
25-
import org.elasticsearch.rest.action.document.BulkOperationWaitForChunkMetrics;
25+
import org.elasticsearch.telemetry.metric.LongHistogram;
26+
import org.elasticsearch.telemetry.metric.MeterRegistry;
2627

2728
import java.util.ArrayList;
2829
import java.util.Collections;
@@ -34,6 +35,7 @@
3435
import static org.elasticsearch.common.settings.Setting.boolSetting;
3536

3637
public class IncrementalBulkService {
38+
public static final String CHUNK_WAIT_TIME_HISTOGRAM_NAME = "es.rest.incremental_bulk.wait_for_next_chunk.duration.histogram";
3739

3840
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
3941
"rest.incremental_bulk",
@@ -44,16 +46,18 @@ public class IncrementalBulkService {
4446
private final Client client;
4547
private final AtomicBoolean enabledForTests = new AtomicBoolean(true);
4648
private final IndexingPressure indexingPressure;
47-
private final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics;
4849

49-
public IncrementalBulkService(
50-
Client client,
51-
IndexingPressure indexingPressure,
52-
BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics
53-
) {
50+
/* Capture in milliseconds because the APM histogram only has a range of 100,000 */
51+
private final LongHistogram chunkWaitTimeMillisHistogram;
52+
53+
public IncrementalBulkService(Client client, IndexingPressure indexingPressure, MeterRegistry meterRegistry) {
5454
this.client = client;
5555
this.indexingPressure = indexingPressure;
56-
this.bulkOperationWaitForChunkMetrics = bulkOperationWaitForChunkMetrics;
56+
this.chunkWaitTimeMillisHistogram = meterRegistry.registerLongHistogram(
57+
CHUNK_WAIT_TIME_HISTOGRAM_NAME,
58+
"Total time in millis spent waiting for next chunk of a bulk request",
59+
"ms"
60+
);
5761
}
5862

5963
public Handler newBulkRequest() {
@@ -63,7 +67,7 @@ public Handler newBulkRequest() {
6367

6468
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
6569
ensureEnabled();
66-
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, bulkOperationWaitForChunkMetrics);
70+
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram);
6771
}
6872

6973
private void ensureEnabled() {
@@ -106,39 +110,38 @@ public static class Handler implements Releasable {
106110
private final ArrayList<Releasable> releasables = new ArrayList<>(4);
107111
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
108112
private final IndexingPressure.Incremental incrementalOperation;
113+
// Ideally this should be in RestBulkAction, but it's harder to inject the metric registry there
114+
private final LongHistogram chunkWaitTimeMillisHistogram;
109115
private boolean closed = false;
110116
private boolean globalFailure = false;
111117
private boolean incrementalRequestSubmitted = false;
112118
private boolean bulkInProgress = false;
113119
private Exception bulkActionLevelFailure = null;
114120
private BulkRequest bulkRequest = null;
115-
private final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics;
116121

117122
protected Handler(
118123
Client client,
119124
IndexingPressure indexingPressure,
120125
@Nullable String waitForActiveShards,
121126
@Nullable TimeValue timeout,
122127
@Nullable String refresh,
123-
@Nullable BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics
128+
LongHistogram chunkWaitTimeMillisHistogram
124129
) {
125130
this.client = client;
126131
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
127132
this.timeout = timeout;
128133
this.refresh = refresh;
129134
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
130-
this.bulkOperationWaitForChunkMetrics = bulkOperationWaitForChunkMetrics;
135+
this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram;
131136
createNewBulkRequest(EMPTY_STATE);
132137
}
133138

134139
public IndexingPressure.Incremental getIncrementalOperation() {
135140
return incrementalOperation;
136141
}
137142

138-
public void updateWaitForChunkMetrics(long chunkWaitTimeCentis) {
139-
if (bulkOperationWaitForChunkMetrics != null) {
140-
bulkOperationWaitForChunkMetrics.recordTookTime(chunkWaitTimeCentis);
141-
}
143+
public void updateWaitForChunkMetrics(long chunkWaitTimeInMillis) {
144+
chunkWaitTimeMillisHistogram.record(chunkWaitTimeInMillis);
142145
}
143146

144147
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@
195195
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthIndicatorService;
196196
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthTracker;
197197
import org.elasticsearch.reservedstate.service.FileSettingsServiceProvider;
198-
import org.elasticsearch.rest.action.document.BulkOperationWaitForChunkMetrics;
199198
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
200199
import org.elasticsearch.script.ScriptModule;
201200
import org.elasticsearch.script.ScriptService;
@@ -941,9 +940,6 @@ public Map<String, String> queryFields() {
941940
);
942941

943942
final IndexingPressure indexingLimits = new IndexingPressure(settings);
944-
final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics = new BulkOperationWaitForChunkMetrics(
945-
telemetryProvider.getMeterRegistry()
946-
);
947943

948944
PluginServiceInstances pluginServices = new PluginServiceInstances(
949945
client,
@@ -1004,7 +1000,7 @@ public Map<String, String> queryFields() {
10041000
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
10051001
client,
10061002
indexingLimits,
1007-
bulkOperationWaitForChunkMetrics
1003+
telemetryProvider.getMeterRegistry()
10081004
);
10091005

10101006
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
@@ -1261,7 +1257,6 @@ public Map<String, String> queryFields() {
12611257
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
12621258
b.bind(IngestService.class).toInstance(ingestService);
12631259
b.bind(IndexingPressure.class).toInstance(indexingLimits);
1264-
b.bind(BulkOperationWaitForChunkMetrics.class).toInstance(bulkOperationWaitForChunkMetrics);
12651260
b.bind(IncrementalBulkService.class).toInstance(incrementalBulkService);
12661261
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
12671262
b.bind(MetaStateService.class).toInstance(metaStateService);

server/src/main/java/org/elasticsearch/rest/action/document/BulkOperationWaitForChunkMetrics.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
159159
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
160160

161161
private long requestNextChunkTime;
162-
private long totalChunkWaitTime = 0L;
162+
private long totalChunkWaitTimeInNanos = 0L;
163163

164164
ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
165165
this.request = request;
@@ -185,18 +185,19 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
185185
public void accept(RestChannel restChannel) {
186186
this.restChannel = restChannel;
187187
this.handler = handlerSupplier.get();
188-
request.contentStream().next();
189188
requestNextChunkTime = System.nanoTime();
189+
request.contentStream().next();
190190
}
191191

192192
@Override
193193
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
194194
assert handler != null;
195195
assert channel == restChannel;
196-
long elapsedTime = System.nanoTime() - requestNextChunkTime;
196+
long now = System.nanoTime();
197+
long elapsedTime = now - requestNextChunkTime;
197198
if (elapsedTime > 0) {
198-
totalChunkWaitTime += elapsedTime;
199-
requestNextChunkTime = 0L;
199+
totalChunkWaitTimeInNanos += elapsedTime;
200+
requestNextChunkTime = now;
200201
}
201202
if (shortCircuited) {
202203
chunk.close();
@@ -241,20 +242,19 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
241242
items.clear();
242243
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
243244
}
244-
totalChunkWaitTime = TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTime);
245-
handler.updateWaitForChunkMetrics(totalChunkWaitTime);
246-
totalChunkWaitTime = 0L;
245+
handler.updateWaitForChunkMetrics(TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTimeInNanos));
246+
totalChunkWaitTimeInNanos = 0L;
247247
} else if (items.isEmpty() == false) {
248248
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
249249
items.clear();
250250
handler.addItems(toPass, () -> Releasables.close(releasables), () -> {
251-
request.contentStream().next();
252251
requestNextChunkTime = System.nanoTime();
252+
request.contentStream().next();
253253
});
254254
} else {
255255
Releasables.close(releasables);
256-
request.contentStream().next();
257256
requestNextChunkTime = System.nanoTime();
257+
request.contentStream().next();
258258
}
259259
}
260260

server/src/test/java/org/elasticsearch/action/ActionModuleTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.tasks.Task;
4343
import org.elasticsearch.tasks.TaskManager;
4444
import org.elasticsearch.telemetry.TelemetryProvider;
45+
import org.elasticsearch.telemetry.metric.MeterRegistry;
4546
import org.elasticsearch.test.ESTestCase;
4647
import org.elasticsearch.threadpool.TestThreadPool;
4748
import org.elasticsearch.threadpool.ThreadPool;
@@ -130,7 +131,7 @@ public void testSetupRestHandlerContainsKnownBuiltin() {
130131
List.of(),
131132
List.of(),
132133
RestExtension.allowAll(),
133-
new IncrementalBulkService(null, null, null),
134+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
134135
TestProjectResolvers.alwaysThrow()
135136
);
136137
actionModule.initRestHandlers(null, null);
@@ -197,7 +198,7 @@ public String getName() {
197198
List.of(),
198199
List.of(),
199200
RestExtension.allowAll(),
200-
new IncrementalBulkService(null, null, null),
201+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
201202
TestProjectResolvers.alwaysThrow()
202203
);
203204
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
@@ -257,7 +258,7 @@ public List<RestHandler> getRestHandlers(
257258
List.of(),
258259
List.of(),
259260
RestExtension.allowAll(),
260-
new IncrementalBulkService(null, null, null),
261+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
261262
TestProjectResolvers.alwaysThrow()
262263
);
263264
actionModule.initRestHandlers(null, null);
@@ -310,7 +311,7 @@ public void test3rdPartyHandlerIsNotInstalled() {
310311
List.of(),
311312
List.of(),
312313
RestExtension.allowAll(),
313-
new IncrementalBulkService(null, null, null),
314+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
314315
TestProjectResolvers.alwaysThrow()
315316
)
316317
);
@@ -354,7 +355,7 @@ public void test3rdPartyRestControllerIsNotInstalled() {
354355
List.of(),
355356
List.of(),
356357
RestExtension.allowAll(),
357-
new IncrementalBulkService(null, null, null),
358+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
358359
TestProjectResolvers.alwaysThrow()
359360
)
360361
);

server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.rest.RestStatus;
4444
import org.elasticsearch.tasks.Task;
4545
import org.elasticsearch.telemetry.TelemetryProvider;
46+
import org.elasticsearch.telemetry.metric.MeterRegistry;
4647
import org.elasticsearch.telemetry.tracing.Tracer;
4748
import org.elasticsearch.test.ESTestCase;
4849
import org.elasticsearch.test.MockLog;
@@ -1183,7 +1184,7 @@ public Collection<RestHeaderDefinition> getRestHeaders() {
11831184
List.of(),
11841185
List.of(),
11851186
RestExtension.allowAll(),
1186-
new IncrementalBulkService(null, null, null),
1187+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
11871188
TestProjectResolvers.alwaysThrow()
11881189
);
11891190
}

server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.index.IndexingPressure;
2828
import org.elasticsearch.rest.RestChannel;
2929
import org.elasticsearch.rest.RestRequest;
30+
import org.elasticsearch.telemetry.metric.MeterRegistry;
3031
import org.elasticsearch.test.ESTestCase;
3132
import org.elasticsearch.test.client.NoOpNodeClient;
3233
import org.elasticsearch.test.rest.FakeRestChannel;
@@ -66,7 +67,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
6667
params.put("pipeline", "timestamps");
6768
new RestBulkAction(
6869
settings(IndexVersion.current()).build(),
69-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), mock(BulkOperationWaitForChunkMetrics.class))
70+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
7071
).handleRequest(
7172
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
7273
{"index":{"_id":"1"}}
@@ -101,11 +102,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
101102
{
102103
new RestBulkAction(
103104
settings(IndexVersion.current()).build(),
104-
new IncrementalBulkService(
105-
mock(Client.class),
106-
mock(IndexingPressure.class),
107-
mock(BulkOperationWaitForChunkMetrics.class)
108-
)
105+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
109106
).handleRequest(
110107
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
111108
.withParams(params)
@@ -129,11 +126,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
129126
bulkCalled.set(false);
130127
new RestBulkAction(
131128
settings(IndexVersion.current()).build(),
132-
new IncrementalBulkService(
133-
mock(Client.class),
134-
mock(IndexingPressure.class),
135-
mock(BulkOperationWaitForChunkMetrics.class)
136-
)
129+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
137130
).handleRequest(
138131
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
139132
.withParams(params)
@@ -156,11 +149,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
156149
bulkCalled.set(false);
157150
new RestBulkAction(
158151
settings(IndexVersion.current()).build(),
159-
new IncrementalBulkService(
160-
mock(Client.class),
161-
mock(IndexingPressure.class),
162-
mock(BulkOperationWaitForChunkMetrics.class)
163-
)
152+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
164153
).handleRequest(
165154
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
166155
.withParams(params)
@@ -184,11 +173,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
184173
bulkCalled.set(false);
185174
new RestBulkAction(
186175
settings(IndexVersion.current()).build(),
187-
new IncrementalBulkService(
188-
mock(Client.class),
189-
mock(IndexingPressure.class),
190-
mock(BulkOperationWaitForChunkMetrics.class)
191-
)
176+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
192177
).handleRequest(
193178
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
194179
.withParams(params)

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.elasticsearch.rest.RestRequest;
6464
import org.elasticsearch.script.ScriptService;
6565
import org.elasticsearch.telemetry.TelemetryProvider;
66+
import org.elasticsearch.telemetry.metric.MeterRegistry;
6667
import org.elasticsearch.test.ESTestCase;
6768
import org.elasticsearch.test.IndexSettingsModule;
6869
import org.elasticsearch.test.MockLog;
@@ -947,7 +948,7 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc
947948
List.of(),
948949
List.of(),
949950
RestExtension.allowAll(),
950-
new IncrementalBulkService(null, null, null),
951+
new IncrementalBulkService(null, null, MeterRegistry.NOOP),
951952
TestProjectResolvers.alwaysThrow()
952953
);
953954
actionModule.initRestHandlers(null, null);

0 commit comments

Comments
 (0)