4242import org .elasticsearch .cluster .metadata .ProjectMetadata ;
4343import org .elasticsearch .cluster .metadata .Template ;
4444import org .elasticsearch .cluster .node .DiscoveryNode ;
45- import org .elasticsearch .cluster .project .ProjectResolver ;
4645import org .elasticsearch .cluster .project .TestProjectResolvers ;
4746import org .elasticsearch .cluster .service .ClusterService ;
4847import org .elasticsearch .common .settings .ClusterSettings ;
4948import org .elasticsearch .common .settings .Settings ;
50- import org .elasticsearch .common .streams .StreamType ;
51- import org .elasticsearch .common .streams .StreamsPermissionsUtils ;
5249import org .elasticsearch .common .util .concurrent .AtomicArray ;
5350import org .elasticsearch .common .util .concurrent .EsExecutors ;
5451import org .elasticsearch .common .util .concurrent .ThreadContext ;
6764
6865import java .io .IOException ;
6966import java .util .Arrays ;
70- import java .util .HashMap ;
7167import java .util .List ;
7268import java .util .Map ;
7369import java .util .concurrent .CountDownLatch ;
7773import java .util .function .BiConsumer ;
7874import java .util .function .Supplier ;
7975
80- import static org .hamcrest .CoreMatchers .containsStringIgnoringCase ;
8176import static org .hamcrest .CoreMatchers .equalTo ;
8277import static org .hamcrest .CoreMatchers .instanceOf ;
8378import static org .hamcrest .CoreMatchers .is ;
8479import static org .hamcrest .CoreMatchers .not ;
8580import static org .hamcrest .CoreMatchers .notNullValue ;
8681import static org .mockito .ArgumentMatchers .any ;
87- import static org .mockito .ArgumentMatchers .eq ;
8882import static org .mockito .Mockito .doAnswer ;
8983import static org .mockito .Mockito .doThrow ;
9084import static org .mockito .Mockito .mock ;
@@ -97,14 +91,10 @@ public class BulkOperationTests extends ESTestCase {
9791 private final long millis = randomMillisUpToYear9999 ();
9892 private final String indexName = "my_index" ;
9993 private final String dataStreamName = "my_data_stream" ;
100-
10194 private final String fsDataStreamName = "my_failure_store_data_stream" ;
10295 private final String fsRolloverDataStreamName = "my_failure_store_to_be_rolled_over_data_stream" ;
10396 private final String fsBySettingsDataStreamName = "my_failure_store_enabled_by_setting_data_stream" ;
10497
105- private final String logsStreamDsName = StreamType .LOGS .getStreamName ();
106- private final String logsChildStreamDsName = StreamType .LOGS .getStreamName () + ".child" ;
107-
10898 private final IndexMetadata indexMetadata = IndexMetadata .builder (indexName )
10999 .settings (
110100 Settings .builder ()
@@ -142,20 +132,6 @@ public class BulkOperationTests extends ESTestCase {
142132 .numberOfShards (1 )
143133 .build ();
144134
145- private final IndexMetadata streamBackingIndex = DataStreamTestHelper .createFailureStore (logsStreamDsName , 1 , millis )
146- .numberOfShards (1 )
147- .build ();
148- private final IndexMetadata streamFailureStore = DataStreamTestHelper .createFailureStore (logsStreamDsName , 1 , millis )
149- .numberOfShards (1 )
150- .build ();
151-
152- private final IndexMetadata streamChildBackingIndex = DataStreamTestHelper .createFailureStore (logsChildStreamDsName , 1 , millis )
153- .numberOfShards (1 )
154- .build ();
155- private final IndexMetadata streamChildFailureStore = DataStreamTestHelper .createFailureStore (logsChildStreamDsName , 1 , millis )
156- .numberOfShards (1 )
157- .build ();
158-
159135 private final DataStream dataStream1 = DataStreamTestHelper .newInstance (
160136 dataStreamName ,
161137 List .of (ds1BackingIndex1 .getIndex (), ds1BackingIndex2 .getIndex ())
@@ -178,18 +154,6 @@ public class BulkOperationTests extends ESTestCase {
178154 .setFailureIndices (DataStream .DataStreamIndices .failureIndicesBuilder (List .of (ds4FailureStore1 .getIndex ())).build ())
179155 .build ();
180156
181- private final DataStream logsDataStream = DataStream .builder (logsStreamDsName , List .of (streamChildBackingIndex .getIndex ()))
182- .setGeneration (1 )
183- .setDataStreamOptions (DataStreamOptions .EMPTY )
184- .setFailureIndices (DataStream .DataStreamIndices .failureIndicesBuilder (List .of (streamFailureStore .getIndex ())).build ())
185- .build ();
186-
187- private final DataStream logsChildDataStream = DataStream .builder (logsChildStreamDsName , List .of (streamChildBackingIndex .getIndex ()))
188- .setGeneration (1 )
189- .setDataStreamOptions (DataStreamOptions .EMPTY )
190- .setFailureIndices (DataStream .DataStreamIndices .failureIndicesBuilder (List .of (streamChildFailureStore .getIndex ())).build ())
191- .build ();
192-
193157 private final ProjectId projectId = randomProjectIdOrDefault ();
194158 private final ClusterState clusterState = ClusterState .builder (ClusterName .DEFAULT )
195159 .putProjectMetadata (
@@ -210,23 +174,28 @@ public class BulkOperationTests extends ESTestCase {
210174 .build ()
211175 )
212176 )
213- .indices (new HashMap <>() {
214- {
215- put (indexName , indexMetadata );
216- put (ds1BackingIndex1 .getIndex ().getName (), ds1BackingIndex1 );
217- put (ds1BackingIndex2 .getIndex ().getName (), ds1BackingIndex2 );
218- put (ds2BackingIndex1 .getIndex ().getName (), ds2BackingIndex1 );
219- put (ds2FailureStore1 .getIndex ().getName (), ds2FailureStore1 );
220- put (ds3BackingIndex1 .getIndex ().getName (), ds3BackingIndex1 );
221- put (ds3FailureStore1 .getIndex ().getName (), ds3FailureStore1 );
222- put (ds4BackingIndex1 .getIndex ().getName (), ds4BackingIndex1 );
223- put (ds4FailureStore1 .getIndex ().getName (), ds4FailureStore1 );
224- put (streamBackingIndex .getIndex ().getName (), streamBackingIndex );
225- put (streamFailureStore .getIndex ().getName (), streamFailureStore );
226- put (streamChildBackingIndex .getIndex ().getName (), streamChildBackingIndex );
227- put (streamChildFailureStore .getIndex ().getName (), streamChildFailureStore );
228- }
229- })
177+ .indices (
178+ Map .of (
179+ indexName ,
180+ indexMetadata ,
181+ ds1BackingIndex1 .getIndex ().getName (),
182+ ds1BackingIndex1 ,
183+ ds1BackingIndex2 .getIndex ().getName (),
184+ ds1BackingIndex2 ,
185+ ds2BackingIndex1 .getIndex ().getName (),
186+ ds2BackingIndex1 ,
187+ ds2FailureStore1 .getIndex ().getName (),
188+ ds2FailureStore1 ,
189+ ds3BackingIndex1 .getIndex ().getName (),
190+ ds3BackingIndex1 ,
191+ ds3FailureStore1 .getIndex ().getName (),
192+ ds3FailureStore1 ,
193+ ds4BackingIndex1 .getIndex ().getName (),
194+ ds4BackingIndex1 ,
195+ ds4FailureStore1 .getIndex ().getName (),
196+ ds4FailureStore1
197+ )
198+ )
230199 .dataStreams (
231200 Map .of (
232201 dataStreamName ,
@@ -236,11 +205,7 @@ public class BulkOperationTests extends ESTestCase {
236205 fsRolloverDataStreamName ,
237206 dataStream3 ,
238207 fsBySettingsDataStreamName ,
239- dataStream4 ,
240- logsStreamDsName ,
241- logsDataStream ,
242- logsChildStreamDsName ,
243- logsChildDataStream
208+ dataStream4
244209 ),
245210 Map .of ()
246211 )
@@ -250,18 +215,10 @@ public class BulkOperationTests extends ESTestCase {
250215
251216 private TestThreadPool threadPool ;
252217
253- private StreamsPermissionsUtils streamsPermissionsUtilsMock ;
254- private ProjectResolver projectResolverMock ;
255- private IndexNameExpressionResolver indexNameExpressionResolverMock ;
256-
257218 @ Before
258- public void setupTest () {
219+ public void setupThreadpool () {
259220 threadPool = new TestThreadPool (getClass ().getName ());
260221 threadPool .getThreadContext ().putHeader (Task .X_ELASTIC_PROJECT_ID_HTTP_HEADER , projectId .id ());
261- streamsPermissionsUtilsMock = mock (StreamsPermissionsUtils .class );
262- when (streamsPermissionsUtilsMock .streamTypeIsEnabled (any (), any ())).thenReturn (false );
263- projectResolverMock = mock (ProjectResolver .class );
264- indexNameExpressionResolverMock = mock (IndexNameExpressionResolver .class );
265222 }
266223
267224 @ After
@@ -731,8 +688,8 @@ public void testRetryableBlockAcceptsFailureStoreDocument() throws Exception {
731688 }
732689
733690 /**
734- * A bulk operation to a data stream with a failure store enabled may still partially fail if the redirected documents experience
735- * a shard-level failure while writing to the failure store indices .
691+ * A bulk operation to a data stream with a failure store enabled may still partially fail if the cluster is experiencing a
692+ * non-retryable block when the redirected documents would be sent to the shard-level action .
736693 */
737694 public void testBlockedClusterRejectsFailureStoreDocument () throws Exception {
738695 // Requests that go to two separate shards
@@ -989,83 +946,6 @@ public void testFailureWhileRollingOverFailureStore() throws Exception {
989946 assertThat (failedItem .getFailureStoreStatus (), equalTo (IndexDocFailureStoreStatus .FAILED ));
990947 }
991948
992- public void testIndexWriteSucceededWhenStreamsEnabled () throws Exception {
993- // Requests that go to two separate shards
994- BulkRequest bulkRequest = new BulkRequest ();
995- bulkRequest .add (new IndexRequest (indexName ).id ("1" ).source (Map .of ("key" , "val" )));
996-
997- NodeClient client = getNodeClient (acceptAllShardWrites ());
998-
999- when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (true );
1000-
1001- BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1002- assertThat (bulkItemResponses .hasFailures (), is (false ));
1003- }
1004-
1005- public void testLogsDatastreamWriteSucceededWhenStreamsEnabled () throws Exception {
1006- // Requests that go to two separate shards
1007- BulkRequest bulkRequest = new BulkRequest ();
1008- bulkRequest .add (new IndexRequest (logsStreamDsName ).id ("1" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1009- bulkRequest .add (new IndexRequest (logsStreamDsName ).id ("3" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1010-
1011- NodeClient client = getNodeClient (acceptAllShardWrites ());
1012-
1013- when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (true );
1014-
1015- BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1016- assertThat (bulkItemResponses .hasFailures (), is (false ));
1017- }
1018-
1019- public void testLogsDatastreamWriteSucceededWhenStreamsDisabled () throws Exception {
1020- // Requests that go to two separate shards
1021- BulkRequest bulkRequest = new BulkRequest ();
1022- bulkRequest .add (new IndexRequest (logsStreamDsName ).id ("1" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1023- bulkRequest .add (new IndexRequest (logsStreamDsName ).id ("3" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1024-
1025- NodeClient client = getNodeClient (acceptAllShardWrites ());
1026-
1027- when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (false );
1028-
1029- BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1030- assertThat (bulkItemResponses .hasFailures (), is (false ));
1031- }
1032-
1033- public void testLogsChildDatastreamWriteRejectedWhenStreamsEnabled () throws Exception {
1034- // Requests that go to two separate shards
1035- BulkRequest bulkRequest = new BulkRequest ();
1036- bulkRequest .add (new IndexRequest (logsChildStreamDsName ).id ("1" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1037- bulkRequest .add (new IndexRequest (logsChildStreamDsName ).id ("3" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1038-
1039- NodeClient client = getNodeClient (acceptAllShardWrites ());
1040-
1041- when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (true );
1042-
1043- BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1044- assertThat (bulkItemResponses .hasFailures (), is (true ));
1045-
1046- for (int i = 0 ; i < bulkItemResponses .getItems ().length ; i ++) {
1047- assertThat (bulkItemResponses .getItems ()[i ].getFailure ().getCause (), instanceOf (IllegalArgumentException .class ));
1048- assertThat (
1049- bulkItemResponses .getItems ()[i ].getFailure ().getCause ().getMessage (),
1050- is (containsStringIgnoringCase ("Writes to child stream [" + logsChildStreamDsName + "] are not allowed" ))
1051- );
1052- }
1053- }
1054-
1055- public void testLogsChildDatastreamWriteSucceededWhenStreamsDisabled () throws Exception {
1056- // Requests that go to two separate shards
1057- BulkRequest bulkRequest = new BulkRequest ();
1058- bulkRequest .add (new IndexRequest (logsChildStreamDsName ).id ("1" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1059- bulkRequest .add (new IndexRequest (logsChildStreamDsName ).id ("3" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1060-
1061- NodeClient client = getNodeClient (acceptAllShardWrites ());
1062-
1063- when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (false );
1064-
1065- BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1066- assertThat (bulkItemResponses .hasFailures (), is (false ));
1067- }
1068-
1069949 /**
1070950 * Throws an assertion error with the given message if the client operation executes
1071951 */
@@ -1359,8 +1239,7 @@ private BulkOperation newBulkOperation(
13591239 failureStoreDocumentConverter ,
13601240 FailureStoreMetrics .NOOP ,
13611241 dataStreamFailureStoreSettings ,
1362- failureStoreNodeFeatureEnabled ,
1363- streamsPermissionsUtilsMock
1242+ failureStoreNodeFeatureEnabled
13641243 );
13651244 }
13661245
0 commit comments