Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class IndexManagementPlugin :
.registerMapperService(RollupMapperService(client, clusterService, indexNameExpressionResolver))
.registerIndexer(RollupIndexer(settings, clusterService, client))
.registerSearcher(RollupSearchService(settings, clusterService, client))
.registerMetadataServices(RollupMetadataService(client, xContentRegistry))
.registerMetadataServices(RollupMetadataService(client, xContentRegistry, clusterService))
.registerConsumers()
.registerClusterConfigurationProvider(skipFlag)
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.indexmanagement.rollup.action.index.IndexRollupRequest
import org.opensearch.indexmanagement.rollup.action.index.IndexRollupResponse
import org.opensearch.indexmanagement.rollup.action.start.StartRollupAction
import org.opensearch.indexmanagement.rollup.action.start.StartRollupRequest
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -39,39 +40,100 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name)
val previousRunRollupId = managedIndexMetadata.actionMetaData?.actionProperties?.rollupId
val hasPreviousRollupAttemptFailed = managedIndexMetadata.actionMetaData?.actionProperties?.hasRollupFailed

// Creating a rollup job
val rollup = action.ismRollup.toRollup(indexName, context.user)
rollupId = rollup.id
logger.info("Attempting to create a rollup job $rollupId for index $indexName")

val indexRollupRequest = IndexRollupRequest(rollup, WriteRequest.RefreshPolicy.IMMEDIATE)

try {
// Create a temporary rollup object for template resolution context.
// This provides the rollup's source_index as {{ctx.source_index}} in templates.
val tempRollup = action.ismRollup.toRollup(indexName, context.user)

// Resolve source_index template if provided, else use managed index name.
// This enables patterns like:
// - source_index: "{{ctx.index}}" -> resolves to the managed index name
// - source_index: null -> defaults to the managed index name (backward compatible)
val resolvedSourceIndex = if (action.ismRollup.sourceIndex != null) {
RollupFieldValueExpressionResolver.resolve(
tempRollup,
action.ismRollup.sourceIndex,
indexName,
)
} else {
indexName
}

// Resolve target_index template.
val resolvedTargetIndex = RollupFieldValueExpressionResolver.resolve(
tempRollup,
action.ismRollup.targetIndex,
indexName,
)

// Validate resolved indices to ensure they are valid and different.
// This catches configuration errors early before attempting to create the rollup job.
validateResolvedIndices(resolvedSourceIndex, resolvedTargetIndex)

logger.info(
"Executing rollup from source [$resolvedSourceIndex] to target [$resolvedTargetIndex] " +
"for managed index [$indexName]",
)

// Create the final rollup job with resolved source_index and target_index.
val rollup = action.ismRollup.toRollup(indexName, context.user)
.copy(sourceIndex = resolvedSourceIndex, targetIndex = resolvedTargetIndex)
rollupId = rollup.id
logger.info("Attempting to create a rollup job $rollupId for index $indexName")

val indexRollupRequest = IndexRollupRequest(rollup, WriteRequest.RefreshPolicy.IMMEDIATE)
val response: IndexRollupResponse = context.client.suspendUntil { execute(IndexRollupAction.INSTANCE, indexRollupRequest, it) }
logger.info("Received status ${response.status.status} on trying to create rollup job $rollupId")

stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(rollup.id, indexName))
} catch (e: IllegalArgumentException) {
val message = "Failed to validate resolved indices for rollup job"
logger.error(message, e)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message, "cause" to "${e.message}")
} catch (e: VersionConflictEngineException) {
val message = getFailedJobExistsMessage(rollup.id, indexName)
val message = getFailedJobExistsMessage(rollupId ?: "unknown", indexName)
logger.info(message)
if (rollupId == previousRunRollupId && hasPreviousRollupAttemptFailed == true) {
startRollupJob(rollup.id, context)
startRollupJob(rollupId ?: "unknown", context)
} else {
stepStatus = StepStatus.COMPLETED
info = mapOf("info" to message)
}
} catch (e: RemoteTransportException) {
processFailure(rollup.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception)
processFailure(rollupId ?: "unknown", indexName, ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: OpenSearchException) {
processFailure(rollup.id, indexName, e)
processFailure(rollupId ?: "unknown", indexName, e)
} catch (e: Exception) {
processFailure(rollup.id, indexName, e)
val message = "Failed to create rollup job"
logger.error(message, e)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message, "cause" to "${e.message}")
}

return this
}

/**
* Validates that resolved source and target indices are valid and different.
*
* @param sourceIndex The resolved source index name (after template resolution)
* @param targetIndex The resolved target index name (after template resolution)
* @throws IllegalArgumentException if any validation rule fails, with a descriptive error message
*/
private fun validateResolvedIndices(sourceIndex: String, targetIndex: String) {
require(sourceIndex.isNotBlank()) {
"Resolved source_index cannot be empty"
}
require(targetIndex.isNotBlank()) {
"Resolved target_index cannot be empty"
}
require(sourceIndex != targetIndex) {
"Source and target indices must be different: $sourceIndex"
}
}

