Skip to content

Commit 81508e8

Browse files
committed
Fix: Downsample returns appropriate error when target index gets deleted unexpectedly. (#138228)
(cherry picked from commit 14f5892) # Conflicts: # x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java
1 parent c1a46d5 commit 81508e8

File tree

3 files changed

+128
-23
lines changed

3 files changed

+128
-23
lines changed

docs/changelog/138228.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138228
2+
summary: "Fix: Downsample returns appropriate error when target index gets deleted\
3+
\ unexpectedly."
4+
area: Downsampling
5+
type: bug
6+
issues: []

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
133133

134134
/**
135135
* This is the cluster state task executor for cluster state update actions.
136+
* Visible for testing
136137
*/
137-
private static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
138-
new SimpleBatchedExecutor<>() {
139-
@Override
140-
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState)
141-
throws Exception {
142-
return Tuple.tuple(task.execute(clusterState), null);
143-
}
138+
static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
139+
@Override
140+
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
141+
return Tuple.tuple(task.execute(clusterState), null);
142+
}
144143

145-
@Override
146-
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
147-
task.listener.onResponse(AcknowledgedResponse.TRUE);
148-
}
149-
};
144+
@Override
145+
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
146+
task.listener.onResponse(AcknowledgedResponse.TRUE);
147+
}
148+
};
150149

