Skip to content

Commit 431e275

Browse files
authored
[Failure store] Support failure store for system data streams (elastic#126585) (elastic#126639)
In this PR we add support for the failure store for system data streams. Specifically: - We pass the system descriptor so the failure index can be created based on that. - We extend the tests to ensure it works - We remove a guard we had but I wasn't able to test it because it only gets triggered if the data stream gets created right after a failure in the ingest pipeline, and I didn't see how to add one (yet). - We extend the system data stream migration to ensure this is also working. (cherry picked from commit 78ac5d5) # Conflicts: # x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java
1 parent a0e939b commit 431e275

File tree

6 files changed

+200
-36
lines changed

6 files changed

+200
-36
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@
2424
import org.elasticsearch.client.internal.Client;
2525
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2626
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate;
27+
import org.elasticsearch.cluster.metadata.DataStream;
28+
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
29+
import org.elasticsearch.cluster.metadata.DataStreamOptions;
2730
import org.elasticsearch.cluster.metadata.Template;
2831
import org.elasticsearch.cluster.service.ClusterService;
2932
import org.elasticsearch.common.Strings;
3033
import org.elasticsearch.common.compress.CompressedXContent;
3134
import org.elasticsearch.common.network.NetworkModule;
3235
import org.elasticsearch.common.settings.Settings;
3336
import org.elasticsearch.common.xcontent.XContentHelper;
37+
import org.elasticsearch.index.mapper.DateFieldMapper;
38+
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
3439
import org.elasticsearch.indices.ExecutorNames;
3540
import org.elasticsearch.indices.SystemDataStreamDescriptor;
3641
import org.elasticsearch.indices.SystemDataStreamDescriptor.Type;
@@ -46,12 +51,15 @@
4651
import java.util.ArrayList;
4752
import java.util.Collection;
4853
import java.util.List;
54+
import java.util.Locale;
4955
import java.util.Map;
5056
import java.util.stream.Collectors;
5157

58+
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
5259
import static org.hamcrest.Matchers.containsString;
5360
import static org.hamcrest.Matchers.equalTo;
5461
import static org.hamcrest.Matchers.is;
62+
import static org.hamcrest.Matchers.startsWith;
5563

5664
public class SystemDataStreamIT extends ESIntegTestCase {
5765

@@ -60,6 +68,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
6068
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
6169
plugins.add(DataStreamsPlugin.class);
6270
plugins.add(TestSystemDataStreamPlugin.class);
71+
plugins.add(MapperExtrasPlugin.class);
6372
return plugins;
6473
}
6574

@@ -167,6 +176,63 @@ public void testDataStreamStats() throws Exception {
167176
}
168177
}
169178

