From 3ed3edb56843962750c0ca4e8098293930aa2109 Mon Sep 17 00:00:00 2001 From: Sanjay Kumar Date: Thu, 13 Jun 2024 17:48:33 +0530 Subject: [PATCH 1/5] Addition of Optional Parameter - allow_Red_cluster to bypass ISM to run on a red cluster Signed-off-by: Sanjay Kumar --- .../ManagedIndexRunner.kt | 14 ++++-- .../indexstatemanagement/model/State.kt | 49 +++++++++++-------- .../mappings/opendistro-ism-config.json | 3 ++ 3 files changed, 40 insertions(+), 26 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index ea7ed3765..ef0d04393 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -250,11 +250,6 @@ object ManagedIndexRunner : @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth") private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) { logger.debug("Run job for index ${managedIndexConfig.index}") - // doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls - if (clusterIsRed()) { - logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health") - return - } val (managedIndexMetaData, getMetadataSuccess) = client.getManagedIndexMetadata(managedIndexConfig.indexUuid) if (!getMetadataSuccess) { @@ -314,6 +309,15 @@ object ManagedIndexRunner : val state = policy.getStateToExecute(managedIndexMetaData) val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider) + val allowRedCluster = state?.allowRedCluster + // doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls + if (clusterIsRed()) { + if (allowRedCluster == false) { + logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health") + return + } + logger.warn("Warning: ISM is running on a red cluster") + } val stepContext = StepContext( managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, jobContext.lockService, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt index e34ff41fd..ed06bdfdc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt @@ -24,9 +24,10 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde import java.io.IOException data class State( - val name: String, - val actions: List, - val transitions: List, + val name: String, + val allowRedCluster: Boolean, + val actions: List, + val transitions: List, ) : ToXContentObject, Writeable { init { require(name.isNotBlank()) { "State must contain a valid name" } @@ -43,21 +44,23 @@ data class State( override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder - .startObject() - .field(NAME_FIELD, name) - .startArray(ACTIONS_FIELD) - .also { actions.forEach { action -> action.toXContent(it, params) } } - .endArray() - .field(TRANSITIONS_FIELD, transitions.toTypedArray()) - .endObject() + .startObject() + .field(NAME_FIELD, name) + .field(ALLOW_RED_CLUSTER, allowRedCluster) + .startArray(ACTIONS_FIELD) + .also { actions.forEach { action -> action.toXContent(it, params) } } + .endArray() + .field(TRANSITIONS_FIELD, transitions.toTypedArray()) + .endObject() return builder } @Throws(IOException::class) constructor(sin: StreamInput) : this( - sin.readString(), - sin.readList { ISMActionsParser.instance.fromStreamInput(it) }, - sin.readList(::Transition), + sin.readString(), + sin.readBoolean(), + sin.readList { ISMActionsParser.instance.fromStreamInput(it) }, + sin.readList(::Transition), ) @Throws(IOException::class) @@ -68,8 +71,8 @@ data class State( } fun getActionToExecute( - managedIndexMetaData: ManagedIndexMetaData, - indexMetadataProvider: IndexMetadataProvider, + managedIndexMetaData: ManagedIndexMetaData, + indexMetadataProvider: IndexMetadataProvider, ): Action? { var actionConfig: Action? val actionMetaData = managedIndexMetaData.actionMetaData @@ -83,9 +86,9 @@ data class State( } else { // Get the current actionConfig that is in the ManagedIndexMetaData actionConfig = - this.actions.filterIndexed { index, config -> - index == actionMetaData.index && config.type == actionMetaData.name - }.firstOrNull() + this.actions.filterIndexed { index, config -> + index == actionMetaData.index && config.type == actionMetaData.name + }.firstOrNull() if (actionConfig == null) return null val stepMetaData = managedIndexMetaData.stepMetaData @@ -104,6 +107,7 @@ data class State( companion object { const val NAME_FIELD = "name" + const val ALLOW_RED_CLUSTER = "allow_red_cluster" const val ACTIONS_FIELD = "actions" const val TRANSITIONS_FIELD = "transitions" @@ -111,6 +115,7 @@ data class State( @Throws(IOException::class) fun parse(xcp: XContentParser): State { var name: String? = null + var allowRedCluster: Boolean = false val actions: MutableList = mutableListOf() val transitions: MutableList = mutableListOf() @@ -121,6 +126,7 @@ data class State( when (fieldName) { NAME_FIELD -> name = xcp.text() + ALLOW_RED_CLUSTER -> allowRedCluster = xcp.booleanValue() ACTIONS_FIELD -> { ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_ARRAY) { @@ -138,9 +144,10 @@ data class State( } return State( - name = requireNotNull(name) { "State name is null" }, - actions = actions.toList(), - transitions = transitions.toList(), + name = requireNotNull(name) { "State name is null" }, + allowRedCluster = allowRedCluster, + actions = actions.toList(), + transitions = transitions.toList(), ) } } diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 4c138a267..a36b917e8 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -140,6 +140,9 @@ "name": { "type": "keyword" }, + "allow_red_cluster": { + "type": "boolean" + }, "actions": { "type": "nested", "properties": { From f19c0b578dd8aab9adac0be2f49d9e6d9862ba57 Mon Sep 17 00:00:00 2001 From: Sanjay Kumar Date: Thu, 13 Jun 2024 17:55:29 +0530 Subject: [PATCH 2/5] Indendation Fix Signed-off-by: Sanjay Kumar --- .../indexstatemanagement/model/State.kt | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt index ed06bdfdc..631dd78e3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt @@ -24,10 +24,10 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde import java.io.IOException data class State( - val name: String, - val allowRedCluster: Boolean, - val actions: List, - val transitions: List, + val name: String, + val allowRedCluster: Boolean, + val actions: List, + val transitions: List, ) : ToXContentObject, Writeable { init { require(name.isNotBlank()) { "State must contain a valid name" } @@ -44,35 +44,36 @@ data class State( override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder - .startObject() - .field(NAME_FIELD, name) - .field(ALLOW_RED_CLUSTER, allowRedCluster) - .startArray(ACTIONS_FIELD) - .also { actions.forEach { action -> action.toXContent(it, params) } } - .endArray() - .field(TRANSITIONS_FIELD, transitions.toTypedArray()) - .endObject() + .startObject() + .field(NAME_FIELD, name) + .field(ALLOW_RED_CLUSTER, allowRedCluster) + .startArray(ACTIONS_FIELD) + .also { actions.forEach { action -> action.toXContent(it, params) } } + .endArray() + .field(TRANSITIONS_FIELD, transitions.toTypedArray()) + .endObject() return builder } @Throws(IOException::class) constructor(sin: StreamInput) : this( - sin.readString(), - sin.readBoolean(), - sin.readList { ISMActionsParser.instance.fromStreamInput(it) }, - sin.readList(::Transition), + sin.readString(), + sin.readBoolean(), + sin.readList { ISMActionsParser.instance.fromStreamInput(it) }, + sin.readList(::Transition), ) @Throws(IOException::class) override fun writeTo(out: StreamOutput) { out.writeString(name) + out.writeBoolean(allowRedCluster) out.writeList(actions) out.writeList(transitions) } fun getActionToExecute( - managedIndexMetaData: ManagedIndexMetaData, - indexMetadataProvider: IndexMetadataProvider, + managedIndexMetaData: ManagedIndexMetaData, + indexMetadataProvider: IndexMetadataProvider, ): Action? { var actionConfig: Action? val actionMetaData = managedIndexMetaData.actionMetaData @@ -86,9 +87,9 @@ data class State( } else { // Get the current actionConfig that is in the ManagedIndexMetaData actionConfig = - this.actions.filterIndexed { index, config -> - index == actionMetaData.index && config.type == actionMetaData.name - }.firstOrNull() + this.actions.filterIndexed { index, config -> + index == actionMetaData.index && config.type == actionMetaData.name + }.firstOrNull() if (actionConfig == null) return null val stepMetaData = managedIndexMetaData.stepMetaData @@ -144,10 +145,10 @@ data class State( } return State( - name = requireNotNull(name) { "State name is null" }, - allowRedCluster = allowRedCluster, - actions = actions.toList(), - transitions = transitions.toList(), + name = requireNotNull(name) { "State name is null" }, + allowRedCluster = allowRedCluster, + actions = actions.toList(), + transitions = transitions.toList(), ) } } From c7adb0b3f3378070ad01d36a3f98408608de3aa8 Mon Sep 17 00:00:00 2001 From: Sanjay Kumar Date: Mon, 24 Jun 2024 12:06:14 +0530 Subject: [PATCH 3/5] Overriding state with default values of allowredcluster Signed-off-by: Sanjay Kumar --- .../indexmanagement/indexstatemanagement/model/State.kt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt index 631dd78e3..8eb136735 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt @@ -29,6 +29,13 @@ data class State( val actions: List, val transitions: List, ) : ToXContentObject, Writeable { + + constructor( + name: String, + actions: List, + transitions: List, + ) : this(name, false, actions, transitions + init { require(name.isNotBlank()) { "State must contain a valid name" } var hasDelete = false From 09773181a277de981d6372cb2ea6a35d9e1c60b4 Mon Sep 17 00:00:00 2001 From: Sanjay Kumar Date: Tue, 25 Jun 2024 17:22:01 +0530 Subject: [PATCH 4/5] Fix missing brackets Signed-off-by: Sanjay Kumar --- .../indexmanagement/indexstatemanagement/model/State.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt index 8eb136735..fb1c2ae18 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/State.kt @@ -31,10 +31,10 @@ data class State( ) : ToXContentObject, Writeable { constructor( - name: String, - actions: List, - transitions: List, - ) : this(name, false, actions, transitions + name: String, + actions: List, + transitions: List, + ) : this(name, false, actions, transitions) init { require(name.isNotBlank()) { "State must contain a valid name" } From ada089ec39296d23bbaf4cdb35b2b524ddc53b96 Mon Sep 17 00:00:00 2001 From: Sanjay Kumar Date: Fri, 5 Jul 2024 17:16:24 +0530 Subject: [PATCH 5/5] Addition of test case Signed-off-by: Sanjay Kumar --- .../IndexStateManagementRestTestCase.kt | 6 ++ .../runner/ManagedIndexRunnerIT.kt | 102 ++++++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index 21f27f5af..cd8e7f072 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -941,6 +941,12 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() assertEquals("Unable to delete snapshot", RestStatus.OK, response.restStatus()) } + protected fun isClusterGreen(timeout: String) { + val endpoint = "_cluster/health?wait_for_status=yellow&timeout=$timeout" + val response = client().makeRequest("GET", endpoint) + assertEquals("Cluster status check timed out", RestStatus.OK, response.restStatus()) + } + @Suppress("UNCHECKED_CAST") protected fun assertSnapshotExists( repository: String, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt index 428ce810d..d0ad49182 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt @@ -5,6 +5,11 @@ package org.opensearch.indexmanagement.indexstatemanagement.runner +import org.apache.hc.core5.http.ContentType +import org.apache.hc.core5.http.io.entity.StringEntity +import org.opensearch.client.Request +import org.opensearch.client.Response +import org.opensearch.core.rest.RestStatus import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase import org.opensearch.indexmanagement.indexstatemanagement.action.OpenAction @@ -224,4 +229,101 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { assertEquals("Failed to update ManagedIndexConfig jitter", newJitter, currJitter) } } + + fun `test runner on a red cluster with allow_red_cluster as false`() { + val indexName = "test-index-1" + val policyID = "test_red_cluster_policy" + val policy = + """ + {"policy":{"description":"Close indices older than 5m","default_state":"active","states":[{"name":"active","allow_red_cluster":"false", + "actions":[],"transitions":[{"state_name":"inactivestate","conditions":{"min_index_age":"5s"}}]},{"name":"inactivestate","allow_red_cluster":"false" + ,"actions":[{"delete":{}}],"transitions":[]}],"ism_template":{"index_patterns":["test-index"]}}} + """.trimIndent() + createPolicyJson(policy, policyID) + createIndex(indexName, policyID) + waitFor { assertIndexExists(indexName) } + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + val endpoint = "sim-red" + val jsonEntity = """{"settings":{"index.routing.allocation.require.size": "test"}}""" + val request = Request("PUT", endpoint) + request.entity = StringEntity(jsonEntity, ContentType.APPLICATION_JSON) + val response: Response = client().performRequest(request) + assertEquals("Failed to simulate red cluster", RestStatus.OK, response.restStatus()) + + // Change the start time so the job will trigger in 2 seconds. + // After the job, the index will be in "Active" State + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so the job will trigger in 2 seconds. + // Index Transitions to inactivestate state + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Wait for the index to settle in "inactivestate". + Thread.sleep(8000L) + + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + Thread.sleep(5000L) + + waitFor { assertIndexExists(indexName) } + + val deleteReq = Request("DELETE", endpoint) + val deleteRes: Response = client().performRequest(deleteReq) + assertEquals("Failed to delete Index $endpoint", RestStatus.OK, deleteRes.restStatus()) + isClusterGreen("30s") + } + + fun `test runner on a red cluster with allow_red_cluster as true`() { + val indexName = "test-index-2" + val policyID = "test_red_cluster_policy" + val policy = + """ + {"policy":{"description":"Close indices older than 5m","default_state":"active","states":[{"name":"active","allow_red_cluster":"true", + "actions":[],"transitions":[{"state_name":"inactivestate","conditions":{"min_index_age":"5s"}}]},{"name":"inactivestate","allow_red_cluster":"true" + ,"actions":[{"delete":{}}],"transitions":[]}],"ism_template":{"index_patterns":["test-index"]}}} + """.trimIndent() + createPolicyJson(policy, policyID) + createIndex(indexName, policyID) + waitFor { assertIndexExists(indexName) } + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + val endpoint = "sim-red" + val jsonEntity = """{"settings":{"index.routing.allocation.require.size": "test"}}""" + val request = Request("PUT", endpoint) + request.entity = StringEntity(jsonEntity, ContentType.APPLICATION_JSON) + val response: Response = client().performRequest(request) + assertEquals("Failed to simulate red cluster", RestStatus.OK, response.restStatus()) + + // Change the start time so the job will trigger in 2 seconds. + // After the job, the index will be in "Active" State + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so the job will trigger in 2 seconds. + // Index Transitions to inactivestate state + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Wait for the index to settle in "inactivestate". + Thread.sleep(8000L) + + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Wait for the index deletion by the ISM job. + Thread.sleep(5000L) + + waitFor { assertIndexDoesNotExist(indexName) } + + val deleteReq = Request("DELETE", endpoint) + val deleteRes: Response = client().performRequest(deleteReq) + assertEquals("Failed to delete Index $endpoint", RestStatus.OK, deleteRes.restStatus()) + isClusterGreen("30s") + } }