Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
Expand All @@ -33,6 +36,8 @@
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.indices.ExecutorNames;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemDataStreamDescriptor.Type;
Expand All @@ -48,12 +53,15 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

public class SystemDataStreamIT extends ESIntegTestCase {

Expand All @@ -62,6 +70,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(DataStreamsPlugin.class);
plugins.add(TestSystemDataStreamPlugin.class);
plugins.add(MapperExtrasPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -169,6 +178,63 @@ public void testDataStreamStats() throws Exception {
}
}

public void testSystemDataStreamWithFailureStore() throws Exception {
String dataStreamName = ".test-failure-store";
RequestOptions productHeader = RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build();
try (RestClient restClient = createRestClient()) {
Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
indexRequest.setOptions(productHeader);
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
indexRequest.setJsonEntity(
String.format(Locale.ROOT, "{\"%s\":\"%s\",\"count\":\"not-a-number\"}", DEFAULT_TIMESTAMP_FIELD, value)
);

Response indexResponse = restClient.performRequest(indexRequest);
assertThat(indexResponse.getStatusLine().getStatusCode(), is(201));
Map<String, Object> responseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(indexResponse.getEntity()),
false
);
assertThat(responseMap.get("result"), equalTo("created"));
assertThat((String) responseMap.get("_index"), startsWith(DataStream.FAILURE_STORE_PREFIX));
assertThat(responseMap.get("failure_store"), equalTo("used"));

// Rollover
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "::failures/_rollover");
rolloverRequest.setOptions(productHeader);
Response rolloverResponse = restClient.performRequest(rolloverRequest);
assertThat(rolloverResponse.getStatusLine().getStatusCode(), is(200));
responseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(rolloverResponse.getEntity()),
false
);
assertThat(responseMap.get("acknowledged"), equalTo(true));
assertThat(responseMap.get("rolled_over"), equalTo(true));
assertThat((String) responseMap.get("new_index"), startsWith(DataStream.FAILURE_STORE_PREFIX));

// Edit data stream options
Request editOptionsRequest = new Request("PUT", "/_data_stream/" + dataStreamName + "/_options");
editOptionsRequest.setJsonEntity("{\"failure_store\":{\"enabled\":\"false\"}}");
editOptionsRequest.setOptions(productHeader);
Response editOptionsResponse = restClient.performRequest(editOptionsRequest);
assertThat(editOptionsResponse.getStatusLine().getStatusCode(), is(200));
responseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(editOptionsResponse.getEntity()),
false
);
assertThat(responseMap.get("acknowledged"), equalTo(true));

// delete
Request deleteRequest = new Request("DELETE", "/_data_stream/" + dataStreamName);
deleteRequest.setOptions(productHeader);
Response deleteResponse = restClient.performRequest(deleteRequest);
assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200));
}
}

