Skip to content

Commit 115d67e

Browse files
committed
Using flag in place of thread context for internal search calls
Signed-off-by: Kshitij Tandon <[email protected]>
1 parent 96eb1d6 commit 115d67e

File tree

4 files changed

+60
-37
lines changed

4 files changed

+60
-37
lines changed

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ class RollupMetadataService(
242242
val dateHistogram = rollup.dimensions.first() as DateHistogram
243243
val dateField = dateHistogram.sourceField
244244

245-
logger.info("Idhr se ja rha hu mai tumhe kya 2")
245+
val bypassMarker = "_rollup_internal_bypass_${org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.BYPASS_SIZE_CHECK}"
246246

247247
val searchRequest = SearchRequest(rollup.sourceIndex)
248248
.source(
@@ -251,37 +251,31 @@ class RollupMetadataService(
251251
.query(MatchAllQueryBuilder())
252252
.sort("$dateField.date_histogram", SortOrder.ASC)
253253
.trackTotalHits(false)
254-
.fetchSource(false)
254+
.fetchSource(org.opensearch.search.fetch.subphase.FetchSourceContext(false, arrayOf(bypassMarker), emptyArray()))
255255
.docValueField("$dateField.date_histogram", DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT),
256256
)
257257
.allowPartialSearchResults(false)
258258

259-
// Set BYPASS_SIZE_CHECK to allow size=1 when querying rollup index to get earliest timestamp
260-
// This is needed for continuous rollup initialization on rollup indices (multi-tier rollup)
261-
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.setBypass(
262-
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.BYPASS_SIZE_CHECK,
263-
)
264-
try {
265-
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
259+
// Set bypass in ThreadContext for multi-node support - skips all rollup validations
260+
// This is needed because we're querying a rollup index directly without aggregations
261+
// Set bypass via preference parameter for multi-node support - allows size > 0 for fetching earliest timestamp
262+
// searchRequest.preference("_rollup_bypass:${org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.BYPASS_SIZE_CHECK}")
266263

267-
if (response.hits.hits.isEmpty()) {
268-
return StartingTimeResult.NoDocumentsFound
269-
}
264+
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
270265

271-
logger.info("Idhr se ja rha hu mai tumhe kya")
266+
if (response.hits.hits.isEmpty()) {
267+
return StartingTimeResult.NoDocumentsFound
268+
}
272269

273-
// In rollup indices, date histogram fields are named as "field.date_histogram"
274-
val rollupDateField = "$dateField.date_histogram"
275-
val firstHitTimestampAsString: String =
276-
response.hits.hits.first().field(rollupDateField).getValue<String>()
277-
?: return StartingTimeResult.NoDocumentsFound
270+
// In rollup indices, date histogram fields are named as "field.date_histogram"
271+
val rollupDateField = "$dateField.date_histogram"
272+
val firstHitTimestampAsString: String =
273+
response.hits.hits.first().field(rollupDateField).getValue<String>()
274+
?: return StartingTimeResult.NoDocumentsFound
278275

279-
val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
280-
val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli()
281-
return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram))
282-
} finally {
283-
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.clearBypass()
284-
}
276+
val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
277+
val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli()
278+
return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram))
285279
} catch (e: RemoteTransportException) {
286280
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
287281
logger.error("Error when getting earliest timestamp from rollup index for rollup [{}]: {}", rollup.id, unwrappedException)

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -271,16 +271,7 @@ object RollupRunner :
271271
withClosableContext(
272272
IndexManagementSecurityContext(job.id, settings, threadPool.threadContext, job.user),
273273
) {
274-
// Need to set this bypass as we are already doing the required aggregation re-writing in the composite aggregation
275-
// hence do not need to go through the interceptor for aggregation rewriting
276-
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.setBypass(
277-
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.BYPASS_ROLLUP_SEARCH,
278-
)
279-
try {
280-
rollupSearchService.executeCompositeSearch(updatableJob, metadata, clusterService)
281-
} finally {
282-
org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.clearBypass()
283-
}
274+
rollupSearchService.executeCompositeSearch(updatableJob, metadata, clusterService)
284275
}
285276
val rollupResult =
286277
when (rollupSearchResult) {

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupSearchService.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ import org.opensearch.core.action.ActionListener
1919
import org.opensearch.core.common.breaker.CircuitBreakingException
2020
import org.opensearch.indexmanagement.opensearchapi.retry
2121
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
22+
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor
2223
import org.opensearch.indexmanagement.rollup.model.Rollup
2324
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
2425
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES
2526
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_COUNT
2627
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_MILLIS
2728
import org.opensearch.indexmanagement.rollup.util.getRollupSearchRequest
2829
import org.opensearch.search.aggregations.MultiBucketConsumerService
30+
import org.opensearch.search.fetch.subphase.FetchSourceContext
2931
import org.opensearch.transport.RemoteTransportException
3032
import org.opensearch.transport.client.Client
3133
import java.time.Instant
@@ -117,6 +119,14 @@ class RollupSearchService(
117119
val cancelTimeoutTimeValue = TimeValue.timeValueMinutes(getCancelAfterTimeInterval(cancelAfterTimeInterval.minutes))
118120
searchRequest.cancelAfterTimeInterval = cancelTimeoutTimeValue
119121

122+
// Set bypass flag via FetchSourceContext for multi-node support
123+
// Using fetchSource=false with marker in includes to avoid affecting actual source fetching
124+
val bypassMarker = "_rollup_internal_bypass_${RollupInterceptor.BYPASS_ROLLUP_SEARCH}"
125+
126+
searchRequest.source().fetchSource(
127+
FetchSourceContext(false, arrayOf(bypassMarker), emptyArray()),
128+
)
129+
120130
search(searchRequest, listener)
121131
}
122132
},

src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class RollupInterceptor(
6969
/**
7070
* Thread-local bypass mechanism for internal operations that need to query rollup indices
7171
* without triggering interceptor validations.
72+
* Kept for backward compatibility with single-node clusters.
7273
*/
7374
private val bypassInterceptor = ThreadLocal<Int>()
7475

@@ -108,6 +109,21 @@ class RollupInterceptor(
108109
fun getBypassLevel(): Int = bypassInterceptor.get() ?: 0
109110
}
110111

112+
/**
113+
* Reads the bypass value from the request's FetchSourceContext.
114+
* Returns the bypass level if the marker is present in includes array, null otherwise.
115+
* This enables bypass mechanism to work in multi-node clusters.
116+
*/
117+
private fun getBypassFromFetchSource(request: ShardSearchRequest): Int? {
118+
val includes = request.source()?.fetchSource()?.includes()
119+
120+
// Look for our bypass marker in the includes array and extract the bypass level
121+
return includes
122+
?.find { it.startsWith("_rollup_internal_bypass_") }
123+
?.substringAfter("_rollup_internal_bypass_")
124+
?.toIntOrNull()
125+
}
126+
111127
init {
112128
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) {
113129
searchEnabled = it
@@ -129,14 +145,26 @@ class RollupInterceptor(
129145
): TransportRequestHandler<T> = object : TransportRequestHandler<T> {
130146
override fun messageReceived(request: T, channel: TransportChannel, task: Task) {
131147
if (searchEnabled && request is ShardSearchRequest) {
148+
logger.info("Idhr aaya hu 3")
132149
val index = request.shardId().indexName
133150
val isRollupIndex = isRollupIndex(index, clusterService.state())
134151
if (isRollupIndex) {
135-
val bypassLevel = getBypassLevel()
152+
// Check bypass from FetchSourceContext (multi-node) OR ThreadLocal (single-node/backward compat)
153+
val bypassFromFetchSource = getBypassFromFetchSource(request)
154+
val bypassFromThreadLocal = getBypassLevel()
155+
156+
// Use FetchSourceContext value if present, otherwise fall back to ThreadLocal
157+
val effectiveBypass = bypassFromFetchSource ?: bypassFromThreadLocal
158+
159+
logger.debug(
160+
"RollupInterceptor bypass check - fetchSource: $bypassFromFetchSource, " +
161+
"threadLocal: $bypassFromThreadLocal, effective: $effectiveBypass",
162+
)
163+
136164
// BYPASS_ROLLUP_SEARCH: Skip all validations and query rewriting
137165
// Used for composite aggregation query which we do for retrieving the buckets
138166
// to write to target index during rollup of rolled up index
139-
if (bypassLevel == BYPASS_ROLLUP_SEARCH) {
167+
if (effectiveBypass == BYPASS_ROLLUP_SEARCH) {
140168
actualHandler.messageReceived(request, channel, task)
141169
return
142170
}
@@ -145,7 +173,7 @@ class RollupInterceptor(
145173
// fetch documents (e.g., fetching earliest timestamp document for rolled index
146174
// in case of continuous rollup job). Normal rollup searches must have size=0
147175
// since they should only return aggregations
148-
if (bypassLevel != BYPASS_SIZE_CHECK && request.source().size() != 0) {
176+
if (effectiveBypass != BYPASS_SIZE_CHECK && request.source().size() != 0) {
149177
throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}")
150178
}
151179

0 commit comments

Comments
 (0)