Skip to content

Commit e5ecd63

Browse files
Tarun KishoreTarun-kishore
authored andcommitted
Added min_state_age condition check for ISM
Signed-off-by: Tarun-kishore <[email protected]>
1 parent 5429476 commit e5ecd63

File tree

4 files changed

+63
-3
lines changed

4 files changed

+63
-3
lines changed

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Transition.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,11 @@ data class Conditions(
8181
val cron: CronSchedule? = null,
8282
val rolloverAge: TimeValue? = null,
8383
val noAlias: Boolean? = null,
84+
val minStateAge: TimeValue? = null,
8485
) : ToXContentObject,
8586
Writeable {
8687
init {
87-
val conditionsList = listOf(indexAge, docCount, size, cron, rolloverAge, noAlias)
88+
val conditionsList = listOf(indexAge, docCount, size, cron, rolloverAge, noAlias, minStateAge)
8889
require(conditionsList.filterNotNull().size == 1) { "Cannot provide more than one Transition condition" }
8990

9091
// Validate doc count condition
@@ -102,6 +103,7 @@ data class Conditions(
102103
if (cron != null) builder.field(CRON_FIELD, cron)
103104
if (rolloverAge != null) builder.field(MIN_ROLLOVER_AGE_FIELD, rolloverAge.stringRep)
104105
if (noAlias != null) builder.field(NO_ALIAS_FIELD, noAlias)
106+
if (minStateAge != null) builder.field(MIN_STATE_AGE_FIELD, minStateAge.stringRep)
105107
return builder.endObject()
106108
}
107109

@@ -113,6 +115,7 @@ data class Conditions(
113115
cron = sin.readOptionalWriteable(::CronSchedule),
114116
rolloverAge = sin.readOptionalTimeValue(),
115117
noAlias = sin.readOptionalBoolean(),
118+
minStateAge = sin.readOptionalTimeValue(),
116119
)
117120

118121
@Throws(IOException::class)
@@ -123,6 +126,7 @@ data class Conditions(
123126
out.writeOptionalWriteable(cron)
124127
out.writeOptionalTimeValue(rolloverAge)
125128
out.writeOptionalBoolean(noAlias)
129+
out.writeOptionalTimeValue(minStateAge)
126130
}
127131

128132
companion object {
@@ -132,6 +136,7 @@ data class Conditions(
132136
const val CRON_FIELD = "cron"
133137
const val MIN_ROLLOVER_AGE_FIELD = "min_rollover_age"
134138
const val NO_ALIAS_FIELD = "no_alias"
139+
const val MIN_STATE_AGE_FIELD = "min_state_age"
135140

136141
@JvmStatic
137142
@Throws(IOException::class)
@@ -142,6 +147,7 @@ data class Conditions(
142147
var cron: CronSchedule? = null
143148
var rolloverAge: TimeValue? = null
144149
var noAlias: Boolean? = null
150+
var minStateAge: TimeValue? = null
145151

146152
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
147153
while (xcp.nextToken() != Token.END_OBJECT) {
@@ -155,11 +161,12 @@ data class Conditions(
155161
CRON_FIELD -> cron = ScheduleParser.parse(xcp) as? CronSchedule
156162
MIN_ROLLOVER_AGE_FIELD -> rolloverAge = TimeValue.parseTimeValue(xcp.text(), MIN_ROLLOVER_AGE_FIELD)
157163
NO_ALIAS_FIELD -> noAlias = xcp.booleanValue()
164+
MIN_STATE_AGE_FIELD -> minStateAge = TimeValue.parseTimeValue(xcp.text(), MIN_STATE_AGE_FIELD)
158165
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Conditions.")
159166
}
160167
}
161168

162-
return Conditions(indexAge, docCount, size, cron, rolloverAge, noAlias)
169+
return Conditions(indexAge, docCount, size, cron, rolloverAge, noAlias, minStateAge)
163170
}
164171
}
165172
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,11 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name)
101101

102102
// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
103103
val indexAliasesCount = indexMetadata?.aliases?.size ?: 0
104+
val stateStartTime = context.metadata.stateMetaData?.startTime
105+
val stateStartInstant = stateStartTime?.let { Instant.ofEpochMilli(it) }
104106
stateName =
105107
transitions.find {
106-
it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime, rolloverDate, indexAliasesCount)
108+
it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime, rolloverDate, indexAliasesCount, stateStartInstant)
107109
}?.stateName
108110
val message: String
109111
val stateName = stateName // shadowed on purpose to prevent var from changing

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ fun Transition.evaluateConditions(
190190
transitionStartTime: Instant,
191191
rolloverDate: Instant?,
192192
indexAliasesCount: Int? = null,
193+
stateStartTime: Instant? = null,
193194
): Boolean {
194195
// If there are no conditions, treat as always true
195196
if (this.conditions == null) return true
@@ -225,6 +226,11 @@ fun Transition.evaluateConditions(
225226
(!this.conditions.noAlias && indexAliasesCount > 0)
226227
}
227228

229+
if (this.conditions.minStateAge != null && stateStartTime != null) {
230+
val elapsed = System.currentTimeMillis() - stateStartTime.toEpochMilli()
231+
return elapsed >= this.conditions.minStateAge.millis
232+
}
233+
228234
// We should never reach this
229235
return false
230236
}

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.kt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,4 +230,49 @@ class TransitionActionIT : IndexStateManagementRestTestCase() {
230230
)
231231
}
232232
}
233+
234+
fun `test minStateAge transition occurs after elapsed time`() {
235+
val indexName = "${testIndexName}_min_state_age"
236+
val policyID = "${testIndexName}_min_state_age_policy"
237+
val secondStateName = "second"
238+
val states =
239+
listOf(
240+
State("first", listOf(), listOf(Transition(secondStateName, Conditions(minStateAge = TimeValue.timeValueSeconds(5))))),
241+
State(secondStateName, listOf(), listOf()),
242+
)
243+
val policy =
244+
Policy(
245+
id = policyID,
246+
description = "$testIndexName description",
247+
schemaVersion = 1L,
248+
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
249+
errorNotification = randomErrorNotification(),
250+
defaultState = states[0].name,
251+
states = states,
252+
)
253+
createPolicy(policy, policyID)
254+
createIndex(indexName, policyID)
255+
val managedIndexConfig = getExistingManagedIndexConfig(indexName)
256+
// Initialising policy
257+
updateManagedIndexConfigStartTime(managedIndexConfig)
258+
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }
259+
// should not transition immediately
260+
updateManagedIndexConfigStartTime(managedIndexConfig)
261+
waitFor {
262+
assertEquals(
263+
AttemptTransitionStep.getEvaluatingMessage(indexName),
264+
getExplainManagedIndexMetaData(indexName).info?.get("message"),
265+
)
266+
}
267+
// Wait for min_state_age to elapse
268+
Thread.sleep(5500)
269+
updateManagedIndexConfigStartTime(managedIndexConfig)
270+
// Should transition now
271+
waitFor {
272+
assertEquals(
273+
AttemptTransitionStep.getSuccessMessage(indexName, secondStateName),
274+
getExplainManagedIndexMetaData(indexName).info?.get("message"),
275+
)
276+
}
277+
}
233278
}

0 commit comments

Comments
 (0)