diff --git a/docs/reference/snapshot-restore/repository-source-only.asciidoc b/docs/reference/snapshot-restore/repository-source-only.asciidoc index 07ddedd197931..04e53c42aff9d 100644 --- a/docs/reference/snapshot-restore/repository-source-only.asciidoc +++ b/docs/reference/snapshot-restore/repository-source-only.asciidoc @@ -18,7 +18,7 @@ stream or index. ================================================== Source-only snapshots are only supported if the `_source` field is enabled and no source-filtering is applied. -When you restore a source-only snapshot: +As a result, indices adopting synthetic source cannot be restored. When you restore a source-only snapshot: * The restored index is read-only and can only serve `match_all` search or scroll requests to enable reindexing. diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamRestIT.java index f62fa83b4e111..f95815d1daff9 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamRestIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamRestIT.java @@ -9,16 +9,23 @@ package org.elasticsearch.datastreams; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.ESRestTestCase; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.ClassRule; @@ -41,6 +48,7 @@ public class LogsDataStreamRestIT extends ESRestTestCase { public static ElasticsearchCluster cluster = ElasticsearchCluster.local() .distribution(DistributionType.DEFAULT) .setting("xpack.security.enabled", "false") + .setting("xpack.license.self_generated.type", "trial") .build(); @Override @@ -102,7 +110,7 @@ private static void waitForLogs(RestClient client) throws Exception { } }"""; - private static final String STANDARD_TEMPLATE = """ + private static final String LOGS_STANDARD_INDEX_MODE = """ { "index_patterns": [ "logs-*-*" ], "data_stream": {}, @@ -135,6 +143,39 @@ private static void waitForLogs(RestClient client) throws Exception { } }"""; + private static final String STANDARD_TEMPLATE = """ + { + "index_patterns": [ "standard-*-*" ], + "data_stream": {}, + "priority": 201, + "template": { + "settings": { + "index": { + "mode": "standard" + } + }, + "mappings": { + "properties": { + "@timestamp" : { + "type": "date" + }, + "host.name": { + "type": "keyword" + }, + "pid": { + "type": "long" + }, + "method": { + "type": "keyword" + }, + "ip_address": { + "type": "ip" + } + } + } + } + }"""; + private static final String TIME_SERIES_TEMPLATE = """ { "index_patterns": [ "logs-*-*" ], @@ -203,7 +244,7 @@ public void testLogsIndexing() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("logsdb", 0); + assertDataStreamBackingIndexMode("logsdb", 0, DATA_STREAM_NAME); rolloverDataStream(client, DATA_STREAM_NAME); indexDocument( client, @@ -218,7 +259,7 @@ public void testLogsIndexing() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("logsdb", 1); + assertDataStreamBackingIndexMode("logsdb", 1, DATA_STREAM_NAME); } public void testLogsStandardIndexModeSwitch() throws IOException { @@ -237,9 +278,9 @@ public void testLogsStandardIndexModeSwitch() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("logsdb", 0); + assertDataStreamBackingIndexMode("logsdb", 0, DATA_STREAM_NAME); - putTemplate(client, "custom-template", STANDARD_TEMPLATE); + putTemplate(client, "custom-template", LOGS_STANDARD_INDEX_MODE); rolloverDataStream(client, DATA_STREAM_NAME); indexDocument( client, @@ -254,7 +295,7 @@ public void testLogsStandardIndexModeSwitch() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("standard", 1); + assertDataStreamBackingIndexMode("standard", 1, DATA_STREAM_NAME); putTemplate(client, "custom-template", LOGS_TEMPLATE); rolloverDataStream(client, DATA_STREAM_NAME); @@ -271,7 +312,7 @@ public void testLogsStandardIndexModeSwitch() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("logsdb", 2); + assertDataStreamBackingIndexMode("logsdb", 2, DATA_STREAM_NAME); } public void testLogsTimeSeriesIndexModeSwitch() throws IOException { @@ -290,7 +331,7 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("logsdb", 0); + assertDataStreamBackingIndexMode("logsdb", 0, DATA_STREAM_NAME); putTemplate(client, "custom-template", TIME_SERIES_TEMPLATE); rolloverDataStream(client, DATA_STREAM_NAME); @@ -307,7 +348,7 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("time_series", 1); + assertDataStreamBackingIndexMode("time_series", 1, DATA_STREAM_NAME); putTemplate(client, "custom-template", LOGS_TEMPLATE); rolloverDataStream(client, DATA_STREAM_NAME); @@ -324,11 +365,193 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("logsdb", 2); + assertDataStreamBackingIndexMode("logsdb", 2, DATA_STREAM_NAME); + } + + public void testLogsDBToStandardReindex() throws IOException { + // LogsDB data stream + putTemplate(client, "logs-template", LOGS_TEMPLATE); + createDataStream(client, "logs-apache-kafka"); + + // Standard data stream + putTemplate(client, "standard-template", STANDARD_TEMPLATE); + createDataStream(client, "standard-apache-kafka"); + + // Index some documents in the LogsDB index + for (int i = 0; i < 10; i++) { + indexDocument( + client, + "logs-apache-kafka", + document( + Instant.now().plusSeconds(10), + randomAlphaOfLength(10), + randomNonNegativeLong(), + randomFrom("PUT", "POST", "GET"), + randomAlphaOfLength(64), + randomIp(randomBoolean()), + randomLongBetween(1_000_000L, 2_000_000L) + ) + ); + } + assertDataStreamBackingIndexMode("logsdb", 0, "logs-apache-kafka"); + assertDocCount(client, "logs-apache-kafka", 10); + + // Reindex a LogsDB data stream into a standard data stream + final Request reindexRequest = new Request("POST", "/_reindex?refresh=true"); + reindexRequest.setJsonEntity(""" + { + "source": { + "index": "logs-apache-kafka" + }, + "dest": { + "index": "standard-apache-kafka", + "op_type": "create" + } + } + """); + assertOK(client.performRequest(reindexRequest)); + assertDataStreamBackingIndexMode("standard", 0, "standard-apache-kafka"); + assertDocCount(client, "standard-apache-kafka", 10); + } + + public void testStandardToLogsDBReindex() throws IOException { + // LogsDB data stream + putTemplate(client, "logs-template", LOGS_TEMPLATE); + createDataStream(client, "logs-apache-kafka"); + + // Standard data stream + putTemplate(client, "standard-template", STANDARD_TEMPLATE); + createDataStream(client, "standard-apache-kafka"); + + // Index some documents in a standard index + for (int i = 0; i < 10; i++) { + indexDocument( + client, + "standard-apache-kafka", + document( + Instant.now().plusSeconds(10), + randomAlphaOfLength(10), + randomNonNegativeLong(), + randomFrom("PUT", "POST", "GET"), + randomAlphaOfLength(64), + randomIp(randomBoolean()), + randomLongBetween(1_000_000L, 2_000_000L) + ) + ); + } + assertDataStreamBackingIndexMode("standard", 0, "standard-apache-kafka"); + assertDocCount(client, "standard-apache-kafka", 10); + + // Reindex a standard data stream into a LogsDB data stream + final Request reindexRequest = new Request("POST", "/_reindex?refresh=true"); + reindexRequest.setJsonEntity(""" + { + "source": { + "index": "standard-apache-kafka" + }, + "dest": { + "index": "logs-apache-kafka", + "op_type": "create" + } + } + """); + assertOK(client.performRequest(reindexRequest)); + assertDataStreamBackingIndexMode("logsdb", 0, "logs-apache-kafka"); + assertDocCount(client, "logs-apache-kafka", 10); + } + + public void testLogsDBSnapshotCreateRestoreMount() throws IOException { + final String repository = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + registerRepository(repository, FsRepository.TYPE, Settings.builder().put("location", randomAlphaOfLength(6))); + + final String index = randomAlphaOfLength(12).toLowerCase(Locale.ROOT); + createIndex(client, index, Settings.builder().put("index.mode", IndexMode.LOGSDB.getName()).build()); + + for (int i = 0; i < 10; i++) { + indexDocument( + client, + index, + document( + Instant.now().plusSeconds(10), + randomAlphaOfLength(10), + randomNonNegativeLong(), + randomFrom("PUT", "POST", "GET"), + randomAlphaOfLength(64), + randomIp(randomBoolean()), + randomLongBetween(1_000_000L, 2_000_000L) + ) + ); + } + + final String snapshot = randomAlphaOfLength(8).toLowerCase(Locale.ROOT); + deleteSnapshot(repository, snapshot, true); + createSnapshot(client, repository, snapshot, true, index); + wipeDataStreams(); + wipeAllIndices(); + restoreSnapshot(client, repository, snapshot, true, index); + + final String restoreIndex = randomAlphaOfLength(7).toLowerCase(Locale.ROOT); + final Request mountRequest = new Request("POST", "/_snapshot/" + repository + '/' + snapshot + "/_mount"); + mountRequest.addParameter("wait_for_completion", "true"); + mountRequest.setJsonEntity("{\"index\": \"" + index + "\",\"renamed_index\": \"" + restoreIndex + "\"}"); + + assertOK(client.performRequest(mountRequest)); + assertDocCount(client, restoreIndex, 10); + assertThat(getSettings(client, restoreIndex).get("index.mode"), Matchers.equalTo(IndexMode.LOGSDB.getName())); + } + + // NOTE: this test will fail on snapshot creation after fixing + // https://github.com/elastic/elasticsearch/issues/112735 + public void testLogsDBSourceOnlySnapshotCreation() throws IOException { + final String repository = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + registerRepository(repository, FsRepository.TYPE, Settings.builder().put("location", randomAlphaOfLength(6))); + // A source-only repository delegates storage to another repository + final String sourceOnlyRepository = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + registerRepository( + sourceOnlyRepository, + "source", + Settings.builder().put("delegate_type", FsRepository.TYPE).put("location", repository) + ); + + final String index = randomAlphaOfLength(12).toLowerCase(Locale.ROOT); + createIndex(client, index, Settings.builder().put("index.mode", IndexMode.LOGSDB.getName()).build()); + + for (int i = 0; i < 10; i++) { + indexDocument( + client, + index, + document( + Instant.now().plusSeconds(10), + randomAlphaOfLength(10), + randomNonNegativeLong(), + randomFrom("PUT", "POST", "GET"), + randomAlphaOfLength(64), + randomIp(randomBoolean()), + randomLongBetween(1_000_000L, 2_000_000L) + ) + ); + } + + final String snapshot = randomAlphaOfLength(8).toLowerCase(Locale.ROOT); + deleteSnapshot(sourceOnlyRepository, snapshot, true); + createSnapshot(client, sourceOnlyRepository, snapshot, true, index); + wipeDataStreams(); + wipeAllIndices(); + // Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source + final ResponseException responseException = expectThrows( + ResponseException.class, + () -> restoreSnapshot(client, sourceOnlyRepository, snapshot, true, index) + ); + assertThat(responseException.getMessage(), Matchers.containsString("wasn't fully snapshotted")); + } + + private static void registerRepository(final String repository, final String type, final Settings.Builder settings) throws IOException { + registerRepository(repository, type, false, settings.build()); } - private void assertDataStreamBackingIndexMode(final String indexMode, int backingIndex) throws IOException { - assertThat(getSettings(client, getWriteBackingIndex(client, DATA_STREAM_NAME, backingIndex)).get("index.mode"), is(indexMode)); + private void assertDataStreamBackingIndexMode(final String indexMode, int backingIndex, final String dataStreamName) + throws IOException { + assertThat(getSettings(client, getWriteBackingIndex(client, dataStreamName, backingIndex)).get("index.mode"), is(indexMode)); } private String document( @@ -364,8 +587,8 @@ private static void putTemplate(final RestClient client, final String templateNa assertOK(client.performRequest(request)); } - private static void indexDocument(final RestClient client, String dataStreamName, String doc) throws IOException { - final Request request = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true"); + private static void indexDocument(final RestClient client, String indexOrtDataStream, String doc) throws IOException { + final Request request = new Request("POST", "/" + indexOrtDataStream + "/_doc?refresh=true"); request.setJsonEntity(doc); final Response response = client.performRequest(request); assertOK(response); @@ -393,4 +616,46 @@ private static Map getSettings(final RestClient client, final St final Request request = new Request("GET", "/" + indexName + "/_settings?flat_settings"); return ((Map>) entityAsMap(client.performRequest(request)).get(indexName)).get("settings"); } + + private static void createSnapshot( + RestClient restClient, + String repository, + String snapshot, + boolean waitForCompletion, + final String... indices + ) throws IOException { + final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot); + request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion)); + request.setJsonEntity(""" + "indices": $indices + """.replace("$indices", String.join(", ", indices))); + + final Response response = restClient.performRequest(request); + assertThat( + "Failed to create snapshot [" + snapshot + "] in repository [" + repository + "]: " + response, + response.getStatusLine().getStatusCode(), + equalTo(RestStatus.OK.getStatus()) + ); + } + + private static void restoreSnapshot( + final RestClient client, + final String repository, + String snapshot, + boolean waitForCompletion, + final String... indices + ) throws IOException { + final Request request = new Request(HttpPost.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot + "/_restore"); + request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion)); + request.setJsonEntity(""" + "indices": $indices + """.replace("$indices", String.join(", ", indices))); + + final Response response = client.performRequest(request); + assertThat( + "Failed to restore snapshot [" + snapshot + "] from repository [" + repository + "]: " + response, + response.getStatusLine().getStatusCode(), + equalTo(RestStatus.OK.getStatus()) + ); + } }