Skip to content

Commit c4698c6

Browse files
committed
Introduce watermarks for indexing pressure backoff (#113912)
Currently we have a relatively basic decider about when to throttling indexing. This commit adds two levels of watermarks with configurable bulk size deciders. Additionally, adds additional settings to control primary, coordinating, and replica rejection limits.
1 parent 2046b49 commit c4698c6

File tree

13 files changed

+237
-56
lines changed

13 files changed

+237
-56
lines changed

modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public class KibanaThreadPoolIT extends ESIntegTestCase {
5757
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5858
return Settings.builder()
5959
.put(super.nodeSettings(nodeOrdinal, otherSettings))
60-
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
60+
.put(IndexingPressure.MAX_PRIMARY_BYTES.getKey(), "1KB")
61+
.put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), "1KB")
6162
.put("thread_pool.search.size", 1)
6263
.put("thread_pool.search.queue_size", 1)
6364
.put("thread_pool.write.size", 1)

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IndexingPressureRestIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public class IndexingPressureRestIT extends HttpSmokeTestCase {
4242
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
4343
return Settings.builder()
4444
.put(super.nodeSettings(nodeOrdinal, otherSettings))
45-
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
45+
.put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), "1KB")
46+
.put(IndexingPressure.MAX_PRIMARY_BYTES.getKey(), "1KB")
4647
.put(unboundedWriteQueue)
4748
.build();
4849
}

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import static org.hamcrest.Matchers.equalTo;
5252
import static org.hamcrest.Matchers.greaterThan;
5353
import static org.hamcrest.Matchers.instanceOf;
54+
import static org.hamcrest.Matchers.lessThan;
5455

5556
public class IncrementalBulkIT extends ESIntegTestCase {
5657

@@ -63,7 +64,10 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
6364
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
6465
return Settings.builder()
6566
.put(super.nodeSettings(nodeOrdinal, otherSettings))
66-
.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), "512B")
67+
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), "512B")
68+
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "2048B")
69+
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "2KB")
70+
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "1024B")
6771
.build();
6872
}
6973

@@ -80,7 +84,7 @@ public void testSingleBulkRequest() {
8084
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
8185
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);
8286

83-
BulkResponse bulkResponse = future.actionGet();
87+
BulkResponse bulkResponse = safeGet(future);
8488
assertNoFailures(bulkResponse);
8589

8690
refresh(index);
@@ -143,7 +147,7 @@ public void testIndexingPressureRejection() {
143147
}
144148
}
145149

