Skip to content

Commit 3907030

Browse files
authored
[8.19] Fix: Downsample returns appropriate error when target index gets deleted unexpectedly. (#138228) (#138344)
1 parent 334fff7 commit 3907030

File tree

3 files changed

+123
-24
lines changed

3 files changed

+123
-24
lines changed

docs/changelog/138228.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138228
2+
summary: "Fix: Downsample returns appropriate error when target index gets deleted\
3+
\ unexpectedly."
4+
area: Downsampling
5+
type: bug
6+
issues: []

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -130,20 +130,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
130130

131131
/**
132132
* This is the cluster state task executor for cluster state update actions.
133+
* Visible for testing
133134
*/
134-
private static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
135-
new SimpleBatchedExecutor<>() {
136-
@Override
137-
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState)
138-
throws Exception {
139-
return Tuple.tuple(task.execute(clusterState), null);
140-
}
135+
static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
136+
@Override
137+
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
138+
return Tuple.tuple(task.execute(clusterState), null);
139+
}
141140

142-
@Override
143-
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
144-
task.listener.onResponse(AcknowledgedResponse.TRUE);
145-
}
146-
};
141+
@Override
142+
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
143+
task.listener.onResponse(AcknowledgedResponse.TRUE);
144+
}
145+
};
147146

