Skip to content

Commit ad4c7f7

Browse files
update ingestion state on all-active shards on receiving cluster state update (opensearch-project#19407)
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 7abef8c commit ad4c7f7

File tree

3 files changed

+72
-5
lines changed

3 files changed

+72
-5
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.action.admin.indices.stats.IndexStats;
1717
import org.opensearch.action.admin.indices.stats.ShardStats;
1818
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
19+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
1920
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
2021
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
2122
import org.opensearch.action.search.SearchResponse;
@@ -254,8 +255,7 @@ public void testMultiThreadedWrites() throws Exception {
254255
}
255256

256257
public void testAllActiveIngestion() throws Exception {
257-
// Create pull-based index in default replication mode (docrep) and publish some messages
258-
258+
// Create all-active pull-based index
259259
internalCluster().startClusterManagerOnlyNode();
260260
final String nodeA = internalCluster().startDataOnlyNode();
261261
for (int i = 0; i < 10; i++) {
@@ -374,7 +374,7 @@ public void testAllActiveIngestion() throws Exception {
374374
}
375375

376376
public void testReplicaPromotionOnAllActiveIngestion() throws Exception {
377-
// Create pull-based index in default replication mode (docrep) and publish some messages
377+
// Create all-active pull-based index
378378
internalCluster().startClusterManagerOnlyNode();
379379
final String nodeA = internalCluster().startDataOnlyNode();
380380
for (int i = 0; i < 10; i++) {
@@ -425,7 +425,7 @@ public void testReplicaPromotionOnAllActiveIngestion() throws Exception {
425425
}
426426

427427
public void testSnapshotRestoreOnAllActiveIngestion() throws Exception {
428-
// Create pull-based index in default replication mode (docrep) and publish some messages
428+
// Create all-active pull-based index
429429
internalCluster().startClusterManagerOnlyNode();
430430
final String nodeA = internalCluster().startDataOnlyNode();
431431
final String nodeB = internalCluster().startDataOnlyNode();
@@ -505,6 +505,68 @@ public void testSnapshotRestoreOnAllActiveIngestion() throws Exception {
505505
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
506506
}
507507

508+
public void testResetPollerInAllActiveIngestion() throws Exception {
509+
// Create all-active pull-based index
510+
internalCluster().startClusterManagerOnlyNode();
511+
final String nodeA = internalCluster().startDataOnlyNode();
512+
final String nodeB = internalCluster().startDataOnlyNode();
513+
for (int i = 0; i < 10; i++) {
514+
produceData(Integer.toString(i), "name" + i, "30");
515+
}
516+
517+
createIndex(
518+
indexName,
519+
Settings.builder()
520+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
521+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
522+
.put("ingestion_source.type", "kafka")
523+
.put("ingestion_source.param.topic", topicName)
524+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
525+
.put("ingestion_source.pointer.init.reset", "earliest")
526+
.put("ingestion_source.all_active", true)
527+
.build(),
528+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
529+
);
530+
531+
ensureGreen(indexName);
532+
waitForSearchableDocs(10, List.of(nodeA, nodeB));
533+
534+
// pause ingestion
535+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
536+
assertTrue(pauseResponse.isAcknowledged());
537+
assertTrue(pauseResponse.isShardsAcknowledged());
538+
waitForState(() -> {
539+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
540+
return ingestionState.getShardStates().length == 2
541+
&& ingestionState.getFailedShards() == 0
542+
&& Arrays.stream(ingestionState.getShardStates())
543+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
544+
});
545+
546+
// reset to offset=2 and resume ingestion
547+
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "2");
548+
assertTrue(resumeResponse.isAcknowledged());
549+
assertTrue(resumeResponse.isShardsAcknowledged());
550+
waitForState(() -> {
551+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
552+
return ingestionState.getShardStates().length == 2
553+
&& Arrays.stream(ingestionState.getShardStates())
554+
.allMatch(
555+
state -> state.isPollerPaused() == false
556+
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
557+
);
558+
});
559+
560+
// validate there are 8 duplicate messages encountered after reset
561+
waitForState(() -> {
562+
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
563+
assertNotNull(shardTypeToStats.get("primary"));
564+
assertNotNull(shardTypeToStats.get("replica"));
565+
return shardTypeToStats.get("primary").getConsumerStats().totalDuplicateMessageSkippedCount() == 8
566+
&& shardTypeToStats.get("replica").getConsumerStats().totalDuplicateMessageSkippedCount() == 8;
567+
});
568+
}
569+
508570
// returns PollingIngestStats for single primary and single replica
509571
private Map<String, PollingIngestStats> getPollingIngestStatsForPrimaryAndReplica(String indexName) {
510572
IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,10 @@ public boolean useIngestionSource() {
12231223
return ingestionSourceType != null && !(NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType));
12241224
}
12251225

1226+
public boolean isAllActiveIngestionEnabled() {
1227+
return INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings);
1228+
}
1229+
12261230
public IngestionStatus getIngestionStatus() {
12271231
return ingestionStatus;
12281232
}

server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,8 @@ private void updateShard(
782782
*/
783783
private void updateShardIngestionState(Shard shard, IndexMetadata indexMetadata, ShardRouting shardRouting) {
784784
try {
785-
if (indexMetadata.useIngestionSource() && shardRouting.primary()) {
785+
boolean isPrimaryOrAllActiveShard = shardRouting.primary() || indexMetadata.isAllActiveIngestionEnabled();
786+
if (indexMetadata.useIngestionSource() && isPrimaryOrAllActiveShard) {
786787
shard.updateShardIngestionState(indexMetadata);
787788
}
788789
} catch (Exception e) {

0 commit comments

Comments
 (0)