Skip to content

Commit 45d5990

Browse files
authored
Metrics for incremental bulk splits (#116765) (#117280)
Backport of PR #116765. Add metrics to track incremental bulk request splits due to indexing pressure. Resolves ES-9612
1 parent 1cd95b5 commit 45d5990

File tree

10 files changed

+390
-6
lines changed

10 files changed

+390
-6
lines changed

docs/changelog/116765.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116765
2+
summary: Metrics for incremental bulk splits
3+
area: Distributed
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
6666
.put(super.nodeSettings(nodeOrdinal, otherSettings))
6767
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), "512B")
6868
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "2048B")
69-
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "2KB")
69+
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "4KB")
7070
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "1024B")
7171
.build();
7272
}
@@ -162,6 +162,8 @@ public void testIncrementalBulkLowWatermarkBackOff() throws Exception {
162162

163163
IndexRequest indexRequest = indexRequest(index);
164164
long total = indexRequest.ramBytesUsed();
165+
long lowWaterMarkSplits = indexingPressure.stats().getLowWaterMarkSplits();
166+
long highWaterMarkSplits = indexingPressure.stats().getHighWaterMarkSplits();
165167
while (total < 2048) {
166168
refCounted.incRef();
167169
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
@@ -176,6 +178,8 @@ public void testIncrementalBulkLowWatermarkBackOff() throws Exception {
176178
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
177179

178180
assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
181+
assertBusy(() -> assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(lowWaterMarkSplits + 1)));
182+
assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(highWaterMarkSplits));
179183

180184
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
181185
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);
@@ -193,6 +197,8 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
193197
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
194198
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
195199
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
200+
long lowWaterMarkSplits = indexingPressure.stats().getLowWaterMarkSplits();
201+
long highWaterMarkSplits = indexingPressure.stats().getHighWaterMarkSplits();
196202

197203
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
198204
AtomicBoolean nextPage = new AtomicBoolean(false);
@@ -218,6 +224,8 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
218224
handlerNoThrottle.addItems(requestsNoThrottle, refCounted::decRef, () -> nextPage.set(true));
219225
assertTrue(nextPage.get());
220226
nextPage.set(false);
227+
assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(highWaterMarkSplits));
228+
assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(lowWaterMarkSplits));
221229

222230
ArrayList<DocWriteRequest<?>> requestsThrottle = new ArrayList<>();
223231
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
@@ -236,6 +244,8 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
236244

237245
// Wait until we are ready for the next page
238246
assertBusy(() -> assertTrue(nextPage.get()));
247+
assertBusy(() -> assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(highWaterMarkSplits + 1)));
248+
assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(lowWaterMarkSplits));
239249

240250
for (IncrementalBulkService.Handler h : handlers) {
241251
refCounted.incRef();

server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,35 +9,48 @@
99

1010
package org.elasticsearch.monitor.metrics;
1111

12+
import org.elasticsearch.action.DocWriteRequest;
1213
import org.elasticsearch.action.bulk.BulkRequest;
1314
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1415
import org.elasticsearch.action.bulk.BulkResponse;
16+
import org.elasticsearch.action.bulk.IncrementalBulkService;
1517
import org.elasticsearch.action.delete.DeleteRequest;
1618
import org.elasticsearch.action.index.IndexRequest;
19+
import org.elasticsearch.action.support.PlainActionFuture;
1720
import org.elasticsearch.cluster.metadata.IndexMetadata;
1821
import org.elasticsearch.common.settings.Setting;
1922
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2024
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
25+
import org.elasticsearch.core.AbstractRefCounted;
2126
import org.elasticsearch.core.TimeValue;
27+
import org.elasticsearch.index.IndexingPressure;
2228
import org.elasticsearch.plugins.Plugin;
2329
import org.elasticsearch.plugins.PluginsService;
2430
import org.elasticsearch.rest.RestStatus;
2531
import org.elasticsearch.telemetry.Measurement;
2632
import org.elasticsearch.telemetry.TestTelemetryPlugin;
2733
import org.elasticsearch.test.ESIntegTestCase;
34+
import org.elasticsearch.threadpool.ThreadPool;
2835

36+
import java.util.ArrayList;
2937
import java.util.Arrays;
3038
import java.util.Collection;
3139
import java.util.List;
3240
import java.util.Map;
41+
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.CyclicBarrier;
43+
import java.util.concurrent.atomic.AtomicBoolean;
3344
import java.util.function.Function;
3445

3546
import static org.elasticsearch.index.IndexingPressure.MAX_COORDINATING_BYTES;
3647
import static org.elasticsearch.index.IndexingPressure.MAX_PRIMARY_BYTES;
3748
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
49+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
3850
import static org.hamcrest.Matchers.equalTo;
3951
import static org.hamcrest.Matchers.greaterThan;
4052
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
53+
import static org.hamcrest.Matchers.lessThan;
4154

4255
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
4356
public class NodeIndexingMetricsIT extends ESIntegTestCase {
@@ -453,6 +466,211 @@ public void testPrimaryDocumentRejectionMetricsFluctuatingOverTime() throws Exce
453466
}
454467
}
455468