148147
@Inject
149148
public TransportDownsampleAction(
@@ -1086,7 +1085,6 @@ public void onResponse(final AcknowledgedResponse response) {
10861085

10871086
@Override
10881087
public void onFailure(Exception e) {
1089-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
10901088
listener.onFailure(e);
10911089
}
10921090

@@ -1137,7 +1135,12 @@ public void onResponse(final BroadcastResponse response) {
11371135
public ClusterState execute(ClusterState currentState) {
11381136
logger.debug("Updating downsample index status for [{}]", downsampleIndexName);
11391137
final Metadata metadata = currentState.metadata();
1140-
final IndexMetadata downsampleIndex = metadata.index(metadata.index(downsampleIndexName).getIndex());
1138+
final IndexMetadata downsampleIndex = metadata.index(downsampleIndexName);
1139+
if (downsampleIndex == null) {
1140+
throw new IllegalStateException(
1141+
"Failed to update downsample status because [" + downsampleIndexName + "] does not exist"
1142+
);
1143+
}
11411144
if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
11421145
return currentState;
11431146
}
@@ -1159,7 +1162,6 @@ public ClusterState execute(ClusterState currentState) {
11591162

11601163
@Override
11611164
public void onFailure(Exception e) {
1162-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
11631165
actionListener.onFailure(e);
11641166
}
11651167

@@ -1233,8 +1235,8 @@ public void onResponse(final AcknowledgedResponse response) {
12331235

12341236
@Override
12351237
public void onFailure(Exception e) {
1236-
recordSuccessMetrics(startTime);
1237-
logger.debug("Downsampling measured successfully", e);
1238+
recordFailureMetrics(startTime);
1239+
logger.debug("Downsampling failure measured successfully", e);
12381240
this.actionListener.onFailure(e);
12391241
}
12401242

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java

Lines changed: 98 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.node.DiscoveryNode;
2828
import org.elasticsearch.cluster.routing.allocation.DataTier;
2929
import org.elasticsearch.cluster.service.ClusterService;
30+
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
3031
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
3132
import org.elasticsearch.common.compress.CompressedXContent;
3233
import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -54,6 +55,7 @@
5455
import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus;
5556
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
5657
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
58+
import org.hamcrest.Matchers;
5759
import org.junit.After;
5860
import org.junit.Before;
5961
import org.mockito.Answers;
@@ -169,11 +171,13 @@ public void setUp() throws Exception {
169171
replicaShards = randomIntBetween(0, 3);
170172
task = new Task(1, "type", "action", "description", null, null);
171173

174+
// Initialise mocks for thread pool and cluster service
172175
var threadContext = new ThreadContext(Settings.EMPTY);
173176
when(threadPool.getThreadContext()).thenReturn(threadContext);
174177
when(clusterService.localNode()).thenReturn(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "node_name"));
175178
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
176179

180+
// Mock refresh & force merge requests
177181
Answer<Void> mockBroadcastResponse = invocation -> {
178182
@SuppressWarnings("unchecked")
179183
var listener = (ActionListener<BroadcastResponse>) invocation.getArgument(1, ActionListener.class);
@@ -184,9 +188,9 @@ public void setUp() throws Exception {
184188
doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any());
185189
doAnswer(invocation -> {
186190
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
187-
updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE);
191+
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
188192
return null;
189-
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
193+
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
190194
when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService);
191195
MappedFieldType timestampFieldMock = mock(MappedFieldType.class);
192196
when(timestampFieldMock.meta()).thenReturn(Map.of());
@@ -224,11 +228,6 @@ private void downsample(String mapping) throws IOException {
224228
.blocks(ClusterBlocks.builder().addIndexBlock(sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
225229
.build();
226230

227-
doAnswer(invocation -> {
228-
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
229-
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
230-
return null;
231-
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
232231
Function<Integer, Answer<Void>> mockPersistentTask = i -> invocation -> {
233232
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(i);
234233
PersistentTasksCustomMetadata.PersistentTask<?> task = mock(PersistentTasksCustomMetadata.PersistentTask.class);
@@ -248,6 +247,7 @@ private void downsample(String mapping) throws IOException {
248247
listener.onResponse(AcknowledgedResponse.TRUE);
249248
return null;
250249
}).when(indicesAdminClient).updateSettings(any(), any());
250+
assertSuccessfulUpdateDownsampleStatus(clusterState);
251251

252252
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
253253
action.masterOperation(
@@ -283,6 +283,7 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() {
283283
)
284284
.blocks(ClusterBlocks.builder().addIndexBlock(sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
285285
.build();
286+
assertSuccessfulUpdateDownsampleStatus(clusterState);
286287

287288
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
288289
action.masterOperation(
@@ -338,6 +339,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
338339
)
339340
.build()
340341
);
342+
assertSuccessfulUpdateDownsampleStatus(clusterService.state());
341343

342344
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
343345
action.masterOperation(
@@ -356,6 +358,78 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
356358
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
357359
}
358360

361+
public void testDownsamplingWhenTargetIndexGetsDeleted() throws IOException {
362+
String mapping = switch (randomIntBetween(0, 2)) {
363+
case 0 -> NO_METADATA_MAPPING;
364+
case 1 -> OTHER_METADATA_MAPPING;
365+
default -> FORCE_MERGE_ENABLED_MAPPING;
366+
};
367+
mockGetMapping(mapping);
368+
mockMergedMapping(mapping);
369+
370+
var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
371+
.metadata(Metadata.builder().put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)))
372+
.blocks(ClusterBlocks.builder().addIndexBlock(sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
373+
.build();
374+
375+
Function<Integer, Answer<Void>> mockPersistentTask = i -> invocation -> {
376+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(i);
377+
PersistentTasksCustomMetadata.PersistentTask<?> task = mock(PersistentTasksCustomMetadata.PersistentTask.class);
378+
when(task.getId()).thenReturn(randomAlphaOfLength(10));
379+
DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState(
380+
DownsampleShardIndexerStatus.COMPLETED,
381+
null
382+
);
383+
when(task.getState()).thenReturn(runningTaskState);
384+
listener.onResponse(task);
385+
return null;
386+
};
387+
doAnswer(mockPersistentTask.apply(4)).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any());
388+
doAnswer(mockPersistentTask.apply(3)).when(persistentTaskService).waitForPersistentTaskCondition(anyString(), any(), any(), any());
389+
doAnswer(invocation -> {
390+
var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class);
391+
listener.onResponse(AcknowledgedResponse.TRUE);
392+
return null;
393+
}).when(indicesAdminClient).updateSettings(any(), any());
394+
395+
doAnswer(invocation -> {
396+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
397+
ClusterStateTaskExecutorUtils.executeHandlingResults(
398+
clusterState,
399+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
400+
List.of(updateTask),
401+
task1 -> {},
402+
TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure
403+
);
404+
return null;
405+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
406+
IllegalStateException error = safeAwaitFailure(
407+
IllegalStateException.class,
408+
AcknowledgedResponse.class,
409+
listener -> action.masterOperation(
410+
task,
411+
new DownsampleAction.Request(
412+
ESTestCase.TEST_REQUEST_TIMEOUT,
413+
sourceIndex,
414+
targetIndex,
415+
TimeValue.timeValueHours(1),
416+
new DownsampleConfig(new DateHistogramInterval("5m"))
417+
),
418+
clusterState,
419+
listener
420+
)
421+
);
422+
assertThat(
423+
error.getMessage(),
424+
Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist")
425+
);
426+
verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
427+
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED));
428+
verify(indicesAdminClient).refresh(any(), any());
429+
verify(indicesAdminClient, never()).flush(any(), any());
430+
verify(indicesAdminClient, never()).forceMerge(any(), any());
431+
}
432+
359433
private void mockGetMapping(String mapping) {
360434
doAnswer(invocation -> {
361435
@SuppressWarnings("unchecked")
@@ -511,4 +585,21 @@ public void testGetSupportedMetrics() {
511585
assertThat(supported.defaultMetric(), is("max"));
512586
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
513587
}
588+
589+
private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) {
590+
var metadata = Metadata.builder(clusterState.metadata())
591+
.put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards))
592+
.build();
593+
594+
var updatedClusterState = ClusterState.builder(clusterState).metadata(metadata).build();
595+
doAnswer(invocation -> {
596+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
597+
ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
598+
updatedClusterState,
599+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
600+
List.of(updateTask)
601+
);
602+
return null;
603+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
604+
}
514605
}

0 commit comments

Comments
 (0)