fun processFailure(rollupId: String, indexName: String, e: Exception) {
val message = getFailedMessage(rollupId, indexName)
logger.error(message, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ class RollupMapperService(

val indexMappingSource = indexTypeMappings.sourceAsMap

// TODO: Check if this is required or we want to fail in case of rollup on rolled up empty indices
// // If source is a rollup index with no properties (no data rolled up yet), skip field validation
// if (isSourceRollupIndex && !indexMappingSource.containsKey("properties")) {
// logger.info("Source rollup index [$index] has no properties yet, skipping field validation")
// return RollupJobValidationResult.Valid
// }

val issues = mutableSetOf<String>()
// Validate source fields in dimensions
rollup.dimensions.forEach { dimension ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor
import org.opensearch.indexmanagement.rollup.model.ContinuousMetadata
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
import org.opensearch.indexmanagement.rollup.model.RollupStats
import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.search.sort.SortOrder
import org.opensearch.transport.RemoteTransportException
import org.opensearch.transport.client.Client
Expand All @@ -48,7 +51,11 @@ import java.time.Instant
// TODO: Wrap client calls in retry for transient failures
// Service that handles CRUD operations for rollup metadata
@Suppress("TooManyFunctions")
class RollupMetadataService(val client: Client, val xContentRegistry: NamedXContentRegistry) {
class RollupMetadataService(
val client: Client,
val xContentRegistry: NamedXContentRegistry,
val clusterService: org.opensearch.cluster.service.ClusterService,
) {
private val logger = LogManager.getLogger(javaClass)

// If the job does not have a metadataID then we need to initialize the first metadata
Expand Down Expand Up @@ -178,6 +185,12 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
@Throws(Exception::class)
private suspend fun getInitialStartTime(rollup: Rollup): StartingTimeResult {
try {
// Check if source is a rollup index and use appropriate method
val isSourceRollupIndex = isRollupIndex(rollup.sourceIndex, clusterService.state())
if (isSourceRollupIndex) {
// Use min aggregation for rollup indices (RollupInterceptor blocks size > 0)
return getEarliestTimestampFromRollupIndex(rollup)
}
// Rollup requires the first dimension to be the date histogram
val dateHistogram = rollup.dimensions.first() as DateHistogram
val searchSourceBuilder =
Expand Down Expand Up @@ -219,6 +232,60 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
}
}

/**
* Get the earliest timestamp from a rollup index by finding the minimum value of the date histogram field.
* This is used to determine the starting point for continuous rollups on rollup indices.
* Uses sort instead of aggregation to avoid rollup interceptor validation.
*/
@Suppress("ReturnCount")
@Throws(Exception::class)
private suspend fun getEarliestTimestampFromRollupIndex(rollup: Rollup): StartingTimeResult {
try {
val dateHistogram = rollup.dimensions.first() as DateHistogram
val dateField = dateHistogram.sourceField

// For multi-tier rollup, we would be querying a document on a rollup index
// So we set this bypassMarker in fetchSource as a flag to help bypass the validation in RollupInterceptor
val bypassMarker = "_rollup_internal_bypass_${RollupInterceptor.BYPASS_SIZE_CHECK}"

val searchRequest = SearchRequest(rollup.sourceIndex)
.source(
SearchSourceBuilder()
.size(1)
.query(MatchAllQueryBuilder())
.sort("$dateField.date_histogram", SortOrder.ASC)
.trackTotalHits(false)
.fetchSource(FetchSourceContext(false, arrayOf(bypassMarker), emptyArray()))
.docValueField("$dateField.date_histogram", DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT),
)
.allowPartialSearchResults(false)

val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }

if (response.hits.hits.isEmpty()) {
return StartingTimeResult.NoDocumentsFound
}

// In rollup indices, date histogram fields are named as "field.date_histogram"
val rollupDateField = "$dateField.date_histogram"
val firstHitTimestampAsString: String =
response.hits.hits.first().field(rollupDateField).getValue<String>()
?: return StartingTimeResult.NoDocumentsFound

val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli()
return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram))
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.error("Error when getting earliest timestamp from rollup index for rollup [{}]: {}", rollup.id, unwrappedException)
return StartingTimeResult.Failure(unwrappedException)
} catch (e: Exception) {
// TODO: Catching general exceptions for now, can make more granular
logger.error("Error when getting earliest timestamp from rollup index for rollup [{}]: {}", rollup.id, e)
return StartingTimeResult.Failure(e)
}
}

/**
* Return time rounded down to the nearest unit of time the interval is based on.
* This should map to the equivalent bucket a document with the given timestamp would fall into for the date histogram.
Expand Down
Loading