469+
// Borrowed this test from IncrementalBulkIT and added test for metrics to it
470+
public void testIncrementalBulkLowWatermarkSplitMetrics() throws Exception {
471+
final String nodeName = internalCluster().startNode(
472+
Settings.builder()
473+
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), "512B")
474+
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "2048B")
475+
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "4KB")
476+
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "1024B")
477+
.build()
478+
);
479+
ensureStableCluster(1);
480+
481+
String index = "test";
482+
createIndex(index);
483+
484+
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
485+
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
486+
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, nodeName)
487+
.filterPlugins(TestTelemetryPlugin.class)
488+
.findFirst()
489+
.orElseThrow();
490+
testTelemetryPlugin.resetMeter();
491+
492+
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
493+
494+
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
495+
AtomicBoolean nextPage = new AtomicBoolean(false);
496+
497+
IndexRequest indexRequest = indexRequest(index);
498+
long total = indexRequest.ramBytesUsed();
499+
while (total < 2048) {
500+
refCounted.incRef();
501+
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
502+
assertTrue(nextPage.get());
503+
nextPage.set(false);
504+
indexRequest = indexRequest(index);
505+
total += indexRequest.ramBytesUsed();
506+
}
507+
508+
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L));
509+
assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(0L));
510+
511+
testTelemetryPlugin.collect();
512+
assertThat(
513+
getSingleRecordedMetric(
514+
testTelemetryPlugin::getLongAsyncCounterMeasurement,
515+
"es.indexing.coordinating.low_watermark_splits.total"
516+
).getLong(),
517+
equalTo(0L)
518+
);
519+
assertThat(
520+
getSingleRecordedMetric(
521+
testTelemetryPlugin::getLongAsyncCounterMeasurement,
522+
"es.indexing.coordinating.high_watermark_splits.total"
523+
).getLong(),
524+
equalTo(0L)
525+
);
526+
527+
refCounted.incRef();
528+
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
529+
530+
assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
531+
assertBusy(() -> assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(1L)));
532+
assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(0L));
533+
534+
testTelemetryPlugin.collect();
535+
assertThat(
536+
getLatestRecordedMetric(
537+
testTelemetryPlugin::getLongAsyncCounterMeasurement,
538+
"es.indexing.coordinating.low_watermark_splits.total"
539+
).getLong(),
540+
equalTo(1L)
541+
);
542+
assertThat(
543+
getLatestRecordedMetric(
544+
testTelemetryPlugin::getLongAsyncCounterMeasurement,
545+
"es.indexing.coordinating.high_watermark_splits.total"
546+
).getLong(),
547+
equalTo(0L)
548+
);
549+
550+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
551+
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);
552+
553+
BulkResponse bulkResponse = safeGet(future);
554+
assertNoFailures(bulkResponse);
555+
assertFalse(refCounted.hasReferences());
556+
}
557+
558+
// Borrowed this test from IncrementalBulkIT and added test for metrics to it
559+
public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
560+
final String nodeName = internalCluster().startNode(
561+
Settings.builder()
562+
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), "512B")
563+
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "2048B")
564+
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "4KB")
565+
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "1024B")
566+
.build()
567+
);
568+
ensureStableCluster(1);
569+
570+
String index = "test";
571+
createIndex(index);
572+
573+
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
574+
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
575+
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
576+
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, nodeName)
577+
.filterPlugins(TestTelemetryPlugin.class)
578+
.findFirst()
579+
.orElseThrow();
580+
testTelemetryPlugin.resetMeter();
581+
582+
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
583+
AtomicBoolean nextPage = new AtomicBoolean(false);
584+
585+
ArrayList<IncrementalBulkService.Handler> handlers = new ArrayList<>();
586+
for (int i = 0; i < 4; ++i) {
587+
ArrayList<DocWriteRequest<?>> requests = new ArrayList<>();
588+
add512BRequests(requests, index);
589+
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
590+
handlers.add(handler);
591+
refCounted.incRef();
592+
handler.addItems(requests, refCounted::decRef, () -> nextPage.set(true));
593+
assertTrue(nextPage.get());
594+
nextPage.set(false);
595+
}
596+
597+
// Test that a request smaller than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is not throttled
598+
ArrayList<DocWriteRequest<?>> requestsNoThrottle = new ArrayList<>();
599+
add512BRequests(requestsNoThrottle, index);
600+
IncrementalBulkService.Handler handlerNoThrottle = incrementalBulkService.newBulkRequest();
601+
handlers.add(handlerNoThrottle);
602+
refCounted.incRef();
603+
handlerNoThrottle.addItems(requestsNoThrottle, refCounted::decRef, () -> nextPage.set(true));
604+
assertTrue(nextPage.get());
605+
nextPage.set(false);
606+
assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(0L));
607+
608+
testTelemetryPlugin.collect();
609+
assertThat(
610+
getSingleRecordedMetric(
611+
testTelemetryPlugin::getLongAsyncCounterMeasurement,
612+
"es.indexing.coordinating.low_watermark_splits.total"
613+
).getLong(),
614+
equalTo(0L)
615+
);
616+
assertThat(
617+
getSingleRecordedMetric(
618+
testTelemetryPlugin::getLongAsyncCounterMeasurement,
619+
"es.indexing.coordinating.high_watermark_splits.total"
620+
).getLong(),
621+
equalTo(0L)
622+
);
623+
624+
ArrayList<DocWriteRequest<?>> requestsThrottle = new ArrayList<>();
625+
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
626+
add512BRequests(requestsThrottle, index);
627+
add512BRequests(requestsThrottle, index);
628+
629+
CountDownLatch finishLatch = new CountDownLatch(1);
630+
blockWritePool(threadPool, finishLatch);
631+
IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest();
632+
refCounted.incRef();
633+
handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true));
634+
assertFalse(nextPage.get());
635+
finishLatch.countDown();
636+
637+
handlers.add(handlerThrottled);
638+
639+
// Wait until we are ready for the next page
640+
assertBusy(() -> assertTrue(nextPage.get()));
641+
assertBusy(() -> assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(1L)));
642+
assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(0L));
643+
644+
testTelemetryPlugin.collect();
645+
assertThat(
646+
getLatestRecordedMetric(
647+
testTelemetryPlugin::getLongAsyncCounterMeasurement,
648+
"es.indexing.coordinating.low_watermark_splits.total"
649+
).getLong(),
650+
equalTo(0L)
651+
);
652+
assertThat(
653+
getLatestRecordedMetric(
654+
testTelemetryPlugin::getLongAsyncCounterMeasurement,
655+
"es.indexing.coordinating.high_watermark_splits.total"
656+
).getLong(),
657+
equalTo(1L)
658+
);
659+
660+
for (IncrementalBulkService.Handler h : handlers) {
661+
refCounted.incRef();
662+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
663+
h.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
664+
BulkResponse bulkResponse = safeGet(future);
665+
assertNoFailures(bulkResponse);
666+
}
667+
668+
assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
669+
refCounted.decRef();
670+
assertFalse(refCounted.hasReferences());
671+
testTelemetryPlugin.collect();
672+
}
673+
456674
private static Measurement getSingleRecordedMetric(Function<String, List<Measurement>> metricGetter, String name) {
457675
final List<Measurement> measurements = metricGetter.apply(name);
458676
assertFalse("Indexing metric is not recorded", measurements.isEmpty());
@@ -470,4 +688,47 @@ private static boolean doublesEquals(double expected, double actual) {
470688
final double eps = .0000001;
471689
return Math.abs(expected - actual) < eps;
472690
}
691+
692+
private static IndexRequest indexRequest(String index) {
693+
IndexRequest indexRequest = new IndexRequest();
694+
indexRequest.index(index);
695+
indexRequest.source(Map.of("field", randomAlphaOfLength(10)));
696+
return indexRequest;
697+
}
698+
699+
private static void add512BRequests(ArrayList<DocWriteRequest<?>> requests, String index) {
700+
long total = 0;
701+
while (total < 512) {
702+
IndexRequest indexRequest = indexRequest(index);
703+
requests.add(indexRequest);
704+
total += indexRequest.ramBytesUsed();
705+
}
706+
assertThat(total, lessThan(1024L));
707+
}
708+
709+
private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
710+
final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
711+
final var startBarrier = new CyclicBarrier(threadCount + 1);
712+
final var blockingTask = new AbstractRunnable() {
713+
@Override
714+
public void onFailure(Exception e) {
715+
fail(e);
716+
}
717+
718+
@Override
719+
protected void doRun() {
720+
safeAwait(startBarrier);
721+
safeAwait(finishLatch);
722+
}
723+
724+
@Override
725+
public boolean isForceExecution() {
726+
return true;
727+
}
728+
};
729+
for (int i = 0; i < threadCount; i++) {
730+
threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask);
731+
}
732+
safeAwait(startBarrier);
733+
}
473734
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ static TransportVersion def(int id) {
204204
public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_795_00_0);
205205
public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_796_00_0);
206206
public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0);
207+
public static final TransportVersion INDEXING_PRESSURE_THROTTLING_STATS = def(8_798_00_0);
207208

208209
/*
209210
* STOP! READ THIS FIRST! No, really,

0 commit comments

Comments
 (0)