@@ -441,6 +441,45 @@ public void testFailingDocumentRedirectsToFailureStore() throws Exception {
441441 assertThat (failedItem .getFailureStoreStatus (), equalTo (IndexDocFailureStoreStatus .USED ));
442442 }
443443
444+ /**
445+ * A bulk operation to a data stream with a failure store enabled should NOT redirect any documents that fail at a shard level to the
446+ * failure store if the failure store node feature is not on every node in the cluster
447+ */
448+ public void testFailingDocumentIgnoredByFailureStoreWhenFeatureIsDisabled () throws Exception {
449+ Assume .assumeTrue (DataStream .isFailureStoreFeatureFlagEnabled ());
450+
451+ // Requests that go to two separate shards
452+ BulkRequest bulkRequest = new BulkRequest ();
453+ bulkRequest .add (new IndexRequest (fsDataStreamName ).id ("1" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
454+ bulkRequest .add (new IndexRequest (fsDataStreamName ).id ("3" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
455+
456+ NodeClient client = getNodeClient (
457+ thatFailsDocuments (Map .of (new IndexAndId (ds2BackingIndex1 .getIndex ().getName (), "3" ), () -> new MapperException ("test" )))
458+ );
459+
460+ BulkResponse bulkItemResponses = safeAwait (
461+ l -> newBulkOperation (
462+ clusterState ,
463+ client ,
464+ bulkRequest ,
465+ new AtomicArray <>(bulkRequest .numberOfActions ()),
466+ mockObserver (clusterState ),
467+ l ,
468+ new FailureStoreDocumentConverter (),
469+ DataStreamFailureStoreSettings .create (ClusterSettings .createBuiltInClusterSettings ()),
470+ false
471+ ).run ()
472+ );
473+ assertThat (bulkItemResponses .hasFailures (), is (true ));
474+ BulkItemResponse failedItem = Arrays .stream (bulkItemResponses .getItems ())
475+ .filter (BulkItemResponse ::isFailed )
476+ .findFirst ()
477+ .orElseThrow (() -> new AssertionError ("Could not find redirected item" ));
478+ assertThat (failedItem .getFailure ().getCause (), is (instanceOf (MapperException .class )));
479+ assertThat (failedItem .getFailure ().getCause ().getMessage (), is (equalTo ("test" )));
480+ assertThat (failedItem .getFailureStoreStatus (), equalTo (IndexDocFailureStoreStatus .NOT_APPLICABLE_OR_UNKNOWN ));
481+ }
482+
444483 public void testFailingDocumentRedirectsToFailureStoreWhenEnabledByClusterSetting () {
445484 Assume .assumeTrue (DataStream .isFailureStoreFeatureFlagEnabled ());
446485
@@ -482,7 +521,8 @@ public void testFailingDocumentRedirectsToFailureStoreWhenEnabledByClusterSettin
482521 mockObserver (clusterState ),
483522 l ,
484523 new FailureStoreDocumentConverter (),
485- dataStreamFailureStoreSettings
524+ dataStreamFailureStoreSettings ,
525+ true
486526 ).run ()
487527 );
488528 assertThat (bulkItemResponsesUsingClusterSetting .hasFailures (), is (false ));
@@ -1175,7 +1215,8 @@ private BulkOperation newBulkOperation(
11751215 observer ,
11761216 listener ,
11771217 failureStoreDocumentConverter ,
1178- DataStreamFailureStoreSettings .create (ClusterSettings .createBuiltInClusterSettings ())
1218+ DataStreamFailureStoreSettings .create (ClusterSettings .createBuiltInClusterSettings ()),
1219+ true
11791220 );
11801221 }
11811222
@@ -1187,7 +1228,8 @@ private BulkOperation newBulkOperation(
11871228 ClusterStateObserver observer ,
11881229 ActionListener <BulkResponse > listener ,
11891230 FailureStoreDocumentConverter failureStoreDocumentConverter ,
1190- DataStreamFailureStoreSettings dataStreamFailureStoreSettings
1231+ DataStreamFailureStoreSettings dataStreamFailureStoreSettings ,
1232+ boolean failureStoreNodeFeatureEnabled
11911233 ) {
11921234 // Time provision
11931235 long timeZero = TimeUnit .MILLISECONDS .toNanos (randomMillisUpToYear9999 () - TimeUnit .DAYS .toMillis (1 ));
@@ -1222,7 +1264,7 @@ private BulkOperation newBulkOperation(
12221264 failureStoreDocumentConverter ,
12231265 FailureStoreMetrics .NOOP ,
12241266 dataStreamFailureStoreSettings ,
1225- true
1267+ failureStoreNodeFeatureEnabled
12261268 );
12271269 }
12281270
0 commit comments