Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ stream or index.
==================================================

Source-only snapshots are only supported if the `_source` field is enabled and no source-filtering is applied.
When you restore a source-only snapshot:
As a result, indices adopting synthetic source cannot be restored. When you restore a source-only snapshot:

* The restored index is read-only and can only serve `match_all` search or scroll requests to enable reindexing.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.ClassRule;

Expand All @@ -40,6 +44,7 @@ public class LogsDataStreamRestIT extends ESRestTestCase {
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.build();

@Override
Expand Down Expand Up @@ -101,7 +106,7 @@ private static void waitForLogs(RestClient client) throws Exception {
}
}""";

private static final String STANDARD_TEMPLATE = """
private static final String LOGS_STANDARD_INDEX_MODE = """
{
"index_patterns": [ "logs-*-*" ],
"data_stream": {},
Expand Down Expand Up @@ -134,6 +139,39 @@ private static void waitForLogs(RestClient client) throws Exception {
}
}""";

private static final String STANDARD_TEMPLATE = """
{
"index_patterns": [ "standard-*-*" ],
"data_stream": {},
"priority": 201,
"template": {
"settings": {
"index": {
"mode": "standard"
}
},
"mappings": {
"properties": {
"@timestamp" : {
"type": "date"
},
"host.name": {
"type": "keyword"
},
"pid": {
"type": "long"
},
"method": {
"type": "keyword"
},
"ip_address": {
"type": "ip"
}
}
}
}
}""";