146-
public void testIncrementalBulkRequestMemoryBackOff() throws Exception {
150+
public void testIncrementalBulkLowWatermarkBackOff() throws Exception {
147151
String index = "test";
148152
createIndex(index);
149153

@@ -158,7 +162,7 @@ public void testIncrementalBulkRequestMemoryBackOff() throws Exception {
158162

159163
IndexRequest indexRequest = indexRequest(index);
160164
long total = indexRequest.ramBytesUsed();
161-
while (total < 512) {
165+
while (total < 2048) {
162166
refCounted.incRef();
163167
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
164168
assertTrue(nextPage.get());
@@ -176,11 +180,73 @@ public void testIncrementalBulkRequestMemoryBackOff() throws Exception {
176180
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
177181
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);
178182

179-
BulkResponse bulkResponse = future.actionGet();
183+
BulkResponse bulkResponse = safeGet(future);
180184
assertNoFailures(bulkResponse);
181185
assertFalse(refCounted.hasReferences());
182186
}
183187

188+
public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
189+
String index = "test";
190+
createIndex(index);
191+
192+
String nodeName = internalCluster().getRandomNodeName();
193+
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
194+
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
195+
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
196+
197+
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
198+
AtomicBoolean nextPage = new AtomicBoolean(false);
199+
200+
ArrayList<IncrementalBulkService.Handler> handlers = new ArrayList<>();
201+
for (int i = 0; i < 4; ++i) {
202+
ArrayList<DocWriteRequest<?>> requests = new ArrayList<>();
203+
add512BRequests(requests, index);
204+
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
205+
handlers.add(handler);
206+
refCounted.incRef();
207+
handler.addItems(requests, refCounted::decRef, () -> nextPage.set(true));
208+
assertTrue(nextPage.get());
209+
nextPage.set(false);
210+
}
211+
212+
// Test that a request smaller than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is not throttled
213+
ArrayList<DocWriteRequest<?>> requestsNoThrottle = new ArrayList<>();
214+
add512BRequests(requestsNoThrottle, index);
215+
IncrementalBulkService.Handler handlerNoThrottle = incrementalBulkService.newBulkRequest();
216+
handlers.add(handlerNoThrottle);
217+
refCounted.incRef();
218+
handlerNoThrottle.addItems(requestsNoThrottle, refCounted::decRef, () -> nextPage.set(true));
219+
assertTrue(nextPage.get());
220+
nextPage.set(false);
221+
222+
ArrayList<DocWriteRequest<?>> requestsThrottle = new ArrayList<>();
223+
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
224+
add512BRequests(requestsThrottle, index);
225+
add512BRequests(requestsThrottle, index);
226+
227+
CountDownLatch finishLatch = new CountDownLatch(1);
228+
blockWritePool(threadPool, finishLatch);
229+
IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest();
230+
refCounted.incRef();
231+
handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true));
232+
assertFalse(nextPage.get());
233+
finishLatch.countDown();
234+
235+
handlers.add(handlerThrottled);
236+
237+
for (IncrementalBulkService.Handler h : handlers) {
238+
refCounted.incRef();
239+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
240+
h.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
241+
BulkResponse bulkResponse = safeGet(future);
242+
assertNoFailures(bulkResponse);
243+
}
244+
245+
assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
246+
refCounted.decRef();
247+
assertFalse(refCounted.hasReferences());
248+
}
249+
184250
public void testMultipleBulkPartsWithBackoff() {
185251
ExecutorService executorService = Executors.newFixedThreadPool(1);
186252

@@ -279,7 +345,7 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except
279345
}
280346

281347
// Should not throw because some succeeded
282-
BulkResponse bulkResponse = future.actionGet();
348+
BulkResponse bulkResponse = safeGet(future);
283349

284350
assertTrue(bulkResponse.hasFailures());
285351
BulkItemResponse[] items = bulkResponse.getItems();
@@ -347,7 +413,7 @@ public void testShortCircuitShardLevelFailure() throws Exception {
347413
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
348414
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
349415

350-
BulkResponse bulkResponse = future.actionGet();
416+
BulkResponse bulkResponse = safeGet(future);
351417
assertTrue(bulkResponse.hasFailures());
352418
for (int i = 0; i < hits.get(); ++i) {
353419
assertFalse(bulkResponse.getItems()[i].isFailed());
@@ -440,7 +506,7 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio
440506
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
441507
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
442508

443-
BulkResponse bulkResponse = future.actionGet();
509+
BulkResponse bulkResponse = safeGet(future);
444510
assertTrue(bulkResponse.hasFailures());
445511
for (int i = 0; i < hits.get(); ++i) {
446512
assertFalse(bulkResponse.getItems()[i].isFailed());
@@ -539,6 +605,16 @@ public void run() {
539605
return bulkResponse;
540606
}
541607

608+
private static void add512BRequests(ArrayList<DocWriteRequest<?>> requests, String index) {
609+
long total = 0;
610+
while (total < 512) {
611+
IndexRequest indexRequest = indexRequest(index);
612+
requests.add(indexRequest);
613+
total += indexRequest.ramBytesUsed();
614+
}
615+
assertThat(total, lessThan(1024L));
616+
}
617+
542618
private static IndexRequest indexRequest(String index) {
543619
IndexRequest indexRequest = new IndexRequest();
544620
indexRequest.index(index);

server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception {
248248
final long bulkRequestSize = bulkRequest.ramBytesUsed();
249249
final long bulkShardRequestSize = totalRequestSize;
250250
restartNodesWithSettings(
251-
Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), (long) (bulkShardRequestSize * 1.5) + "B").build()
251+
Settings.builder().put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), (long) (bulkShardRequestSize * 1.5) + "B").build()
252252
);
253253

254254
assertAcked(prepareCreate(INDEX_NAME, indexSettings(1, 1)));
@@ -312,7 +312,7 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception {
312312
}
313313
final long bulkShardRequestSize = totalRequestSize;
314314
restartNodesWithSettings(
315-
Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), (long) (bulkShardRequestSize * 1.5) + "B").build()
315+
Settings.builder().put(IndexingPressure.MAX_PRIMARY_BYTES.getKey(), (long) (bulkShardRequestSize * 1.5) + "B").build()
316316
);
317317

318318
assertAcked(prepareCreate(INDEX_NAME, indexSettings(1, 1)));
@@ -358,7 +358,12 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception {
358358
}
359359

