Skip to content

Commit e67f291

Browse files
Add warmup phase for pull-based ingestion (#20526)
This PR introduces a warmup phase for pull-based ingestion that prevents shards from serving queries until they have caught up with the streaming source. This is analogous to how push-based replication waits for replicas to sync before serving. Key changes: - Add warmup settings: enabled (default: true), timeout, lag_threshold, fail_on_timeout - Create WarmupConfig class to encapsulate warmup configuration - Block shard in postRecovery() until warmup completes or times out - Use CountDownLatch for thread-safe warmup blocking/signaling - Encapsulate timeout and error handling in IngestionEngine.awaitWarmupComplete() --------- Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
1 parent 54d6d1f commit e67f291

File tree

12 files changed

+716
-26
lines changed

12 files changed

+716
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2424
- Add ref_path support for package-based hunspell dictionary loading ([#20840](https://github.com/opensearch-project/OpenSearch/pull/20840))
2525
- Add support for enabling pluggable data formats, starting with phase-1 of decoupling shard from engine, and introducing basic abstractions ([#20675](https://github.com/opensearch-project/OpenSearch/pull/20675))
2626

27+
- Add warmup phase to wait for lag to catch up in pull-based ingestion before serving ([#20526](https://github.com/opensearch-project/OpenSearch/pull/20526))
2728
### Changed
2829
- Make telemetry `Tags` immutable ([#20788](https://github.com/opensearch-project/OpenSearch/pull/20788))
2930
- Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1338,4 +1338,55 @@ public void testKafkaIngestionWithFieldMappingMapper_VariousConfigurations() thr
13381338
&& !docs.get("jkl").containsKey("expired");
13391339
});
13401340
}
1341+
1342+
public void testWarmupPhase() throws Exception {
1343+
// Step 1: Publish 10 messages before creating the index
1344+
for (int i = 0; i < 10; i++) {
1345+
produceData(Integer.toString(i), "name" + i, "25");
1346+
}
1347+
1348+
// Step 2: Start cluster
1349+
internalCluster().startClusterManagerOnlyNode();
1350+
final String nodeA = internalCluster().startDataOnlyNode();
1351+
1352+
// Step 3: Create index with warmup enabled, lag_threshold=0, and long timeout
1353+
createIndex(
1354+
indexName,
1355+
Settings.builder()
1356+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1357+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1358+
.put("ingestion_source.type", "kafka")
1359+
.put("ingestion_source.pointer.init.reset", "earliest")
1360+
.put("ingestion_source.param.topic", topicName)
1361+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
1362+
.put("ingestion_source.warmup.lag_threshold", 0)
1363+
.put("ingestion_source.warmup.timeout", "10m")
1364+
.put("ingestion_source.all_active", true)
1365+
.build(),
1366+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
1367+
);
1368+
1369+
ensureGreen(indexName);
1370+
1371+
// Step 4: Wait for poller to enter POLLING state (warmup complete)
1372+
waitForState(() -> {
1373+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
1374+
return ingestionState.getShardStates().length == 1
1375+
&& ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("polling");
1376+
});
1377+
1378+
// Step 5: Validate all 10 documents are searchable after warmup
1379+
// Refresh to make all docs visible, then assert directly (not waitForSearchableDocs)
1380+
// since all 10 docs should already be indexed during the warmup phase
1381+
client(nodeA).admin().indices().prepareRefresh(indexName).get();
1382+
long docCount = client(nodeA).prepareSearch(indexName).setSize(0).get().getHits().getTotalHits().value();
1383+
assertEquals("All 10 documents should be searchable immediately after warmup completes", 10L, docCount);
1384+
1385+
// Step 6: Verify stats
1386+
PollingIngestStats stats = client(nodeA).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
1387+
.getPollingIngestStats();
1388+
assertNotNull(stats);
1389+
assertEquals(10L, stats.getMessageProcessorStats().totalProcessedCount());
1390+
assertEquals(10L, stats.getConsumerStats().totalPolledCount());
1391+
}
13411392
}

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public void testPauseAndResumeAPIs() throws Exception {
140140
}
141141

142142
// This test validates shard initialization does not fail due to kafka connection errors.
143+
// Warmup is disabled since we're testing error handling, not warmup behavior.
143144
public void testShardInitializationUsingUnknownTopic() throws Exception {
144145
createIndexWithMappingSource(
145146
indexName,

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,32 @@ public Iterator<Setting<?>> settings() {
10311031
}, Property.IndexScope, Property.Final)
10321032
);
10331033

1034+
/**
1035+
* Defines the maximum time to wait for lag to catch up during warmup phase.
1036+
* A value of -1 means warmup is disabled (the default). A value >= 0 enables warmup with that timeout.
1037+
*/
1038+
public static final String SETTING_INGESTION_SOURCE_WARMUP_TIMEOUT = "index.ingestion_source.warmup.timeout";
1039+
public static final Setting<TimeValue> INGESTION_SOURCE_WARMUP_TIMEOUT_SETTING = Setting.timeSetting(
1040+
SETTING_INGESTION_SOURCE_WARMUP_TIMEOUT,
1041+
TimeValue.timeValueMillis(-1),
1042+
TimeValue.timeValueMillis(-1),
1043+
Property.IndexScope,
1044+
Property.Final
1045+
);
1046+
1047+
/**
1048+
* Defines the acceptable pointer-based lag threshold. Warmup completes when lag is at or below this value.
1049+
* A value of 0 means fully caught up (no lag).
1050+
*/
1051+
public static final String SETTING_INGESTION_SOURCE_WARMUP_LAG_THRESHOLD = "index.ingestion_source.warmup.lag_threshold";
1052+
public static final Setting<Long> INGESTION_SOURCE_WARMUP_LAG_THRESHOLD_SETTING = Setting.longSetting(
1053+
SETTING_INGESTION_SOURCE_WARMUP_LAG_THRESHOLD,
1054+
100L,
1055+
0L,
1056+
Property.IndexScope,
1057+
Property.Final
1058+
);
1059+
10341060
/**
10351061
* an internal index format description, allowing us to find out if this index is upgraded or needs upgrading
10361062
*/
@@ -1301,6 +1327,12 @@ public IngestionSource getIngestionSource() {
13011327
final IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.get(settings);
13021328
final Map<String, Object> mapperSettings = INGESTION_SOURCE_MAPPER_SETTINGS.getAsMap(settings);
13031329

1330+
// Warmup settings
1331+
final IngestionSource.WarmupConfig warmupConfig = new IngestionSource.WarmupConfig(
1332+
INGESTION_SOURCE_WARMUP_TIMEOUT_SETTING.get(settings),
1333+
INGESTION_SOURCE_WARMUP_LAG_THRESHOLD_SETTING.get(settings)
1334+
);
1335+
13041336
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
13051337
.setPointerInitReset(pointerInitReset)
13061338
.setErrorStrategy(errorStrategy)
@@ -1312,6 +1344,7 @@ public IngestionSource getIngestionSource() {
13121344
.setPointerBasedLagUpdateInterval(pointerBasedLagUpdateInterval)
13131345
.setMapperType(mapperType)
13141346
.setMapperSettings(mapperSettings)
1347+
.setWarmupConfig(warmupConfig)
13151348
.build();
13161349
}
13171350
return null;

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING;
2828
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING;
2929
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT;
30+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_WARMUP_LAG_THRESHOLD_SETTING;
31+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_WARMUP_TIMEOUT_SETTING;
3032

3133
/**
3234
* Class encapsulating the configuration of an ingestion source.
@@ -45,6 +47,7 @@ public class IngestionSource {
4547
private final TimeValue pointerBasedLagUpdateInterval;
4648
private final IngestionMessageMapper.MapperType mapperType;
4749
private final Map<String, Object> mapperSettings;
50+
private final WarmupConfig warmupConfig;
4851

4952
private IngestionSource(
5053
String type,
@@ -58,7 +61,8 @@ private IngestionSource(
5861
boolean allActiveIngestion,
5962
TimeValue pointerBasedLagUpdateInterval,
6063
IngestionMessageMapper.MapperType mapperType,
61-
Map<String, Object> mapperSettings
64+
Map<String, Object> mapperSettings,
65+
WarmupConfig warmupConfig
6266
) {
6367
this.type = type;
6468
this.pointerInitReset = pointerInitReset;
@@ -72,6 +76,7 @@ private IngestionSource(
7276
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
7377
this.mapperType = mapperType;
7478
this.mapperSettings = mapperSettings != null ? Collections.unmodifiableMap(mapperSettings) : Collections.emptyMap();
79+
this.warmupConfig = warmupConfig;
7580
}
7681

7782
public String getType() {
@@ -122,6 +127,10 @@ public Map<String, Object> getMapperSettings() {
122127
return mapperSettings;
123128
}
124129

130+
public WarmupConfig getWarmupConfig() {
131+
return warmupConfig;
132+
}
133+
125134
@Override
126135
public boolean equals(Object o) {
127136
if (this == o) return true;
@@ -138,7 +147,8 @@ public boolean equals(Object o) {
138147
&& Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion)
139148
&& Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval)
140149
&& Objects.equals(mapperType, ingestionSource.mapperType)
141-
&& Objects.equals(mapperSettings, ingestionSource.mapperSettings);
150+
&& Objects.equals(mapperSettings, ingestionSource.mapperSettings)
151+
&& Objects.equals(warmupConfig, ingestionSource.warmupConfig);
142152
}
143153

144154
@Override
@@ -155,7 +165,8 @@ public int hashCode() {
155165
allActiveIngestion,
156166
pointerBasedLagUpdateInterval,
157167
mapperType,
158-
mapperSettings
168+
mapperSettings,
169+
warmupConfig
159170
);
160171
}
161172

@@ -190,6 +201,8 @@ public String toString() {
190201
+ '\''
191202
+ ", mapperSettings="
192203
+ mapperSettings
204+
+ ", warmupConfig="
205+
+ warmupConfig
193206
+ '}';
194207
}
195208

@@ -233,6 +246,21 @@ public String toString() {
233246
}
234247
}
235248

249+
/**
250+
* Record encapsulating the warmup configuration for pull-based ingestion.
251+
* When warmup is enabled (timeout >= 0), shards will wait for lag to catch up before serving queries
252+
* after node restart or shard relocation. A timeout of -1 means warmup is disabled.
253+
*/
254+
@PublicApi(since = "3.6.0")
255+
public record WarmupConfig(TimeValue timeout, long lagThreshold) {
256+
/**
257+
* Returns true if warmup is enabled (timeout >= 0).
258+
*/
259+
public boolean isEnabled() {
260+
return timeout.millis() >= 0;
261+
}
262+
}
263+
236264
/**
237265
* Builder for {@link IngestionSource}.
238266
*
@@ -253,6 +281,9 @@ public static class Builder {
253281
);
254282
private IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.getDefault(Settings.EMPTY);
255283
private Map<String, Object> mapperSettings = new HashMap<>();
284+
// Warmup configuration
285+
private TimeValue warmupTimeout = INGESTION_SOURCE_WARMUP_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
286+
private long warmupLagThreshold = INGESTION_SOURCE_WARMUP_LAG_THRESHOLD_SETTING.getDefault(Settings.EMPTY);
256287

257288
public Builder(String type) {
258289
this.type = type;
@@ -269,6 +300,10 @@ public Builder(IngestionSource ingestionSource) {
269300
this.pointerBasedLagUpdateInterval = ingestionSource.pointerBasedLagUpdateInterval;
270301
this.mapperType = ingestionSource.mapperType;
271302
this.mapperSettings = new HashMap<>(ingestionSource.mapperSettings);
303+
// Copy warmup config
304+
WarmupConfig wc = ingestionSource.warmupConfig;
305+
this.warmupTimeout = wc.timeout();
306+
this.warmupLagThreshold = wc.lagThreshold();
272307
}
273308

274309
public Builder setPointerInitReset(PointerInitReset pointerInitReset) {
@@ -331,7 +366,24 @@ public Builder setMapperSettings(Map<String, Object> mapperSettings) {
331366
return this;
332367
}
333368

369+
public Builder setWarmupTimeout(TimeValue warmupTimeout) {
370+
this.warmupTimeout = warmupTimeout;
371+
return this;
372+
}
373+
374+
public Builder setWarmupLagThreshold(long warmupLagThreshold) {
375+
this.warmupLagThreshold = warmupLagThreshold;
376+
return this;
377+
}
378+
379+
public Builder setWarmupConfig(WarmupConfig warmupConfig) {
380+
this.warmupTimeout = warmupConfig.timeout();
381+
this.warmupLagThreshold = warmupConfig.lagThreshold();
382+
return this;
383+
}
384+
334385
public IngestionSource build() {
386+
WarmupConfig warmupConfig = new WarmupConfig(warmupTimeout, warmupLagThreshold);
335387
return new IngestionSource(
336388
type,
337389
pointerInitReset,
@@ -344,7 +396,8 @@ public IngestionSource build() {
344396
allActiveIngestion,
345397
pointerBasedLagUpdateInterval,
346398
mapperType,
347-
mapperSettings
399+
mapperSettings,
400+
warmupConfig
348401
);
349402
}
350403

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
292292
IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING,
293293
IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING,
294294
IndexMetadata.INGESTION_SOURCE_MAPPER_SETTINGS,
295+
IndexMetadata.INGESTION_SOURCE_WARMUP_TIMEOUT_SETTING,
296+
IndexMetadata.INGESTION_SOURCE_WARMUP_LAG_THRESHOLD_SETTING,
295297

296298
// Settings for search replica
297299
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ private void initializeStreamPoller(
155155
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis())
156156
.mapperType(ingestionSource.getMapperType())
157157
.mapperSettings(ingestionSource.getMapperSettings())
158+
.warmupConfig(ingestionSource.getWarmupConfig())
158159
.build();
159160
registerStreamPollerListener();
160161

@@ -662,4 +663,25 @@ public ShardIngestionState getIngestionState() {
662663
shardPointer != null ? shardPointer.toString() : ""
663664
);
664665
}
666+
667+
/**
668+
* Block until warmup is complete or timeout occurs.
669+
* This method handles all warmup logic internally. On timeout, always logs a warning and proceeds.
670+
*
671+
* @throws InterruptedException if the thread is interrupted while waiting
672+
*/
673+
public void awaitWarmupComplete() throws InterruptedException {
674+
IngestionSource ingestionSource = engineConfig.getIndexSettings().getIndexMetadata().getIngestionSource();
675+
if (ingestionSource == null || !ingestionSource.getWarmupConfig().isEnabled() || streamPoller.isPaused()) {
676+
return;
677+
}
678+
679+
long timeoutMs = ingestionSource.getWarmupConfig().timeout().millis();
680+
boolean completed = streamPoller.awaitWarmupComplete(timeoutMs);
681+
682+
if (!completed) {
683+
logger.warn("Ingestion warmup timed out for shard after {}ms, proceeding with potentially stale data.", timeoutMs);
684+
}
685+
}
686+
665687
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2487,6 +2487,10 @@ public void postRecovery(String reason) throws IndexShardStartedException, Index
24872487
// responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener
24882488
// and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard).
24892489
getIndexer().refresh("post_recovery");
2490+
2491+
// Wait for ingestion warmup if enabled (pull-based ingestion only)
2492+
handlePullBasedIngestionWarmup(getIndexer());
2493+
24902494
synchronized (mutex) {
24912495
if (state == IndexShardState.CLOSED) {
24922496
throw new IndexShardClosedException(shardId);
@@ -2500,6 +2504,29 @@ public void postRecovery(String reason) throws IndexShardStartedException, Index
25002504
}
25012505
}
25022506

2507+
/**
2508+
* Handles warmup for pull-based ingestion (PBI) engines.
2509+
* When warmup is enabled, this method blocks until the shard has caught up with the streaming source
2510+
* or until the configured timeout is reached.
2511+
*
2512+
* @param indexer the indexer to check for warmup
2513+
*/
2514+
private void handlePullBasedIngestionWarmup(Indexer indexer) {
2515+
if (!(indexer instanceof EngineBackedIndexer)) {
2516+
return;
2517+
}
2518+
Engine engine = ((EngineBackedIndexer) indexer).getEngine();
2519+
if (engine instanceof IngestionEngine) {
2520+
IngestionEngine ingestionEngine = (IngestionEngine) engine;
2521+
try {
2522+
ingestionEngine.awaitWarmupComplete();
2523+
} catch (InterruptedException e) {
2524+
Thread.currentThread().interrupt();
2525+
throw new OpenSearchException("Interrupted waiting for ingestion warmup", e);
2526+
}
2527+
}
2528+
}
2529+
25032530
/**
25042531
* called before starting to copy index files over
25052532
*/

0 commit comments

Comments
 (0)