private static final String TIME_SERIES_TEMPLATE = """
{
"index_patterns": [ "logs-*-*" ],
Expand Down Expand Up @@ -202,7 +240,7 @@ public void testLogsIndexing() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("logsdb", 0);
assertDataStreamBackingIndexMode("logsdb", 0, DATA_STREAM_NAME);
rolloverDataStream(client, DATA_STREAM_NAME);
indexDocument(
client,
Expand All @@ -217,7 +255,7 @@ public void testLogsIndexing() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("logsdb", 1);
assertDataStreamBackingIndexMode("logsdb", 1, DATA_STREAM_NAME);
}

public void testLogsStandardIndexModeSwitch() throws IOException {
Expand All @@ -236,9 +274,9 @@ public void testLogsStandardIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("logsdb", 0);
assertDataStreamBackingIndexMode("logsdb", 0, DATA_STREAM_NAME);

putTemplate(client, "custom-template", STANDARD_TEMPLATE);
putTemplate(client, "custom-template", LOGS_STANDARD_INDEX_MODE);
rolloverDataStream(client, DATA_STREAM_NAME);
indexDocument(
client,
Expand All @@ -253,7 +291,7 @@ public void testLogsStandardIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("standard", 1);
assertDataStreamBackingIndexMode("standard", 1, DATA_STREAM_NAME);

putTemplate(client, "custom-template", LOGS_TEMPLATE);
rolloverDataStream(client, DATA_STREAM_NAME);
Expand All @@ -270,7 +308,7 @@ public void testLogsStandardIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("logsdb", 2);
assertDataStreamBackingIndexMode("logsdb", 2, DATA_STREAM_NAME);
}

public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
Expand All @@ -289,7 +327,7 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("logsdb", 0);
assertDataStreamBackingIndexMode("logsdb", 0, DATA_STREAM_NAME);

putTemplate(client, "custom-template", TIME_SERIES_TEMPLATE);
rolloverDataStream(client, DATA_STREAM_NAME);
Expand All @@ -306,7 +344,7 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("time_series", 1);
assertDataStreamBackingIndexMode("time_series", 1, DATA_STREAM_NAME);

putTemplate(client, "custom-template", LOGS_TEMPLATE);
rolloverDataStream(client, DATA_STREAM_NAME);
Expand All @@ -323,11 +361,199 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("logsdb", 2);
assertDataStreamBackingIndexMode("logsdb", 2, DATA_STREAM_NAME);
}

public void testStandardToLogsDBReindex() throws IOException {
// LogsDB data stream
putTemplate(client, "logs-template", LOGS_TEMPLATE);
createDataStream(client, "logs-apache-kafka");

// Standard data stream
putTemplate(client, "standard-template", STANDARD_TEMPLATE);
createDataStream(client, "standard-apache-kafka");

// Index some documents in LogsDB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the test is testStandardToLogsDBReindex but the test is doing the opposite - it reindexes form logsdb to standard.

for (int i = 0; i < 10; i++) {
indexDocument(
client,
"logs-apache-kafka",
document(
Instant.now().plusSeconds(10),
randomAlphaOfLength(10),
randomNonNegativeLong(),
randomFrom("PUT", "POST", "GET"),
randomAlphaOfLength(64),
randomIp(randomBoolean()),
randomLongBetween(1_000_000L, 2_000_000L)
)
);
}
assertDataStreamBackingIndexMode("logsdb", 0, "logs-apache-kafka");
assertThat(entityAsMap(client.performRequest(new Request("POST", "/logs-apache-kafka/_count"))).get("count"), Matchers.equalTo(10));

// Reindex LogsDB data stream into a standard data stream
final Request reindexRequest = new Request("POST", "/_reindex?refresh=true");
reindexRequest.setJsonEntity("""
{
"source": {
"index": "logs-apache-kafka"
},
"dest": {
"index": "standard-apache-kafka",
"op_type": "create"
}
}
""");
assertOK(client.performRequest(reindexRequest));
assertDataStreamBackingIndexMode("standard", 0, "standard-apache-kafka");
assertThat(
entityAsMap(client.performRequest(new Request("POST", "/standard-apache-kafka/_count"))).get("count"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check that documents look correct?

Copy link
Contributor Author

@salvatore-campagna salvatore-campagna Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you exactly mean when saying "look correct"? I think it is not up to thins kind of test including that they match.

Copy link
Contributor

@lkts lkts Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is not an ideal place to test reindex operation itself but AFAIK there are currently no tests for this. Do we have an item for that? I was thinking that such a simple document should be identical even with synthetic _source and therefore we could get at least an initial test for that behavior.

Matchers.equalTo(10)
);
}

public void testLogsDBToStandardReindex() throws IOException {
// LogsDB data stream
putTemplate(client, "logs-template", LOGS_TEMPLATE);
createDataStream(client, "logs-apache-kafka");

// Standard data stream
putTemplate(client, "standard-template", STANDARD_TEMPLATE);
createDataStream(client, "standard-apache-kafka");

// Index some documents in LogsDB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in standard

for (int i = 0; i < 10; i++) {
indexDocument(
client,
"standard-apache-kafka",
document(
Instant.now().plusSeconds(10),
randomAlphaOfLength(10),
randomNonNegativeLong(),
randomFrom("PUT", "POST", "GET"),
randomAlphaOfLength(64),
randomIp(randomBoolean()),
randomLongBetween(1_000_000L, 2_000_000L)
)
);
}
assertDataStreamBackingIndexMode("standard", 0, "standard-apache-kafka");
assertThat(
entityAsMap(client.performRequest(new Request("POST", "/standard-apache-kafka/_count"))).get("count"),
Matchers.equalTo(10)
);

// Reindex LogsDB data stream into a standard data stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

standard into logsdb

final Request reindexRequest = new Request("POST", "/_reindex?refresh=true");
reindexRequest.setJsonEntity("""
{
"source": {
"index": "standard-apache-kafka"
},
"dest": {
"index": "logs-apache-kafka",
"op_type": "create"
}
}
""");
assertOK(client.performRequest(reindexRequest));
assertDataStreamBackingIndexMode("logsdb", 0, "logs-apache-kafka");
assertThat(entityAsMap(client.performRequest(new Request("POST", "/logs-apache-kafka/_count"))).get("count"), Matchers.equalTo(10));
}

public void testLogsDBSnapshotCreateRestoreMount() throws IOException {
final String repository = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
registerRepository(repository, FsRepository.TYPE, Settings.builder().put("location", randomAlphaOfLength(6)));

final String index = randomAlphaOfLength(12).toLowerCase(Locale.ROOT);
createIndex(client, index, Settings.builder().put("index.mode", IndexMode.LOGSDB.getName()).build());

for (int i = 0; i < 10; i++) {
indexDocument(
client,
index,
document(
Instant.now().plusSeconds(10),
randomAlphaOfLength(10),
randomNonNegativeLong(),
randomFrom("PUT", "POST", "GET"),
randomAlphaOfLength(64),
randomIp(randomBoolean()),
randomLongBetween(1_000_000L, 2_000_000L)
)
);
}

final String snapshot = randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
deleteSnapshot(repository, snapshot, true);
createSnapshot(repository, snapshot, true);
deleteIndex(index);
restoreSnapshot(repository, snapshot, true);

final String restoreIndex = randomAlphaOfLength(7).toLowerCase(Locale.ROOT);
final Request mountRequest = new Request("POST", "/_snapshot/" + repository + '/' + snapshot + "/_mount");
mountRequest.addParameter("wait_for_completion", "true");
mountRequest.setJsonEntity("{\"index\": \"" + index + "\",\"renamed_index\": \"" + restoreIndex + "\"}");
assertOK(client.performRequest(mountRequest));

assertThat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same - can we check documents?

entityAsMap(client.performRequest(new Request("POST", "/" + restoreIndex + "/_count"))).get("count"),
Matchers.equalTo(10)
);

assertThat(getSettings(client, restoreIndex).get("index.mode"), Matchers.equalTo(IndexMode.LOGSDB.getName()));
}

public void testLogsDBSourceOnlySnapshotCreation() throws IOException {
final String repository = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
registerRepository(repository, FsRepository.TYPE, Settings.builder().put("location", randomAlphaOfLength(6)));
// A source-only repository delegates storage to another repository
final String sourceOnlyRepository = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
registerRepository(
sourceOnlyRepository,
"source",
Settings.builder().put("delegate_type", FsRepository.TYPE).put("location", repository)
);

final String index = randomAlphaOfLength(12).toLowerCase(Locale.ROOT);
createIndex(client, index, Settings.builder().put("index.mode", IndexMode.LOGSDB.getName()).build());

for (int i = 0; i < 10; i++) {
indexDocument(
client,
index,
document(
Instant.now().plusSeconds(10),
randomAlphaOfLength(10),
randomNonNegativeLong(),
randomFrom("PUT", "POST", "GET"),
randomAlphaOfLength(64),
randomIp(randomBoolean()),
randomLongBetween(1_000_000L, 2_000_000L)
)
);
}

final String snapshot = randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
deleteSnapshot(sourceOnlyRepository, snapshot, true);
createSnapshot(sourceOnlyRepository, snapshot, true);
deleteIndex(index);
// Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source
final ResponseException responseException = expectThrows(
ResponseException.class,
() -> restoreSnapshot(sourceOnlyRepository, snapshot, true)
);
assertThat(responseException.getMessage(), Matchers.containsString("wasn't fully snapshotted"));
}

private static void registerRepository(final String repository, final String type, final Settings.Builder settings) throws IOException {
registerRepository(repository, type, false, settings.build());
}

private void assertDataStreamBackingIndexMode(final String indexMode, int backingIndex) throws IOException {
assertThat(getSettings(client, getWriteBackingIndex(client, DATA_STREAM_NAME, backingIndex)).get("index.mode"), is(indexMode));
private void assertDataStreamBackingIndexMode(final String indexMode, int backingIndex, final String dataStreamName)
throws IOException {
assertThat(getSettings(client, getWriteBackingIndex(client, dataStreamName, backingIndex)).get("index.mode"), is(indexMode));
}

private String document(
Expand Down Expand Up @@ -363,8 +589,8 @@ private static void putTemplate(final RestClient client, final String templateNa
assertOK(client.performRequest(request));
}

private static void indexDocument(final RestClient client, String dataStreamName, String doc) throws IOException {
final Request request = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
private static void indexDocument(final RestClient client, String indexOrtDataStream, String doc) throws IOException {
final Request request = new Request("POST", "/" + indexOrtDataStream + "/_doc?refresh=true");
request.setJsonEntity(doc);
final Response response = client.performRequest(request);
assertOK(response);
Expand Down