5555import java .util .concurrent .ExecutionException ;
5656
5757import static org .elasticsearch .cluster .metadata .DataStreamTestHelper .backingIndexEqualTo ;
58+ import static org .elasticsearch .cluster .metadata .DataStreamTestHelper .dataStreamIndexEqualTo ;
5859import static org .elasticsearch .cluster .metadata .MetadataIndexTemplateService .DEFAULT_TIMESTAMP_FIELD ;
5960import static org .elasticsearch .xpack .security .support .SecuritySystemIndices .SECURITY_MAIN_ALIAS ;
6061import static org .hamcrest .Matchers .allOf ;
61- import static org .hamcrest .Matchers .anEmptyMap ;
6262import static org .hamcrest .Matchers .anyOf ;
6363import static org .hamcrest .Matchers .containsString ;
6464import static org .hamcrest .Matchers .equalTo ;
@@ -95,31 +95,35 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
9595
9696 public void testRolloverLifecycleAndForceMergeAuthorized () throws Exception {
9797 String dataStreamName = randomDataStreamName ();
98- // empty lifecycle contains the default rollover
99- prepareDataStreamAndIndex (dataStreamName , DataStreamLifecycle . Template . DATA_DEFAULT );
98+ // with failure store and empty lifecycle contains the default rollover
99+ prepareDataStreamAndIndex (dataStreamName , null );
100100
101- assertBusy (() -> {
102- assertNoAuthzErrors ();
103- List <Index > backingIndices = getDataStreamBackingIndices (dataStreamName );
104- assertThat (backingIndices .size (), equalTo (2 ));
105- String backingIndex = backingIndices .get (0 ).getName ();
106- assertThat (backingIndex , backingIndexEqualTo (dataStreamName , 1 ));
107- String writeIndex = backingIndices .get (1 ).getName ();
108- assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
109- });
101+ List <String > backingIndices = waitForDataStreamBackingIndices (dataStreamName , 2 );
102+ String backingIndex = backingIndices .get (0 );
103+ assertThat (backingIndex , backingIndexEqualTo (dataStreamName , 1 ));
104+ String writeIndex = backingIndices .get (1 );
105+ assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
106+
107+ // initialise the failure store
108+ indexFailedDoc (dataStreamName );
109+ List <String > failureIndices = waitForDataStreamIndices (dataStreamName , 2 , true );
110+ String firstFailureIndex = failureIndices .get (0 );
111+ assertThat (firstFailureIndex , dataStreamIndexEqualTo (dataStreamName , 3 , true ));
112+ String secondFailureIndexGen = failureIndices .get (1 );
113+ assertThat (secondFailureIndexGen , dataStreamIndexEqualTo (dataStreamName , 4 , true ));
114+
115+ assertNoAuthzErrors ();
110116 // Index another doc to force another rollover and trigger an attempted force-merge. The force-merge may be a noop under
111117 // the hood but for authz purposes this doesn't matter, it only matters that the force-merge API was called
112118 indexDoc (dataStreamName );
113- assertBusy (() -> {
114- assertNoAuthzErrors ();
115- List <Index > backingIndices = getDataStreamBackingIndices (dataStreamName );
116- assertThat (backingIndices .size (), equalTo (3 ));
117- });
119+
120+ waitForDataStreamBackingIndices (dataStreamName , 3 );
121+ assertNoAuthzErrors ();
118122 }
119123
120124 public void testRolloverAndRetentionAuthorized () throws Exception {
121125 String dataStreamName = randomDataStreamName ();
122- prepareDataStreamAndIndex (dataStreamName , DataStreamLifecycle . builder (). dataRetention ( TimeValue .ZERO ). buildTemplate () );
126+ prepareDataStreamAndIndex (dataStreamName , TimeValue .ZERO );
123127
124128 assertBusy (() -> {
125129 assertNoAuthzErrors ();
@@ -130,16 +134,33 @@ public void testRolloverAndRetentionAuthorized() throws Exception {
130134 String writeIndex = backingIndices .get (0 ).getName ();
131135 assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
132136 });
137+
138+ // test failure store too, we index the failure later to have predictable generation suffixes
139+ indexFailedDoc (dataStreamName );
140+ assertBusy (() -> {
141+ assertNoAuthzErrors ();
142+ List <String > failureIndices = getDataStreamBackingIndexNames (dataStreamName , true );
143+ assertThat (failureIndices .size (), equalTo (1 ));
144+ // we expect the data stream to have only one failure index, with generation 4
145+ // as generation 3 would've been deleted by the data stream lifecycle given the lifecycle configuration
146+ String writeIndex = failureIndices .get (0 );
147+ assertThat (writeIndex , dataStreamIndexEqualTo (dataStreamName , 4 , true ));
148+ });
133149 }
134150
135151 public void testUnauthorized () throws Exception {
136152 // this is an example index pattern for a system index that the data stream lifecycle does not have access for. Data stream
137153 // lifecycle will therefore fail at runtime with an authz exception
138- prepareDataStreamAndIndex (SECURITY_MAIN_ALIAS , DataStreamLifecycle .Template .DATA_DEFAULT );
154+ prepareDataStreamAndIndex (SECURITY_MAIN_ALIAS , null );
155+ indexFailedDoc (SECURITY_MAIN_ALIAS );
139156
140157 assertBusy (() -> {
141158 Map <String , String > indicesAndErrors = collectErrorsFromStoreAsMap ();
142- assertThat (indicesAndErrors , is (not (anEmptyMap ())));
159+ // Both the backing and failures indices should have errors
160+ assertThat (indicesAndErrors .size (), is (2 ));
161+ for (String index : indicesAndErrors .keySet ()) {
162+ assertThat (index , anyOf (containsString (DataStream .BACKING_INDEX_PREFIX ), containsString (DataStream .FAILURE_STORE_PREFIX )));
163+ }
143164 assertThat (
144165 indicesAndErrors .values (),
145166 hasItem (allOf (containsString ("security_exception" ), containsString ("unauthorized for user [_data_stream_lifecycle]" )))
@@ -160,6 +181,18 @@ public void testRolloverAndRetentionWithSystemDataStreamAuthorized() throws Exce
160181 String writeIndex = backingIndices .get (0 ).getName ();
161182 assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
162183 });
184+
185+ // test failure store too, we index the failure later to have predictable generation suffixes
186+ indexFailedDoc (dataStreamName );
187+ assertBusy (() -> {
188+ assertNoAuthzErrors ();
189+ List <String > failureIndices = getDataStreamBackingIndexNames (dataStreamName , true );
190+ assertThat (failureIndices .size (), equalTo (1 ));
191+ // we expect the data stream to have only one backing index, the write one, with generation 2
192+ // as generation 1 would've been deleted by the data stream lifecycle given the lifecycle configuration
193+ String writeIndex = failureIndices .get (0 );
194+ assertThat (writeIndex , dataStreamIndexEqualTo (dataStreamName , 4 , true ));
195+ });
163196 }
164197
165198 private static String randomDataStreamName () {
@@ -183,9 +216,22 @@ private Map<String, String> collectErrorsFromStoreAsMap() {
183216 return indicesAndErrors ;
184217 }
185218
186- private void prepareDataStreamAndIndex (String dataStreamName , DataStreamLifecycle .Template lifecycle ) throws IOException ,
187- InterruptedException , ExecutionException {
188- putComposableIndexTemplate ("id1" , null , List .of (dataStreamName + "*" ), null , null , lifecycle );
219+ private void prepareDataStreamAndIndex (String dataStreamName , TimeValue retention ) throws IOException , InterruptedException ,
220+ ExecutionException {
221+ var dataLifecycle = retention == null
222+ ? DataStreamLifecycle .Template .DATA_DEFAULT
223+ : new DataStreamLifecycle .Template (true , retention , null );
224+ putComposableIndexTemplate ("id1" , """
225+ {
226+ "properties": {
227+ "@timestamp" : {
228+ "type": "date"
229+ },
230+ "count": {
231+ "type": "long"
232+ }
233+ }
234+ }""" , List .of (dataStreamName + "*" ), null , null , dataLifecycle );
189235 CreateDataStreamAction .Request createDataStreamRequest = new CreateDataStreamAction .Request (
190236 TEST_REQUEST_TIMEOUT ,
191237 TEST_REQUEST_TIMEOUT ,
@@ -224,7 +270,7 @@ private static void putComposableIndexTemplate(
224270 List <String > patterns ,
225271 @ Nullable Settings settings ,
226272 @ Nullable Map <String , Object > metadata ,
227- @ Nullable DataStreamLifecycle .Template lifecycle
273+ @ Nullable DataStreamLifecycle .Template dataLifecycle
228274 ) throws IOException {
229275 TransportPutComposableIndexTemplateAction .Request request = new TransportPutComposableIndexTemplateAction .Request (id );
230276 request .indexTemplate (
@@ -234,7 +280,8 @@ private static void putComposableIndexTemplate(
234280 Template .builder ()
235281 .settings (settings )
236282 .mappings (mappings == null ? null : CompressedXContent .fromJSON (mappings ))
237- .lifecycle (lifecycle )
283+ .lifecycle (dataLifecycle )
284+ .dataStreamOptions (new DataStreamOptions .Template (new DataStreamFailureStore .Template (true )))
238285 )
239286 .metadata (metadata )
240287 .dataStreamTemplate (new ComposableIndexTemplate .DataStreamTemplate ())
@@ -261,6 +308,27 @@ private static void indexDoc(String dataStream) {
261308 indicesAdmin ().refresh (new RefreshRequest (dataStream )).actionGet ();
262309 }
263310
311+ private static void indexFailedDoc (String dataStream ) {
312+ BulkRequest bulkRequest = new BulkRequest ();
313+ String value = DateFieldMapper .DEFAULT_DATE_TIME_FORMATTER .formatMillis (System .currentTimeMillis ());
314+ bulkRequest .add (
315+ new IndexRequest (dataStream ).opType (DocWriteRequest .OpType .CREATE )
316+ .source (
317+ String .format (Locale .ROOT , "{\" %s\" :\" %s\" ,\" count\" :\" not-a-number\" }" , DEFAULT_TIMESTAMP_FIELD , value ),
318+ XContentType .JSON
319+ )
320+ );
321+ BulkResponse bulkResponse = client ().bulk (bulkRequest ).actionGet ();
322+ assertThat (bulkResponse .getItems ().length , equalTo (1 ));
323+ String backingIndexPrefix = DataStream .FAILURE_STORE_PREFIX + dataStream ;
324+ for (BulkItemResponse itemResponse : bulkResponse ) {
325+ assertThat (itemResponse .getFailureMessage (), nullValue ());
326+ assertThat (itemResponse .status (), equalTo (RestStatus .CREATED ));
327+ assertThat (itemResponse .getIndex (), startsWith (backingIndexPrefix ));
328+ }
329+ indicesAdmin ().refresh (new RefreshRequest (dataStream )).actionGet ();
330+ }
331+
264332 public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin {
265333
266334 static final String SYSTEM_DATA_STREAM_NAME = ".fleet-actions-results" ;
0 commit comments