Skip to content

Commit b068ab2

Browse files
authored
Adding BulkProcessor2.addWithBackpressure() (#94599)
This commit adds a method called addWithBackpressure() to BulkProcessor2. This method is similar to the add() method that already exists, except that it blocks the calling thread until there is room to add the request.
1 parent 6da721e commit b068ab2

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor2.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@
2626
import java.util.Objects;
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.concurrent.atomic.AtomicLong;
29+
import java.util.concurrent.locks.Condition;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantLock;
2932
import java.util.function.BiConsumer;
33+
import java.util.function.Supplier;
3034

3135
/**
3236
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
@@ -206,6 +210,13 @@ public static Builder builder(
206210
*/
207211
private final Object mutex = new Object();
208212

213+
/*
214+
* This Lock and Condition are used to throttle calls to bulkProcessor.addWithBackpressure when the bulkProcessor already has too many
215+
* bytes in flight and cannot accept more data until already-in-flight requests complete.
216+
*/
217+
private final Lock backpressureLock = new ReentrantLock();
218+
private final Condition backpressureNotRequiredCondition = backpressureLock.newCondition();
219+
209220
BulkProcessor2(
210221
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
211222
int maxNumberOfRetries,
@@ -276,6 +287,21 @@ public BulkProcessor2 add(IndexRequest request) throws EsRejectedExecutionExcept
276287
return add((DocWriteRequest<?>) request);
277288
}
278289

290+
/**
291+
* This method is similar to {@link #add(IndexRequest) add()}, except that if adding the approximate size in bytes of the request to
292+
* totalBytesInFlight would exceed maxBytesInFlight then this method will block until the request can be added without exceeding
293+
* maxBytesInFlight (or until shouldAbort returns false). This method should be used carefully, ideally from a single thread. This is
294+
* because calling it with multiple threads would either hang up all the threads in the pool (in the case of a bounded thread pool) or
295+
* effectively create an unbounded queue (in the case of an unbounded thread pool).
296+
* @param request The request to add to a batch to be consumed
297+
* @param shouldAbort If this returns true then this method bails out with an EsRejectedExecutionException
298+
* @return this BulkProcessor2
299+
* @throws EsRejectedExecutionException if shouldAbort returns true before the request has been added to a batch
300+
*/
301+
public BulkProcessor2 addWithBackpressure(IndexRequest request, Supplier<Boolean> shouldAbort) throws EsRejectedExecutionException {
302+
return addWithBackpressure((DocWriteRequest<?>) request, shouldAbort);
303+
}
304+
279305
/**
280306
* Adds an {@link DeleteRequest} to the list of actions to execute.
281307
* @throws EsRejectedExecutionException if adding the approximate size in bytes of the request to totalBytesInFlight would exceed
@@ -295,6 +321,44 @@ private BulkProcessor2 add(DocWriteRequest<?> request) throws EsRejectedExecutio
295321
return this;
296322
}
297323

324+
private BulkProcessor2 addWithBackpressure(DocWriteRequest<?> request, Supplier<Boolean> shouldAbort)
325+
throws EsRejectedExecutionException {
326+
/*
327+
* We want this method to block until the bulkProcessor accepts the request. Otherwise, the subsequent calls to this method will
328+
* likely continue rejecting. BulkProcessor2.add does not have the ability to exert backpressure.
329+
* So we have to catch the EsRejectedExecutionException that is thrown when it already has too many bytes in flight. We then
330+
* wait until some bulk has been completed, reducing the amount of data in flight and (probably) making room for this request.
331+
*/
332+
boolean successfullyAdded = false;
333+
while (successfullyAdded == false) {
334+
if (shouldAbort.get()) {
335+
throw new EsRejectedExecutionException("Rejecting request because bulk add has been cancelled by the caller");
336+
}
337+
try {
338+
add(request);
339+
successfullyAdded = true;
340+
} catch (EsRejectedExecutionException e) {
341+
logger.trace("Attempt to add request to batch rejected because too many bytes are in flight already. Will try again.");
342+
/*
343+
* Note: It is possible that signalAll was called between the call to add above and acquiring this lock.
344+
* But in that case, either we wait 500ms, or another batch completes and another call to signalAll wakes us up.
345+
* Either way is preferable to requiring this lock for the whole try/catch block. That is why we ignore the
346+
* result of the call to await() -- either way we are going to try calling bulkProcessor.add() again.
347+
*/
348+
backpressureLock.lock();
349+
try {
350+
backpressureNotRequiredCondition.await(500, TimeUnit.MILLISECONDS);
351+
} catch (InterruptedException ex) {
352+
Thread.currentThread().interrupt();
353+
throw new RuntimeException(ex);
354+
} finally {
355+
backpressureLock.unlock();
356+
}
357+
}
358+
}
359+
return this;
360+
}
361+
298362
/*
299363
* Exposed for unit testing
300364
*/
@@ -393,22 +457,39 @@ private void execute(BulkRequest bulkRequest, long executionId) {
393457
@Override
394458
public void onResponse(BulkResponse response) {
395459
totalBytesInFlight.addAndGet(-1 * bulkRequest.estimatedSizeInBytes());
460+
maybeNoLongerInExcessofMaxBytesInFlight();
396461
listener.afterBulk(executionId, bulkRequest, response);
397462
}
398463

399464
@Override
400465
public void onFailure(Exception e) {
401466
totalBytesInFlight.addAndGet(-1 * bulkRequest.estimatedSizeInBytes());
467+
maybeNoLongerInExcessofMaxBytesInFlight();
402468
listener.afterBulk(executionId, bulkRequest, e);
403469
}
404470
});
405471
} catch (Exception e) {
406472
logger.warn(() -> "Failed to execute bulk request " + executionId + ".", e);
407473
totalBytesInFlight.addAndGet(-1 * bulkRequest.estimatedSizeInBytes());
474+
maybeNoLongerInExcessofMaxBytesInFlight();
408475
listener.afterBulk(executionId, bulkRequest, e);
409476
}
410477
}
411478

