Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio
}

private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to:

Suggested change
private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
private static void blockWriteCoordinationPool(ThreadPool threadPool, CountDownLatch finishLatch) {

final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
final var threadCount = threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getMax();
final var startBarrier = new CyclicBarrier(threadCount + 1);
final var blockingTask = new AbstractRunnable() {
@Override
Expand All @@ -552,13 +552,13 @@ public boolean isForceExecution() {
}
};
for (int i = 0; i < threadCount; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask);
threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(blockingTask);
}
safeAwait(startBarrier);
}

private static void fillWriteQueue(ThreadPool threadPool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename to:

Suggested change
private static void fillWriteQueue(ThreadPool threadPool) {
private static void fillWriteCoordinationQueue(ThreadPool threadPool) {

final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles());
final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getQueueSize().singles());
final var queueFilled = new AtomicBoolean(false);
final var queueFillingTask = new AbstractRunnable() {
@Override
Expand All @@ -577,7 +577,7 @@ public boolean isForceExecution() {
}
};
for (int i = 0; i < queueSize; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(queueFillingTask);
threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(queueFillingTask);
}
queueFilled.set(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
private final IngestService ingestService;
private final IngestActionForwarder ingestForwarder;
protected final LongSupplier relativeTimeNanosProvider;
protected final Executor coordinationExecutor;
protected final Executor writeExecutor;
protected final Executor systemWriteExecutor;
private final ActionType<BulkResponse> bulkAction;
Expand All @@ -92,6 +93,7 @@ public TransportAbstractBulkAction(
this.indexingPressure = indexingPressure;
this.systemIndices = systemIndices;
this.projectResolver = projectResolver;
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
this.ingestForwarder = new IngestActionForwarder(transportService);
Expand All @@ -106,8 +108,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
* This is called on the Transport thread so we can check the indexing
* memory pressure *quickly* but we don't want to keep the transport
* thread busy. Then, as soon as we have the indexing pressure in we fork
* to one of the write thread pools. We do this because juggling the
* bulk request can get expensive for a few reasons:
* to the coordinator thread pool for coordination tasks. We do this because
* juggling the bulk request can get expensive for a few reasons:
* 1. Figuring out which shard should receive a bulk request might require
* parsing the _source.
* 2. When dispatching the sub-requests to shards we may have to compress
Expand All @@ -131,14 +133,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
}
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
// Use coordinationExecutor for dispatching coordination tasks
ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener);
}

private void ensureClusterStateThenForkAndExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> releasingListener
) {
final ClusterState initialState = clusterService.state();
Expand All @@ -159,7 +162,7 @@ private void ensureClusterStateThenForkAndExecute(
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
forkAndExecute(task, bulkRequest, executor, releasingListener);
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
}

@Override
Expand All @@ -173,21 +176,32 @@ public void onTimeout(TimeValue timeout) {
}
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE));
} else {
forkAndExecute(task, bulkRequest, executor, releasingListener);
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
}
}

private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
private void forkAndExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> releasingListener
) {
executor.execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
}
});
}

private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
throws IOException {
private boolean applyPipelines(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> listener
) throws IOException {
boolean hasIndexRequestsWithPipelines = false;
ClusterState state = clusterService.state();
ProjectId projectId = projectResolver.getProjectId();
Expand Down Expand Up @@ -276,7 +290,7 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
assert arePipelinesResolved : bulkRequest;
}
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, executor, project, l);
processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l);
} else {
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
}
Expand All @@ -290,6 +304,7 @@ private void processBulkIndexIngestRequest(
Task task,
BulkRequest original,
Executor executor,
boolean isOnlySystem,
ProjectMetadata metadata,
ActionListener<BulkResponse> listener
) {
Expand Down Expand Up @@ -323,20 +338,21 @@ private void processBulkIndexIngestRequest(
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener);
}

