@@ -18,6 +18,7 @@ import org.opensearch.indexmanagement.rollup.action.index.IndexRollupRequest
1818import org.opensearch.indexmanagement.rollup.action.index.IndexRollupResponse
1919import org.opensearch.indexmanagement.rollup.action.start.StartRollupAction
2020import org.opensearch.indexmanagement.rollup.action.start.StartRollupRequest
21+ import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
2122import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
2223import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
2324import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
@@ -39,39 +40,110 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name)
3940 val previousRunRollupId = managedIndexMetadata.actionMetaData?.actionProperties?.rollupId
4041 val hasPreviousRollupAttemptFailed = managedIndexMetadata.actionMetaData?.actionProperties?.hasRollupFailed
4142
42- // Creating a rollup job
43- val rollup = action.ismRollup.toRollup(indexName, context.user)
44- rollupId = rollup.id
45- logger.info(" Attempting to create a rollup job $rollupId for index $indexName " )
46-
47- val indexRollupRequest = IndexRollupRequest (rollup, WriteRequest .RefreshPolicy .IMMEDIATE )
48-
4943 try {
44+ // Create a temporary rollup object for template resolution context.
45+ // This provides the rollup's source_index as {{ctx.source_index}} in templates.
46+ val tempRollup = action.ismRollup.toRollup(indexName, context.user)
47+
48+ // Resolve source_index template if provided, else use managed index name.
49+ // This enables patterns like:
50+ // - source_index: "{{ctx.index}}" -> resolves to the managed index name
51+ // - source_index: null -> defaults to the managed index name (backward compatible)
52+ val resolvedSourceIndex = if (action.ismRollup.sourceIndex != null ) {
53+ RollupFieldValueExpressionResolver .resolve(
54+ tempRollup,
55+ action.ismRollup.sourceIndex,
56+ indexName,
57+ )
58+ } else {
59+ indexName
60+ }
61+
62+ // Resolve target_index template.
63+ // Common patterns:
64+ // - "rollup_{{ctx.index}}" -> "rollup_logs-2024-01"
65+ // - "rollup_tier2_{{ctx.index}}" -> "rollup_tier2_rollup_tier1_logs-2024-01"
66+ val resolvedTargetIndex = RollupFieldValueExpressionResolver .resolve(
67+ tempRollup,
68+ action.ismRollup.targetIndex,
69+ indexName,
70+ )
71+
72+ // Validate resolved indices to ensure they are valid and different.
73+ // This catches configuration errors early before attempting to create the rollup job.
74+ validateResolvedIndices(resolvedSourceIndex, resolvedTargetIndex)
75+
76+ logger.info(
77+ " Executing rollup from source [$resolvedSourceIndex ] to target [$resolvedTargetIndex ] " +
78+ " for managed index [$indexName ]" ,
79+ )
80+
81+ // Create the final rollup job with resolved source_index and target_index.
82+ // The copy() ensures that template variables are replaced with actual index names
83+ // before the rollup job is persisted and executed.
84+ val rollup = action.ismRollup.toRollup(indexName, context.user)
85+ .copy(sourceIndex = resolvedSourceIndex, targetIndex = resolvedTargetIndex)
86+ rollupId = rollup.id
87+ logger.info(" Attempting to create a rollup job $rollupId for index $indexName " )
88+
89+ val indexRollupRequest = IndexRollupRequest (rollup, WriteRequest .RefreshPolicy .IMMEDIATE )
5090 val response: IndexRollupResponse = context.client.suspendUntil { execute(IndexRollupAction .INSTANCE , indexRollupRequest, it) }
5191 logger.info(" Received status ${response.status.status} on trying to create rollup job $rollupId " )
5292
5393 stepStatus = StepStatus .COMPLETED
5494 info = mapOf (" message" to getSuccessMessage(rollup.id, indexName))
95+ } catch (e: IllegalArgumentException ) {
96+ val message = " Failed to validate resolved indices for rollup job"
97+ logger.error(message, e)
98+ stepStatus = StepStatus .FAILED
99+ info = mapOf (" message" to message, " cause" to " ${e.message} " )
55100 } catch (e: VersionConflictEngineException ) {
56- val message = getFailedJobExistsMessage(rollup.id , indexName)
101+ val message = getFailedJobExistsMessage(rollupId ? : " unknown " , indexName)
57102 logger.info(message)
58103 if (rollupId == previousRunRollupId && hasPreviousRollupAttemptFailed == true ) {
59- startRollupJob(rollup.id , context)
104+ startRollupJob(rollupId ? : " unknown " , context)
60105 } else {
61106 stepStatus = StepStatus .COMPLETED
62107 info = mapOf (" info" to message)
63108 }
64109 } catch (e: RemoteTransportException ) {
65- processFailure(rollup.id , indexName, ExceptionsHelper .unwrapCause(e) as Exception )
110+ processFailure(rollupId ? : " unknown " , indexName, ExceptionsHelper .unwrapCause(e) as Exception )
66111 } catch (e: OpenSearchException ) {
67- processFailure(rollup.id , indexName, e)
112+ processFailure(rollupId ? : " unknown " , indexName, e)
68113 } catch (e: Exception ) {
69- processFailure(rollup.id, indexName, e)
114+ val message = " Failed to create rollup job"
115+ logger.error(message, e)
116+ stepStatus = StepStatus .FAILED
117+ info = mapOf (" message" to message, " cause" to " ${e.message} " )
70118 }
71119
72120 return this
73121 }
74122
123+ /* *
124+ * Validates that resolved source and target indices are valid and different.
125+ *
126+ * This validation ensures that:
127+ * 1. The resolved source_index is not empty or whitespace-only
128+ * 2. The resolved target_index is not empty or whitespace-only
129+ * 3. The source and target indices are different (prevents self-rollup)
130+ *
131+ * @param sourceIndex The resolved source index name (after template resolution)
132+ * @param targetIndex The resolved target index name (after template resolution)
133+ * @throws IllegalArgumentException if any validation rule fails, with a descriptive error message
134+ */
135+ private fun validateResolvedIndices (sourceIndex : String , targetIndex : String ) {
136+ require(sourceIndex.isNotBlank()) {
137+ " Resolved source_index cannot be empty"
138+ }
139+ require(targetIndex.isNotBlank()) {
140+ " Resolved target_index cannot be empty"
141+ }
142+ require(sourceIndex != targetIndex) {
143+ " Source and target indices must be different: $sourceIndex "
144+ }
145+ }
146+
75147 fun processFailure (rollupId : String , indexName : String , e : Exception ) {
76148 val message = getFailedMessage(rollupId, indexName)
77149 logger.error(message, e)
0 commit comments