@SuppressWarnings("unchecked")
public void testSystemDataStreamReadWrite() throws Exception {
try (RestClient restClient = createRestClient()) {
Expand Down Expand Up @@ -328,6 +394,30 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
),
new SystemDataStreamDescriptor(
".test-failure-store",
"system data stream test with failure store",
Type.EXTERNAL,
ComposableIndexTemplate.builder()
.indexPatterns(List.of(".test-failure-store"))
.template(Template.builder().mappings(new CompressedXContent("""
{
"properties": {
"@timestamp" : {
"type": "date"
},
"count": {
"type": "long"
}
}
}""")).dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true))))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
List.of("product"),
"product",
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
)
);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ private RolloverResult rolloverDataStream(
now.toEpochMilli(),
dataStreamName,
templateV2,
systemDataStreamDescriptor,
newWriteIndexName,
(builder, indexMetadata) -> builder.put(dataStream.rolloverFailureStore(indexMetadata.getIndex(), newGeneration))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,6 @@ static ClusterState createDataStream(
// responsibility to check that before setting.
IndexMetadata failureStoreIndex = null;
if (initializeFailureStore) {
if (isSystem) {
throw new IllegalArgumentException("Failure stores are not supported on system data streams");
}
String failureStoreIndexName = DataStream.getDefaultFailureStoreName(dataStreamName, initialGeneration, request.startTime());
currentState = createFailureStoreIndex(
metadataCreateIndexService,
Expand All @@ -291,6 +288,7 @@ static ClusterState createDataStream(
request.startTime(),
dataStreamName,
template,
systemDataStreamDescriptor,
failureStoreIndexName,
null
);
Expand Down Expand Up @@ -420,6 +418,7 @@ public static ClusterState createFailureStoreIndex(
long nameResolvedInstant,
String dataStreamName,
ComposableIndexTemplate template,
SystemDataStreamDescriptor systemDataStreamDescriptor,
String failureStoreIndexName,
@Nullable BiConsumer<ProjectMetadata.Builder, IndexMetadata> metadataTransformer
) throws Exception {
Expand All @@ -439,7 +438,8 @@ public static ClusterState createFailureStoreIndex(
.performReroute(false)
.setMatchingTemplate(template)
.settings(indexSettings)
.isFailureIndex(true);
.isFailureIndex(true)
.systemDataStreamDescriptor(systemDataStreamDescriptor);

try {
currentState = metadataCreateIndexService.applyCreateIndexRequest(
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/migrate/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {

internalClusterTestImplementation project(path: ':modules:lang-painless')
internalClusterTestImplementation project(path: ':modules:lang-painless:spi')
internalClusterTestImplementation project(path: ':modules:mapper-extras')
}

addQaCheckDependencies(project)
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.indices.ExecutorNames;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.plugins.ActionPlugin;
Expand All @@ -33,6 +35,8 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -52,23 +56,38 @@ public class SystemDataStreamMigrationIT extends AbstractFeatureMigrationIntegTe
);

private static SystemDataStreamDescriptor createSystemDataStreamDescriptor(IndexVersion indexVersion) {
return new SystemDataStreamDescriptor(
TEST_DATA_STREAM_NAME,
"system data stream test",
SystemDataStreamDescriptor.Type.EXTERNAL,
ComposableIndexTemplate.builder()
.template(
Template.builder()
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))
.settings(indexSettings(indexVersion, 1, 0))
)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
List.of("product"),
ORIGIN,
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
);
try {
return new SystemDataStreamDescriptor(
TEST_DATA_STREAM_NAME,
"system data stream test",
SystemDataStreamDescriptor.Type.EXTERNAL,
ComposableIndexTemplate.builder()
.template(
Template.builder()
.mappings(new CompressedXContent("""
{
"properties": {
"@timestamp" : {
"type": "date"
},
"count": {
"type": "long"
}
}
}"""))
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true))
.settings(indexSettings(indexVersion, 1, 0))
)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
List.of("product"),
ORIGIN,
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
Expand All @@ -87,6 +106,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(DataStreamsPlugin.class);
plugins.add(DataStreamTestPlugin.class);
plugins.add(MapperExtrasPlugin.class);
return plugins;
}

Expand All @@ -110,6 +130,20 @@ private static void indexDocsToDataStream(String dataStreamName) {

BulkResponse actionGet = bulkBuilder.get();
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));

// Index docs to failure store too
bulkBuilder = client().prepareBulk();
for (int i = 0; i < INDEX_DOC_COUNT; i++) {
IndexRequestBuilder requestBuilder = ESIntegTestCase.prepareIndex(dataStreamName)
.setId(Integer.toString(i))
.setRequireDataStream(true)
.setOpType(DocWriteRequest.OpType.CREATE)
.setSource(DataStream.TIMESTAMP_FIELD_NAME, 1741271969000L, "count", "not-a-number");
bulkBuilder.add(requestBuilder);
}

actionGet = bulkBuilder.get();
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
}

public void testMigrateSystemDataStream() throws Exception {
Expand All @@ -136,6 +170,14 @@ public void testMigrateSystemDataStream() throws Exception {
assertThat(indexMetadata.isSystem(), is(true));
assertThat(indexMetadata.getCreationVersion(), is(IndexVersion.current()));
}

List<Index> failureIndices = dataStream.getFailureIndices();
assertThat(failureIndices, hasSize(failureIndices.size()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion always passes, did you mean something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:face_palm: yes I will fix it. Thank you for pointing it out

for (Index failureIndex : failureIndices) {
IndexMetadata indexMetadata = finalMetadata.index(failureIndex);
assertThat(indexMetadata.isSystem(), is(true));
assertThat(indexMetadata.getCreationVersion(), is(IndexVersion.current()));
}
}

public void testMigrationRestartAfterFailure() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -262,21 +264,47 @@ public static class SystemDataStreamTestPlugin extends Plugin implements SystemI

@Override
public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
return List.of(
new SystemDataStreamDescriptor(
SYSTEM_DATA_STREAM_NAME,
"a system data stream for testing",
SystemDataStreamDescriptor.Type.EXTERNAL,
ComposableIndexTemplate.builder()
.indexPatterns(List.of(SYSTEM_DATA_STREAM_NAME))
.template(Template.builder().lifecycle(DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO)))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
Collections.singletonList("test"),
"test",
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
)
try {
return List.of(
new SystemDataStreamDescriptor(
SYSTEM_DATA_STREAM_NAME,
"a system data stream for testing",
SystemDataStreamDescriptor.Type.EXTERNAL,
ComposableIndexTemplate.builder()
.indexPatterns(List.of(SYSTEM_DATA_STREAM_NAME))
.template(
Template.builder()
.mappings(new CompressedXContent("""
{
"properties": {
"@timestamp" : {
"type": "date"
},
"count": {
"type": "long"
}
}
}"""))
.lifecycle(DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO))
.dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true)))
)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
Map.of(),
Collections.singletonList("test"),
"test",
new ExecutorNames(
ThreadPool.Names.SYSTEM_CRITICAL_READ,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SYSTEM_WRITE
)
)
);
} catch (IOException e) {
fail(e.getMessage());
}
throw new IllegalStateException(
"Something went wrong, it should have either returned the descriptor or it should have thrown an assertion error"
);
}

Expand Down