Skip to content

Commit f75852f

Browse files
author
Kamal Nayan
committed
# This is a combination of 4 commits.
# This is the 1st commit message: Removed unnecessary user notifications for version conflict exceptions in Snapshot Management Signed-off-by: Kamal Nayan <askkamal@amazon.com> # This is the commit message #2: Unwrapped the exception Signed-off-by: Kamal Nayan <askkamal@amazon.com> # This is the commit message #3: Added the unit tests and updated the code Signed-off-by: Kamal Nayan <askkamal@amazon.com> # This is the commit message #4: Updated the code for minor fixes and nits Signed-off-by: Kamal Nayan <askkamal@amazon.com>
1 parent a36dc82 commit f75852f

File tree

2 files changed

+88
-1
lines changed

2 files changed

+88
-1
lines changed

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ package org.opensearch.indexmanagement.snapshotmanagement.engine
77

88
import org.apache.logging.log4j.LogManager
99
import org.apache.logging.log4j.Logger
10+
import org.opensearch.ExceptionsHelper
1011
import org.opensearch.action.bulk.BackoffPolicy
1112
import org.opensearch.common.settings.Settings
1213
import org.opensearch.common.unit.TimeValue
1314
import org.opensearch.commons.ConfigConstants
15+
import org.opensearch.index.engine.VersionConflictEngineException
1416
import org.opensearch.indexmanagement.IndexManagementIndices
1517
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
1618
import org.opensearch.indexmanagement.opensearchapi.retry
@@ -215,6 +217,14 @@ class SMStateMachine(
215217
metadata = md
216218
}
217219
} catch (ex: Exception) {
220+
val unwrappedException = ExceptionsHelper.unwrapCause(ex) as Exception
221+
if (unwrappedException is VersionConflictEngineException) {
222+
// Don't throw the exception
223+
// TODO: Extract seqNo on VersionConflictException and retry updateMetadata with updated seqNo.
224+
log.error("Version conflict exception while updating metadata.", ex)
225+
return
226+
}
227+
218228
val smEx = SnapshotManagementException(ExceptionKey.METADATA_INDEXING_FAILURE, ex)
219229
log.error(smEx.message, ex)
220230
throw smEx
@@ -223,7 +233,7 @@ class SMStateMachine(
223233
// TODO SM save a copy to history
224234
}
225235

226-
private val updateMetaDataRetryPolicy =
236+
val updateMetaDataRetryPolicy =
227237
BackoffPolicy.exponentialBackoff(
228238
TimeValue.timeValueMillis(EXPONENTIAL_BACKOFF_MILLIS), MAX_NUMBER_OF_RETRIES,
229239
)

src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachineTests.kt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,23 @@
66
package org.opensearch.indexmanagement.snapshotmanagement.engine
77

88
import com.nhaarman.mockitokotlin2.argumentCaptor
9+
import com.nhaarman.mockitokotlin2.doAnswer
10+
import com.nhaarman.mockitokotlin2.doReturn
11+
import com.nhaarman.mockitokotlin2.mock
912
import com.nhaarman.mockitokotlin2.spy
1013
import com.nhaarman.mockitokotlin2.times
1114
import com.nhaarman.mockitokotlin2.verify
1215
import kotlinx.coroutines.runBlocking
16+
import org.apache.logging.log4j.LogManager
17+
import org.apache.logging.log4j.Logger
18+
import org.opensearch.OpenSearchException
19+
import org.opensearch.action.bulk.BackoffPolicy
1320
import org.opensearch.common.unit.TimeValue
21+
import org.opensearch.core.index.shard.ShardId
22+
import org.opensearch.index.engine.VersionConflictEngineException
1423
import org.opensearch.indexmanagement.MocksTestCase
24+
import org.opensearch.indexmanagement.opensearchapi.retry
25+
import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementException
1526
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState
1627
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.creationTransitions
1728
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.deletionTransitions
@@ -230,4 +241,70 @@ open class SMStateMachineTests : MocksTestCase() {
230241
assertEquals(1, firstValue.policyPrimaryTerm)
231242
}
232243
}
244+
245+
fun `test updateMetadata handles VersionConflictEngineException gracefully`() = runBlocking {
246+
val initialMetadata = randomSMMetadata(
247+
policySeqNo = 0,
248+
policyPrimaryTerm = 0,
249+
)
250+
val smPolicy = randomSMPolicy(
251+
seqNo = 1,
252+
primaryTerm = 1,
253+
)
254+
val updatedMetadata = randomSMMetadata(
255+
policySeqNo = 1,
256+
policyPrimaryTerm = 1,
257+
)
258+
259+
val mockBackoffPolicy = mock<BackoffPolicy>()
260+
val stateMachineSpy = spy(SMStateMachine(client, smPolicy, initialMetadata, settings, threadPool, indicesManager))
261+
doReturn(mockBackoffPolicy).`when`(stateMachineSpy).updateMetaDataRetryPolicy
262+
val logger: Logger = LogManager.getLogger(javaClass)
263+
264+
doAnswer { throw VersionConflictEngineException(ShardId("index", "_na_", 1), "test", "message") }
265+
.`when`(mockBackoffPolicy)
266+
.retry(logger) { true }
267+
268+
// Verify VersionConflictEngineException is handled gracefully
269+
try {
270+
stateMachineSpy.updateMetadata(updatedMetadata)
271+
} catch (e: Exception) {
272+
fail("VersionConflictEngineException should be handled without throwing: ${e.message}")
273+
}
274+
}
275+
276+
fun `test updateMetadata throws SnapshotManagementException for other exceptions`() = runBlocking {
277+
val initialMetadata = randomSMMetadata(
278+
policySeqNo = 0,
279+
policyPrimaryTerm = 0,
280+
)
281+
val smPolicy = randomSMPolicy(
282+
seqNo = 1,
283+
primaryTerm = 1,
284+
)
285+
val updatedMetadata = randomSMMetadata(
286+
policySeqNo = 1,
287+
policyPrimaryTerm = 1,
288+
)
289+
290+
val mockBackoffPolicy = mock<BackoffPolicy>()
291+
val stateMachineSpy = spy(SMStateMachine(client, smPolicy, initialMetadata, settings, threadPool, indicesManager))
292+
doReturn(mockBackoffPolicy).`when`(stateMachineSpy).updateMetaDataRetryPolicy
293+
val logger: Logger = LogManager.getLogger(javaClass)
294+
295+
val openSearchException = OpenSearchException("Test exception")
296+
doAnswer { throw openSearchException }
297+
.`when`(mockBackoffPolicy)
298+
.retry(logger) { true }
299+
300+
// Verify OpenSearchException is wrapped in SnapshotManagementException
301+
val thrownException = assertThrows(SnapshotManagementException::class.java) {
302+
runBlocking {
303+
stateMachineSpy.updateMetadata(updatedMetadata)
304+
}
305+
}
306+
307+
// Verify exception type and cause
308+
assertTrue(thrownException.cause is OpenSearchException)
309+
}
233310
}

0 commit comments

Comments
 (0)