Skip to content

Commit c63652c

Browse files
committed
Removing some unwanted code
Signed-off-by: Kshitij Tandon <[email protected]>
1 parent 8b1e28e commit c63652c

File tree

3 files changed

+134
-93
lines changed

3 files changed

+134
-93
lines changed

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin
3131
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
3232
import org.opensearch.indexmanagement.opensearchapi.parseWithType
3333
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
34+
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor
3435
import org.opensearch.indexmanagement.rollup.model.ContinuousMetadata
3536
import org.opensearch.indexmanagement.rollup.model.Rollup
3637
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
@@ -41,6 +42,7 @@ import org.opensearch.indexmanagement.util.NO_ID
4142
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
4243
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
4344
import org.opensearch.search.builder.SearchSourceBuilder
45+
import org.opensearch.search.fetch.subphase.FetchSourceContext
4446
import org.opensearch.search.sort.SortOrder
4547
import org.opensearch.transport.RemoteTransportException
4648
import org.opensearch.transport.client.Client
@@ -242,7 +244,9 @@ class RollupMetadataService(
242244
val dateHistogram = rollup.dimensions.first() as DateHistogram
243245
val dateField = dateHistogram.sourceField
244246

245-
val bypassMarker = "_rollup_internal_bypass_${org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.BYPASS_SIZE_CHECK}"
247+
// For multi-tier rollup, we would be querying a document on a rollup index
248+
// So we set this bypassMarker in fetchSource as a flag to help bypass the validation in RollupInterceptor
249+
val bypassMarker = "_rollup_internal_bypass_${RollupInterceptor.BYPASS_SIZE_CHECK}"
246250

247251
val searchRequest = SearchRequest(rollup.sourceIndex)
248252
.source(
@@ -251,16 +255,11 @@ class RollupMetadataService(
251255
.query(MatchAllQueryBuilder())
252256
.sort("$dateField.date_histogram", SortOrder.ASC)
253257
.trackTotalHits(false)
254-
.fetchSource(org.opensearch.search.fetch.subphase.FetchSourceContext(false, arrayOf(bypassMarker), emptyArray()))
258+
.fetchSource(FetchSourceContext(false, arrayOf(bypassMarker), emptyArray()))
255259
.docValueField("$dateField.date_histogram", DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT),
256260
)
257261
.allowPartialSearchResults(false)
258262

259-
// Set bypass in ThreadContext for multi-node support - skips all rollup validations
260-
// This is needed because we're querying a rollup index directly without aggregations
261-
// Set bypass via preference parameter for multi-node support - allows size > 0 for fetching earliest timestamp
262-
// searchRequest.preference("_rollup_bypass:${org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.BYPASS_SIZE_CHECK}")
263-
264263
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
265264

266265
if (response.hits.hits.isEmpty()) {

src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt

Lines changed: 12 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -66,55 +66,28 @@ class RollupInterceptor(
6666
@Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings)
6767

6868
companion object {
69-
/**
70-
* Thread-local bypass mechanism for internal operations that need to query rollup indices
71-
* without triggering interceptor validations.
72-
* Kept for backward compatibility with single-node clusters.
73-
*/
74-
private val bypassInterceptor = ThreadLocal<Int>()
75-
7669
/**
7770
* Bypass level that skips all rollup search validations and rewriting.
78-
* Used when the system needs to query rollup indices directly using composite aggregation
71+
* Used when the system needs to query rollup indices directly using composite aggregation.
72+
* Passed via FetchSourceContext marker for multi-node support.
7973
*/
8074
const val BYPASS_ROLLUP_SEARCH = 1
8175

8276
/**
8377
* Bypass level that allows non-zero size in rollup searches.
8478
* Used for internal operations like continuous rollup initialization that need to fetch
8579
* actual documents from rollup indices (e.g., getEarliestTimestampFromRollupIndex).
80+
* Passed via FetchSourceContext marker for multi-node support.
8681
*/
8782
const val BYPASS_SIZE_CHECK = 2
88-
89-
/**
90-
* Sets the bypass level for the current thread.
91-
* Must be followed by clearBypass() to avoid leaking the bypass state.
92-
*/
93-
fun setBypass(bypassLevel: Int) {
94-
bypassInterceptor.set(bypassLevel)
95-
}
96-
97-
/**
98-
* Clears the bypass level for the current thread.
99-
* Should always be called in a finally block after setBypass().
100-
*/
101-
fun clearBypass() {
102-
bypassInterceptor.remove()
103-
}
104-
105-
/**
106-
* Gets the current bypass level for the thread.
107-
* Returns 0 if no bypass is set.
108-
*/
109-
fun getBypassLevel(): Int = bypassInterceptor.get() ?: 0
11083
}
11184

11285
/**
11386
* Reads the bypass value from the request's FetchSourceContext.
11487
* Returns the bypass level if the marker is present in includes array, null otherwise.
11588
* This enables bypass mechanism to work in multi-node clusters.
11689
*/
117-
private fun getBypassFromFetchSource(request: ShardSearchRequest): Int? {
90+
internal fun getBypassFromFetchSource(request: ShardSearchRequest): Int? {
11891
val includes = request.source()?.fetchSource()?.includes()
11992

12093
// Look for our bypass marker in the includes array and extract the bypass level
@@ -145,35 +118,25 @@ class RollupInterceptor(
145118
): TransportRequestHandler<T> = object : TransportRequestHandler<T> {
146119
override fun messageReceived(request: T, channel: TransportChannel, task: Task) {
147120
if (searchEnabled && request is ShardSearchRequest) {
148-
logger.info("Idhr aaya hu 3")
149121
val index = request.shardId().indexName
150122
val isRollupIndex = isRollupIndex(index, clusterService.state())
151123
if (isRollupIndex) {
152-
// Check bypass from FetchSourceContext (multi-node) OR ThreadLocal (single-node/backward compat)
153-
val bypassFromFetchSource = getBypassFromFetchSource(request)
154-
val bypassFromThreadLocal = getBypassLevel()
124+
// Check bypass from FetchSourceContext for multi-node support
125+
val bypassLevel = getBypassFromFetchSource(request)
155126

156-
// Use FetchSourceContext value if present, otherwise fall back to ThreadLocal
157-
val effectiveBypass = bypassFromFetchSource ?: bypassFromThreadLocal
158-
159-
logger.debug(
160-
"RollupInterceptor bypass check - fetchSource: $bypassFromFetchSource, " +
161-
"threadLocal: $bypassFromThreadLocal, effective: $effectiveBypass",
162-
)
127+
logger.debug("RollupInterceptor bypass check - bypassLevel: $bypassLevel")
163128

164129
// BYPASS_ROLLUP_SEARCH: Skip all validations and query rewriting
165-
// Used for composite aggregation query which we do for retrieving the buckets
166-
// to write to target index during rollup of rolled up index
167-
if (effectiveBypass == BYPASS_ROLLUP_SEARCH) {
130+
// Used for composite aggregation queries when rolling up rollup indices
131+
if (bypassLevel == BYPASS_ROLLUP_SEARCH) {
168132
actualHandler.messageReceived(request, channel, task)
169133
return
170134
}
171135

172136
// BYPASS_SIZE_CHECK: Allow non-zero size for internal operations that need to
173-
// fetch documents (e.g., fetching earliest timestamp document for rolled index
174-
// in case of continuous rollup job). Normal rollup searches must have size=0
175-
// since they should only return aggregations
176-
if (effectiveBypass != BYPASS_SIZE_CHECK && request.source().size() != 0) {
137+
// fetch documents (e.g., fetching earliest timestamp document for continuous rollup initialization)
138+
// Normal rollup searches must have size=0 since they should only return aggregations
139+
if (bypassLevel != BYPASS_SIZE_CHECK && request.source().size() != 0) {
177140
throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}")
178141
}
179142

src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorTests.kt

Lines changed: 116 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,60 +5,139 @@
55

66
package org.opensearch.indexmanagement.rollup.interceptor
77

8+
import org.junit.Before
9+
import org.mockito.Mockito.mock
10+
import org.mockito.Mockito.`when`
11+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
12+
import org.opensearch.cluster.service.ClusterService
13+
import org.opensearch.common.settings.ClusterSettings
14+
import org.opensearch.common.settings.Settings
815
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.Companion.BYPASS_ROLLUP_SEARCH
916
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.Companion.BYPASS_SIZE_CHECK
10-
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.Companion.clearBypass
11-
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.Companion.getBypassLevel
12-
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor.Companion.setBypass
17+
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
18+
import org.opensearch.search.builder.SearchSourceBuilder
19+
import org.opensearch.search.fetch.subphase.FetchSourceContext
20+
import org.opensearch.search.internal.ShardSearchRequest
1321
import org.opensearch.test.OpenSearchTestCase
1422

1523
class RollupInterceptorTests : OpenSearchTestCase() {
1624

17-
fun `test setBypass and getBypassLevel for BYPASS_ROLLUP_SEARCH`() {
18-
setBypass(BYPASS_ROLLUP_SEARCH)
19-
assertEquals(BYPASS_ROLLUP_SEARCH, getBypassLevel())
20-
clearBypass()
25+
private lateinit var interceptor: RollupInterceptor
26+
27+
@Before
28+
fun setup() {
29+
interceptor = createInterceptor()
30+
}
31+
32+
fun `test getBypassFromFetchSource returns null when no source`() {
33+
val request = mock(ShardSearchRequest::class.java)
34+
`when`(request.source()).thenReturn(null)
35+
36+
val bypassLevel = interceptor.getBypassFromFetchSource(request)
37+
38+
assertNull(bypassLevel)
2139
}
2240

23-
fun `test setBypass and getBypassLevel for BYPASS_SIZE_CHECK`() {
24-
setBypass(BYPASS_SIZE_CHECK)
25-
assertEquals(BYPASS_SIZE_CHECK, getBypassLevel())
26-
clearBypass()
41+
fun `test getBypassFromFetchSource returns null when no FetchSourceContext`() {
42+
val request = mock(ShardSearchRequest::class.java)
43+
val source = SearchSourceBuilder()
44+
45+
`when`(request.source()).thenReturn(source)
46+
47+
val bypassLevel = interceptor.getBypassFromFetchSource(request)
48+
49+
assertNull(bypassLevel)
2750
}
2851

29-
fun `test getBypassLevel returns 0 when not set`() {
30-
clearBypass()
31-
assertEquals(0, getBypassLevel())
52+
fun `test getBypassFromFetchSource returns null when no includes array`() {
53+
val request = mock(ShardSearchRequest::class.java)
54+
val source = SearchSourceBuilder()
55+
source.fetchSource(FetchSourceContext(true))
56+
57+
`when`(request.source()).thenReturn(source)
58+
59+
val bypassLevel = interceptor.getBypassFromFetchSource(request)
60+
61+
assertNull(bypassLevel)
3262
}
3363

34-
fun `test clearBypass resets bypass level`() {
35-
setBypass(BYPASS_ROLLUP_SEARCH)
36-
assertEquals(BYPASS_ROLLUP_SEARCH, getBypassLevel())
37-
clearBypass()
38-
assertEquals(0, getBypassLevel())
64+
fun `test getBypassFromFetchSource returns null when no bypass marker present`() {
65+
val request = mock(ShardSearchRequest::class.java)
66+
val source = SearchSourceBuilder()
67+
source.fetchSource(FetchSourceContext(false, arrayOf("field1", "field2"), emptyArray()))
68+
69+
`when`(request.source()).thenReturn(source)
70+
71+
val bypassLevel = interceptor.getBypassFromFetchSource(request)
72+
73+
assertNull(bypassLevel)
3974
}
4075

41-
fun `test bypass is thread local`() {
42-
setBypass(BYPASS_ROLLUP_SEARCH)
43-
assertEquals(BYPASS_ROLLUP_SEARCH, getBypassLevel())
76+
fun `test getBypassFromFetchSource extracts BYPASS_ROLLUP_SEARCH correctly`() {
77+
val request = mock(ShardSearchRequest::class.java)
78+
val source = SearchSourceBuilder()
79+
source.fetchSource(FetchSourceContext(false, arrayOf("_rollup_internal_bypass_$BYPASS_ROLLUP_SEARCH"), emptyArray()))
4480

45-
val thread = Thread {
46-
assertEquals(0, getBypassLevel())
47-
setBypass(BYPASS_SIZE_CHECK)
48-
assertEquals(BYPASS_SIZE_CHECK, getBypassLevel())
49-
}
50-
thread.start()
51-
thread.join()
81+
`when`(request.source()).thenReturn(source)
5282

53-
assertEquals(BYPASS_ROLLUP_SEARCH, getBypassLevel())
54-
clearBypass()
83+
val bypassLevel = interceptor.getBypassFromFetchSource(request)
84+
85+
assertEquals(BYPASS_ROLLUP_SEARCH, bypassLevel)
5586
}
5687

57-
fun `test multiple setBypass calls overwrite previous value`() {
58-
setBypass(BYPASS_ROLLUP_SEARCH)
59-
assertEquals(BYPASS_ROLLUP_SEARCH, getBypassLevel())
60-
setBypass(BYPASS_SIZE_CHECK)
61-
assertEquals(BYPASS_SIZE_CHECK, getBypassLevel())
62-
clearBypass()
88+
fun `test getBypassFromFetchSource extracts BYPASS_SIZE_CHECK correctly`() {
89+
val request = mock(ShardSearchRequest::class.java)
90+
val source = SearchSourceBuilder()
91+
source.fetchSource(FetchSourceContext(false, arrayOf("_rollup_internal_bypass_$BYPASS_SIZE_CHECK"), emptyArray()))
92+
93+
`when`(request.source()).thenReturn(source)
94+
95+
val bypassLevel = interceptor.getBypassFromFetchSource(request)
96+
97+
assertEquals(BYPASS_SIZE_CHECK, bypassLevel)
98+
}
99+
100+
fun `test getBypassFromFetchSource finds marker among multiple includes`() {
101+
val request = mock(ShardSearchRequest::class.java)
102+
val source = SearchSourceBuilder()
103+
source.fetchSource(
104+
FetchSourceContext(false, arrayOf("field1", "_rollup_internal_bypass_$BYPASS_ROLLUP_SEARCH", "field2"), emptyArray()),
105+
)
106+
107+
`when`(request.source()).thenReturn(source)
108+
109+
val bypassLevel = interceptor.getBypassFromFetchSource(request)
110+
111+
assertEquals(BYPASS_ROLLUP_SEARCH, bypassLevel)
112+
}
113+
114+
fun `test getBypassFromFetchSource returns null for invalid bypass marker`() {
115+
val request = mock(ShardSearchRequest::class.java)
116+
val source = SearchSourceBuilder()
117+
source.fetchSource(FetchSourceContext(false, arrayOf("_rollup_internal_bypass_invalid"), emptyArray()))
118+
119+
`when`(request.source()).thenReturn(source)
120+
121+
val bypassLevel = interceptor.getBypassFromFetchSource(request)
122+
123+
assertNull(bypassLevel)
124+
}
125+
126+
// Helper method to create interceptor instance
127+
private fun createInterceptor(): RollupInterceptor {
128+
val clusterService = mock(ClusterService::class.java)
129+
val clusterSettings = ClusterSettings(
130+
Settings.EMPTY,
131+
setOf(
132+
RollupSettings.ROLLUP_SEARCH_ENABLED,
133+
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
134+
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
135+
),
136+
)
137+
`when`(clusterService.clusterSettings).thenReturn(clusterSettings)
138+
139+
val settings = Settings.EMPTY
140+
val resolver = mock(IndexNameExpressionResolver::class.java)
141+
return RollupInterceptor(clusterService, settings, resolver)
63142
}
64143
}

0 commit comments

Comments
 (0)