4747import org .elasticsearch .cluster .service .ClusterService ;
4848import org .elasticsearch .common .settings .ClusterSettings ;
4949import org .elasticsearch .common .settings .Settings ;
50+ import org .elasticsearch .common .streams .StreamType ;
5051import org .elasticsearch .common .streams .StreamsPermissionsUtils ;
5152import org .elasticsearch .common .util .concurrent .AtomicArray ;
5253import org .elasticsearch .common .util .concurrent .EsExecutors ;
6667
6768import java .io .IOException ;
6869import java .util .Arrays ;
70+ import java .util .HashMap ;
6971import java .util .List ;
7072import java .util .Map ;
7173import java .util .concurrent .CountDownLatch ;
7577import java .util .function .BiConsumer ;
7678import java .util .function .Supplier ;
7779
80+ import static org .hamcrest .CoreMatchers .containsStringIgnoringCase ;
7881import static org .hamcrest .CoreMatchers .equalTo ;
7982import static org .hamcrest .CoreMatchers .instanceOf ;
8083import static org .hamcrest .CoreMatchers .is ;
8184import static org .hamcrest .CoreMatchers .not ;
8285import static org .hamcrest .CoreMatchers .notNullValue ;
8386import static org .mockito .ArgumentMatchers .any ;
87+ import static org .mockito .ArgumentMatchers .eq ;
8488import static org .mockito .Mockito .doAnswer ;
8589import static org .mockito .Mockito .doThrow ;
8690import static org .mockito .Mockito .mock ;
@@ -93,10 +97,14 @@ public class BulkOperationTests extends ESTestCase {
9397 private final long millis = randomMillisUpToYear9999 ();
9498 private final String indexName = "my_index" ;
9599 private final String dataStreamName = "my_data_stream" ;
100+
96101 private final String fsDataStreamName = "my_failure_store_data_stream" ;
97102 private final String fsRolloverDataStreamName = "my_failure_store_to_be_rolled_over_data_stream" ;
98103 private final String fsBySettingsDataStreamName = "my_failure_store_enabled_by_setting_data_stream" ;
99104
105+ private final String logsStreamDsName = StreamType .LOGS .getStreamName ();
106+ private final String logsChildStreamDsName = StreamType .LOGS .getStreamName () + ".child" ;
107+
100108 private final IndexMetadata indexMetadata = IndexMetadata .builder (indexName )
101109 .settings (
102110 Settings .builder ()
@@ -134,6 +142,20 @@ public class BulkOperationTests extends ESTestCase {
134142 .numberOfShards (1 )
135143 .build ();
136144
145+ private final IndexMetadata streamBackingIndex = DataStreamTestHelper .createFailureStore (logsStreamDsName , 1 , millis )
146+ .numberOfShards (1 )
147+ .build ();
148+ private final IndexMetadata streamFailureStore = DataStreamTestHelper .createFailureStore (logsStreamDsName , 1 , millis )
149+ .numberOfShards (1 )
150+ .build ();
151+
152+ private final IndexMetadata streamChildBackingIndex = DataStreamTestHelper .createFailureStore (logsChildStreamDsName , 1 , millis )
153+ .numberOfShards (1 )
154+ .build ();
155+ private final IndexMetadata streamChildFailureStore = DataStreamTestHelper .createFailureStore (logsChildStreamDsName , 1 , millis )
156+ .numberOfShards (1 )
157+ .build ();
158+
137159 private final DataStream dataStream1 = DataStreamTestHelper .newInstance (
138160 dataStreamName ,
139161 List .of (ds1BackingIndex1 .getIndex (), ds1BackingIndex2 .getIndex ())
@@ -156,6 +178,18 @@ public class BulkOperationTests extends ESTestCase {
156178 .setFailureIndices (DataStream .DataStreamIndices .failureIndicesBuilder (List .of (ds4FailureStore1 .getIndex ())).build ())
157179 .build ();
158180
181+ private final DataStream logsDataStream = DataStream .builder (logsStreamDsName , List .of (streamChildBackingIndex .getIndex ()))
182+ .setGeneration (1 )
183+ .setDataStreamOptions (DataStreamOptions .EMPTY )
184+ .setFailureIndices (DataStream .DataStreamIndices .failureIndicesBuilder (List .of (streamFailureStore .getIndex ())).build ())
185+ .build ();
186+
187+ private final DataStream logsChildDataStream = DataStream .builder (logsChildStreamDsName , List .of (streamChildBackingIndex .getIndex ()))
188+ .setGeneration (1 )
189+ .setDataStreamOptions (DataStreamOptions .EMPTY )
190+ .setFailureIndices (DataStream .DataStreamIndices .failureIndicesBuilder (List .of (streamChildFailureStore .getIndex ())).build ())
191+ .build ();
192+
159193 private final ProjectId projectId = randomProjectIdOrDefault ();
160194 private final ClusterState clusterState = ClusterState .builder (ClusterName .DEFAULT )
161195 .putProjectMetadata (
@@ -176,28 +210,23 @@ public class BulkOperationTests extends ESTestCase {
176210 .build ()
177211 )
178212 )
179- .indices (
180- Map .of (
181- indexName ,
182- indexMetadata ,
183- ds1BackingIndex1 .getIndex ().getName (),
184- ds1BackingIndex1 ,
185- ds1BackingIndex2 .getIndex ().getName (),
186- ds1BackingIndex2 ,
187- ds2BackingIndex1 .getIndex ().getName (),
188- ds2BackingIndex1 ,
189- ds2FailureStore1 .getIndex ().getName (),
190- ds2FailureStore1 ,
191- ds3BackingIndex1 .getIndex ().getName (),
192- ds3BackingIndex1 ,
193- ds3FailureStore1 .getIndex ().getName (),
194- ds3FailureStore1 ,
195- ds4BackingIndex1 .getIndex ().getName (),
196- ds4BackingIndex1 ,
197- ds4FailureStore1 .getIndex ().getName (),
198- ds4FailureStore1
199- )
200- )
213+ .indices (new HashMap <>() {
214+ {
215+ put (indexName , indexMetadata );
216+ put (ds1BackingIndex1 .getIndex ().getName (), ds1BackingIndex1 );
217+ put (ds1BackingIndex2 .getIndex ().getName (), ds1BackingIndex2 );
218+ put (ds2BackingIndex1 .getIndex ().getName (), ds2BackingIndex1 );
219+ put (ds2FailureStore1 .getIndex ().getName (), ds2FailureStore1 );
220+ put (ds3BackingIndex1 .getIndex ().getName (), ds3BackingIndex1 );
221+ put (ds3FailureStore1 .getIndex ().getName (), ds3FailureStore1 );
222+ put (ds4BackingIndex1 .getIndex ().getName (), ds4BackingIndex1 );
223+ put (ds4FailureStore1 .getIndex ().getName (), ds4FailureStore1 );
224+ put (streamBackingIndex .getIndex ().getName (), streamBackingIndex );
225+ put (streamFailureStore .getIndex ().getName (), streamFailureStore );
226+ put (streamChildBackingIndex .getIndex ().getName (), streamChildBackingIndex );
227+ put (streamChildFailureStore .getIndex ().getName (), streamChildFailureStore );
228+ }
229+ })
201230 .dataStreams (
202231 Map .of (
203232 dataStreamName ,
@@ -207,7 +236,11 @@ public class BulkOperationTests extends ESTestCase {
207236 fsRolloverDataStreamName ,
208237 dataStream3 ,
209238 fsBySettingsDataStreamName ,
210- dataStream4
239+ dataStream4 ,
240+ logsStreamDsName ,
241+ logsDataStream ,
242+ logsChildStreamDsName ,
243+ logsChildDataStream
211244 ),
212245 Map .of ()
213246 )
@@ -956,6 +989,83 @@ public void testFailureWhileRollingOverFailureStore() throws Exception {
956989 assertThat (failedItem .getFailureStoreStatus (), equalTo (IndexDocFailureStoreStatus .FAILED ));
957990 }
958991
992+ public void testIndexWriteSucceededWhenStreamsEnabled () throws Exception {
993+ // Requests that go to two separate shards
994+ BulkRequest bulkRequest = new BulkRequest ();
995+ bulkRequest .add (new IndexRequest (indexName ).id ("1" ).source (Map .of ("key" , "val" )));
996+
997+ NodeClient client = getNodeClient (acceptAllShardWrites ());
998+
999+ when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (true );
1000+
1001+ BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1002+ assertThat (bulkItemResponses .hasFailures (), is (false ));
1003+ }
1004+
1005+ public void testLogsDatastreamWriteSucceededWhenStreamsEnabled () throws Exception {
1006+ // Requests that go to two separate shards
1007+ BulkRequest bulkRequest = new BulkRequest ();
1008+ bulkRequest .add (new IndexRequest (logsStreamDsName ).id ("1" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1009+ bulkRequest .add (new IndexRequest (logsStreamDsName ).id ("3" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1010+
1011+ NodeClient client = getNodeClient (acceptAllShardWrites ());
1012+
1013+ when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (true );
1014+
1015+ BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1016+ assertThat (bulkItemResponses .hasFailures (), is (false ));
1017+ }
1018+
1019+ public void testLogsDatastreamWriteSucceededWhenStreamsDisabled () throws Exception {
1020+ // Requests that go to two separate shards
1021+ BulkRequest bulkRequest = new BulkRequest ();
1022+ bulkRequest .add (new IndexRequest (logsStreamDsName ).id ("1" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1023+ bulkRequest .add (new IndexRequest (logsStreamDsName ).id ("3" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1024+
1025+ NodeClient client = getNodeClient (acceptAllShardWrites ());
1026+
1027+ when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (false );
1028+
1029+ BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1030+ assertThat (bulkItemResponses .hasFailures (), is (false ));
1031+ }
1032+
1033+ public void testLogsChildDatastreamWriteRejectedWhenStreamsEnabled () throws Exception {
1034+ // Requests that go to two separate shards
1035+ BulkRequest bulkRequest = new BulkRequest ();
1036+ bulkRequest .add (new IndexRequest (logsChildStreamDsName ).id ("1" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1037+ bulkRequest .add (new IndexRequest (logsChildStreamDsName ).id ("3" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1038+
1039+ NodeClient client = getNodeClient (acceptAllShardWrites ());
1040+
1041+ when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (true );
1042+
1043+ BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1044+ assertThat (bulkItemResponses .hasFailures (), is (true ));
1045+
1046+ for (int i = 0 ; i < bulkItemResponses .getItems ().length ; i ++) {
1047+ assertThat (bulkItemResponses .getItems ()[i ].getFailure ().getCause (), instanceOf (IllegalArgumentException .class ));
1048+ assertThat (
1049+ bulkItemResponses .getItems ()[i ].getFailure ().getCause ().getMessage (),
1050+ is (containsStringIgnoringCase ("Writes to child stream [" + logsChildStreamDsName + "] are not allowed" ))
1051+ );
1052+ }
1053+ }
1054+
1055+ public void testLogsChildDatastreamWriteSucceededWhenStreamsDisabled () throws Exception {
1056+ // Requests that go to two separate shards
1057+ BulkRequest bulkRequest = new BulkRequest ();
1058+ bulkRequest .add (new IndexRequest (logsChildStreamDsName ).id ("1" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1059+ bulkRequest .add (new IndexRequest (logsChildStreamDsName ).id ("3" ).source (Map .of ("key" , "val" )).opType (DocWriteRequest .OpType .CREATE ));
1060+
1061+ NodeClient client = getNodeClient (acceptAllShardWrites ());
1062+
1063+ when (streamsPermissionsUtilsMock .streamTypeIsEnabled (eq (StreamType .LOGS ), any ())).thenReturn (false );
1064+
1065+ BulkResponse bulkItemResponses = safeAwait (l -> newBulkOperation (client , bulkRequest , l ).run ());
1066+ assertThat (bulkItemResponses .hasFailures (), is (false ));
1067+ }
1068+
9591069 /**
9601070 * Throws an assertion error with the given message if the client operation executes
9611071 */
0 commit comments