179+
public void testSystemDataStreamWithFailureStore() throws Exception {
180+
String dataStreamName = ".test-failure-store";
181+
RequestOptions productHeader = RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build();
182+
try (RestClient restClient = createRestClient()) {
183+
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
184+
indexRequest.setOptions(productHeader);
185+
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
186+
indexRequest.setJsonEntity(
187+
String.format(Locale.ROOT, "{\"%s\":\"%s\",\"count\":\"not-a-number\"}", DEFAULT_TIMESTAMP_FIELD, value)
188+
);
189+
190+
Response indexResponse = restClient.performRequest(indexRequest);
191+
assertThat(indexResponse.getStatusLine().getStatusCode(), is(201));
192+
Map<String, Object> responseMap = XContentHelper.convertToMap(
193+
XContentType.JSON.xContent(),
194+
EntityUtils.toString(indexResponse.getEntity()),
195+
false
196+
);
197+
assertThat(responseMap.get("result"), equalTo("created"));
198+
assertThat((String) responseMap.get("_index"), startsWith(DataStream.FAILURE_STORE_PREFIX));
199+
assertThat(responseMap.get("failure_store"), equalTo("used"));
200+
201+
// Rollover
202+
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "::failures/_rollover");
203+
rolloverRequest.setOptions(productHeader);
204+
Response rolloverResponse = restClient.performRequest(rolloverRequest);
205+
assertThat(rolloverResponse.getStatusLine().getStatusCode(), is(200));
206+
responseMap = XContentHelper.convertToMap(
207+
XContentType.JSON.xContent(),
208+
EntityUtils.toString(rolloverResponse.getEntity()),
209+
false
210+
);
211+
assertThat(responseMap.get("acknowledged"), equalTo(true));
212+
assertThat(responseMap.get("rolled_over"), equalTo(true));
213+
assertThat((String) responseMap.get("new_index"), startsWith(DataStream.FAILURE_STORE_PREFIX));
214+
215+
// Edit data stream options
216+
Request editOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
217+
editOptionsRequest.setJsonEntity("{\"failure_store\":{\"enabled\":\"false\"}}");
218+
editOptionsRequest.setOptions(productHeader);
219+
Response editOptionsResponse = restClient.performRequest(editOptionsRequest);
220+
assertThat(editOptionsResponse.getStatusLine().getStatusCode(), is(200));
221+
responseMap = XContentHelper.convertToMap(
222+
XContentType.JSON.xContent(),
223+
EntityUtils.toString(editOptionsResponse.getEntity()),
224+
false
225+
);
226+
assertThat(responseMap.get("acknowledged"), equalTo(true));
227+
228+
// delete
229+
Request deleteRequest = new Request("DELETE", "/_data_stream/" + dataStreamName);
230+
deleteRequest.setOptions(productHeader);
231+
Response deleteResponse = restClient.performRequest(deleteRequest);
232+
assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200));
233+
}
234+
}
235+
170236
@SuppressWarnings("unchecked")
171237
public void testSystemDataStreamReadWrite() throws Exception {
172238
try (RestClient restClient = createRestClient()) {
@@ -325,6 +391,30 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
325391
List.of("product"),
326392
"product",
327393
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
394+
),
395+
new SystemDataStreamDescriptor(
396+
".test-failure-store",
397+
"system data stream test with failure store",
398+
Type.EXTERNAL,
399+
ComposableIndexTemplate.builder()
400+
.indexPatterns(List.of(".test-failure-store"))
401+
.template(Template.builder().mappings(new CompressedXContent("""
402+
{
403+
"properties": {
404+
"@timestamp" : {
405+
"type": "date"
406+
},
407+
"count": {
408+
"type": "long"
409+
}
410+
}
411+
}""")).dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true))))
412+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
413+
.build(),
414+
Map.of(),
415+
List.of("product"),
416+
"product",
417+
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
328418
)
329419
);
330420
} catch (IOException e) {

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ private RolloverResult rolloverDataStream(
343343
now.toEpochMilli(),
344344
dataStreamName,
345345
templateV2,
346+
systemDataStreamDescriptor,
346347
newWriteIndexName,
347348
(builder, indexMetadata) -> builder.put(dataStream.rolloverFailureStore(indexMetadata.getIndex(), newGeneration))
348349
);

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,6 @@ static ClusterState createDataStream(
270270
// responsibility to check that before setting.
271271
IndexMetadata failureStoreIndex = null;
272272
if (initializeFailureStore) {
273-
if (isSystem) {
274-
throw new IllegalArgumentException("Failure stores are not supported on system data streams");
275-
}
276273
String failureStoreIndexName = DataStream.getDefaultFailureStoreName(dataStreamName, initialGeneration, request.startTime());
277274
currentState = createFailureStoreIndex(
278275
metadataCreateIndexService,
@@ -282,6 +279,7 @@ static ClusterState createDataStream(
282279
request.startTime(),
283280
dataStreamName,
284281
template,
282+
systemDataStreamDescriptor,
285283
failureStoreIndexName,
286284
null
287285
);
@@ -409,6 +407,7 @@ public static ClusterState createFailureStoreIndex(
409407
long nameResolvedInstant,
410408
String dataStreamName,
411409
ComposableIndexTemplate template,
410+
SystemDataStreamDescriptor systemDataStreamDescriptor,
412411
String failureStoreIndexName,
413412
@Nullable BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer
414413
) throws Exception {
@@ -427,7 +426,8 @@ public static ClusterState createFailureStoreIndex(
427426
.performReroute(false)
428427
.setMatchingTemplate(template)
429428
.settings(indexSettings)
430-
.isFailureIndex(true);
429+
.isFailureIndex(true)
430+
.systemDataStreamDescriptor(systemDataStreamDescriptor);
431431

432432
try {
433433
currentState = metadataCreateIndexService.applyCreateIndexRequest(

x-pack/plugin/migrate/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ dependencies {
2626

2727
internalClusterTestImplementation project(path: ':modules:lang-painless')
2828
internalClusterTestImplementation project(path: ':modules:lang-painless:spi')
29+
internalClusterTestImplementation project(path: ':modules:mapper-extras')
2930
}
3031

3132
addQaCheckDependencies(project)

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/system_indices/action/SystemDataStreamMigrationIT.java

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.elasticsearch.cluster.metadata.Metadata;
2222
import org.elasticsearch.cluster.metadata.Template;
2323
import org.elasticsearch.common.Strings;
24+
import org.elasticsearch.common.compress.CompressedXContent;
2425
import org.elasticsearch.common.settings.Settings;
2526
import org.elasticsearch.datastreams.DataStreamsPlugin;
2627
import org.elasticsearch.index.Index;
2728
import org.elasticsearch.index.IndexVersion;
29+
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
2830
import org.elasticsearch.indices.ExecutorNames;
2931
import org.elasticsearch.indices.SystemDataStreamDescriptor;
3032
import org.elasticsearch.plugins.ActionPlugin;
@@ -33,6 +35,8 @@
3335
import org.elasticsearch.test.ESIntegTestCase;
3436
import org.junit.After;
3537

38+
import java.io.IOException;
39+
import java.io.UncheckedIOException;
3640
import java.util.ArrayList;
3741
import java.util.Collection;
3842
import java.util.List;
@@ -52,23 +56,38 @@ public class SystemDataStreamMigrationIT extends AbstractFeatureMigrationIntegTe
5256
);
5357

5458
private static SystemDataStreamDescriptor createSystemDataStreamDescriptor(IndexVersion indexVersion) {
55-
return new SystemDataStreamDescriptor(
56-
TEST_DATA_STREAM_NAME,
57-
"system data stream test",
58-
SystemDataStreamDescriptor.Type.EXTERNAL,
59-
ComposableIndexTemplate.builder()
60-
.template(
61-
Template.builder()
62-
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))
63-
.settings(indexSettings(indexVersion, 1, 0))
64-
)
65-
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
66-
.build(),
67-
Map.of(),
68-
List.of("product"),
69-
ORIGIN,
70-
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
71-
);
59+
try {
60+
return new SystemDataStreamDescriptor(
61+
TEST_DATA_STREAM_NAME,
62+
"system data stream test",
63+
SystemDataStreamDescriptor.Type.EXTERNAL,
64+
ComposableIndexTemplate.builder()
65+
.template(
66+
Template.builder()
67+
.mappings(new CompressedXContent("""
68+
{
69+
"properties": {
70+
"@timestamp" : {
71+
"type": "date"
72+
},
73+
"count": {
74+
"type": "long"
75+
}
76+
}
77+
}"""))
78+
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))
79+
.settings(indexSettings(indexVersion, 1, 0))
80+
)
81+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
82+
.build(),
83+
Map.of(),
84+
List.of("product"),
85+
ORIGIN,
86+
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
87+
);
88+
} catch (IOException e) {
89+
throw new UncheckedIOException(e);
90+
}
7291
}
7392

7493
@Override
@@ -87,6 +106,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
87106
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
88107
plugins.add(DataStreamsPlugin.class);
89108
plugins.add(DataStreamTestPlugin.class);
109+
plugins.add(MapperExtrasPlugin.class);
90110
return plugins;
91111
}
92112

@@ -110,6 +130,20 @@ private static void indexDocsToDataStream(String dataStreamName) {
110130

111131
BulkResponse actionGet = bulkBuilder.get();
112132
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
133+
134+
// Index docs to failure store too
135+
bulkBuilder = client().prepareBulk();
136+
for (int i = 0; i < INDEX_DOC_COUNT; i++) {
137+
IndexRequestBuilder requestBuilder = ESIntegTestCase.prepareIndex(dataStreamName)
138+
.setId(Integer.toString(i))
139+
.setRequireDataStream(true)
140+
.setOpType(DocWriteRequest.OpType.CREATE)
141+
.setSource(DataStream.TIMESTAMP_FIELD_NAME, 1741271969000L, "count", "not-a-number");
142+
bulkBuilder.add(requestBuilder);
143+
}
144+
145+
actionGet = bulkBuilder.get();
146+
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
113147
}
114148

115149
public void testMigrateSystemDataStream() throws Exception {
@@ -136,6 +170,16 @@ public void testMigrateSystemDataStream() throws Exception {
136170
assertThat(indexMetadata.isSystem(), is(true));
137171
assertThat(indexMetadata.getCreationVersion(), is(IndexVersion.current()));
138172
}
173+
174+
// Migrate action does not migrate the failure store indices
175+
// here we check that they are preserved.
176+
List<Index> failureIndices = dataStream.getFailureIndices();
177+
assertThat(failureIndices, hasSize(1));
178+
for (Index failureIndex : failureIndices) {
179+
IndexMetadata indexMetadata = finalMetadata.index(failureIndex);
180+
assertThat(indexMetadata.isSystem(), is(true));
181+
assertThat(indexMetadata.getCreationVersion(), is(IndexVersion.current()));
182+
}
139183
}
140184

141185
public void testMigrationRestartAfterFailure() throws Exception {

x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.elasticsearch.action.index.IndexRequest;
2020
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2121
import org.elasticsearch.cluster.metadata.DataStream;
22+
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
2223
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
24+
import org.elasticsearch.cluster.metadata.DataStreamOptions;
2325
import org.elasticsearch.cluster.metadata.Template;
2426
import org.elasticsearch.common.compress.CompressedXContent;
2527
import org.elasticsearch.common.settings.Settings;
@@ -265,21 +267,47 @@ public static class SystemDataStreamTestPlugin extends Plugin implements SystemI
265267

266268
@Override
267269
public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
268-
return List.of(
269-
new SystemDataStreamDescriptor(
270-
SYSTEM_DATA_STREAM_NAME,
271-
"a system data stream for testing",
272-
SystemDataStreamDescriptor.Type.EXTERNAL,
273-
ComposableIndexTemplate.builder()
274-
.indexPatterns(List.of(SYSTEM_DATA_STREAM_NAME))
275-
.template(Template.builder().lifecycle(DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO)))
276-
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
277-
.build(),
278-
Map.of(),
279-
Collections.singletonList("test"),
280-
"test",
281-
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
282-
)
270+
try {
271+
return List.of(
272+
new SystemDataStreamDescriptor(
273+
SYSTEM_DATA_STREAM_NAME,
274+
"a system data stream for testing",
275+
SystemDataStreamDescriptor.Type.EXTERNAL,
276+
ComposableIndexTemplate.builder()
277+
.indexPatterns(List.of(SYSTEM_DATA_STREAM_NAME))
278+
.template(
279+
Template.builder()
280+
.mappings(new CompressedXContent("""
281+
{
282+
"properties": {
283+
"@timestamp" : {
284+
"type": "date"
285+
},
286+
"count": {
287+
"type": "long"
288+
}
289+
}
290+
}"""))
291+
.lifecycle(DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO))
292+
.dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true)))
293+
)
294+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
295+
.build(),
296+
Map.of(),
297+
Collections.singletonList("test"),
298+
"test",
299+
new ExecutorNames(
300+
ThreadPool.Names.SYSTEM_CRITICAL_READ,
301+
ThreadPool.Names.SYSTEM_READ,
302+
ThreadPool.Names.SYSTEM_WRITE
303+
)
304+
)
305+
);
306+
} catch (IOException e) {
307+
fail(e.getMessage());
308+
}
309+
throw new IllegalStateException(
310+
"Something went wrong, it should have either returned the descriptor or it should have thrown an assertion error"
283311
);
284312
}
285313

0 commit comments

Comments
 (0)