Skip to content

Commit cf6272d

Browse files
authored
[9.1] Fix: Downsample returns appropriate error when target index gets deleted unexpectedly. (elastic#138228) (elastic#138304)
1 parent 40f474f commit cf6272d

File tree

3 files changed

+126
-23
lines changed

3 files changed

+126
-23
lines changed

docs/changelog/138228.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138228
2+
summary: "Fix: Downsample returns appropriate error when target index gets deleted\
3+
\ unexpectedly."
4+
area: Downsampling
5+
type: bug
6+
issues: []

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
133133

134134
/**
135135
* This is the cluster state task executor for cluster state update actions.
136+
* Visible for testing
136137
*/
137-
private static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
138-
new SimpleBatchedExecutor<>() {
139-
@Override
140-
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState)
141-
throws Exception {
142-
return Tuple.tuple(task.execute(clusterState), null);
143-
}
138+
static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
139+
@Override
140+
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
141+
return Tuple.tuple(task.execute(clusterState), null);
142+
}
144143

145-
@Override
146-
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
147-
task.listener.onResponse(AcknowledgedResponse.TRUE);
148-
}
149-
};
144+
@Override
145+
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
146+
task.listener.onResponse(AcknowledgedResponse.TRUE);
147+
}
148+
};
150149

151150
@Inject
152151
public TransportDownsampleAction(
@@ -1110,7 +1109,6 @@ public void onResponse(final AcknowledgedResponse response) {
11101109

11111110
@Override
11121111
public void onFailure(Exception e) {
1113-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
11141112
listener.onFailure(e);
11151113
}
11161114

@@ -1165,6 +1163,11 @@ public ClusterState execute(ClusterState currentState) {
11651163
logger.debug("Updating downsample index status for [{}]", downsampleIndexName);
11661164
final ProjectMetadata project = currentState.metadata().getProject(projectId);
11671165
final IndexMetadata downsampleIndex = project.index(downsampleIndexName);
1166+
if (downsampleIndex == null) {
1167+
throw new IllegalStateException(
1168+
"Failed to update downsample status because [" + downsampleIndexName + "] does not exist"
1169+
);
1170+
}
11681171
if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
11691172
return currentState;
11701173
}
@@ -1186,7 +1189,6 @@ public ClusterState execute(ClusterState currentState) {
11861189

11871190
@Override
11881191
public void onFailure(Exception e) {
1189-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
11901192
actionListener.onFailure(e);
11911193
}
11921194

@@ -1260,8 +1262,8 @@ public void onResponse(final AcknowledgedResponse response) {
12601262

12611263
@Override
12621264
public void onFailure(Exception e) {
1263-
recordSuccessMetrics(startTime);
1264-
logger.debug("Downsampling measured successfully", e);
1265+
recordFailureMetrics(startTime);
1266+
logger.debug("Downsampling failure measured successfully", e);
12651267
this.actionListener.onFailure(e);
12661268
}
12671269

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.cluster.project.ProjectResolver;
3030
import org.elasticsearch.cluster.routing.allocation.DataTier;
3131
import org.elasticsearch.cluster.service.ClusterService;
32+
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
3233
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
3334
import org.elasticsearch.common.compress.CompressedXContent;
3435
import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -56,6 +57,7 @@
5657
import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus;
5758
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
5859
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
60+
import org.hamcrest.Matchers;
5961
import org.junit.After;
6062
import org.junit.Before;
6163
import org.mockito.Answers;
@@ -188,11 +190,12 @@ public void setUp() throws Exception {
188190
};
189191
doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any());
190192
doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any());
193+
191194
doAnswer(invocation -> {
192195
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
193-
updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE);
196+
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
194197
return null;
195-
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
198+
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
196199
when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService);
197200
MappedFieldType timestampFieldMock = mock(MappedFieldType.class);
198201
when(timestampFieldMock.meta()).thenReturn(Map.of());
@@ -236,11 +239,6 @@ private void downsample(String mapping) throws IOException {
236239

237240
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
238241

239-
doAnswer(invocation -> {
240-
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
241-
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
242-
return null;
243-
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
244242
Answer<Void> mockPersistentTask = invocation -> {
245243
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
246244
PersistentTasksCustomMetadata.PersistentTask<?> task = mock(PersistentTasksCustomMetadata.PersistentTask.class);
@@ -260,6 +258,7 @@ private void downsample(String mapping) throws IOException {
260258
listener.onResponse(AcknowledgedResponse.TRUE);
261259
return null;
262260
}).when(indicesAdminClient).updateSettings(any(), any());
261+
assertSuccessfulUpdateDownsampleStatus(clusterState);
263262

264263
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
265264
action.masterOperation(
@@ -298,6 +297,7 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() {
298297
.build();
299298

300299
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
300+
assertSuccessfulUpdateDownsampleStatus(clusterState);
301301

302302
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
303303
action.masterOperation(
@@ -359,6 +359,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
359359
)
360360
.build()
361361
);
362+
assertSuccessfulUpdateDownsampleStatus(clusterService.state());
362363

363364
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
364365
action.masterOperation(
@@ -377,6 +378,83 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
377378
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
378379
}
379380

381+
public void testDownsamplingWhenTargetIndexGetsDeleted() throws IOException {
382+
String mapping = switch (randomIntBetween(0, 2)) {
383+
case 0 -> NO_METADATA_MAPPING;
384+
case 1 -> OTHER_METADATA_MAPPING;
385+
default -> FORCE_MERGE_ENABLED_MAPPING;
386+
};
387+
mockGetMapping(mapping);
388+
mockMergedMapping(mapping);
389+
var projectMetadata = ProjectMetadata.builder(projectId)
390+
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
391+
.build();
392+
393+
var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
394+
.putProjectMetadata(projectMetadata)
395+
.blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
396+
.build();
397+
398+
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
399+
400+
Answer<Void> mockPersistentTask = invocation -> {
401+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
402+
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
403+
when(task1.getId()).thenReturn(randomAlphaOfLength(10));
404+
DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState(
405+
DownsampleShardIndexerStatus.COMPLETED,
406+
null
407+
);
408+
when(task1.getState()).thenReturn(runningTaskState);
409+
listener.onResponse(task1);
410+
return null;
411+
};
412+
doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any());
413+
doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any());
414+
doAnswer(invocation -> {
415+
var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class);
416+
listener.onResponse(AcknowledgedResponse.TRUE);
417+
return null;
418+
}).when(indicesAdminClient).updateSettings(any(), any());
419+
420+
doAnswer(invocation -> {
421+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
422+
ClusterStateTaskExecutorUtils.executeHandlingResults(
423+
clusterState,
424+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
425+
List.of(updateTask),
426+
task1 -> {},
427+
TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure
428+
);
429+
return null;
430+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
431+
IllegalStateException error = safeAwaitFailure(
432+
IllegalStateException.class,
433+
AcknowledgedResponse.class,
434+
listener -> action.masterOperation(
435+
task,
436+
new DownsampleAction.Request(
437+
ESTestCase.TEST_REQUEST_TIMEOUT,
438+
sourceIndex,
439+
targetIndex,
440+
TimeValue.ONE_HOUR,
441+
new DownsampleConfig(new DateHistogramInterval("5m"))
442+
),
443+
clusterState,
444+
listener
445+
)
446+
);
447+
assertThat(
448+
error.getMessage(),
449+
Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist")
450+
);
451+
verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
452+
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED));
453+
verify(indicesAdminClient).refresh(any(), any());
454+
verify(indicesAdminClient, never()).flush(any(), any());
455+
verify(indicesAdminClient, never()).forceMerge(any(), any());
456+
}
457+
380458
private void mockGetMapping(String mapping) {
381459
doAnswer(invocation -> {
382460
@SuppressWarnings("unchecked")
@@ -532,4 +610,21 @@ public void testGetSupportedMetrics() {
532610
assertThat(supported.defaultMetric(), is("max"));
533611
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
534612
}
613+
614+
private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) {
615+
var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId))
616+
.put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards))
617+
.build();
618+
619+
var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build();
620+
doAnswer(invocation -> {
621+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
622+
ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
623+
updatedClusterState,
624+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
625+
List.of(updateTask)
626+
);
627+
return null;
628+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
629+
}
535630
}

0 commit comments

Comments
 (0)