Skip to content

Commit 7e641a6

Browse files
committed
Focused on Add a validation in Start Replication API to fail the call incase there is any stale replication metadata and Stop Replication API should ensure that it removes all stale replication metadata from the cluster state upon execution.
Signed-off-by: Mohit Kumar <mohitamg@amazon.com>
1 parent 80aa587 commit 7e641a6

File tree

9 files changed

+294
-893
lines changed

9 files changed

+294
-893
lines changed

src/main/kotlin/org/opensearch/replication/ReplicationEngine.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111

1212
package org.opensearch.replication
1313

14+
import org.opensearch.index.engine.DeletionStrategy
1415
import org.opensearch.index.engine.EngineConfig
16+
import org.opensearch.index.engine.IndexingStrategy
1517
import org.opensearch.index.engine.InternalEngine
1618
import org.opensearch.index.seqno.SequenceNumbers
1719

src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt

Lines changed: 6 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import org.opensearch.replication.ReplicationPlugin
1515
import org.opensearch.replication.action.setup.SetupChecksAction
1616
import org.opensearch.replication.action.setup.SetupChecksRequest
1717
import org.opensearch.replication.metadata.store.ReplicationContext
18-
import org.opensearch.replication.task.cleanup.StaleArtifactDetector
19-
import org.opensearch.replication.task.cleanup.TaskCleanupManager
2018
import org.opensearch.replication.util.SecurityContext
2119
import org.opensearch.replication.util.ValidationUtil
2220
import org.opensearch.replication.util.completeWith
@@ -54,9 +52,7 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
5452
actionFilters: ActionFilters,
5553
private val client : Client,
5654
private val environment: Environment,
57-
private val metadataCreateIndexService: MetadataCreateIndexService,
58-
private val staleArtifactDetector: StaleArtifactDetector,
59-
private val taskCleanupManager: TaskCleanupManager) :
55+
private val metadataCreateIndexService: MetadataCreateIndexService) :
6056
HandledTransportAction<ReplicateIndexRequest, ReplicateIndexResponse>(ReplicateIndexAction.NAME,
6157
transportService, actionFilters, ::ReplicateIndexRequest),
6258
CoroutineScope by GlobalScope {
@@ -69,10 +65,8 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
6965
launch(threadPool.coroutineContext()) {
7066
listener.completeWith {
7167
log.info("Setting-up replication for ${request.leaderAlias}:${request.leaderIndex} -> ${request.followerIndex}")
72-
73-
performPreOperationValidation(request.followerIndex)
74-
7568
val user = SecurityContext.fromSecurityThreadContext(threadPool.threadContext)
69+
7670
val followerReplContext = ReplicationContext(request.followerIndex,
7771
user?.overrideFgacRole(request.useRoles?.get(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE)))
7872
val leaderReplContext = ReplicationContext(request.leaderIndex,
@@ -99,12 +93,15 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
9993

10094
val leaderSettings = getLeaderIndexSettings(request.leaderAlias, request.leaderIndex)
10195
log.debug("Leader settings were fetched for ${request.leaderIndex} index.")
96+
10297
if (leaderSettings.keySet().contains(ReplicationPlugin.REPLICATED_INDEX_SETTING.key) and
10398
!leaderSettings.get(ReplicationPlugin.REPLICATED_INDEX_SETTING.key).isNullOrBlank()) {
10499
throw IllegalArgumentException("Cannot Replicate a Replicated Index ${request.leaderIndex}")
105100
}
101+
102+
// Soft deletes should be enabled for replication to work.
106103
if (!leaderSettings.getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.key, false)) {
107-
throw IllegalArgumentException("Cannot Replicate an index where ${IndexSettings.INDEX_SOFT_DELETES_SETTING.key} is disabled")
104+
throw IllegalArgumentException("Cannot Replicate an index where the setting ${IndexSettings.INDEX_SOFT_DELETES_SETTING.key} is disabled")
108105
}
109106

110107
// Disabling knn checks as new api call will require us add roles in security index which will be a breaking call.
@@ -168,61 +165,4 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
168165
// and then the explicitly set ones to override the default settings.
169166
return Settings.builder().put(leaderDefaultSettings).put(leaderSettings).build()
170167
}
171-
172-
/**
173-
* This method distinguishes between active replication (RUNNING/PAUSED state)
174-
* and truly stale artifacts from failed operations. Active replication will cause this
175-
* method to throw an error instead of attempting cleanup.
176-
*/
177-
private suspend fun performPreOperationValidation(followerIndex: String) {
178-
// If the follower index already exists, skip this validation
179-
// The index existence check in TransportReplicateIndexClusterManagerNodeAction will handle it
180-
if (clusterService.state().metadata().hasIndex(followerIndex)) {
181-
return
182-
}
183-
184-
// First check if replication is already active for this index
185-
val replicationStateParams = org.opensearch.replication.metadata.state.getReplicationStateParamsForIndex(
186-
clusterService,
187-
followerIndex
188-
)
189-
190-
if (replicationStateParams != null) {
191-
val currentState = replicationStateParams[org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE]
192-
193-
// If replication is RUNNING or PAUSED, it's active - don't cleanup, throw error
194-
if (currentState == org.opensearch.replication.metadata.ReplicationOverallState.RUNNING.name ||
195-
currentState == org.opensearch.replication.metadata.ReplicationOverallState.PAUSED.name) {
196-
throw IllegalStateException(
197-
"Replication is already active for index $followerIndex in $currentState state. " +
198-
"Cannot start replication again. Use resume API to restart paused replication."
199-
)
200-
}
201-
202-
// If state is STOPPED or FAILED, artifacts are stale and should be cleaned up
203-
log.info("Found replication metadata for $followerIndex in $currentState state - will cleanup stale artifacts")
204-
}
205-
206-
// Now safe to detect and cleanup truly stale artifacts
207-
val staleArtifactReport = staleArtifactDetector.detectStaleArtifacts(followerIndex)
208-
209-
if (!staleArtifactReport.hasStaleArtifacts) return
210-
211-
log.warn("Detected ${staleArtifactReport.artifacts.size} stale artifacts for $followerIndex")
212-
log.info("Attempting cleanup of stale artifacts")
213-
214-
val cleanupResult = taskCleanupManager.cleanupAllReplicationTasks(followerIndex)
215-
216-
if (!cleanupResult.success) {
217-
val failures = cleanupResult.failures.joinToString("; ") { "${it.component}: ${it.error}" }
218-
throw IllegalStateException(
219-
"Cannot start replication for $followerIndex due to conflicting stale tasks. " +
220-
"Cleanup failed: $failures. Please manually resolve these conflicts."
221-
)
222-
}
223-
224-
log.info("Cleaned up stale artifacts: ${cleanupResult.indexTasksRemoved} index tasks, " +
225-
"${cleanupResult.shardTasksRemoved} shard tasks, ${cleanupResult.retentionLeasesRemoved} leases, " +
226-
"${cleanupResult.persistentTasksRemoved} persistent tasks")
227-
}
228168
}

src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ import org.opensearch.common.settings.IndexScopedSettings
4545
import org.opensearch.index.IndexNotFoundException
4646
import org.opensearch.persistent.PersistentTasksService
4747
import org.opensearch.replication.ReplicationPlugin
48+
import org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE
49+
import org.opensearch.replication.metadata.state.getReplicationStateParamsForIndex
50+
import org.opensearch.replication.task.cleanup.InlineStaleTaskCleaner
4851
import org.opensearch.replication.util.stackTraceToString
4952
import org.opensearch.repositories.RepositoriesService
5053
import org.opensearch.core.rest.RestStatus
@@ -91,6 +94,36 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
9194
// for each shard. If that takes too long we can start the task asynchronously and return the response first.
9295
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
9396
try {
97+
// Validate no active replication metadata exists for this index
98+
val replicationStateParams = getReplicationStateParamsForIndex(clusterService, replicateIndexReq.followerIndex)
99+
if (replicationStateParams != null) {
100+
val currentState = replicationStateParams[REPLICATION_LAST_KNOWN_OVERALL_STATE]
101+
if (currentState == ReplicationOverallState.RUNNING.name ||
102+
currentState == ReplicationOverallState.PAUSED.name) {
103+
throw IllegalStateException(
104+
"Replication is already active for index ${replicateIndexReq.followerIndex} in $currentState state. " +
105+
"Cannot start replication again. Use resume API to restart paused replication."
106+
)
107+
}
108+
// STOPPED or FAILED state indicates stale metadata
109+
throw IllegalStateException(
110+
"Stale replication metadata exists for index ${replicateIndexReq.followerIndex} in $currentState state. " +
111+
"Please run the Stop Replication API to clean up before starting replication."
112+
)
113+
}
114+
115+
// Clean up any stale persistent tasks before creating new ones
116+
try {
117+
val cleaner = InlineStaleTaskCleaner(clusterService, nodeClient)
118+
val removed = cleaner.removeStaleTasksForIndex(replicateIndexReq.followerIndex)
119+
if (removed > 0) {
120+
log.info("Cleaned up $removed stale task(s) for index ${replicateIndexReq.followerIndex} before starting replication")
121+
}
122+
} catch (e: Exception) {
123+
log.error("Failed to clean up stale tasks for index ${replicateIndexReq.followerIndex}: ${e.message}", e)
124+
throw e
125+
}
126+
94127
if(clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START)) {
95128
log.debug("Replication cannot be started as " +
96129
"start block(${ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START}) is set")

0 commit comments

Comments
 (0)