Skip to content

Commit 3688680

Browse files
Introduce concurrent translog recovery to accelerate segment replication primary promotion (#20251)
* support concurrent translog recovery Signed-off-by: guojialiang <[email protected]> * update Signed-off-by: guojialiang <[email protected]> * update Signed-off-by: guojialiang <[email protected]> * update Signed-off-by: guojialiang <[email protected]> * refactor Signed-off-by: guojialiang <[email protected]> * update Signed-off-by: guojialiang <[email protected]> * refactor test Signed-off-by: guojialiang <[email protected]> * add translogConcurrentRecoverySemaphore Signed-off-by: guojialiang <[email protected]> * use FutureUtils.cancel Signed-off-by: guojialiang <[email protected]> * use FutureUtils.cancel Signed-off-by: guojialiang <[email protected]> * Limit the maximum value of batch to 1 million Signed-off-by: guojialiang <[email protected]> * add UT Signed-off-by: guojialiang <[email protected]> * simplify snapshot close and fix test Signed-off-by: guojialiang <[email protected]> * add change log Signed-off-by: guojialiang <[email protected]> * Add comments Signed-off-by: guojialiang <[email protected]> --------- Signed-off-by: guojialiang <[email protected]>
1 parent d864716 commit 3688680

File tree

8 files changed

+255
-67
lines changed

8 files changed

+255
-67
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919
- Add tracing support for StreamingRestChannel ([#20361](https://github.com/opensearch-project/OpenSearch/pull/20361))
2020
- Introduce new libs/netty4 module to share common implementation between netty-based plugins and modules (transport-netty4, transport-reactor-netty4) ([#20447](https://github.com/opensearch-project/OpenSearch/pull/20447))
2121
- Add validation to make crypto store settings immutable ([#20123](https://github.com/opensearch-project/OpenSearch/pull/20123))
22+
- Introduce concurrent translog recovery to accelerate segment replication primary promotion ([#20251](https://github.com/opensearch-project/OpenSearch/pull/20251))
2223
- Update to `almalinux:10` ([#20482](https://github.com/opensearch-project/OpenSearch/pull/20482))
2324

2425
### Changed

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@ public void apply(Settings value, Settings current, Settings previous) {
346346
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
347347
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
348348
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING,
349+
RecoverySettings.INDICES_TRANSLOG_CONCURRENT_RECOVERY_ENABLE,
350+
RecoverySettings.INDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZE,
349351
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
350352
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
351353
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 88 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import org.opensearch.common.util.concurrent.AbstractRunnable;
104104
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
105105
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
106+
import org.opensearch.common.util.concurrent.FutureUtils;
106107
import org.opensearch.common.util.concurrent.RunOnce;
107108
import org.opensearch.common.util.concurrent.ThreadContext;
108109
import org.opensearch.common.util.io.IOUtils;
@@ -238,8 +239,12 @@
238239
import java.util.Objects;
239240
import java.util.Optional;
240241
import java.util.Set;
242+
import java.util.concurrent.CompletionService;
241243
import java.util.concurrent.CopyOnWriteArrayList;
242244
import java.util.concurrent.CountDownLatch;
245+
import java.util.concurrent.ExecutorCompletionService;
246+
import java.util.concurrent.Future;
247+
import java.util.concurrent.Semaphore;
243248
import java.util.concurrent.TimeUnit;
244249
import java.util.concurrent.TimeoutException;
245250
import java.util.concurrent.atomic.AtomicBoolean;
@@ -395,6 +400,9 @@ Runnable getGlobalCheckpointSyncer() {
395400
private final Set<MergedSegmentCheckpoint> pendingMergedSegmentCheckpoints = Sets.newConcurrentHashSet();
396401
private final MergedSegmentTransferTracker mergedSegmentTransferTracker;
397402

403+
// Used to limit the number of concurrent translog tasks. When the semaphore is exhausted, serial recovery is used.
404+
private static final Semaphore translogConcurrentRecoverySemaphore = new Semaphore(1000);
405+
398406
@InternalApi
399407
public IndexShard(
400408
final ShardRouting shardRouting,
@@ -5274,14 +5282,87 @@ public void close() throws IOException {
52745282
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
52755283
onNewEngine(newEngineReference.get());
52765284
}
5277-
final TranslogRecoveryRunner translogRunner = (snapshot) -> runTranslogRecovery(
5278-
newEngineReference.get(),
5279-
snapshot,
5280-
Engine.Operation.Origin.LOCAL_RESET,
5281-
() -> {
5282-
// TODO: add a dedicate recovery stats for the reset translog
5285+
final TranslogRecoveryRunner translogRunner = (snapshot) -> {
5286+
long startTime = System.currentTimeMillis();
5287+
Engine engine = newEngineReference.get();
5288+
assert null != engine;
5289+
int translogRecoveryOperations;
5290+
int totalOperations = snapshot.totalOperations();
5291+
int batchSize = recoverySettings.getTranslogConcurrentRecoveryBatchSize();
5292+
long localCheckpoint = engine.getProcessedLocalCheckpoint();
5293+
final int batches = (totalOperations + batchSize - 1) / batchSize;
5294+
// When the total totalOperations <= batchSize, there is no need to use concurrent execution.
5295+
boolean isConcurrentRecovery = recoverySettings.isTranslogConcurrentRecoveryEnable()
5296+
&& indexSettings.isSegRepEnabledOrRemoteNode()
5297+
&& totalOperations > batchSize
5298+
&& translogConcurrentRecoverySemaphore.tryAcquire(batches);
5299+
if (isConcurrentRecovery) {
5300+
List<Future<Integer>> translogRecoveryFutureList = new ArrayList<>();
5301+
try {
5302+
// Since the translog does not change at this time, it is safe to re-partition the translog snapshot here.
5303+
CompletionService<Integer> completionService = new ExecutorCompletionService<>(
5304+
threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY)
5305+
);
5306+
for (int i = 0; i < batches; i++) {
5307+
long start = localCheckpoint + 1 + (long) i * batchSize;
5308+
long end = (i == batches - 1) ? Long.MAX_VALUE : start + batchSize - 1;
5309+
translogRecoveryFutureList.add(completionService.submit(() -> {
5310+
try (Translog.Snapshot translogSnapshot = engine.translogManager().newChangesSnapshot(start, end, false)) {
5311+
return runTranslogRecovery(engine, translogSnapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
5312+
// TODO: add a dedicate recovery stats for the reset translog
5313+
});
5314+
}
5315+
}));
5316+
}
5317+
Exception exception = null;
5318+
int totalRecovered = 0;
5319+
for (int i = 0; i < batches; i++) {
5320+
try {
5321+
if (exception != null) {
5322+
for (Future<Integer> translogRecoveryFuture : translogRecoveryFutureList) {
5323+
FutureUtils.cancel(translogRecoveryFuture);
5324+
if (false == translogRecoveryFuture.isCancelled()) {
5325+
translogRecoveryFuture.get();
5326+
}
5327+
}
5328+
break;
5329+
}
5330+
totalRecovered += completionService.take().get();
5331+
} catch (Exception e) {
5332+
if (exception == null) {
5333+
exception = e;
5334+
} else {
5335+
exception.addSuppressed(e);
5336+
}
5337+
}
5338+
}
5339+
if (exception != null) {
5340+
throw new IOException("Failed to concurrent recovery translog", exception);
5341+
}
5342+
translogRecoveryOperations = totalRecovered;
5343+
} finally {
5344+
translogConcurrentRecoverySemaphore.release(batches);
5345+
}
5346+
} else {
5347+
translogRecoveryOperations = runTranslogRecovery(
5348+
newEngineReference.get(),
5349+
snapshot,
5350+
Engine.Operation.Origin.LOCAL_RESET,
5351+
() -> {
5352+
// TODO: add a dedicate recovery stats for the reset translog
5353+
}
5354+
);
52835355
}
5284-
);
5356+
5357+
logger.info(
5358+
"translog recovery complete, isConcurrentRecovery {}, cost {}ms, totalRecovered {}",
5359+
isConcurrentRecovery,
5360+
System.currentTimeMillis() - startTime,
5361+
translogRecoveryOperations
5362+
);
5363+
5364+
return translogRecoveryOperations;
5365+
};
52855366

52865367
// When the new engine is created, translogs are synced from remote store onto local. Since remote store is the source
52875368
// of truth for translog, we play all translogs that exists locally. Otherwise, the recoverUpto happens upto global checkpoint.

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,26 @@ public class RecoverySettings {
230230
Property.NodeScope
231231
);
232232

233+
public static final Setting<Boolean> INDICES_TRANSLOG_CONCURRENT_RECOVERY_ENABLE = Setting.boolSetting(
234+
"indices.translog_concurrent_recovery.enable",
235+
false,
236+
Property.Dynamic,
237+
Property.NodeScope
238+
);
239+
240+
// Limiting the maximum value to 1 million is to avoid excessive memory usage of the bitset in LocalCheckpointTracker during
241+
// out-of-order execution of concurrent recovery of translog.
242+
// Considering the worst-case scenario, with 1000 concurrent recovery tasks, each task recovering 1 million translogs, the bitset
243+
// occupancy is approximately 125MB
244+
public static final Setting<Integer> INDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZE = Setting.intSetting(
245+
"indices.translog_concurrent_recovery.batch_size",
246+
500000,
247+
10000,
248+
1000000,
249+
Property.Dynamic,
250+
Property.NodeScope
251+
);
252+
233253
private volatile ByteSizeValue mergedSegmentWarmerMinSegmentSizeThreshold;
234254
private volatile ByteSizeValue recoveryMaxBytesPerSec;
235255
private volatile ByteSizeValue replicationMaxBytesPerSec;
@@ -252,6 +272,9 @@ public class RecoverySettings {
252272
private volatile TimeValue internalRemoteUploadTimeout;
253273
private volatile TimeValue mergedSegmentReplicationTimeout;
254274

275+
private volatile boolean isTranslogConcurrentRecoveryEnable;
276+
private volatile int translogConcurrentRecoveryBatchSize;
277+
255278
public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
256279
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
257280
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
@@ -286,6 +309,9 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
286309
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
287310
this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE_SETTING.get(settings);
288311

312+
this.isTranslogConcurrentRecoveryEnable = INDICES_TRANSLOG_CONCURRENT_RECOVERY_ENABLE.get(settings);
313+
this.translogConcurrentRecoveryBatchSize = INDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZE.get(settings);
314+
289315
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
290316
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
291317
clusterSettings.addSettingsUpdateConsumer(
@@ -324,6 +350,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
324350
INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
325351
this::setInternalActionRetryTimeout
326352
);
353+
clusterSettings.addSettingsUpdateConsumer(INDICES_TRANSLOG_CONCURRENT_RECOVERY_ENABLE, this::setTranslogConcurrentRecoveryEnable);
354+
clusterSettings.addSettingsUpdateConsumer(
355+
INDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZE,
356+
this::setTranslogConcurrentRecoveryBatchSize
357+
);
327358
}
328359

329360
private void setMergedSegmentWarmerMinSegmentSizeThreshold(ByteSizeValue value) {
@@ -493,4 +524,19 @@ public void setIndicesMergedSegmentReplicationWarmerEnabled(boolean mergedSegmen
493524
this.mergedSegmentReplicationWarmerEnabled = mergedSegmentReplicationWarmerEnabled;
494525
}
495526

527+
public boolean isTranslogConcurrentRecoveryEnable() {
528+
return isTranslogConcurrentRecoveryEnable;
529+
}
530+
531+
private void setTranslogConcurrentRecoveryEnable(boolean translogConcurrentRecoveryEnable) {
532+
isTranslogConcurrentRecoveryEnable = translogConcurrentRecoveryEnable;
533+
}
534+
535+
public int getTranslogConcurrentRecoveryBatchSize() {
536+
return translogConcurrentRecoveryBatchSize;
537+
}
538+
539+
private void setTranslogConcurrentRecoveryBatchSize(int translogConcurrentRecoveryBatchSize) {
540+
this.translogConcurrentRecoveryBatchSize = translogConcurrentRecoveryBatchSize;
541+
}
496542
}

server/src/main/java/org/opensearch/threadpool/ThreadPool.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
9999
public static class Names {
100100
public static final String SAME = "same";
101101
public static final String GENERIC = "generic";
102+
public static final String TRANSLOG_RECOVERY = "translog_recovery";
102103
@Deprecated
103104
public static final String LISTENER = "listener";
104105
public static final String GET = "get";
@@ -179,6 +180,7 @@ public static ThreadPoolType fromType(String type) {
179180
HashMap<String, ThreadPoolType> map = new HashMap<>();
180181
map.put(Names.SAME, ThreadPoolType.DIRECT);
181182
map.put(Names.GENERIC, ThreadPoolType.SCALING);
183+
map.put(Names.TRANSLOG_RECOVERY, ThreadPoolType.FIXED);
182184
map.put(Names.LISTENER, ThreadPoolType.FIXED);
183185
map.put(Names.GET, ThreadPoolType.FIXED);
184186
map.put(Names.ANALYZE, ThreadPoolType.FIXED);
@@ -258,6 +260,7 @@ public ThreadPool(
258260
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
259261
final int snapshotDeletionPoolMax = boundedBy(4 * allocatedProcessors, 64, 256);
260262
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
263+
builders.put(Names.TRANSLOG_RECOVERY, new FixedExecutorBuilder(settings, Names.TRANSLOG_RECOVERY, allocatedProcessors, -1));
261264
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000));
262265
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000));
263266
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));

0 commit comments

Comments
 (0)