Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -54,15 +56,12 @@
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_MAIN_ALIAS;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
Expand Down Expand Up @@ -94,15 +93,23 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {

public void testRolloverLifecycleAndForceMergeAuthorized() throws Exception {
String dataStreamName = randomDataStreamName();
// empty lifecycle contains the default rollover
prepareDataStreamAndIndex(dataStreamName, DataStreamLifecycle.Template.DATA_DEFAULT);
// with failure store and empty lifecycle contains the default rollover
prepareDataStreamAndIndex(dataStreamName, null);

List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String backingIndex = backingIndices.get(0);
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));

// initialise the failure store
indexFailedDoc(dataStreamName);
List<String> failureIndices = waitForDataStreamIndices(dataStreamName, 2, true);
String firstFailureIndex = failureIndices.get(0);
assertThat(firstFailureIndex, dataStreamIndexEqualTo(dataStreamName, 3, true));
String secondFailureIndexGen = failureIndices.get(1);
assertThat(secondFailureIndexGen, dataStreamIndexEqualTo(dataStreamName, 4, true));

assertNoAuthzErrors();
// Index another doc to force another rollover and trigger an attempted force-merge. The force-merge may be a noop under
// the hood but for authz purposes this doesn't matter, it only matters that the force-merge API was called
Expand All @@ -114,7 +121,7 @@ public void testRolloverLifecycleAndForceMergeAuthorized() throws Exception {

public void testRolloverAndRetentionAuthorized() throws Exception {
String dataStreamName = randomDataStreamName();
prepareDataStreamAndIndex(dataStreamName, DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).buildTemplate());
prepareDataStreamAndIndex(dataStreamName, TimeValue.ZERO);

assertBusy(() -> {
assertNoAuthzErrors();
Expand All @@ -125,21 +132,25 @@ public void testRolloverAndRetentionAuthorized() throws Exception {
String writeIndex = backingIndices.get(0).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});

// test failure store too, we index the failure later to have predictable generation suffixes
indexFailedDoc(dataStreamName);
assertBusy(() -> {
assertNoAuthzErrors();
List<String> failureIndices = getDataStreamBackingIndexNames(dataStreamName, true);
assertThat(failureIndices.size(), equalTo(1));
// we expect the data stream to have only one failure index, with generation 4
// as generation 3 would've been deleted by the data stream lifecycle given the lifecycle configuration
String writeIndex = failureIndices.get(0);
assertThat(writeIndex, dataStreamIndexEqualTo(dataStreamName, 4, true));
});
}

public void testUnauthorized() throws Exception {
// this is an example index pattern for a system index that the data stream lifecycle does not have access for. Data stream
// lifecycle will therefore fail at runtime with an authz exception
prepareDataStreamAndIndex(SECURITY_MAIN_ALIAS, DataStreamLifecycle.Template.DATA_DEFAULT);

assertBusy(() -> {
Map<String, String> indicesAndErrors = collectErrorsFromStoreAsMap();
assertThat(indicesAndErrors, is(not(anEmptyMap())));
assertThat(
indicesAndErrors.values(),
hasItem(allOf(containsString("security_exception"), containsString("unauthorized for user [_data_stream_lifecycle]")))
);
});
prepareDataStreamAndIndex(SECURITY_MAIN_ALIAS, null);
indexFailedDoc(SECURITY_MAIN_ALIAS);
}

public void testRolloverAndRetentionWithSystemDataStreamAuthorized() throws Exception {
Expand Down Expand Up @@ -178,9 +189,22 @@ private Map<String, String> collectErrorsFromStoreAsMap() {
return indicesAndErrors;
}

private void prepareDataStreamAndIndex(String dataStreamName, DataStreamLifecycle.Template lifecycle) throws IOException,
InterruptedException, ExecutionException {
putComposableIndexTemplate("id1", null, List.of(dataStreamName + "*"), null, null, lifecycle);
private void prepareDataStreamAndIndex(String dataStreamName, TimeValue retention) throws IOException, InterruptedException,
ExecutionException {
var dataLifecycle = retention == null
? DataStreamLifecycle.Template.DATA_DEFAULT
: new DataStreamLifecycle.Template(true, retention, null);
putComposableIndexTemplate("id1", """
{
"properties": {
"@timestamp" : {
"type": "date"
},
"count": {
"type": "long"
}
}
}""", List.of(dataStreamName + "*"), null, null, dataLifecycle);
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
Expand Down Expand Up @@ -219,7 +243,7 @@ private static void putComposableIndexTemplate(
List<String> patterns,
@Nullable Settings settings,
@Nullable Map<String, Object> metadata,
@Nullable DataStreamLifecycle.Template lifecycle
@Nullable DataStreamLifecycle.Template dataLifecycle
) throws IOException {
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id);
request.indexTemplate(
Expand All @@ -229,7 +253,8 @@ private static void putComposableIndexTemplate(
Template.builder()
.settings(settings)
.mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings))
.lifecycle(lifecycle)
.lifecycle(dataLifecycle)
.dataStreamOptions(new DataStreamOptions.Template(new DataStreamFailureStore.Template(true)))
)
.metadata(metadata)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
Expand All @@ -256,6 +281,27 @@ private static void indexDoc(String dataStream) {
indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet();
}

private static void indexFailedDoc(String dataStream) {
BulkRequest bulkRequest = new BulkRequest();
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
bulkRequest.add(
new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
.source(
String.format(Locale.ROOT, "{\"%s\":\"%s\",\"count\":\"not-a-number\"}", DEFAULT_TIMESTAMP_FIELD, value),
XContentType.JSON
)
);
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.getItems().length, equalTo(1));
String backingIndexPrefix = DataStream.FAILURE_STORE_PREFIX + dataStream;
for (BulkItemResponse itemResponse : bulkResponse) {
assertThat(itemResponse.getFailureMessage(), nullValue());
assertThat(itemResponse.status(), equalTo(RestStatus.CREATED));
assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix));
}
indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet();
}

public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin {

static final String SYSTEM_DATA_STREAM_NAME = ".fleet-actions-results";
Expand Down