Skip to content

Commit 14f5892

Browse files
authored
Fix: Downsample returns appropriate error when target index gets deleted unexpectedly. (elastic#138228)
1 parent d4e6302 commit 14f5892

File tree

3 files changed

+119
-26
lines changed

3 files changed

+119
-26
lines changed

docs/changelog/138228.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138228
2+
summary: "Fix: Downsample returns appropriate error when target index gets deleted\
3+
\ unexpectedly."
4+
area: Downsampling
5+
type: bug
6+
issues: []

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
133133

134134
/**
135135
* This is the cluster state task executor for cluster state update actions.
136+
* Visible for testing
136137
*/
137-
private static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
138-
new SimpleBatchedExecutor<>() {
139-
@Override
140-
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState)
141-
throws Exception {
142-
return Tuple.tuple(task.execute(clusterState), null);
143-
}
138+
static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
139+
@Override
140+
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
141+
return Tuple.tuple(task.execute(clusterState), null);
142+
}
144143

145-
@Override
146-
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
147-
task.listener.onResponse(AcknowledgedResponse.TRUE);
148-
}
149-
};
144+
@Override
145+
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
146+
task.listener.onResponse(AcknowledgedResponse.TRUE);
147+
}
148+
};
150149

151150
@Inject
152151
public TransportDownsampleAction(
@@ -1106,7 +1105,6 @@ public void onResponse(final AcknowledgedResponse response) {
11061105

11071106
@Override
11081107
public void onFailure(Exception e) {
1109-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
11101108
listener.onFailure(e);
11111109
}
11121110

@@ -1161,6 +1159,11 @@ public ClusterState execute(ClusterState currentState) {
11611159
logger.debug("Updating downsample index status for [{}]", downsampleIndexName);
11621160
final ProjectMetadata project = currentState.metadata().getProject(projectId);
11631161
final IndexMetadata downsampleIndex = project.index(downsampleIndexName);
1162+
if (downsampleIndex == null) {
1163+
throw new IllegalStateException(
1164+
"Failed to update downsample status because [" + downsampleIndexName + "] does not exist"
1165+
);
1166+
}
11641167
if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
11651168
return currentState;
11661169
}
@@ -1182,7 +1185,6 @@ public ClusterState execute(ClusterState currentState) {
11821185

11831186
@Override
11841187
public void onFailure(Exception e) {
1185-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
11861188
actionListener.onFailure(e);
11871189
}
11881190

@@ -1255,8 +1257,8 @@ public void onResponse(final AcknowledgedResponse response) {
12551257

12561258
@Override
12571259
public void onFailure(Exception e) {
1258-
recordSuccessMetrics(startTime);
1259-
logger.debug("Downsampling measured successfully", e);
1260+
recordFailureMetrics(startTime);
1261+
logger.debug("Downsampling failure measured successfully", e);
12601262
this.actionListener.onFailure(e);
12611263
}
12621264

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java

Lines changed: 95 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.cluster.project.ProjectResolver;
3030
import org.elasticsearch.cluster.routing.allocation.DataTier;
3131
import org.elasticsearch.cluster.service.ClusterService;
32+
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
3233
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
3334
import org.elasticsearch.common.compress.CompressedXContent;
3435
import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -56,14 +57,14 @@
5657
import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus;
5758
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
5859
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
60+
import org.hamcrest.Matchers;
5961
import org.junit.After;
6062
import org.junit.Before;
6163
import org.mockito.Answers;
6264
import org.mockito.Mock;
6365
import org.mockito.MockitoAnnotations;
6466
import org.mockito.stubbing.Answer;
6567

66-
import java.io.IOException;
6768
import java.util.List;
6869
import java.util.Map;
6970
import java.util.Set;
@@ -173,12 +174,11 @@ public void setUp() throws Exception {
173174
doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any());
174175
doAnswer(mockBroadcastResponse).when(indicesAdminClient).flush(any(), any());
175176

176-
// Mocks for updating downsampling metadata
177177
doAnswer(invocation -> {
178178
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
179-
updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE);
179+
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
180180
return null;
181-
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
181+
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
182182

183183
// Mocks for mapping retrieval & merging
184184
when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService);
@@ -219,11 +219,6 @@ public void testDownsampling() {
219219

220220
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
221221

222-
doAnswer(invocation -> {
223-
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
224-
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
225-
return null;
226-
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
227222
Answer<Void> mockPersistentTask = invocation -> {
228223
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
229224
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
@@ -243,6 +238,7 @@ public void testDownsampling() {
243238
listener.onResponse(AcknowledgedResponse.TRUE);
244239
return null;
245240
}).when(indicesAdminClient).updateSettings(any(), any());
241+
assertSuccessfulUpdateDownsampleStatus(clusterState);
246242

247243
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
248244
action.masterOperation(
@@ -273,6 +269,7 @@ public void testDownsamplingWithShortCircuitAfterCreation() {
273269
.build();
274270

275271
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
272+
assertSuccessfulUpdateDownsampleStatus(clusterState);
276273

277274
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
278275
action.masterOperation(
@@ -291,7 +288,7 @@ public void testDownsamplingWithShortCircuitAfterCreation() {
291288
verifyIndexFinalisation();
292289
}
293290

294-
public void testDownsamplingWithShortCircuitDuringCreation() throws IOException {
291+
public void testDownsamplingWithShortCircuitDuringCreation() {
295292
var projectMetadata = ProjectMetadata.builder(projectId)
296293
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
297294
.build();
@@ -315,6 +312,7 @@ public void testDownsamplingWithShortCircuitDuringCreation() throws IOException
315312
)
316313
.build()
317314
);
315+
assertSuccessfulUpdateDownsampleStatus(clusterService.state());
318316

319317
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
320318
action.masterOperation(
@@ -333,6 +331,76 @@ public void testDownsamplingWithShortCircuitDuringCreation() throws IOException
333331
verifyIndexFinalisation();
334332
}
335333

334+
public void testDownsamplingWhenTargetIndexGetsDeleted() {
335+
var projectMetadata = ProjectMetadata.builder(projectId)
336+
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
337+
.build();
338+
339+
var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
340+
.putProjectMetadata(projectMetadata)
341+
.blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
342+
.build();
343+
344+
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
345+
346+
Answer<Void> mockPersistentTask = invocation -> {
347+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
348+
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
349+
when(task1.getId()).thenReturn(randomAlphaOfLength(10));
350+
DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState(
351+
DownsampleShardIndexerStatus.COMPLETED,
352+
null
353+
);
354+
when(task1.getState()).thenReturn(runningTaskState);
355+
listener.onResponse(task1);
356+
return null;
357+
};
358+
doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any());
359+
doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any());
360+
doAnswer(invocation -> {
361+
var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class);
362+
listener.onResponse(AcknowledgedResponse.TRUE);
363+
return null;
364+
}).when(indicesAdminClient).updateSettings(any(), any());
365+
366+
doAnswer(invocation -> {
367+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
368+
ClusterStateTaskExecutorUtils.executeHandlingResults(
369+
clusterState,
370+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
371+
List.of(updateTask),
372+
task1 -> {},
373+
TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure
374+
);
375+
return null;
376+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
377+
IllegalStateException error = safeAwaitFailure(
378+
IllegalStateException.class,
379+
AcknowledgedResponse.class,
380+
listener -> action.masterOperation(
381+
task,
382+
new DownsampleAction.Request(
383+
ESTestCase.TEST_REQUEST_TIMEOUT,
384+
sourceIndex,
385+
targetIndex,
386+
TimeValue.ONE_HOUR,
387+
new DownsampleConfig(new DateHistogramInterval("5m"), randomSamplingMethod())
388+
),
389+
clusterState,
390+
listener
391+
)
392+
);
393+
assertThat(
394+
error.getMessage(),
395+
Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist")
396+
);
397+
verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
398+
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED));
399+
verify(indicesAdminClient).refresh(any(), any());
400+
verify(indicesAdminClient, never()).flush(any(), any());
401+
verify(indicesAdminClient, never()).forceMerge(any(), any());
402+
}
403+
336404
private void verifyIndexFinalisation() {
337405
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
338406
verify(indicesAdminClient).refresh(any(), any());
@@ -476,4 +544,21 @@ public void testGetSupportedMetrics() {
476544
assertThat(supported.defaultMetric(), is("max"));
477545
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
478546
}
547+
548+
private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) {
549+
var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId))
550+
.put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards))
551+
.build();
552+
553+
var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build();
554+
doAnswer(invocation -> {
555+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
556+
ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
557+
updatedClusterState,
558+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
559+
List.of(updateTask)
560+
);
561+
return null;
562+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
563+
}
479564
}

0 commit comments

Comments
 (0)