Skip to content

Commit b49875e

Browse files
Kartik031299kbansal2
authored andcommitted
Add WarmerRefreshListerner to NRTReplicationEngine to warm replica shards (opensearch-project#20650)
Signed-off-by: Kartik Bansal <kbansal2@atlassian.com> Co-authored-by: Kartik Bansal <kbansal2@atlassian.com> Signed-off-by: Deepti24 <chauhan.deepti24@gmail.com>
1 parent 1125c09 commit b49875e

File tree

6 files changed

+333
-32
lines changed

6 files changed

+333
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3737
- Introduce AdditionalCodecs and EnginePlugin::getAdditionalCodecs hook to allow additional Codec registration ([#20411](https://github.com/opensearch-project/OpenSearch/pull/20411))
3838
- Introduced strategy planner interfaces for indexing and deletion ([#20585](https://github.com/opensearch-project/OpenSearch/pull/20585))
3939
- Implement FieldMappingIngestionMessageMapper for pull-based ingestion ([#20729](https://github.com/opensearch-project/OpenSearch/pull/20729))
40+
- Added support of WarmerRefreshListener in NRTReplicationEngine to trigger warmer after replication on replica shards ([#20650](https://github.com/opensearch-project/OpenSearch/pull/20650))
4041

4142
### Changed
4243
- Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570))
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
12+
import org.opensearch.action.admin.indices.stats.ShardStats;
13+
import org.opensearch.index.warmer.WarmerStats;
14+
import org.opensearch.test.OpenSearchIntegTestCase;
15+
import org.junit.Before;
16+
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
20+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
21+
import static org.hamcrest.Matchers.greaterThan;
22+
23+
/**
24+
* Integration tests that verify index warming (e.g., eager global ordinals loading) works correctly
25+
* on NRT replica shards during segment replication. This validates the end-to-end flow where
26+
* index warmer gets triggered when new segments arrive on replicas via segment replication.
27+
*
28+
* <p>Uses a keyword field with {@code eager_global_ordinals: true}
29+
* to exercise the warming path during segment replication.</p>
30+
*/
31+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
32+
public class SegmentReplicationReplicaIndexWarmerIT extends SegmentReplicationBaseIT {
33+
@Before
34+
public void setup() {
35+
internalCluster().startClusterManagerOnlyNode();
36+
}
37+
38+
/**
39+
* Verifies that eager global ordinals are loaded on both primary and replica shards
40+
* after segment replication, by checking warmer invocation metrics from the index stats API.
41+
*
42+
* <p>This test ensures that the {@code WarmerRefreshListener} in {@code NRTReplicationEngine}
43+
* correctly invokes the {@code IndexWarmer} chain on replica shards, which in turn loads
44+
* global ordinals for the keyword field with {@code eager_global_ordinals: true}.</p>
45+
*/
46+
public void testEagerGlobalOrdinalsLoadedOnReplicaAfterSegmentReplication() throws Exception {
47+
final String primaryNode = internalCluster().startDataOnlyNode();
48+
assertAcked(
49+
prepareCreate(INDEX_NAME).setSettings(indexSettings()).setMapping("category", "type=keyword,eager_global_ordinals=true")
50+
);
51+
ensureYellow(INDEX_NAME);
52+
final String replicaNode = internalCluster().startDataOnlyNode();
53+
ensureGreen(INDEX_NAME);
54+
55+
final int docCount = randomIntBetween(10, 20);
56+
indexTestDocuments(docCount, -1);
57+
final Map<String, Long> warmerTotalBeforeRefresh = getWarmerTotalPerShardType(INDEX_NAME);
58+
refresh(INDEX_NAME);
59+
waitForSearchableDocs(docCount, primaryNode, replicaNode);
60+
61+
// Assert warmer invoked metrics from index stats API for both primary and replica shards
62+
assertBusy(() -> {
63+
Map<String, Long> warmerTotalsAfterRefresh = getWarmerTotalPerShardType(INDEX_NAME);
64+
compareWarmerTotals(warmerTotalBeforeRefresh, warmerTotalsAfterRefresh);
65+
});
66+
}
67+
68+
/**
69+
* Verifies that warmer invocations continue on replica shards after a force merge on the primary,
70+
* followed by segment replication. This ensures warming is triggered on subsequent segment updates.
71+
*/
72+
public void testWarmerInvokedOnReplicaAfterForceMerge() throws Exception {
73+
final String primaryNode = internalCluster().startDataOnlyNode();
74+
assertAcked(
75+
prepareCreate(INDEX_NAME).setSettings(indexSettings()).setMapping("category", "type=keyword,eager_global_ordinals=true")
76+
);
77+
ensureYellow(INDEX_NAME);
78+
final String replicaNode = internalCluster().startDataOnlyNode();
79+
ensureGreen(INDEX_NAME);
80+
81+
// Index documents in batches to create multiple segments
82+
final int docCount = randomIntBetween(10, 20);
83+
indexTestDocuments(docCount, 3);
84+
waitForSearchableDocs(docCount, primaryNode, replicaNode);
85+
86+
// Capture warmer stats before force merge
87+
final Map<String, Long> warmerTotalBeforeForceMerge = getWarmerTotalPerShardType(INDEX_NAME);
88+
89+
// Force merge and wait for segment replication to complete
90+
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
91+
92+
// Verify warmer was invoked again on replica after force merge replication
93+
assertBusy(() -> {
94+
final Map<String, Long> warmerTotalAfterForceMerge = getWarmerTotalPerShardType(INDEX_NAME);
95+
compareWarmerTotals(warmerTotalBeforeForceMerge, warmerTotalAfterForceMerge, true);
96+
});
97+
}
98+
99+
private Map<String, Long> getWarmerTotalPerShardType(String indexName) {
100+
IndicesStatsResponse shardStatsArray = client().admin().indices().prepareStats(indexName).clear().setWarmer(true).get();
101+
Map<String, Long> warmerTotals = new HashMap<>();
102+
warmerTotals.put("primary", 0L);
103+
warmerTotals.put("replica", 0L);
104+
for (ShardStats shardStats : shardStatsArray.getShards()) {
105+
WarmerStats warmerStats = shardStats.getStats().getWarmer();
106+
if (warmerStats != null) {
107+
String key = shardStats.getShardRouting().primary() ? "primary" : "replica";
108+
warmerTotals.merge(key, warmerStats.total(), Long::sum);
109+
}
110+
}
111+
return warmerTotals;
112+
}
113+
114+
private void indexTestDocuments(int docCount, int refreshAfterDocs) {
115+
for (int i = 0; i < docCount; i++) {
116+
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("category", "value-" + i).get();
117+
if (refreshAfterDocs != -1 && i % refreshAfterDocs == 0) {
118+
refresh(INDEX_NAME);
119+
}
120+
}
121+
}
122+
123+
private void compareWarmerTotals(Map<String, Long> before, Map<String, Long> after) {
124+
compareWarmerTotals(before, after, false);
125+
}
126+
127+
private void compareWarmerTotals(Map<String, Long> before, Map<String, Long> after, boolean onReplicaOnly) {
128+
for (String shardType : before.keySet()) {
129+
if (onReplicaOnly && shardType.equals("primary")) {
130+
continue; // Skip primary if we're only checking replica warmer totals
131+
}
132+
long beforeTotal = before.get(shardType);
133+
long afterTotal = after.get(shardType);
134+
assertThat(
135+
"Warmer total for " + shardType + " should increase after segment replication",
136+
afterTotal,
137+
greaterThan(beforeTotal)
138+
);
139+
}
140+
}
141+
}

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.engine;
1010

11+
import org.apache.logging.log4j.Logger;
1112
import org.apache.lucene.index.DirectoryReader;
1213
import org.apache.lucene.index.IndexCommit;
1314
import org.apache.lucene.index.SegmentInfos;
@@ -43,6 +44,7 @@
4344
import java.util.Map;
4445
import java.util.Objects;
4546
import java.util.concurrent.CountDownLatch;
47+
import java.util.concurrent.atomic.AtomicBoolean;
4648
import java.util.concurrent.locks.Lock;
4749
import java.util.concurrent.locks.ReentrantLock;
4850
import java.util.function.BiFunction;
@@ -100,6 +102,9 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
100102
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
101103
this.readerManager.addListener(listener);
102104
}
105+
// Wire up a warmer listener to trigger index warming
106+
// when new segments arrive via segment replication
107+
this.readerManager.addListener(new WarmerRefreshListener(logger, isClosed, engineConfig, this.readerManager));
103108
final Map<String, String> userData = this.lastCommittedSegmentInfos.getUserData();
104109
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
105110
translogManagerRef = new WriteOnlyTranslogManager(
@@ -560,4 +565,56 @@ private DirectoryReader getDirectoryReader() throws IOException {
560565
Lucene.SOFT_DELETES_FIELD
561566
);
562567
}
568+
569+
/**
570+
* A {@link ReferenceManager.RefreshListener} that warms new segments when the reader is refreshed
571+
* during segment replication. This ensures index warming (e.g., loading global ordinals via
572+
* {@link Engine.Warmer}) occurs on NRT replica shards, consistent with the warming behavior
573+
* in {@link InternalEngine} used by NRT primary shards.
574+
*
575+
* @opensearch.internal
576+
*/
577+
static final class WarmerRefreshListener implements ReferenceManager.RefreshListener {
578+
private final Engine.Warmer warmer;
579+
private final Logger logger;
580+
private final AtomicBoolean isEngineClosed;
581+
private final NRTReplicationReaderManager readerManager;
582+
583+
WarmerRefreshListener(
584+
Logger logger,
585+
AtomicBoolean isEngineClosed,
586+
EngineConfig engineConfig,
587+
NRTReplicationReaderManager readerManager
588+
) {
589+
this.warmer = engineConfig.getWarmer();
590+
this.logger = logger;
591+
this.isEngineClosed = isEngineClosed;
592+
this.readerManager = readerManager;
593+
}
594+
595+
@Override
596+
public void beforeRefresh() throws IOException {}
597+
598+
@Override
599+
public void afterRefresh(boolean didRefresh) {
600+
if (didRefresh && warmer != null) {
601+
try {
602+
OpenSearchDirectoryReader reader = readerManager.acquire();
603+
try {
604+
warmer.warm(reader);
605+
} catch (Exception e) {
606+
if (isEngineClosed.get() == false) {
607+
logger.warn("failed to warm reader replica", e);
608+
}
609+
} finally {
610+
readerManager.release(reader);
611+
}
612+
} catch (IOException e) {
613+
if (isEngineClosed.get() == false) {
614+
logger.warn("failed to acquire reader for warming on replica", e);
615+
}
616+
}
617+
}
618+
}
619+
}
563620
}

server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3477,7 +3477,8 @@ public void onFailedEngine(String reason, Exception e) {
34773477
globalCheckpoint::get,
34783478
retentionLeasesHolder::get,
34793479
new NoneCircuitBreakerService(),
3480-
eventListener
3480+
eventListener,
3481+
null
34813482
)
34823483
);
34833484

@@ -3578,7 +3579,8 @@ public void onFailedEngine(String reason, Exception e) {
35783579
globalCheckpoint::get,
35793580
retentionLeasesHolder::get,
35803581
new NoneCircuitBreakerService(),
3581-
eventListener
3582+
eventListener,
3583+
null
35823584
)
35833585
);
35843586

@@ -3672,7 +3674,8 @@ public void onFailedEngine(String reason, Exception e) {
36723674
globalCheckpoint::get,
36733675
retentionLeasesHolder::get,
36743676
new NoneCircuitBreakerService(),
3675-
eventListener
3677+
eventListener,
3678+
null
36763679
)
36773680
);
36783681

@@ -3769,7 +3772,8 @@ public void onFailedEngine(String reason, Exception e) {
37693772
globalCheckpoint::get,
37703773
retentionLeasesHolder::get,
37713774
new NoneCircuitBreakerService(),
3772-
eventListener
3775+
eventListener,
3776+
null
37733777
)
37743778
);
37753779

@@ -7216,7 +7220,18 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
72167220
Set<Long> existingSeqNos = new HashSet<>();
72177221
store = createStore();
72187222
engine = createEngine(
7219-
config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, retentionLeasesHolder::get)
7223+
config(
7224+
indexSettings,
7225+
store,
7226+
createTempDir(),
7227+
newMergePolicy(),
7228+
null,
7229+
null,
7230+
null,
7231+
globalCheckpoint::get,
7232+
retentionLeasesHolder::get,
7233+
new NoneCircuitBreakerService()
7234+
)
72207235
);
72217236
assertThat(engine.getMinRetainedSeqNo(), equalTo(0L));
72227237
long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo();

0 commit comments

Comments
 (0)