Skip to content

Commit b5e5ad8

Browse files
committed
Add support for source index in ISM Policy Rollup Action to help perform multi-tier rollups
Signed-off-by: Kshitij Tandon <[email protected]>
1 parent cad079c commit b5e5ad8

File tree

8 files changed

+1614
-15
lines changed

8 files changed

+1614
-15
lines changed

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import org.opensearch.indexmanagement.rollup.action.index.IndexRollupRequest
1818
import org.opensearch.indexmanagement.rollup.action.index.IndexRollupResponse
1919
import org.opensearch.indexmanagement.rollup.action.start.StartRollupAction
2020
import org.opensearch.indexmanagement.rollup.action.start.StartRollupRequest
21+
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
2122
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
2223
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
2324
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
@@ -39,39 +40,110 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name)
3940
val previousRunRollupId = managedIndexMetadata.actionMetaData?.actionProperties?.rollupId
4041
val hasPreviousRollupAttemptFailed = managedIndexMetadata.actionMetaData?.actionProperties?.hasRollupFailed
4142

42-
// Creating a rollup job
43-
val rollup = action.ismRollup.toRollup(indexName, context.user)
44-
rollupId = rollup.id
45-
logger.info("Attempting to create a rollup job $rollupId for index $indexName")
46-
47-
val indexRollupRequest = IndexRollupRequest(rollup, WriteRequest.RefreshPolicy.IMMEDIATE)
48-
4943
try {
44+
// Create a temporary rollup object for template resolution context.
45+
// This provides the rollup's source_index as {{ctx.source_index}} in templates.
46+
val tempRollup = action.ismRollup.toRollup(indexName, context.user)
47+
48+
// Resolve source_index template if provided, else use managed index name.
49+
// This enables patterns like:
50+
// - source_index: "{{ctx.index}}" -> resolves to the managed index name
51+
// - source_index: null -> defaults to the managed index name (backward compatible)
52+
val resolvedSourceIndex = if (action.ismRollup.sourceIndex != null) {
53+
RollupFieldValueExpressionResolver.resolve(
54+
tempRollup,
55+
action.ismRollup.sourceIndex,
56+
indexName,
57+
)
58+
} else {
59+
indexName
60+
}
61+
62+
// Resolve target_index template.
63+
// Common patterns:
64+
// - "rollup_{{ctx.index}}" -> "rollup_logs-2024-01"
65+
// - "rollup_tier2_{{ctx.index}}" -> "rollup_tier2_rollup_tier1_logs-2024-01"
66+
val resolvedTargetIndex = RollupFieldValueExpressionResolver.resolve(
67+
tempRollup,
68+
action.ismRollup.targetIndex,
69+
indexName,
70+
)
71+
72+
// Validate resolved indices to ensure they are valid and different.
73+
// This catches configuration errors early before attempting to create the rollup job.
74+
validateResolvedIndices(resolvedSourceIndex, resolvedTargetIndex)
75+
76+
logger.info(
77+
"Executing rollup from source [$resolvedSourceIndex] to target [$resolvedTargetIndex] " +
78+
"for managed index [$indexName]",
79+
)
80+
81+
// Create the final rollup job with resolved source_index and target_index.
82+
// The copy() ensures that template variables are replaced with actual index names
83+
// before the rollup job is persisted and executed.
84+
val rollup = action.ismRollup.toRollup(indexName, context.user)
85+
.copy(sourceIndex = resolvedSourceIndex, targetIndex = resolvedTargetIndex)
86+
rollupId = rollup.id
87+
logger.info("Attempting to create a rollup job $rollupId for index $indexName")
88+
89+
val indexRollupRequest = IndexRollupRequest(rollup, WriteRequest.RefreshPolicy.IMMEDIATE)
5090
val response: IndexRollupResponse = context.client.suspendUntil { execute(IndexRollupAction.INSTANCE, indexRollupRequest, it) }
5191
logger.info("Received status ${response.status.status} on trying to create rollup job $rollupId")
5292

5393
stepStatus = StepStatus.COMPLETED
5494
info = mapOf("message" to getSuccessMessage(rollup.id, indexName))
95+
} catch (e: IllegalArgumentException) {
96+
val message = "Failed to validate resolved indices for rollup job"
97+
logger.error(message, e)
98+
stepStatus = StepStatus.FAILED
99+
info = mapOf("message" to message, "cause" to "${e.message}")
55100
} catch (e: VersionConflictEngineException) {
56-
val message = getFailedJobExistsMessage(rollup.id, indexName)
101+
val message = getFailedJobExistsMessage(rollupId ?: "unknown", indexName)
57102
logger.info(message)
58103
if (rollupId == previousRunRollupId && hasPreviousRollupAttemptFailed == true) {
59-
startRollupJob(rollup.id, context)
104+
startRollupJob(rollupId ?: "unknown", context)
60105
} else {
61106
stepStatus = StepStatus.COMPLETED
62107
info = mapOf("info" to message)
63108
}
64109
} catch (e: RemoteTransportException) {
65-
processFailure(rollup.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception)
110+
processFailure(rollupId ?: "unknown", indexName, ExceptionsHelper.unwrapCause(e) as Exception)
66111
} catch (e: OpenSearchException) {
67-
processFailure(rollup.id, indexName, e)
112+
processFailure(rollupId ?: "unknown", indexName, e)
68113
} catch (e: Exception) {
69-
processFailure(rollup.id, indexName, e)
114+
val message = "Failed to create rollup job"
115+
logger.error(message, e)
116+
stepStatus = StepStatus.FAILED
117+
info = mapOf("message" to message, "cause" to "${e.message}")
70118
}
71119

