@@ -68,6 +68,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
6868 private final IngestService ingestService ;
6969 private final IngestActionForwarder ingestForwarder ;
7070 protected final LongSupplier relativeTimeNanosProvider ;
71+ protected final Executor coordinationExecutor ;
7172 protected final Executor writeExecutor ;
7273 protected final Executor systemWriteExecutor ;
7374 private final ActionType <BulkResponse > bulkAction ;
@@ -92,6 +93,7 @@ public TransportAbstractBulkAction(
9293 this .indexingPressure = indexingPressure ;
9394 this .systemIndices = systemIndices ;
9495 this .projectResolver = projectResolver ;
96+ this .coordinationExecutor = threadPool .executor (ThreadPool .Names .WRITE_COORDINATION );
9597 this .writeExecutor = threadPool .executor (ThreadPool .Names .WRITE );
9698 this .systemWriteExecutor = threadPool .executor (ThreadPool .Names .SYSTEM_WRITE );
9799 this .ingestForwarder = new IngestActionForwarder (transportService );
@@ -106,8 +108,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
106108 * This is called on the Transport thread so we can check the indexing
107109 * memory pressure *quickly* but we don't want to keep the transport
108110 * thread busy. Then, as soon as we have the indexing pressure in we fork
109- * to one of the write thread pools . We do this because juggling the
110- * bulk request can get expensive for a few reasons:
111+ * to the coordinator thread pool for coordination tasks . We do this because
112+ * juggling the bulk request can get expensive for a few reasons:
111113 * 1. Figuring out which shard should receive a bulk request might require
112114 * parsing the _source.
113115 * 2. When dispatching the sub-requests to shards we may have to compress
@@ -131,14 +133,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
131133 releasable = indexingPressure .markCoordinatingOperationStarted (indexingOps , indexingBytes , isOnlySystem );
132134 }
133135 final ActionListener <BulkResponse > releasingListener = ActionListener .runBefore (listener , releasable ::close );
134- final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor ;
135- ensureClusterStateThenForkAndExecute (task , bulkRequest , executor , releasingListener );
136+ // Use coordinationExecutor for dispatching coordination tasks
137+ ensureClusterStateThenForkAndExecute (task , bulkRequest , coordinationExecutor , isOnlySystem , releasingListener );
136138 }
137139
138140 private void ensureClusterStateThenForkAndExecute (
139141 Task task ,
140142 BulkRequest bulkRequest ,
141143 Executor executor ,
144+ boolean isOnlySystem ,
142145 ActionListener <BulkResponse > releasingListener
143146 ) {
144147 final ClusterState initialState = clusterService .state ();
@@ -160,7 +163,7 @@ private void ensureClusterStateThenForkAndExecute(
160163 clusterStateObserver .waitForNextChange (new ClusterStateObserver .Listener () {
161164 @ Override
162165 public void onNewClusterState (ClusterState state ) {
163- forkAndExecute (task , bulkRequest , executor , releasingListener );
166+ forkAndExecute (task , bulkRequest , executor , isOnlySystem , releasingListener );
164167 }
165168
166169 @ Override
@@ -174,21 +177,32 @@ public void onTimeout(TimeValue timeout) {
174177 }
175178 }, newState -> false == newState .blocks ().hasGlobalBlockWithLevel (projectId , ClusterBlockLevel .WRITE ));
176179 } else {
177- forkAndExecute (task , bulkRequest , executor , releasingListener );
180+ forkAndExecute (task , bulkRequest , executor , isOnlySystem , releasingListener );
178181 }
179182 }
180183
181- private void forkAndExecute (Task task , BulkRequest bulkRequest , Executor executor , ActionListener <BulkResponse > releasingListener ) {
184+ private void forkAndExecute (
185+ Task task ,
186+ BulkRequest bulkRequest ,
187+ Executor executor ,
188+ boolean isOnlySystem ,
189+ ActionListener <BulkResponse > releasingListener
190+ ) {
182191 executor .execute (new ActionRunnable <>(releasingListener ) {
183192 @ Override
184193 protected void doRun () throws IOException {
185- applyPipelinesAndDoInternalExecute (task , bulkRequest , executor , releasingListener );
194+ applyPipelinesAndDoInternalExecute (task , bulkRequest , executor , isOnlySystem , releasingListener );
186195 }
187196 });
188197 }
189198
190- private boolean applyPipelines (Task task , BulkRequest bulkRequest , Executor executor , ActionListener <BulkResponse > listener )
191- throws IOException {
199+ private boolean applyPipelines (
200+ Task task ,
201+ BulkRequest bulkRequest ,
202+ Executor executor ,
203+ boolean isOnlySystem ,
204+ ActionListener <BulkResponse > listener
205+ ) throws IOException {
192206 boolean hasIndexRequestsWithPipelines = false ;
193207 ClusterState state = clusterService .state ();
194208 ProjectId projectId = projectResolver .getProjectId ();
@@ -277,7 +291,7 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
277291 assert arePipelinesResolved : bulkRequest ;
278292 }
279293 if (clusterService .localNode ().isIngestNode ()) {
280- processBulkIndexIngestRequest (task , bulkRequest , executor , project , l );
294+ processBulkIndexIngestRequest (task , bulkRequest , executor , isOnlySystem , project , l );
281295 } else {
282296 ingestForwarder .forwardIngestRequest (bulkAction , bulkRequest , l );
283297 }
@@ -291,6 +305,7 @@ private void processBulkIndexIngestRequest(
291305 Task task ,
292306 BulkRequest original ,
293307 Executor executor ,
308+ boolean isOnlySystem ,
294309 ProjectMetadata metadata ,
295310 ActionListener <BulkResponse > listener
296311 ) {
@@ -324,20 +339,21 @@ private void processBulkIndexIngestRequest(
324339 ActionRunnable <BulkResponse > runnable = new ActionRunnable <>(actionListener ) {
325340 @ Override
326341 protected void doRun () throws IOException {
327- applyPipelinesAndDoInternalExecute (task , bulkRequest , executor , actionListener );
342+ applyPipelinesAndDoInternalExecute (task , bulkRequest , executor , isOnlySystem , actionListener );
328343 }
329344
330345 @ Override
331346 public boolean isForceExecution () {
332- // If we fork back to a write thread we **not** should fail, because tp queue is full.
347+ // If we fork back to a coordination thread we **not** should fail, because tp queue is full.
333348 // (Otherwise the work done during ingest will be lost)
334349 // It is okay to force execution here. Throttling of write requests happens prior to
335350 // ingest when a node receives a bulk request.
336351 return true ;
337352 }
338353 };
339354 // If a processor went async and returned a response on a different thread then
340- // before we continue the bulk request we should fork back on a write thread:
355+ // before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
356+ // coordination steps on the write thread
341357 if (originalThread == Thread .currentThread ()) {
342358 runnable .run ();
343359 } else {
@@ -346,7 +362,8 @@ public boolean isForceExecution() {
346362 }
347363 }
348364 },
349- executor
365+ // Use the appropriate write executor for actual ingest processing
366+ isOnlySystem ? systemWriteExecutor : writeExecutor
350367 );
351368 }
352369
@@ -402,10 +419,11 @@ private void applyPipelinesAndDoInternalExecute(
402419 Task task ,
403420 BulkRequest bulkRequest ,
404421 Executor executor ,
422+ boolean isOnlySystem ,
405423 ActionListener <BulkResponse > listener
406424 ) throws IOException {
407425 final long relativeStartTimeNanos = relativeTimeNanos ();
408- if (applyPipelines (task , bulkRequest , executor , listener ) == false ) {
426+ if (applyPipelines (task , bulkRequest , executor , isOnlySystem , listener ) == false ) {
409427 doInternalExecute (task , bulkRequest , executor , listener , relativeStartTimeNanos );
410428 }
411429 }
0 commit comments