479+
/*
480+
* This method is to be called whenever we deduct from totalBytesInFlight, signalling that it's worth retrying any requests blocked in
481+
* addWithBackpressure.
482+
*/
483+
private void maybeNoLongerInExcessofMaxBytesInFlight() {
484+
// Signal in case any rejected docs were waiting for the space in bulkProcessor in addWithBackPressure
485+
backpressureLock.lock();
486+
try {
487+
backpressureNotRequiredCondition.signalAll();
488+
} finally {
489+
backpressureLock.unlock();
490+
}
491+
}
492+
412493
private void execute() {
413494
assert Thread.holdsLock(mutex);
414495
final BulkRequest bulkRequest = this.bulkRequestUnderConstruction;

server/src/test/java/org/elasticsearch/action/bulk/BulkProcessor2Tests.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.junit.Before;
3131

3232
import java.util.ArrayList;
33+
import java.util.Collections;
3334
import java.util.List;
3435
import java.util.Locale;
3536
import java.util.concurrent.CountDownLatch;
@@ -38,6 +39,7 @@
3839
import java.util.concurrent.Future;
3940
import java.util.concurrent.ScheduledExecutorService;
4041
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicBoolean;
4143
import java.util.concurrent.atomic.AtomicInteger;
4244
import java.util.concurrent.atomic.AtomicReference;
4345
import java.util.function.BiConsumer;
@@ -408,6 +410,128 @@ public void testRejections() throws Exception {
408410
consumerExecutor.shutdown();
409411
}
410412

413+
public void testAddWithNoBackpressureThrowsException() throws InterruptedException {
414+
/*
415+
* This tests that if we reduce the configured batch size and max bytes in flight then we can force add() to throw
416+
* EsRejectedExecutionExceptions.
417+
*/
418+
BulkResponse bulkResponse = new BulkResponse(
419+
new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) },
420+
0
421+
);
422+
final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
423+
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
424+
try {
425+
Thread.sleep(50);
426+
} catch (InterruptedException e) {
427+
throw new RuntimeException(e);
428+
}
429+
listener.onResponse(bulkResponse);
430+
});
431+
};
432+
int numberOfRequests = 100;
433+
final AtomicBoolean haveSeenRejections = new AtomicBoolean(false);
434+
final BulkProcessor2.Listener listener = new BulkProcessor2.Listener() {
435+
436+
@Override
437+
public void beforeBulk(long executionId, BulkRequest request) {}
438+
439+
@Override
440+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
441+
442+
@Override
443+
public void afterBulk(long executionId, BulkRequest request, Exception failure) {
444+
fail("afterBulk should not return exception");
445+
}
446+
};
447+
BulkProcessor2 bulkProcessor = BulkProcessor2.builder(consumer, listener, threadPool)
448+
.setBulkSize(ByteSizeValue.ofBytes(512))
449+
.setMaxBytesInFlight(ByteSizeValue.ofBytes(1024))
450+
.setFlushInterval(TimeValue.timeValueMillis(1))
451+
.build();
452+
try {
453+
for (int i = 0; i < numberOfRequests; i++) {
454+
bulkProcessor.add(new IndexRequest().source(Collections.singletonMap("this_is_a_key" + i, "this_is_a_value" + i)));
455+
}
456+
} catch (EsRejectedExecutionException e) {
457+
// We are throwing more data at the processor than the max bytes in flight setting can handle
458+
haveSeenRejections.set(true);
459+
} finally {
460+
bulkProcessor.awaitClose(1, TimeUnit.SECONDS);
461+
}
462+
assertThat(haveSeenRejections.get(), equalTo(true));
463+
}
464+
465+
public void testAddWithBackpressure() throws InterruptedException {
466+
/*
467+
* This test reduces the configured batch size and max bytes in flight so that using add() would throw
468+
* EsRejectedExecutionExceptions (see testAddWithNoBackpressureThrowsException). But instead this test uses addWithBackpressure
469+
* so that it blocks the calling thread until requests can be added.
470+
*/
471+
BulkResponse bulkResponse = new BulkResponse(
472+
new BulkItemResponse[] { BulkItemResponse.success(0, randomFrom(DocWriteRequest.OpType.values()), mockResponse()) },
473+
0
474+
);
475+
final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
476+
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
477+
try {
478+
Thread.sleep(50);
479+
} catch (InterruptedException e) {
480+
throw new RuntimeException(e);
481+
}
482+
listener.onResponse(bulkResponse);
483+
});
484+
};
485+
int numberOfRequests = 100;
486+
final CountDownLatch countDownLatch = new CountDownLatch(numberOfRequests);
487+
final BulkProcessor2.Listener listener = new BulkProcessor2.Listener() {
488+
489+
@Override
490+
public void beforeBulk(long executionId, BulkRequest request) {}
491+
492+
@Override
493+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
494+
for (int i = 0; i < request.requests.size(); i++) {
495+
countDownLatch.countDown();
496+
}
497+
}
498+
499+
@Override
500+
public void afterBulk(long executionId, BulkRequest request, Exception failure) {
501+
fail("afterBulk should not return exception");
502+
}
503+
};
504+
BulkProcessor2 bulkProcessor = BulkProcessor2.builder(consumer, listener, threadPool)
505+
.setBulkSize(ByteSizeValue.ofBytes(512))
506+
.setMaxBytesInFlight(ByteSizeValue.ofBytes(1024))
507+
.setFlushInterval(TimeValue.timeValueMillis(1))
508+
.build();
509+
AtomicBoolean abort = new AtomicBoolean(false);
510+
final AtomicBoolean haveSeenRejections = new AtomicBoolean(false);
511+
try {
512+
for (int i = 0; i < numberOfRequests; i++) {
513+
bulkProcessor.addWithBackpressure(
514+
new IndexRequest().source(Collections.singletonMap("this_is_a_key" + i, "this_is_a_value" + i)),
515+
abort::get
516+
);
517+
}
518+
assertTrue(countDownLatch.await(5, TimeUnit.MINUTES));
519+
assertThat(bulkProcessor.getTotalBytesInFlight(), equalTo(0L));
520+
abort.set(true);
521+
try {
522+
bulkProcessor.addWithBackpressure(
523+
new IndexRequest().source(Collections.singletonMap("this_is_a_key", "this_is_a_value")),
524+
abort::get
525+
);
526+
} catch (EsRejectedExecutionException e) {
527+
haveSeenRejections.set(true);
528+
}
529+
} finally {
530+
bulkProcessor.awaitClose(1, TimeUnit.SECONDS);
531+
}
532+
assertThat(haveSeenRejections.get(), equalTo(true));
533+
}
534+
411535
private BulkProcessor2.Listener emptyListener() {
412536
return new BulkProcessor2.Listener() {
413537
@Override

0 commit comments

Comments
 (0)