72120
return this
73121
}
74122

123+
/**
124+
* Validates that resolved source and target indices are valid and different.
125+
*
126+
* This validation ensures that:
127+
* 1. The resolved source_index is not empty or whitespace-only
128+
* 2. The resolved target_index is not empty or whitespace-only
129+
* 3. The source and target indices are different (prevents self-rollup)
130+
*
131+
* @param sourceIndex The resolved source index name (after template resolution)
132+
* @param targetIndex The resolved target index name (after template resolution)
133+
* @throws IllegalArgumentException if any validation rule fails, with a descriptive error message
134+
*/
135+
private fun validateResolvedIndices(sourceIndex: String, targetIndex: String) {
136+
require(sourceIndex.isNotBlank()) {
137+
"Resolved source_index cannot be empty"
138+
}
139+
require(targetIndex.isNotBlank()) {
140+
"Resolved target_index cannot be empty"
141+
}
142+
require(sourceIndex != targetIndex) {
143+
"Source and target indices must be different: $sourceIndex"
144+
}
145+
}
146+
75147
fun processFailure(rollupId: String, indexName: String, e: Exception) {
76148
val message = getFailedMessage(rollupId, indexName)
77149
logger.error(message, e)

src/main/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollup.kt

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ data class ISMRollup(
3535
val pageSize: Int,
3636
val dimensions: List<Dimension>,
3737
val metrics: List<RollupMetrics>,
38+
val sourceIndex: String? = null,
3839
) : ToXContentObject,
3940
Writeable {
4041
// TODO: This can be moved to a common place, since this is shared between Rollup and ISMRollup
@@ -58,6 +59,9 @@ data class ISMRollup(
5859
.field(Rollup.PAGE_SIZE_FIELD, pageSize)
5960
.field(Rollup.DIMENSIONS_FIELD, dimensions)
6061
.field(Rollup.METRICS_FIELD, metrics)
62+
if (sourceIndex != null) {
63+
builder.field(Rollup.SOURCE_INDEX_FIELD, sourceIndex)
64+
}
6165
if (targetIndexSettings != null) {
6266
builder.startObject(Rollup.TARGET_INDEX_SETTINGS_FIELD)
6367
targetIndexSettings.toXContent(builder, params)
@@ -67,8 +71,20 @@ data class ISMRollup(
6771
return builder
6872
}
6973

74+
/**
75+
* Converts ISMRollup configuration to a Rollup job.
76+
*
77+
* Source index resolution priority:
78+
* 1. If ISMRollup.sourceIndex is set, use it (for multi-tier rollups)
79+
* 2. Otherwise, use the sourceIndex parameter (managed index from ISM context)
80+
*
81+
* @param sourceIndex The managed index from ISM context (fallback if ISMRollup.sourceIndex is null)
82+
* @param user Optional user context for the rollup job
83+
* @return Rollup job configuration
84+
*/
7085
fun toRollup(sourceIndex: String, user: User? = null): Rollup {
71-
val id = sourceIndex + toString()
86+
val resolvedSourceIndex = this.sourceIndex ?: sourceIndex
87+
val id = resolvedSourceIndex + toString()
7288
val currentTime = Instant.now()
7389
return Rollup(
7490
id = DigestUtils.sha1Hex(id),
@@ -80,7 +96,7 @@ data class ISMRollup(
8096
jobLastUpdatedTime = currentTime,
8197
jobEnabledTime = currentTime,
8298
description = this.description,
83-
sourceIndex = sourceIndex,
99+
sourceIndex = resolvedSourceIndex,
84100
targetIndex = this.targetIndex,
85101
targetIndexSettings = this.targetIndexSettings,
86102
metadataID = null,
@@ -120,6 +136,11 @@ data class ISMRollup(
120136
dimensionsList.toList()
121137
},
122138
metrics = sin.readList(::RollupMetrics),
139+
sourceIndex = if (sin.version.onOrAfter(Version.V_3_0_0) && sin.readBoolean()) {
140+
sin.readString()
141+
} else {
142+
null
143+
},
123144
)
124145

125146
override fun toString(): String {
@@ -159,6 +180,10 @@ data class ISMRollup(
159180
}
160181
}
161182
out.writeCollection(metrics)
183+
if (out.version.onOrAfter(Version.V_3_0_0)) {
184+
out.writeBoolean(sourceIndex != null)
185+
if (sourceIndex != null) out.writeString(sourceIndex)
186+
}
162187
}
163188

164189
companion object {
@@ -170,6 +195,7 @@ data class ISMRollup(
170195
): ISMRollup {
171196
var description = ""
172197
var targetIndex = ""
198+
var sourceIndex: String? = null
173199
var targetIndexSettings: Settings? = null
174200
var pageSize = 0
175201
val dimensions = mutableListOf<Dimension>()
@@ -184,6 +210,7 @@ data class ISMRollup(
184210
when (fieldName) {
185211
Rollup.DESCRIPTION_FIELD -> description = xcp.text()
186212
Rollup.TARGET_INDEX_FIELD -> targetIndex = xcp.text()
213+
Rollup.SOURCE_INDEX_FIELD -> sourceIndex = xcp.text()
187214
Rollup.TARGET_INDEX_SETTINGS_FIELD -> {
188215
XContentParserUtils.ensureExpectedToken(
189216
XContentParser.Token.START_OBJECT,
@@ -224,6 +251,7 @@ data class ISMRollup(
224251
metrics = metrics,
225252
targetIndex = targetIndex,
226253
targetIndexSettings = targetIndexSettings,
254+
sourceIndex = sourceIndex,
227255
)
228256
}
229257
}

src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ object RollupFieldValueExpressionResolver {
2424
private lateinit var clusterService: ClusterService
2525
lateinit var indexAliasUtils: IndexAliasUtils
2626

27+
/**
28+
* Resolves template variables in a field value using the rollup object as context.
29+
* This method is kept for backward compatibility and only supports {{ctx.source_index}}.
30+
*
31+
* @param rollup The rollup object providing context for template resolution
32+
* @param fieldValue The field value that may contain template variables (e.g., "{{ctx.source_index}}")
33+
* @return The resolved field value with variables replaced, or the original value if resolution
34+
* produces an empty/null result
35+
*/
2736
fun resolve(rollup: Rollup, fieldValue: String): String {
2837
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())
2938

@@ -44,6 +53,48 @@ object RollupFieldValueExpressionResolver {
4453
return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue
4554
}
4655

56+
/**
57+
* Resolves template variables in a field value using the rollup object and managed index name as context.
58+
* This is the primary method used by ISM rollup actions to resolve source_index and target_index templates.
59+
*
60+
* This method extends the basic resolve() by adding the managed index name to the template context,
61+
* enabling the use of {{ctx.index}} in addition to {{ctx.source_index}}.
62+
*
63+
* @param rollup The rollup object providing context for template resolution. The rollup's source_index
64+
* field is made available as {{ctx.source_index}} in templates.
65+
* @param fieldValue The field value that may contain Mustache template variables. Common patterns:
66+
* - "{{ctx.index}}" - resolves to the managed index name
67+
* - "{{ctx.source_index}}" - resolves to the rollup's source index
68+
* - "rollup_{{ctx.index}}" - adds a prefix to the managed index name
69+
* - Literal values without templates are returned unchanged
70+
* @param managedIndexName The name of the index being managed by ISM. This is the index to which
71+
* the ISM policy is applied, and it's made available as {{ctx.index}} in templates.
72+
* For multi-tier rollups, this would be the output of the previous tier.
73+
* @return The resolved field value with all template variables replaced. If the resolved value is an alias,
74+
* it's automatically resolved to the write index name. If resolution produces an empty or null result,
75+
* the original fieldValue is returned unchanged.
76+
*/
77+
fun resolve(rollup: Rollup, fieldValue: String, managedIndexName: String): String {
78+
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())
79+
80+
val contextMap =
81+
rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)
82+
.toMap()
83+
.filterKeys { key -> key in validTopContextFields }
84+
.plus("index" to managedIndexName)
85+
86+
var compiledValue =
87+
scriptService.compile(script, TemplateScript.CONTEXT)
88+
.newInstance(script.params + mapOf("ctx" to contextMap))
89+
.execute()
90+
91+
if (indexAliasUtils.isAlias(compiledValue)) {
92+
compiledValue = indexAliasUtils.getWriteIndexNameForAlias(compiledValue)
93+
}
94+
95+
return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue
96+
}
97+
4798
fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
4899
this.scriptService = scriptService
49100
this.clusterService = clusterService

src/main/resources/mappings/opendistro-ism-config.json

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_meta" : {
3-
"schema_version": 24
3+
"schema_version": 25
44
},
55
"dynamic": "strict",
66
"properties": {
@@ -374,6 +374,15 @@
374374
"description": {
375375
"type": "text"
376376
},
377+
"source_index": {
378+
"type": "text",
379+
"fields": {
380+
"keyword": {
381+
"type": "keyword",
382+
"ignore_above": 256
383+
}
384+
}
385+
},
377386
"target_index": {
378387
"type": "text",
379388
"fields": {

0 commit comments

Comments
 (0)