Skip to content

Commit cb9b06f

Browse files
committed
Change
1 parent 6ca7bb9 commit cb9b06f

File tree

3 files changed

+21
-33
lines changed

3 files changed

+21
-33
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,11 @@ public boolean isForceExecution() {
353353
// If a processor went async and returned a response on a different thread then
354354
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
355355
// coordination steps on the write thread
356-
runOrDispatch(executor, originalThread, runnable);
356+
if (originalThread == Thread.currentThread()) {
357+
runnable.run();
358+
} else {
359+
executor.execute(runnable);
360+
}
357361
}
358362
}
359363
},
@@ -362,17 +366,6 @@ public boolean isForceExecution() {
362366
);
363367
}
364368

365-
/**
366-
* If the work is still on the original thread run the ActionRunnable synchronously. Otherwise, dispatch to the provided executor.
367-
*/
368-
protected static void runOrDispatch(Executor executor, Thread originalThread, ActionRunnable<BulkResponse> bulkRunnable) {
369-
if (originalThread == Thread.currentThread()) {
370-
bulkRunnable.run();
371-
} else {
372-
executor.execute(bulkRunnable);
373-
}
374-
}
375-
376369
/**
377370
* Determines if an index name is associated with either an existing data stream or a template
378371
* for one that has the failure store enabled.

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -385,26 +385,19 @@ protected void createMissingIndicesAndIndexData(
385385
Map<String, Exception> indicesExceptions = new ConcurrentHashMap<>();
386386
Map<String, Exception> dataStreamExceptions = new ConcurrentHashMap<>();
387387
Map<String, Exception> failureStoreExceptions = new ConcurrentHashMap<>();
388-
389-
Thread originalThread = Thread.currentThread();
390-
Runnable executeBulkRunnable = () -> {
391-
ActionRunnable<BulkResponse> bulkRunnable = new ActionRunnable<>(listener) {
392-
@Override
393-
protected void doRun() {
394-
failRequestsWhenPrerequisiteActionFailed(
395-
indicesExceptions,
396-
dataStreamExceptions,
397-
failureStoreExceptions,
398-
bulkRequest,
399-
responses
400-
);
401-
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses);
402-
}
403-
};
404-
// If we performed some async action and ended up on a different thread dispatch back to the coordination thread pool.
405-
// Otherwise, just run the action since we are on the same thread.
406-
runOrDispatch(executor, originalThread, bulkRunnable);
407-
};
388+
Runnable executeBulkRunnable = () -> executor.execute(new ActionRunnable<>(listener) {
389+
@Override
390+
protected void doRun() {
391+
failRequestsWhenPrerequisiteActionFailed(
392+
indicesExceptions,
393+
dataStreamExceptions,
394+
failureStoreExceptions,
395+
bulkRequest,
396+
responses
397+
);
398+
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses);
399+
}
400+
});
408401
try (RefCountingRunnable refs = new RefCountingRunnable(executeBulkRunnable)) {
409402
createIndices(indicesToAutoCreate, refs, indicesExceptions);
410403
rollOverDataStreams(bulkRequest, dataStreamsToBeRolledOver, false, refs, dataStreamExceptions);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,9 @@ public void testDispatchesToWriteCoordinationThreadPoolOnce() {
436436
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
437437
future.actionGet();
438438
stats = threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION)).findAny().get();
439-
assertThat(stats.completed(), equalTo(1L));
439+
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the index
440+
// is created.
441+
assertThat(stats.completed(), equalTo(2L));
440442
}
441443

442444
public void testRejectCoordination() {

0 commit comments

Comments
 (0)