360360
public void testWritesWillSucceedIfBelowThreshold() throws Exception {
361-
restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1MB").build());
361+
restartNodesWithSettings(
362+
Settings.builder()
363+
.put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), "1MB")
364+
.put(IndexingPressure.MAX_PRIMARY_BYTES.getKey(), "1MB")
365+
.build()
366+
);
362367
assertAcked(prepareCreate(INDEX_NAME, indexSettings(1, 1)));
363368
ensureGreen(INDEX_NAME);
364369

server/src/internalClusterTest/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private static Releasable fullyAllocatePrimaryIndexingCapacityOnNode(String targ
9292
return internalCluster().getInstance(IndexingPressure.class, targetNode)
9393
.markPrimaryOperationStarted(
9494
1,
95-
IndexingPressure.MAX_INDEXING_BYTES.get(internalCluster().getInstance(Settings.class, targetNode)).getBytes() + 1,
95+
IndexingPressure.MAX_PRIMARY_BYTES.get(internalCluster().getInstance(Settings.class, targetNode)).getBytes() + 1,
9696
true
9797
);
9898
}

server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
import java.util.Map;
3333
import java.util.function.Function;
3434

35-
import static org.elasticsearch.index.IndexingPressure.MAX_INDEXING_BYTES;
35+
import static org.elasticsearch.index.IndexingPressure.MAX_COORDINATING_BYTES;
36+
import static org.elasticsearch.index.IndexingPressure.MAX_PRIMARY_BYTES;
3637
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3738
import static org.hamcrest.Matchers.equalTo;
3839
import static org.hamcrest.Matchers.greaterThan;
@@ -198,7 +199,7 @@ public void testNodeIndexingMetricsArePublishing() {
198199
public void testCoordinatingRejectionMetricsArePublishing() {
199200

200201
// lower Indexing Pressure limits to trigger coordinating rejections
201-
final String dataNode = internalCluster().startNode(Settings.builder().put(MAX_INDEXING_BYTES.getKey(), "1KB"));
202+
final String dataNode = internalCluster().startNode(Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), "1KB"));
202203
ensureStableCluster(1);
203204

204205
final TestTelemetryPlugin plugin = internalCluster().getInstance(PluginsService.class, dataNode)
@@ -239,7 +240,7 @@ public void testCoordinatingRejectionMetricsArePublishing() {
239240
public void testCoordinatingRejectionMetricsSpiking() throws Exception {
240241

241242
// lower Indexing Pressure limits to trigger coordinating rejections
242-
final String dataNode = internalCluster().startNode(Settings.builder().put(MAX_INDEXING_BYTES.getKey(), "1KB"));
243+
final String dataNode = internalCluster().startNode(Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), "1KB"));
243244
ensureStableCluster(1);
244245

245246
final TestTelemetryPlugin plugin = internalCluster().getInstance(PluginsService.class, dataNode)
@@ -308,10 +309,10 @@ public void testCoordinatingRejectionMetricsSpiking() throws Exception {
308309
public void testPrimaryDocumentRejectionMetricsArePublishing() {
309310

310311
// setting low Indexing Pressure limits to trigger primary rejections
311-
final String dataNode = internalCluster().startNode(Settings.builder().put(MAX_INDEXING_BYTES.getKey(), "2KB").build());
312+
final String dataNode = internalCluster().startNode(Settings.builder().put(MAX_PRIMARY_BYTES.getKey(), "2KB").build());
312313
// setting high Indexing Pressure limits to pass coordinating checks
313314
final String coordinatingNode = internalCluster().startCoordinatingOnlyNode(
314-
Settings.builder().put(MAX_INDEXING_BYTES.getKey(), "10MB").build()
315+
Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), "10MB").build()
315316
);
316317
ensureStableCluster(2);
317318