@Override
public boolean isForceExecution() {
// If we fork back to a write thread we **not** should fail, because tp queue is full.
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request.
return true;
}
};
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a write thread:
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
// coordination steps on the write thread
if (originalThread == Thread.currentThread()) {
runnable.run();
} else {
Expand All @@ -345,7 +361,8 @@ public boolean isForceExecution() {
}
}
},
executor
// Use the appropriate write executor for actual ingest processing
isOnlySystem ? systemWriteExecutor : writeExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with this, but it seems like for any case where we have ingest processing, we would then still have the coordination happen behind any local write work. At least the PR then avoids some of the wait roundtrips.

We could also decide to have both a system-write-coordination pool and a write-coordination pool and use those here? We can look at this in follow-ups ofc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also decide to have both a system-write-coordination pool and a write-coordination pool and use those here? We can look at this in follow-ups ofc.

Yes I would like to move ingest work to a non-WRITE thread pool in a follow-up as I think there might be a few things to discuss.

);
}

Expand Down Expand Up @@ -401,10 +418,11 @@ private void applyPipelinesAndDoInternalExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> listener
) throws IOException {
final long relativeStartTimeNanos = relativeTimeNanos();
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) {
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.GENERIC,
new ScalingExecutorBuilder(ThreadPool.Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false)
);
result.put(
ThreadPool.Names.WRITE_COORDINATION,
new FixedExecutorBuilder(
settings,
ThreadPool.Names.WRITE_COORDINATION,
allocatedProcessors,
10000,
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
)
);
result.put(
ThreadPool.Names.WRITE,
new FixedExecutorBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ protected static String settingsKey(final String prefix, final String key) {

protected static int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals("bulk")
|| name.equals(ThreadPool.Names.WRITE_COORDINATION)
|| name.equals(ThreadPool.Names.WRITE)
|| name.equals(ThreadPool.Names.SYSTEM_WRITE)
|| name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public static class Names {
public static final String GET = "get";
public static final String ANALYZE = "analyze";
public static final String WRITE = "write";
public static final String WRITE_COORDINATION = "write_coordination";
public static final String SEARCH = "search";
public static final String SEARCH_COORDINATION = "search_coordination";
public static final String AUTO_COMPLETE = "auto_complete";
Expand Down Expand Up @@ -186,6 +187,7 @@ public static ThreadPoolType fromType(String type) {
entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED),
entry(Names.GET, ThreadPoolType.FIXED),
entry(Names.ANALYZE, ThreadPoolType.FIXED),
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
entry(Names.WRITE, ThreadPoolType.FIXED),
entry(Names.SEARCH, ThreadPoolType.FIXED),
entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE);
private FeatureService mockFeatureService;

private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination");
private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write");
private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write");

Expand Down Expand Up @@ -293,6 +294,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti
public void setupAction() {
// initialize captors, which must be members to use @Capture because of generics
threadPool = mock(ThreadPool.class);
when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor);
when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor);
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor);
MockitoAnnotations.openMocks(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ public void testOnlySystem() throws IOException {
assertFalse(TransportBulkAction.isOnlySystem(buildBulkStreamRequest(mixed), indicesLookup, systemIndices));
}

private void blockWriteThreadPool(CountDownLatch blockingLatch) {
private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) {
assertThat(blockingLatch.getCount(), greaterThan(0L));
final var executor = threadPool.executor(ThreadPool.Names.WRITE);
final var executor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
// Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full.
expectThrows(EsRejectedExecutionException.class, () -> {
// noinspection InfiniteLoopStatement
Expand All @@ -427,7 +427,7 @@ public void testRejectCoordination() {

final var blockingLatch = new CountDownLatch(1);
try {
blockWriteThreadPool(blockingLatch);
blockWriteCoordinationThreadPool(blockingLatch);
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
expectThrows(EsRejectedExecutionException.class, future);
Expand All @@ -442,7 +442,7 @@ public void testRejectionAfterCreateIndexIsPropagated() {
bulkAction.failIndexCreationException = randomBoolean() ? new ResourceAlreadyExistsException("index already exists") : null;
final var blockingLatch = new CountDownLatch(1);
try {
bulkAction.beforeIndexCreation = () -> blockWriteThreadPool(blockingLatch);
bulkAction.beforeIndexCreation = () -> blockWriteCoordinationThreadPool(blockingLatch);
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
expectThrows(EsRejectedExecutionException.class, future);
Expand Down