Skip to content

Commit 6bd72e2

Browse files
committed
Adding support for multi-tier rollups in ISM
Signed-off-by: Kshitij Tandon <[email protected]>
1 parent 3cf62f0 commit 6bd72e2

File tree

13 files changed

+1915
-37
lines changed

13 files changed

+1915
-37
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ class IndexManagementPlugin :
427427
.registerMapperService(RollupMapperService(client, clusterService, indexNameExpressionResolver))
428428
.registerIndexer(RollupIndexer(settings, clusterService, client))
429429
.registerSearcher(RollupSearchService(settings, clusterService, client))
430-
.registerMetadataServices(RollupMetadataService(client, xContentRegistry))
430+
.registerMetadataServices(RollupMetadataService(client, xContentRegistry, clusterService))
431431
.registerConsumers()
432432
.registerClusterConfigurationProvider(skipFlag)
433433
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,13 @@ class RollupMapperService(
293293

294294
val indexMappingSource = indexTypeMappings.sourceAsMap
295295

296+
// TODO: Check if this is required or we want to fail in case of rollup on rolled up empty indices
297+
// // If source is a rollup index with no properties (no data rolled up yet), skip field validation
298+
// if (isSourceRollupIndex && !indexMappingSource.containsKey("properties")) {
299+
// logger.info("Source rollup index [$index] has no properties yet, skipping field validation")
300+
// return RollupJobValidationResult.Valid
301+
// }
302+
296303
val issues = mutableSetOf<String>()
297304
// Validate source fields in dimensions
298305
rollup.dimensions.forEach { dimension ->

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
3636
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
3737
import org.opensearch.indexmanagement.rollup.model.RollupStats
3838
import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT
39+
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
3940
import org.opensearch.indexmanagement.util.NO_ID
4041
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
4142
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
@@ -48,7 +49,11 @@ import java.time.Instant
4849
// TODO: Wrap client calls in retry for transient failures
4950
// Service that handles CRUD operations for rollup metadata
5051
@Suppress("TooManyFunctions")
51-
class RollupMetadataService(val client: Client, val xContentRegistry: NamedXContentRegistry) {
52+
class RollupMetadataService(
53+
val client: Client,
54+
val xContentRegistry: NamedXContentRegistry,
55+
val clusterService: org.opensearch.cluster.service.ClusterService,
56+
) {
5257
private val logger = LogManager.getLogger(javaClass)
5358

5459
// If the job does not have a metadataID then we need to initialize the first metadata
@@ -178,6 +183,12 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
178183
@Throws(Exception::class)
179184
private suspend fun getInitialStartTime(rollup: Rollup): StartingTimeResult {
180185
try {
186+
// Check if source is a rollup index and use appropriate method
187+
val isSourceRollupIndex = isRollupIndex(rollup.sourceIndex, clusterService.state())
188+
if (isSourceRollupIndex) {
189+
// Use min aggregation for rollup indices (RollupInterceptor blocks size > 0)
190+
return getEarliestTimestampFromRollupIndex(rollup)
191+
}
181192
// Rollup requires the first dimension to be the date histogram
182193
val dateHistogram = rollup.dimensions.first() as DateHistogram
183194
val searchSourceBuilder =
@@ -219,6 +230,69 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
219230
}
220231
}
221232

233+
/**
234+
* Get the earliest timestamp from a rollup index by finding the minimum value of the date histogram field.
235+
* This is used to determine the starting point for continuous rollups on rollup indices.
236+
* Uses sort instead of aggregation to avoid rollup interceptor validation.
237+
*/
238+
@Suppress("ReturnCount")
239+
@Throws(Exception::class)
240+
private suspend fun getEarliestTimestampFromRollupIndex(rollup: Rollup): StartingTimeResult {
241+
try {
242+
val dateHistogram = rollup.dimensions.first() as DateHistogram
243+
val dateField = dateHistogram.sourceField
244+
245+
logger.info("Idhr se ja rha hu mai tumhe kya 2")
246+
247+
val searchRequest = SearchRequest(rollup.sourceIndex)
248+
.source(
249+
SearchSourceBuilder()
250+
.size(1)
251+
.query(MatchAllQueryBuilder())
252+
.sort("$dateField.date_histogram", SortOrder.ASC)
253+
.trackTotalHits(false)
254+
.fetchSource(false)
255+
.docValueField("$dateField.date_histogram", DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT),
256+
)
257+
.allowPartialSearchResults(false)
258+
259+
// Set BYPASS_SIZE_CHECK to allow size=1 when querying rollup index to get earliest timestamp
260+
// This is needed for continuous rollup initialization on rollup indices (multi-tier rollup)
261+
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.setBypass(
262+
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.BYPASS_SIZE_CHECK,
263+
)
264+
try {
265+
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
266+
267+
if (response.hits.hits.isEmpty()) {
268+
return StartingTimeResult.NoDocumentsFound
269+
}
270+
271+
logger.info("Idhr se ja rha hu mai tumhe kya")
272+
273+
// In rollup indices, date histogram fields are named as "field.date_histogram"
274+
val rollupDateField = "$dateField.date_histogram"
275+
val firstHitTimestampAsString: String =
276+
response.hits.hits.first().field(rollupDateField).getValue<String>()
277+
?: return StartingTimeResult.NoDocumentsFound
278+
279+
val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
280+
val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli()
281+
return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram))
282+
} finally {
283+
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.clearBypass()
284+
}
285+
} catch (e: RemoteTransportException) {
286+
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
287+
logger.error("Error when getting earliest timestamp from rollup index for rollup [{}]: {}", rollup.id, unwrappedException)
288+
return StartingTimeResult.Failure(unwrappedException)
289+
} catch (e: Exception) {
290+
// TODO: Catching general exceptions for now, can make more granular
291+
logger.error("Error when getting earliest timestamp from rollup index for rollup [{}]: {}", rollup.id, e)
292+
return StartingTimeResult.Failure(e)
293+
}
294+
}
295+
222296
/**
223297
* Return time rounded down to the nearest unit of time the interval is based on.
224298
* This should map to the equivalent bucket a document with the given timestamp would fall into for the date histogram.

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ import org.opensearch.indexmanagement.rollup.model.RollupStats
3535
import org.opensearch.indexmanagement.rollup.model.incrementStats
3636
import org.opensearch.indexmanagement.rollup.model.mergeStats
3737
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
38+
import org.opensearch.indexmanagement.rollup.util.getDateHistogram
39+
import org.opensearch.indexmanagement.rollup.util.getRollupJobs
40+
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
3841
import org.opensearch.indexmanagement.util.acquireLockForScheduledJob
3942
import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
4043
import org.opensearch.indexmanagement.util.renewLockForScheduledJob
@@ -44,6 +47,8 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter
4447
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
4548
import org.opensearch.script.ScriptService
4649
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
50+
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
51+
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval
4752
import org.opensearch.threadpool.ThreadPool
4853
import org.opensearch.transport.client.Client
4954

@@ -266,7 +271,16 @@ object RollupRunner :
266271
withClosableContext(
267272
IndexManagementSecurityContext(job.id, settings, threadPool.threadContext, job.user),
268273
) {
269-
rollupSearchService.executeCompositeSearch(updatableJob, metadata)
274+
// Need to set this bypass as we are already doing the required aggregation re-writing in the composite aggregation
275+
// hence do not need to go through the interceptor for aggregation rewriting
276+
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.setBypass(
277+
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.BYPASS_ROLLUP_SEARCH,
278+
)
279+
try {
280+
rollupSearchService.executeCompositeSearch(updatableJob, metadata, clusterService)
281+
} finally {
282+
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.clearBypass()
283+
}
270284
}
271285
val rollupResult =
272286
when (rollupSearchResult) {
@@ -412,6 +426,15 @@ object RollupRunner :
412426
else -> return@withClosableContext sourceIndexValidationResult
413427
}
414428

429+
// Additional validation for rollup-on-rollup scenarios
430+
if (isRollupIndex(job.sourceIndex, clusterService.state())) {
431+
when (val rollupValidationResult = validateRollupOnRollup(job)) {
432+
is RollupJobValidationResult.Valid -> {
433+
} // No action taken when valid
434+
else -> return@withClosableContext rollupValidationResult
435+
}
436+
}
437+
415438
// we validate target index only if there is metadata document in the rollup
416439
if (metadata != null) {
417440
logger.debug("Attempting to create/validate target index [${job.targetIndex}] for rollup job [${job.id}]")
@@ -470,6 +493,74 @@ object RollupRunner :
470493
}
471494
}
472495
}
496+
497+
@Suppress("ReturnCount")
498+
private fun validateRollupOnRollup(job: Rollup): RollupJobValidationResult {
499+
val sourceRollupJobs = clusterService.state().metadata.index(job.sourceIndex).getRollupJobs()
500+
if (sourceRollupJobs == null) {
501+
return RollupJobValidationResult.Invalid("Source rollup index has no rollup jobs")
502+
}
503+
504+
val sourceJob = sourceRollupJobs.first()
505+
val targetDateHistogram = job.getDateHistogram()
506+
val sourceDateHistogram = sourceJob.getDateHistogram()
507+
508+
// Validate interval alignment
509+
val sourceInterval = sourceDateHistogram.fixedInterval ?: sourceDateHistogram.calendarInterval!!
510+
val targetInterval = targetDateHistogram.fixedInterval ?: targetDateHistogram.calendarInterval!!
511+
val intervalValid = validateIntervalAlignment(sourceInterval, targetInterval)
512+
if (!intervalValid) {
513+
return RollupJobValidationResult.Invalid(
514+
"Target interval [$targetInterval] must be an exact multiple of source interval [$sourceInterval]",
515+
)
516+
}
517+
518+
// Validate source field compatibility
519+
val sourceDimensionFields = sourceJob.dimensions.map { it.sourceField }.toSet()
520+
val sourceMetricFields = sourceJob.metrics.map { it.sourceField }.toSet()
521+
val targetDimensionFields = job.dimensions.map { it.sourceField }.toSet()
522+
val targetMetricFields = job.metrics.map { it.sourceField }.toSet()
523+
524+
val invalidDimensionFields = targetDimensionFields - sourceDimensionFields
525+
val invalidMetricFields = targetMetricFields - sourceMetricFields
526+
527+
return when {
528+
invalidDimensionFields.isNotEmpty() -> RollupJobValidationResult.Invalid(
529+
"Cannot rollup on dimension fields $invalidDimensionFields that don't exist in source rollup",
530+
)
531+
invalidMetricFields.isNotEmpty() -> RollupJobValidationResult.Invalid(
532+
"Cannot rollup on metric fields $invalidMetricFields that don't exist in source rollup",
533+
)
534+
else -> {
535+
// Validate metric compatibility
536+
val sourceMetrics = sourceJob.metrics.flatMap { it.metrics }.map { it.type.type }.toSet()
537+
val targetMetrics = job.metrics.flatMap { it.metrics }.map { it.type.type }.toSet()
538+
val unsupportedMetrics = targetMetrics - sourceMetrics
539+
540+
if (unsupportedMetrics.isNotEmpty()) {
541+
RollupJobValidationResult.Invalid(
542+
"Target rollup requests metrics $unsupportedMetrics that are not available in source rollup",
543+
)
544+
} else {
545+
RollupJobValidationResult.Valid
546+
}
547+
}
548+
}
549+
}
550+
551+
private fun validateIntervalAlignment(sourceInterval: String, targetInterval: String): Boolean = try {
552+
val sourceMillis = parseIntervalToMillis(sourceInterval)
553+
val targetMillis = parseIntervalToMillis(targetInterval)
554+
targetMillis % sourceMillis == 0L && targetMillis > sourceMillis
555+
} catch (e: Exception) {
556+
true // Let it through and fail later with better error
557+
}
558+
559+
private fun parseIntervalToMillis(interval: String): Long = if (DateHistogramAggregationBuilder.DATE_FIELD_UNITS.containsKey(interval)) {
560+
DateHistogramInterval(interval).estimateMillis()
561+
} else {
562+
TimeValue.parseTimeValue(interval, "parseIntervalToMillis").millis
563+
}
473564
}
474565

475566
sealed class RollupJobResult {

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class RollupSearchService(
9999
}
100100

101101
@Suppress("ComplexMethod")
102-
suspend fun executeCompositeSearch(job: Rollup, metadata: RollupMetadata): RollupSearchResult = try {
102+
suspend fun executeCompositeSearch(job: Rollup, metadata: RollupMetadata, clusterService: ClusterService): RollupSearchResult = try {
103103
var retryCount = 0
104104
RollupSearchResult.Success(
105105
retrySearchPolicy.retry(logger) {
@@ -113,7 +113,7 @@ class RollupSearchService(
113113
)
114114
}
115115

116-
val searchRequest = job.copy(pageSize = pageSize).getRollupSearchRequest(metadata)
116+
val searchRequest = job.copy(pageSize = pageSize).getRollupSearchRequest(metadata, clusterService.state())
117117
val cancelTimeoutTimeValue = TimeValue.timeValueMinutes(getCancelAfterTimeInterval(cancelAfterTimeInterval.minutes))
118118
searchRequest.cancelAfterTimeInterval = cancelTimeoutTimeValue
119119

src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,49 @@ class RollupInterceptor(
6565

6666
@Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings)
6767

68+
companion object {
69+
/**
70+
* Thread-local bypass mechanism for internal operations that need to query rollup indices
71+
* without triggering interceptor validations.
72+
*/
73+
private val bypassInterceptor = ThreadLocal<Int>()
74+
75+
/**
76+
* Bypass level that skips all rollup search validations and rewriting.
77+
* Used when the system needs to query rollup indices directly using composite aggregation
78+
*/
79+
const val BYPASS_ROLLUP_SEARCH = 1
80+
81+
/**
82+
* Bypass level that allows non-zero size in rollup searches.
83+
* Used for internal operations like continuous rollup initialization that need to fetch
84+
* actual documents from rollup indices (e.g., getEarliestTimestampFromRollupIndex).
85+
*/
86+
const val BYPASS_SIZE_CHECK = 2
87+
88+
/**
89+
* Sets the bypass level for the current thread.
90+
* Must be followed by clearBypass() to avoid leaking the bypass state.
91+
*/
92+
fun setBypass(bypassLevel: Int) {
93+
bypassInterceptor.set(bypassLevel)
94+
}
95+
96+
/**
97+
* Clears the bypass level for the current thread.
98+
* Should always be called in a finally block after setBypass().
99+
*/
100+
fun clearBypass() {
101+
bypassInterceptor.remove()
102+
}
103+
104+
/**
105+
* Gets the current bypass level for the thread.
106+
* Returns 0 if no bypass is set.
107+
*/
108+
fun getBypassLevel(): Int = bypassInterceptor.get() ?: 0
109+
}
110+
68111
init {
69112
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) {
70113
searchEnabled = it
@@ -89,7 +132,20 @@ class RollupInterceptor(
89132
val index = request.shardId().indexName
90133
val isRollupIndex = isRollupIndex(index, clusterService.state())
91134
if (isRollupIndex) {
92-
if (request.source().size() != 0) {
135+
val bypassLevel = getBypassLevel()
136+
// BYPASS_ROLLUP_SEARCH: Skip all validations and query rewriting
137+
// Used for composite aggregation query which we do for retrieving the buckets
138+
// to write to target index during rollup of rolled up index
139+
if (bypassLevel == BYPASS_ROLLUP_SEARCH) {
140+
actualHandler.messageReceived(request, channel, task)
141+
return
142+
}
143+
144+
// BYPASS_SIZE_CHECK: Allow non-zero size for internal operations that need to
145+
// fetch documents (e.g., fetching earliest timestamp document for rolled index
146+
// in case of continuous rollup job). Normal rollup searches must have size=0
147+
// since they should only return aggregations
148+
if (bypassLevel != BYPASS_SIZE_CHECK && request.source().size() != 0) {
93149
throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}")
94150
}
95151

0 commit comments

Comments
 (0)