Skip to content

Commit 3660d86

Browse files
authored
Fork the sending of file chunks during recovery (#74164)
Today if sending file chunks is CPU-bound (e.g. when using compression) then we tend to concentrate all that work onto relatively few threads, even if `indices.recovery.max_concurrent_file_chunks` is increased. With this commit we fork the transmission of each chunk onto its own thread so that the CPU-bound work can happen in parallel.
1 parent a18b1cc commit 3660d86

File tree

3 files changed

+17
-10
lines changed

3 files changed

+17
-10
lines changed

docs/reference/modules/indices/recovery.asciidoc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,12 @@ You can use the following _expert_ setting to manage resources for peer
5757
recoveries.
5858

5959
`indices.recovery.max_concurrent_file_chunks`::
60-
(<<cluster-update-settings,Dynamic>>, Expert) Number of file chunk requests
61-
sent in parallel for each recovery. Defaults to `2`.
60+
(<<cluster-update-settings,Dynamic>>, Expert) Number of file chunks sent in
61+
parallel for each recovery. Defaults to `2`.
6262
+
6363
You can increase the value of this setting when the recovery of a single shard
64-
is not reaching the traffic limit set by `indices.recovery.max_bytes_per_sec`.
64+
is not reaching the traffic limit set by `indices.recovery.max_bytes_per_sec`,
65+
up to a maximum of `8`.
6566

6667
`indices.recovery.max_concurrent_operations`::
6768
(<<cluster-update-settings,Dynamic>>, Expert) Number of operations sent

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class RecoverySettings {
8080
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
8181
*/
8282
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
83-
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope);
83+
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 8, Property.Dynamic, Property.NodeScope);
8484

8585
/**
8686
* Controls the maximum number of operation chunk requests that can be sent concurrently from the source node to the target node.

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.Version;
1717
import org.elasticsearch.action.ActionListener;
1818
import org.elasticsearch.action.ActionListenerResponseHandler;
19+
import org.elasticsearch.action.ActionRunnable;
1920
import org.elasticsearch.action.support.RetryableAction;
2021
import org.elasticsearch.cluster.node.DiscoveryNode;
2122
import org.elasticsearch.common.breaker.CircuitBreakingException;
@@ -202,12 +203,17 @@ public void writeFileChunk(StoreFileMetadata fileMetadata, long position, Releas
202203
final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(
203204
recoveryId, requestSeqNo, shardId, fileMetadata, position, content, lastChunk, totalTranslogOps, throttleTimeInNanos);
204205
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
205-
executeRetryableAction(
206-
action,
207-
request,
208-
fileChunkRequestOptions,
209-
ActionListener.runBefore(listener.map(r -> null), request::decRef),
210-
reader);
206+
207+
// Fork the actual sending onto a separate thread so we can send them concurrently even if CPU-bound (e.g. using compression).
208+
// The AsyncIOProcessor and MultiFileWriter both concentrate their work onto fewer threads if possible, but once we have
209+
// chunks to send we want to increase parallelism again.
210+
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
211+
executeRetryableAction(
212+
action,
213+
request,
214+
fileChunkRequestOptions,
215+
ActionListener.runBefore(l.map(r -> null), request::decRef),
216+
reader)));
211217
}
212218

213219
@Override

0 commit comments

Comments
 (0)