Skip to content

Commit 484a950

Browse files
Refresh source index before reindexing data stream index (#120752)
Add step to the ReindexDatastreamIndexAction which refreshes the source index after setting it to read-only but before calling reindex. Without doing a refresh it is possible for docs from the source index to be missing from the destination index. This happens because the docs arrived before the source index is set to read-only, but because the index hasn't refreshed, the reindex action cannot see these updates.
1 parent 39603ec commit 484a950

File tree

4 files changed

+29
-22
lines changed

4 files changed

+29
-22
lines changed

docs/changelog/120752.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 120752
2+
summary: Refresh source index before reindexing data stream index
3+
area: Data streams
4+
type: bug
5+
issues:
6+
- 120314

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,6 @@ tests:
205205
- class: org.elasticsearch.oldrepos.OldRepositoryAccessIT
206206
method: testOldSourceOnlyRepoAccess
207207
issue: https://github.com/elastic/elasticsearch/issues/120080
208-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
209-
method: testTsdbStartEndSet
210-
issue: https://github.com/elastic/elasticsearch/issues/120314
211208
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
212209
method: test {p0=snapshot/10_basic/Failed to snapshot indices with synthetic source}
213210
issue: https://github.com/elastic/elasticsearch/issues/120332

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public void testDestIndexDeletedIfExists() throws Exception {
8787
var destIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
8888
indicesAdmin().create(new CreateIndexRequest(destIndex)).actionGet();
8989
indexDocs(destIndex, 10);
90+
indicesAdmin().refresh(new RefreshRequest(destIndex)).actionGet();
9091
assertHitCount(prepareSearch(destIndex).setSize(0), 10);
9192

9293
// call reindex
@@ -195,19 +196,7 @@ public void testMappingsAddedToDestIndex() throws Exception {
195196
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());
196197

197198
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
198-
String mapping = """
199-
{
200-
"_doc":{
201-
"dynamic":"strict",
202-
"properties":{
203-
"foo1":{
204-
"type":"text"
205-
}
206-
}
207-
}
208-
}
209-
""";
210-
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(mapping)).actionGet();
199+
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(MAPPING)).actionGet();
211200

212201
// call reindex
213202
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
@@ -337,6 +326,12 @@ public void testSettingsAndMappingsFromTemplate() throws IOException {
337326
var sourceIndex = "logs-" + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
338327
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet();
339328

329+
{
330+
var indexRequest = new IndexRequest(sourceIndex);
331+
indexRequest.source("{ \"foo1\": \"cheese\" }", XContentType.JSON);
332+
client().index(indexRequest).actionGet();
333+
}
334+
340335
// call reindex
341336
var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
342337
.actionGet()
@@ -359,6 +354,9 @@ public void testSettingsAndMappingsFromTemplate() throws IOException {
359354
// sanity check specific value from dest mapping
360355
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
361356
}
357+
358+
// verify doc was successfully added
359+
assertHitCount(prepareSearch(destIndex).setSize(0), 1);
362360
}
363361

364362
private static final String TSDB_MAPPING = """
@@ -455,12 +453,10 @@ public void testTsdbStartEndSet() throws Exception {
455453

456454
assertEquals(startTime, destStart);
457455
assertEquals(endTime, destEnd);
458-
}
459456

460-
// TODO more logsdb/tsdb specific tests
461-
// TODO more data stream specific tests (how are data streams indices are different from regular indices?)
462-
// TODO check other IndexMetadata fields that need to be fixed after the fact
463-
// TODO what happens if don't have necessary perms for a given index?
457+
// verify doc was successfully added
458+
assertHitCount(prepareSearch(destIndex).setSize(0), 1);
459+
}
464460

465461
private static void cleanupMetadataBlocks(String index) {
466462
var settings = Settings.builder()
@@ -483,7 +479,6 @@ private static void indexDocs(String index, int numDocs) {
483479
}
484480
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
485481
assertThat(bulkResponse.getItems().length, equalTo(numDocs));
486-
indicesAdmin().refresh(new RefreshRequest(index)).actionGet();
487482
}
488483

489484
private static String formatInstant(Instant instant) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.support.HandledTransportAction;
2424
import org.elasticsearch.action.support.IndicesOptions;
2525
import org.elasticsearch.action.support.SubscribableListener;
26+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
2627
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2728
import org.elasticsearch.client.internal.Client;
2829
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -140,6 +141,7 @@ protected void doExecute(
140141
}
141142

142143
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
144+
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
143145
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
144146
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
145147
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
@@ -175,6 +177,13 @@ public void onFailure(Exception e) {
175177
}, parentTaskId);
176178
}
177179

180+
private void refresh(String sourceIndexName, ActionListener<BroadcastResponse> listener, TaskId parentTaskId) {
181+
logger.debug("Refreshing source index [{}]", sourceIndexName);
182+
var refreshRequest = new RefreshRequest(sourceIndexName);
183+
refreshRequest.setParentTask(parentTaskId);
184+
client.execute(RefreshAction.INSTANCE, refreshRequest, listener);
185+
}
186+
178187
private void deleteDestIfExists(String destIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
179188
logger.debug("Attempting to delete index [{}]", destIndexName);
180189
var deleteIndexRequest = new DeleteIndexRequest(destIndexName).indicesOptions(IGNORE_MISSING_OPTIONS)

0 commit comments

Comments
 (0)