Skip to content

Commit 92c9cc7

Browse files
committed
Add thread pool for write coordination
This change adds a thread pool for write coordination to ensure that bulk coordination does not get stuck on an overloaded primary node.
1 parent b24bb35 commit 92c9cc7

File tree

4 files changed

+47
-16
lines changed

4 files changed

+47
-16
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
6868
private final IngestService ingestService;
6969
private final IngestActionForwarder ingestForwarder;
7070
protected final LongSupplier relativeTimeNanosProvider;
71+
protected final Executor coordinationExecutor;
7172
protected final Executor writeExecutor;
7273
protected final Executor systemWriteExecutor;
7374
private final ActionType<BulkResponse> bulkAction;
@@ -92,6 +93,7 @@ public TransportAbstractBulkAction(
9293
this.indexingPressure = indexingPressure;
9394
this.systemIndices = systemIndices;
9495
this.projectResolver = projectResolver;
96+
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
9597
this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
9698
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
9799
this.ingestForwarder = new IngestActionForwarder(transportService);
@@ -106,8 +108,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
106108
* This is called on the Transport thread so we can check the indexing
107109
* memory pressure *quickly* but we don't want to keep the transport
108110
* thread busy. Then, as soon as we have the indexing pressure in we fork
109-
* to one of the write thread pools. We do this because juggling the
110-
* bulk request can get expensive for a few reasons:
111+
* to the coordinator thread pool for coordination tasks. We do this because
112+
* juggling the bulk request can get expensive for a few reasons:
111113
* 1. Figuring out which shard should receive a bulk request might require
112114
* parsing the _source.
113115
* 2. When dispatching the sub-requests to shards we may have to compress
@@ -131,14 +133,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
131133
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
132134
}
133135
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
134-
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
135-
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
136+
// Use coordinationExecutor for dispatching coordination tasks
137+
ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener);
136138
}
137139

138140
private void ensureClusterStateThenForkAndExecute(
139141
Task task,
140142
BulkRequest bulkRequest,
141143
Executor executor,
144+
boolean isOnlySystem,
142145
ActionListener<BulkResponse> releasingListener
143146
) {
144147
final ClusterState initialState = clusterService.state();
@@ -159,7 +162,7 @@ private void ensureClusterStateThenForkAndExecute(
159162
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
160163
@Override
161164
public void onNewClusterState(ClusterState state) {
162-
forkAndExecute(task, bulkRequest, executor, releasingListener);
165+
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
163166
}
164167

165168
@Override
@@ -173,21 +176,32 @@ public void onTimeout(TimeValue timeout) {
173176
}
174177
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE));
175178
} else {
176-
forkAndExecute(task, bulkRequest, executor, releasingListener);
179+
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
177180
}
178181
}
179182

180-
private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
183+
private void forkAndExecute(
184+
Task task,
185+
BulkRequest bulkRequest,
186+
Executor executor,
187+
boolean isOnlySystem,
188+
ActionListener<BulkResponse> releasingListener
189+
) {
181190
executor.execute(new ActionRunnable<>(releasingListener) {
182191
@Override
183192
protected void doRun() throws IOException {
184-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
193+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
185194
}
186195
});
187196
}
188197

189-
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
190-
throws IOException {
198+
private boolean applyPipelines(
199+
Task task,
200+
BulkRequest bulkRequest,
201+
Executor executor,
202+
boolean isOnlySystem,
203+
ActionListener<BulkResponse> listener
204+
) throws IOException {
191205
boolean hasIndexRequestsWithPipelines = false;
192206
ClusterState state = clusterService.state();
193207
ProjectId projectId = projectResolver.getProjectId();
@@ -276,7 +290,7 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
276290
assert arePipelinesResolved : bulkRequest;
277291
}
278292
if (clusterService.localNode().isIngestNode()) {
279-
processBulkIndexIngestRequest(task, bulkRequest, executor, project, l);
293+
processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l);
280294
} else {
281295
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
282296
}
@@ -290,6 +304,7 @@ private void processBulkIndexIngestRequest(
290304
Task task,
291305
BulkRequest original,
292306
Executor executor,
307+
boolean isOnlySystem,
293308
ProjectMetadata metadata,
294309
ActionListener<BulkResponse> listener
295310
) {
@@ -323,20 +338,21 @@ private void processBulkIndexIngestRequest(
323338
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
324339
@Override
325340
protected void doRun() throws IOException {
326-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
341+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener);
327342
}
328343

329344
@Override
330345
public boolean isForceExecution() {
331-
// If we fork back to a write thread we **not** should fail, because tp queue is full.
346+
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
332347
// (Otherwise the work done during ingest will be lost)
333348
// It is okay to force execution here. Throttling of write requests happens prior to
334349
// ingest when a node receives a bulk request.
335350
return true;
336351
}
337352
};
338353
// If a processor went async and returned a response on a different thread then
339-
// before we continue the bulk request we should fork back on a write thread:
354+
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
355+
// coordination steps on the write thread
340356
if (originalThread == Thread.currentThread()) {
341357
runnable.run();
342358
} else {
@@ -345,7 +361,8 @@ public boolean isForceExecution() {
345361
}
346362
}
347363
},
348-
executor
364+
// Use the appropriate write executor for actual ingest processing
365+
isOnlySystem ? systemWriteExecutor : writeExecutor
349366
);
350367
}
351368

@@ -401,10 +418,11 @@ private void applyPipelinesAndDoInternalExecute(
401418
Task task,
402419
BulkRequest bulkRequest,
403420
Executor executor,
421+
boolean isOnlySystem,
404422
ActionListener<BulkResponse> listener
405423
) throws IOException {
406424
final long relativeStartTimeNanos = relativeTimeNanos();
407-
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
425+
if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) {
408426
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
409427
}
410428
}

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
3838
ThreadPool.Names.GENERIC,
3939
new ScalingExecutorBuilder(ThreadPool.Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false)
4040
);
41+
result.put(
42+
ThreadPool.Names.WRITE_COORDINATION,
43+
new FixedExecutorBuilder(
44+
settings,
45+
ThreadPool.Names.WRITE_COORDINATION,
46+
allocatedProcessors,
47+
10000,
48+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
49+
)
50+
);
4151
result.put(
4252
ThreadPool.Names.WRITE,
4353
new FixedExecutorBuilder(

server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ protected static String settingsKey(final String prefix, final String key) {
4141

4242
protected static int applyHardSizeLimit(final Settings settings, final String name) {
4343
if (name.equals("bulk")
44+
|| name.equals(ThreadPool.Names.WRITE_COORDINATION)
4445
|| name.equals(ThreadPool.Names.WRITE)
4546
|| name.equals(ThreadPool.Names.SYSTEM_WRITE)
4647
|| name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public static class Names {
113113
public static final String GET = "get";
114114
public static final String ANALYZE = "analyze";
115115
public static final String WRITE = "write";
116+
public static final String WRITE_COORDINATION = "write_coordination";
116117
public static final String SEARCH = "search";
117118
public static final String SEARCH_COORDINATION = "search_coordination";
118119
public static final String AUTO_COMPLETE = "auto_complete";
@@ -186,6 +187,7 @@ public static ThreadPoolType fromType(String type) {
186187
entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED),
187188
entry(Names.GET, ThreadPoolType.FIXED),
188189
entry(Names.ANALYZE, ThreadPoolType.FIXED),
190+
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
189191
entry(Names.WRITE, ThreadPoolType.FIXED),
190192
entry(Names.SEARCH, ThreadPoolType.FIXED),
191193
entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED),

0 commit comments

Comments
 (0)