5656import java .util .concurrent .ExecutionException ;
5757
5858import static org .elasticsearch .cluster .metadata .DataStreamTestHelper .backingIndexEqualTo ;
59+ import static org .elasticsearch .cluster .metadata .DataStreamTestHelper .dataStreamIndexEqualTo ;
5960import static org .elasticsearch .cluster .metadata .MetadataIndexTemplateService .DEFAULT_TIMESTAMP_FIELD ;
6061import static org .elasticsearch .xpack .security .support .SecuritySystemIndices .SECURITY_MAIN_ALIAS ;
6162import static org .hamcrest .Matchers .allOf ;
62- import static org .hamcrest .Matchers .anEmptyMap ;
6363import static org .hamcrest .Matchers .anyOf ;
6464import static org .hamcrest .Matchers .containsString ;
6565import static org .hamcrest .Matchers .equalTo ;
@@ -96,15 +96,23 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
9696
9797 public void testRolloverLifecycleAndForceMergeAuthorized () throws Exception {
9898 String dataStreamName = randomDataStreamName ();
99- // empty lifecycle contains the default rollover
100- prepareDataStreamAndIndex (dataStreamName , DataStreamLifecycle . Template . DATA_DEFAULT );
99+ // with failure store and empty lifecycle contains the default rollover
100+ prepareDataStreamAndIndex (dataStreamName , null );
101101
102102 List <String > backingIndices = waitForDataStreamBackingIndices (dataStreamName , 2 );
103103 String backingIndex = backingIndices .get (0 );
104104 assertThat (backingIndex , backingIndexEqualTo (dataStreamName , 1 ));
105105 String writeIndex = backingIndices .get (1 );
106106 assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
107107
108+ // initialise the failure store
109+ indexFailedDoc (dataStreamName );
110+ List <String > failureIndices = waitForDataStreamIndices (dataStreamName , 2 , true );
111+ String firstFailureIndex = failureIndices .get (0 );
112+ assertThat (firstFailureIndex , dataStreamIndexEqualTo (dataStreamName , 3 , true ));
113+ String secondFailureIndexGen = failureIndices .get (1 );
114+ assertThat (secondFailureIndexGen , dataStreamIndexEqualTo (dataStreamName , 4 , true ));
115+
108116 assertNoAuthzErrors ();
109117 // Index another doc to force another rollover and trigger an attempted force-merge. The force-merge may be a noop under
110118 // the hood but for authz purposes this doesn't matter, it only matters that the force-merge API was called
@@ -116,7 +124,7 @@ public void testRolloverLifecycleAndForceMergeAuthorized() throws Exception {
116124
117125 public void testRolloverAndRetentionAuthorized () throws Exception {
118126 String dataStreamName = randomDataStreamName ();
119- prepareDataStreamAndIndex (dataStreamName , DataStreamLifecycle . builder (). dataRetention ( TimeValue .ZERO ). buildTemplate () );
127+ prepareDataStreamAndIndex (dataStreamName , TimeValue .ZERO );
120128
121129 assertBusy (() -> {
122130 assertNoAuthzErrors ();
@@ -127,16 +135,33 @@ public void testRolloverAndRetentionAuthorized() throws Exception {
127135 String writeIndex = backingIndices .get (0 ).getName ();
128136 assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
129137 });
138+
139+ // test failure store too, we index the failure later to have predictable generation suffixes
140+ indexFailedDoc (dataStreamName );
141+ assertBusy (() -> {
142+ assertNoAuthzErrors ();
143+ List <String > failureIndices = getDataStreamBackingIndexNames (dataStreamName , true );
144+ assertThat (failureIndices .size (), equalTo (1 ));
145+ // we expect the data stream to have only one failure index, with generation 4
146+ // as generation 3 would've been deleted by the data stream lifecycle given the lifecycle configuration
147+ String writeIndex = failureIndices .get (0 );
148+ assertThat (writeIndex , dataStreamIndexEqualTo (dataStreamName , 4 , true ));
149+ });
130150 }
131151
132152 public void testUnauthorized () throws Exception {
133153 // this is an example index pattern for a system index that the data stream lifecycle does not have access for. Data stream
134154 // lifecycle will therefore fail at runtime with an authz exception
135- prepareDataStreamAndIndex (SECURITY_MAIN_ALIAS , DataStreamLifecycle .Template .DATA_DEFAULT );
155+ prepareDataStreamAndIndex (SECURITY_MAIN_ALIAS , null );
156+ indexFailedDoc (SECURITY_MAIN_ALIAS );
136157
137158 assertBusy (() -> {
138159 Map <String , String > indicesAndErrors = collectErrorsFromStoreAsMap ();
139- assertThat (indicesAndErrors , is (not (anEmptyMap ())));
160+ // Both the backing and failures indices should have errors
161+ assertThat (indicesAndErrors .size (), is (2 ));
162+ for (String index : indicesAndErrors .keySet ()) {
163+ assertThat (index , anyOf (containsString (DataStream .BACKING_INDEX_PREFIX ), containsString (DataStream .FAILURE_STORE_PREFIX )));
164+ }
140165 assertThat (
141166 indicesAndErrors .values (),
142167 hasItem (allOf (containsString ("security_exception" ), containsString ("unauthorized for user [_data_stream_lifecycle]" )))
@@ -157,6 +182,18 @@ public void testRolloverAndRetentionWithSystemDataStreamAuthorized() throws Exce
157182 String writeIndex = backingIndices .get (0 ).getName ();
158183 assertThat (writeIndex , backingIndexEqualTo (dataStreamName , 2 ));
159184 });
185+
186+ // test failure store too, we index the failure later to have predictable generation suffixes
187+ indexFailedDoc (dataStreamName );
188+ assertBusy (() -> {
189+ assertNoAuthzErrors ();
190+ List <String > failureIndices = getDataStreamBackingIndexNames (dataStreamName , true );
191+ assertThat (failureIndices .size (), equalTo (1 ));
192+ // we expect the data stream to have only one backing index, the write one, with generation 2
193+ // as generation 1 would've been deleted by the data stream lifecycle given the lifecycle configuration
194+ String writeIndex = failureIndices .get (0 );
195+ assertThat (writeIndex , dataStreamIndexEqualTo (dataStreamName , 4 , true ));
196+ });
160197 }
161198
162199 private static String randomDataStreamName () {
@@ -180,9 +217,22 @@ private Map<String, String> collectErrorsFromStoreAsMap() {
180217 return indicesAndErrors ;
181218 }
182219
183- private void prepareDataStreamAndIndex (String dataStreamName , DataStreamLifecycle .Template lifecycle ) throws IOException ,
184- InterruptedException , ExecutionException {
185- putComposableIndexTemplate ("id1" , null , List .of (dataStreamName + "*" ), null , null , lifecycle );
220+ private void prepareDataStreamAndIndex (String dataStreamName , TimeValue retention ) throws IOException , InterruptedException ,
221+ ExecutionException {
222+ var dataLifecycle = retention == null
223+ ? DataStreamLifecycle .Template .DATA_DEFAULT
224+ : new DataStreamLifecycle .Template (true , retention , null );
225+ putComposableIndexTemplate ("id1" , """
226+ {
227+ "properties": {
228+ "@timestamp" : {
229+ "type": "date"
230+ },
231+ "count": {
232+ "type": "long"
233+ }
234+ }
235+ }""" , List .of (dataStreamName + "*" ), null , null , dataLifecycle );
186236 CreateDataStreamAction .Request createDataStreamRequest = new CreateDataStreamAction .Request (
187237 TEST_REQUEST_TIMEOUT ,
188238 TEST_REQUEST_TIMEOUT ,
@@ -221,7 +271,7 @@ private static void putComposableIndexTemplate(
221271 List <String > patterns ,
222272 @ Nullable Settings settings ,
223273 @ Nullable Map <String , Object > metadata ,
224- @ Nullable DataStreamLifecycle .Template lifecycle
274+ @ Nullable DataStreamLifecycle .Template dataLifecycle
225275 ) throws IOException {
226276 TransportPutComposableIndexTemplateAction .Request request = new TransportPutComposableIndexTemplateAction .Request (id );
227277 request .indexTemplate (
@@ -231,7 +281,8 @@ private static void putComposableIndexTemplate(
231281 Template .builder ()
232282 .settings (settings )
233283 .mappings (mappings == null ? null : CompressedXContent .fromJSON (mappings ))
234- .lifecycle (lifecycle )
284+ .lifecycle (dataLifecycle )
285+ .dataStreamOptions (new DataStreamOptions .Template (new DataStreamFailureStore .Template (true )))
235286 )
236287 .metadata (metadata )
237288 .dataStreamTemplate (new ComposableIndexTemplate .DataStreamTemplate ())
@@ -258,6 +309,27 @@ private static void indexDoc(String dataStream) {
258309 indicesAdmin ().refresh (new RefreshRequest (dataStream )).actionGet ();
259310 }
260311
312+ private static void indexFailedDoc (String dataStream ) {
313+ BulkRequest bulkRequest = new BulkRequest ();
314+ String value = DateFieldMapper .DEFAULT_DATE_TIME_FORMATTER .formatMillis (System .currentTimeMillis ());
315+ bulkRequest .add (
316+ new IndexRequest (dataStream ).opType (DocWriteRequest .OpType .CREATE )
317+ .source (
318+ String .format (Locale .ROOT , "{\" %s\" :\" %s\" ,\" count\" :\" not-a-number\" }" , DEFAULT_TIMESTAMP_FIELD , value ),
319+ XContentType .JSON
320+ )
321+ );
322+ BulkResponse bulkResponse = client ().bulk (bulkRequest ).actionGet ();
323+ assertThat (bulkResponse .getItems ().length , equalTo (1 ));
324+ String backingIndexPrefix = DataStream .FAILURE_STORE_PREFIX + dataStream ;
325+ for (BulkItemResponse itemResponse : bulkResponse ) {
326+ assertThat (itemResponse .getFailureMessage (), nullValue ());
327+ assertThat (itemResponse .status (), equalTo (RestStatus .CREATED ));
328+ assertThat (itemResponse .getIndex (), startsWith (backingIndexPrefix ));
329+ }
330+ indicesAdmin ().refresh (new RefreshRequest (dataStream )).actionGet ();
331+ }
332+
261333 public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin {
262334
263335 static final String SYSTEM_DATA_STREAM_NAME = ".fleet-actions-results" ;
0 commit comments