Skip to content

Commit 2ed9121

Browse files
Refresh source index before reindexing data stream index (elastic#120752) (elastic#120808)
Add step to the ReindexDatastreamIndexAction which refreshes the source index after setting it to read-only but before calling reindex. Without doing a refresh it is possible for docs from the source index to be missing from the destination index. This happens because the docs arrived before the source index is set to read-only, but because the index hasn't refreshed, the reindex action cannot see these updates. (cherry picked from commit 484a950) # Conflicts: # muted-tests.yml
1 parent 50861c1 commit 2ed9121

File tree

4 files changed

+29
-22
lines changed

4 files changed

+29
-22
lines changed

docs/changelog/120752.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 120752
2+
summary: Refresh source index before reindexing data stream index
3+
area: Data streams
4+
type: bug
5+
issues:
6+
- 120314

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,9 +406,6 @@ tests:
406406
- class: org.elasticsearch.search.basic.SearchWithRandomIOExceptionsIT
407407
method: testRandomDirectoryIOExceptions
408408
issue: https://github.com/elastic/elasticsearch/issues/118733
409-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
410-
method: testTsdbStartEndSet
411-
issue: https://github.com/elastic/elasticsearch/issues/120314
412409
- class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT
413410
method: test {p0=search.vectors/41_knn_search_bbq_hnsw/Vector rescoring has same scoring as exact search for kNN section}
414411
issue: https://github.com/elastic/elasticsearch/issues/120441

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public void testDestIndexDeletedIfExists() throws Exception {
8787
var destIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
8888
indicesAdmin().create(new CreateIndexRequest(destIndex)).actionGet();
8989
indexDocs(destIndex, 10);
90+
indicesAdmin().refresh(new RefreshRequest(destIndex)).actionGet();
9091
assertHitCount(prepareSearch(destIndex).setSize(0), 10);
9192

9293
// call reindex
@@ -195,19 +196,7 @@ public void testMappingsAddedToDestIndex() throws Exception {
195196
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());
196197

197198
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
198-
String mapping = """
199-
{
200-
"_doc":{
201-
"dynamic":"strict",
202-
"properties":{
203-
"foo1":{
204-
"type":"text"
205-
}
206-
}
207-
}
208-
}
209-
""";
210-
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(mapping)).actionGet();
199+
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(MAPPING)).actionGet();
211200

212201
// call reindex
213202
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
@@ -336,6 +325,12 @@ public void testSettingsAndMappingsFromTemplate() throws IOException {
336325
var sourceIndex = "logs-" + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
337326
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();
338327

328+
{
329+
var indexRequest = new IndexRequest(sourceIndex);
330+
indexRequest.source("{ \"foo1\": \"cheese\" }", XContentType.JSON);
331+
client().index(indexRequest).actionGet();
332+
}
333+
339334
// call reindex
340335
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
341336
.actionGet()
@@ -357,6 +352,9 @@ public void testSettingsAndMappingsFromTemplate() throws IOException {
357352
// sanity check specific value from dest mapping
358353
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
359354
}
355+
356+
// verify doc was successfully added
357+
assertHitCount(prepareSearch(destIndex).setSize(0), 1);
360358
}
361359

362360
private static final String TSDB_MAPPING = """
@@ -450,12 +448,10 @@ public void testTsdbStartEndSet() throws Exception {
450448

451449
assertEquals(startTime, destStart);
452450
assertEquals(endTime, destEnd);
453-
}
454451

455-
// TODO more logsdb/tsdb specific tests
456-
// TODO more data stream specific tests (how are data streams indices are different from regular indices?)
457-
// TODO check other IndexMetadata fields that need to be fixed after the fact
458-
// TODO what happens if don't have necessary perms for a given index?
452+
// verify doc was successfully added
453+
assertHitCount(prepareSearch(destIndex).setSize(0), 1);
454+
}
459455

460456
private static void cleanupMetadataBlocks(String index) {
461457
var settings = Settings.builder()
@@ -478,7 +474,6 @@ private static void indexDocs(String index, int numDocs) {
478474
}
479475
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
480476
assertThat(bulkResponse.getItems().length, equalTo(numDocs));
481-
indicesAdmin().refresh(new RefreshRequest(index)).actionGet();
482477
}
483478

484479
private static String formatInstant(Instant instant) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.support.HandledTransportAction;
2424
import org.elasticsearch.action.support.IndicesOptions;
2525
import org.elasticsearch.action.support.SubscribableListener;
26+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
2627
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2728
import org.elasticsearch.client.internal.Client;
2829
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -140,6 +141,7 @@ protected void doExecute(
140141
}
141142

142143
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
144+
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
143145
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
144146
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
145147
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
@@ -175,6 +177,13 @@ public void onFailure(Exception e) {
175177
}, parentTaskId);
176178
}
177179

180+
private void refresh(String sourceIndexName, ActionListener<BroadcastResponse> listener, TaskId parentTaskId) {
181+
logger.debug("Refreshing source index [{}]", sourceIndexName);
182+
var refreshRequest = new RefreshRequest(sourceIndexName);
183+
refreshRequest.setParentTask(parentTaskId);
184+
client.execute(RefreshAction.INSTANCE, refreshRequest, listener);
185+
}
186+
178187
private void deleteDestIfExists(String destIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
179188
logger.debug("Attempting to delete index [{}]", destIndexName);
180189
var deleteIndexRequest = new DeleteIndexRequest(destIndexName).indicesOptions(IGNORE_MISSING_OPTIONS)

0 commit comments

Comments
 (0)