@@ -375,10 +376,10 @@ public void testPrimaryDocumentRejectionMetricsArePublishing() {
375376
public void testPrimaryDocumentRejectionMetricsFluctuatingOverTime() throws Exception {
376377

377378
// setting low Indexing Pressure limits to trigger primary rejections
378-
final String dataNode = internalCluster().startNode(Settings.builder().put(MAX_INDEXING_BYTES.getKey(), "4KB").build());
379+
final String dataNode = internalCluster().startNode(Settings.builder().put(MAX_PRIMARY_BYTES.getKey(), "4KB").build());
379380
// setting high Indexing Pressure limits to pass coordinating checks
380381
final String coordinatingNode = internalCluster().startCoordinatingOnlyNode(
381-
Settings.builder().put(MAX_INDEXING_BYTES.getKey(), "100MB").build()
382+
Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), "100MB").build()
382383
);
383384
ensureStableCluster(2);
384385

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public static class Handler implements Releasable {
107107
private boolean incrementalRequestSubmitted = false;
108108
private ThreadContext.StoredContext requestContext;
109109
private Exception bulkActionLevelFailure = null;
110+
private long currentBulkSize = 0L;
110111
private BulkRequest bulkRequest = null;
111112

112113
protected Handler(
@@ -172,7 +173,7 @@ public void onFailure(Exception e) {
172173
}
173174

174175
private boolean shouldBackOff() {
175-
return indexingPressure.shouldSplitBulks();
176+
return indexingPressure.shouldSplitBulk(currentBulkSize);
176177
}
177178

178179
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
@@ -235,6 +236,7 @@ private void errorResponse(ActionListener<BulkResponse> listener) {
235236

236237
private void handleBulkSuccess(BulkResponse bulkResponse) {
237238
responses.add(bulkResponse);
239+
currentBulkSize = 0L;
238240
bulkRequest = null;
239241
}
240242

@@ -243,6 +245,7 @@ private void handleBulkFailure(boolean isFirstRequest, Exception e) {
243245
globalFailure = isFirstRequest;
244246
bulkActionLevelFailure = e;
245247
addItemLevelFailures(bulkRequest.requests());
248+
currentBulkSize = 0;
246249
bulkRequest = null;
247250
}
248251

@@ -261,13 +264,9 @@ private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable rele
261264
try {
262265
bulkRequest.add(items);
263266
releasables.add(releasable);
264-
releasables.add(
265-
indexingPressure.markCoordinatingOperationStarted(
266-
items.size(),
267-
items.stream().mapToLong(Accountable::ramBytesUsed).sum(),
268-
false
269-
)
270-
);
267+
long size = items.stream().mapToLong(Accountable::ramBytesUsed).sum();
268+
releasables.add(indexingPressure.markCoordinatingOperationStarted(items.size(), size, false));
269+
currentBulkSize += size;
271270
return true;
272271
} catch (EsRejectedExecutionException e) {
273272
handleBulkFailure(incrementalRequestSubmitted == false, e);
@@ -278,6 +277,8 @@ private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable rele
278277
}
279278

280279
private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) {
280+
assert currentBulkSize == 0L;
281+
assert bulkRequest == null;
281282
bulkRequest = new BulkRequest();
282283
bulkRequest.incrementalState(incrementalState);
283284

@@ -292,12 +293,6 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState)
292293
}
293294
}
294295

295-
private void releaseCurrentReferences() {
296-
bulkRequest = null;
297-
releasables.forEach(Releasable::close);
298-
releasables.clear();
299-
}
300-
301296
private BulkResponse combineResponses() {
302297
long tookInMillis = 0;
303298
long ingestTookInMillis = 0;

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,14 @@ public void apply(Settings value, Settings current, Settings previous) {
562562
FsHealthService.REFRESH_INTERVAL_SETTING,
563563
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
564564
IndexingPressure.MAX_INDEXING_BYTES,
565+
IndexingPressure.MAX_COORDINATING_BYTES,
566+
IndexingPressure.MAX_PRIMARY_BYTES,
567+
IndexingPressure.MAX_REPLICA_BYTES,
565568
IndexingPressure.SPLIT_BULK_THRESHOLD,
569+
IndexingPressure.SPLIT_BULK_HIGH_WATERMARK,
570+
IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE,
571+
IndexingPressure.SPLIT_BULK_LOW_WATERMARK,
572+
IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE,
566573
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
567574
DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING,
568575
CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING,

server/src/main/java/org/elasticsearch/common/settings/Setting.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1685,6 +1685,10 @@ public static Setting<ByteSizeValue> memorySizeSetting(String key, ByteSizeValue
16851685
return memorySizeSetting(key, defaultValue.toString(), properties);
16861686
}
16871687

1688+
public static Setting<ByteSizeValue> memorySizeSetting(String key, Setting<ByteSizeValue> fallbackSetting, Property... properties) {
1689+
return new Setting<>(key, fallbackSetting, (s) -> MemorySizeValue.parseBytesSizeValueOrHeapRatio(s, key), properties);
1690+
}
1691+
16881692
/**
16891693
* Creates a setting which specifies a memory size. This can either be
16901694
* specified as an absolute bytes value or as a percentage of the heap

0 commit comments

Comments
 (0)