151150
@Inject
152151
public TransportDownsampleAction(
@@ -1110,7 +1109,6 @@ public void onResponse(final AcknowledgedResponse response) {
11101109

11111110
@Override
11121111
public void onFailure(Exception e) {
1113-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
11141112
listener.onFailure(e);
11151113
}
11161114

@@ -1165,6 +1163,11 @@ public ClusterState execute(ClusterState currentState) {
11651163
logger.debug("Updating downsample index status for [{}]", downsampleIndexName);
11661164
final ProjectMetadata project = currentState.metadata().getProject(projectId);
11671165
final IndexMetadata downsampleIndex = project.index(downsampleIndexName);
1166+
if (downsampleIndex == null) {
1167+
throw new IllegalStateException(
1168+
"Failed to update downsample status because [" + downsampleIndexName + "] does not exist"
1169+
);
1170+
}
11681171
if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
11691172
return currentState;
11701173
}
@@ -1186,7 +1189,6 @@ public ClusterState execute(ClusterState currentState) {
11861189

11871190
@Override
11881191
public void onFailure(Exception e) {
1189-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
11901192
actionListener.onFailure(e);
11911193
}
11921194

@@ -1260,8 +1262,8 @@ public void onResponse(final AcknowledgedResponse response) {
12601262

12611263
@Override
12621264
public void onFailure(Exception e) {
1263-
recordSuccessMetrics(startTime);
1264-
logger.debug("Downsampling measured successfully", e);
1265+
recordFailureMetrics(startTime);
1266+
logger.debug("Downsampling failure measured successfully", e);
12651267
this.actionListener.onFailure(e);
12661268
}
12671269

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java

Lines changed: 104 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.cluster.project.ProjectResolver;
3030
import org.elasticsearch.cluster.routing.allocation.DataTier;
3131
import org.elasticsearch.cluster.service.ClusterService;
32+
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
3233
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
3334
import org.elasticsearch.common.compress.CompressedXContent;
3435
import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -56,6 +57,7 @@
5657
import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus;
5758
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
5859
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
60+
import org.hamcrest.Matchers;
5961
import org.junit.After;
6062
import org.junit.Before;
6163
import org.mockito.Answers;
@@ -188,11 +190,14 @@ public void setUp() throws Exception {
188190
};
189191
doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any());
190192
doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any());
193+
191194
doAnswer(invocation -> {
192195
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
193-
updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE);
196+
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
194197
return null;
195-
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
198+
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
199+
200+
// Mocks for mapping retrieval & merging
196201
when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService);
197202
MappedFieldType timestampFieldMock = mock(MappedFieldType.class);
198203
when(timestampFieldMock.meta()).thenReturn(Map.of());
@@ -236,11 +241,6 @@ private void downsample(String mapping) throws IOException {
236241

237242
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
238243

239-
doAnswer(invocation -> {
240-
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
241-
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
242-
return null;
243-
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
244244
Answer<Void> mockPersistentTask = invocation -> {
245245
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
246246
PersistentTasksCustomMetadata.PersistentTask<?> task = mock(PersistentTasksCustomMetadata.PersistentTask.class);
@@ -260,6 +260,7 @@ private void downsample(String mapping) throws IOException {
260260
listener.onResponse(AcknowledgedResponse.TRUE);
261261
return null;
262262
}).when(indicesAdminClient).updateSettings(any(), any());
263+
assertSuccessfulUpdateDownsampleStatus(clusterState);
263264

264265
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
265266
action.masterOperation(
@@ -298,6 +299,7 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() {
298299
.build();
299300

300301
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
302+
assertSuccessfulUpdateDownsampleStatus(clusterState);
301303

302304
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
303305
action.masterOperation(
@@ -359,6 +361,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
359361
)
360362
.build()
361363
);
364+
assertSuccessfulUpdateDownsampleStatus(clusterService.state());
362365

363366
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
364367
action.masterOperation(
@@ -377,6 +380,83 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
377380
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
378381
}
379382

383+
public void testDownsamplingWhenTargetIndexGetsDeleted() throws IOException {
384+
String mapping = switch (randomIntBetween(0, 2)) {
385+
case 0 -> NO_METADATA_MAPPING;
386+
case 1 -> OTHER_METADATA_MAPPING;
387+
default -> FORCE_MERGE_ENABLED_MAPPING;
388+
};
389+
mockGetMapping(mapping);
390+
mockMergedMapping(mapping);
391+
var projectMetadata = ProjectMetadata.builder(projectId)
392+
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
393+
.build();
394+
395+
var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
396+
.putProjectMetadata(projectMetadata)
397+
.blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
398+
.build();
399+
400+
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
401+
402+
Answer<Void> mockPersistentTask = invocation -> {
403+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
404+
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
405+
when(task1.getId()).thenReturn(randomAlphaOfLength(10));
406+
DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState(
407+
DownsampleShardIndexerStatus.COMPLETED,
408+
null
409+
);
410+
when(task1.getState()).thenReturn(runningTaskState);
411+
listener.onResponse(task1);
412+
return null;
413+
};
414+
doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any());
415+
doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any());
416+
doAnswer(invocation -> {
417+
var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class);
418+
listener.onResponse(AcknowledgedResponse.TRUE);
419+
return null;
420+
}).when(indicesAdminClient).updateSettings(any(), any());
421+
422+
doAnswer(invocation -> {
423+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
424+
ClusterStateTaskExecutorUtils.executeHandlingResults(
425+
clusterState,
426+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
427+
List.of(updateTask),
428+
task1 -> {},
429+
TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure
430+
);
431+
return null;
432+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
433+
IllegalStateException error = safeAwaitFailure(
434+
IllegalStateException.class,
435+
AcknowledgedResponse.class,
436+
listener -> action.masterOperation(
437+
task,
438+
new DownsampleAction.Request(
439+
ESTestCase.TEST_REQUEST_TIMEOUT,
440+
sourceIndex,
441+
targetIndex,
442+
TimeValue.ONE_HOUR,
443+
new DownsampleConfig(new DateHistogramInterval("5m"))
444+
),
445+
clusterState,
446+
listener
447+
)
448+
);
449+
assertThat(
450+
error.getMessage(),
451+
Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist")
452+
);
453+
verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
454+
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED));
455+
verify(indicesAdminClient).refresh(any(), any());
456+
verify(indicesAdminClient, never()).flush(any(), any());
457+
verify(indicesAdminClient, never()).forceMerge(any(), any());
458+
}
459+
380460
private void mockGetMapping(String mapping) {
381461
doAnswer(invocation -> {
382462
@SuppressWarnings("unchecked")
@@ -532,4 +612,21 @@ public void testGetSupportedMetrics() {
532612
assertThat(supported.defaultMetric(), is("max"));
533613
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
534614
}
615+
616+
private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) {
617+
var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId))
618+
.put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards))
619+
.build();
620+
621+
var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build();
622+
doAnswer(invocation -> {
623+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
624+
ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
625+
updatedClusterState,
626+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
627+
List.of(updateTask)
628+
);
629+
return null;
630+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
631+
}
535632
}

0 commit comments

Comments
 (0)