Skip to content

Commit 9ae0140

Browse files
authored
Improve handling of failure to create persistent task (#114386)
Today if creating a persistent task fails with an exception then we submit a cluster state update to fail the task but until that update executes we will retry the failing task creation and cluster state submission on all other cluster state updates that change the persistent tasks metadata. With this commit we register a placeholder task on the executing node to block further attempts to create it until the cluster state update is processed.
1 parent 84625c6 commit 9ae0140

File tree

3 files changed

+296
-23
lines changed

3 files changed

+296
-23
lines changed

docs/changelog/114386.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114386
2+
summary: Improve handling of failure to create persistent task
3+
area: Task Management
4+
type: bug
5+
issues: []
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.persistent;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
16+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
17+
import org.elasticsearch.cluster.service.ClusterService;
18+
import org.elasticsearch.common.Priority;
19+
import org.elasticsearch.common.UUIDs;
20+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
21+
import org.elasticsearch.common.io.stream.StreamInput;
22+
import org.elasticsearch.common.io.stream.StreamOutput;
23+
import org.elasticsearch.common.settings.SettingsModule;
24+
import org.elasticsearch.plugins.PersistentTaskPlugin;
25+
import org.elasticsearch.plugins.Plugin;
26+
import org.elasticsearch.plugins.PluginsService;
27+
import org.elasticsearch.tasks.TaskId;
28+
import org.elasticsearch.test.ClusterServiceUtils;
29+
import org.elasticsearch.test.ESIntegTestCase;
30+
import org.elasticsearch.threadpool.ThreadPool;
31+
import org.elasticsearch.xcontent.NamedXContentRegistry;
32+
import org.elasticsearch.xcontent.ParseField;
33+
import org.elasticsearch.xcontent.XContentBuilder;
34+
35+
import java.io.IOException;
36+
import java.util.Collection;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.stream.StreamSupport;
41+
42+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
43+
44+
public class PersistentTaskCreationFailureIT extends ESIntegTestCase {
45+
@Override
46+
protected Collection<Class<? extends Plugin>> nodePlugins() {
47+
return List.of(FailingCreationPersistentTasksPlugin.class);
48+
}
49+
50+
private static boolean hasPersistentTask(ClusterState clusterState) {
51+
return findTasks(clusterState, FailingCreationPersistentTaskExecutor.TASK_NAME).isEmpty() == false;
52+
}
53+
54+
public void testPersistentTasksThatFailDuringCreationAreRemovedFromClusterState() {
55+
56+
final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
57+
final var plugins = StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false)
58+
.flatMap(ps -> ps.filterPlugins(FailingCreationPersistentTasksPlugin.class))
59+
.toList();
60+
plugins.forEach(plugin -> plugin.hasFailedToCreateTask.set(false));
61+
62+
final var taskCreatedListener = ClusterServiceUtils.addTemporaryStateListener(
63+
masterClusterService,
64+
PersistentTaskCreationFailureIT::hasPersistentTask
65+
);
66+
67+
taskCreatedListener.andThenAccept(v -> {
68+
// enqueue some higher-priority cluster state updates to check that they do not cause retries of the failing task creation step
69+
for (int i = 0; i < 5; i++) {
70+
masterClusterService.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
71+
@Override
72+
public ClusterState execute(ClusterState currentState) {
73+
assertTrue(hasPersistentTask(currentState));
74+
75+
assertTrue(waitUntil(() -> {
76+
final var completePersistentTaskPendingTasksCount = masterClusterService.getMasterService()
77+
.pendingTasks()
78+
.stream()
79+
.filter(
80+
pendingClusterTask -> pendingClusterTask.getSource().string().equals("finish persistent task (failed)")
81+
)
82+
.count();
83+
assertThat(completePersistentTaskPendingTasksCount, lessThanOrEqualTo(1L));
84+
return completePersistentTaskPendingTasksCount == 1L;
85+
}));
86+
87+
return currentState.copyAndUpdateMetadata(
88+
mdb -> mdb.putCustom(
89+
PersistentTasksCustomMetadata.TYPE,
90+
PersistentTasksCustomMetadata.builder(
91+
PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(currentState)
92+
)
93+
// create and remove a fake task just to force a change in lastAllocationId so that
94+
// PersistentTasksNodeService checks for changes and potentially retries
95+
.addTask("test", "test", null, PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT)
96+
.removeTask("test")
97+
.build()
98+
)
99+
);
100+
}
101+
102+
@Override
103+
public void onFailure(Exception e) {
104+
fail(e);
105+
}
106+
});
107+
}
108+
});
109+
110+
safeAwait(
111+
l -> internalCluster().getInstance(PersistentTasksService.class)
112+
.sendStartRequest(
113+
UUIDs.base64UUID(),
114+
FailingCreationPersistentTaskExecutor.TASK_NAME,
115+
new FailingCreationTaskParams(),
116+
null,
117+
l.map(ignored -> null)
118+
)
119+
);
120+
121+
safeAwait(
122+
taskCreatedListener.<Void>andThen(
123+
(l, v) -> ClusterServiceUtils.addTemporaryStateListener(
124+
masterClusterService,
125+
clusterState -> hasPersistentTask(clusterState) == false
126+
).addListener(l)
127+
)
128+
);
129+
130+
assertEquals(1L, plugins.stream().filter(plugin -> plugin.hasFailedToCreateTask.get()).count());
131+
}
132+
133+
public static class FailingCreationPersistentTasksPlugin extends Plugin implements PersistentTaskPlugin {
134+
135+
private final AtomicBoolean hasFailedToCreateTask = new AtomicBoolean();
136+
137+
@Override
138+
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
139+
ClusterService clusterService,
140+
ThreadPool threadPool,
141+
Client client,
142+
SettingsModule settingsModule,
143+
IndexNameExpressionResolver expressionResolver
144+
) {
145+
return List.of(new FailingCreationPersistentTaskExecutor(hasFailedToCreateTask));
146+
}
147+
148+
@Override
149+
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
150+
return List.of(
151+
new NamedWriteableRegistry.Entry(
152+
PersistentTaskParams.class,
153+
FailingCreationPersistentTaskExecutor.TASK_NAME,
154+
FailingCreationTaskParams::new
155+
)
156+
);
157+
}
158+
159+
@Override
160+
public List<NamedXContentRegistry.Entry> getNamedXContent() {
161+
return List.of(
162+
new NamedXContentRegistry.Entry(
163+
PersistentTaskParams.class,
164+
new ParseField(FailingCreationPersistentTaskExecutor.TASK_NAME),
165+
p -> {
166+
p.skipChildren();
167+
return new FailingCreationTaskParams();
168+
}
169+
)
170+
);
171+
}
172+
}
173+
174+
public static class FailingCreationTaskParams implements PersistentTaskParams {
175+
public FailingCreationTaskParams() {}
176+
177+
public FailingCreationTaskParams(StreamInput in) {}
178+
179+
@Override
180+
public String getWriteableName() {
181+
return FailingCreationPersistentTaskExecutor.TASK_NAME;
182+
}
183+
184+
@Override
185+
public TransportVersion getMinimalSupportedVersion() {
186+
return TransportVersion.current();
187+
}
188+
189+
@Override
190+
public void writeTo(StreamOutput out) throws IOException {}
191+
192+
@Override
193+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
194+
builder.startObject();
195+
builder.endObject();
196+
return builder;
197+
}
198+
}
199+
200+
static class FailingCreationPersistentTaskExecutor extends PersistentTasksExecutor<FailingCreationTaskParams> {
201+
static final String TASK_NAME = "cluster:admin/persistent/test_creation_failure";
202+
203+
private final AtomicBoolean hasFailedToCreateTask;
204+
205+
FailingCreationPersistentTaskExecutor(AtomicBoolean hasFailedToCreateTask) {
206+
super(TASK_NAME, r -> fail("execution is unexpected"));
207+
this.hasFailedToCreateTask = hasFailedToCreateTask;
208+
}
209+
210+
@Override
211+
protected AllocatedPersistentTask createTask(
212+
long id,
213+
String type,
214+
String action,
215+
TaskId parentTaskId,
216+
PersistentTasksCustomMetadata.PersistentTask<FailingCreationTaskParams> taskInProgress,
217+
Map<String, String> headers
218+
) {
219+
assertTrue("already failed before", hasFailedToCreateTask.compareAndSet(false, true));
220+
throw new RuntimeException("simulated");
221+
}
222+
223+
@Override
224+
protected void nodeOperation(AllocatedPersistentTask task, FailingCreationTaskParams params, PersistentTaskState state) {
225+
fail("execution is unexpected");
226+
}
227+
}
228+
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.Strings;
1818
import org.elasticsearch.common.io.stream.StreamInput;
1919
import org.elasticsearch.common.io.stream.StreamOutput;
20+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2021
import org.elasticsearch.gateway.GatewayService;
2122
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
2223
import org.elasticsearch.tasks.Task;
@@ -32,6 +33,7 @@
3233
import java.util.Map;
3334
import java.util.Objects;
3435
import java.util.Set;
36+
import java.util.concurrent.Executor;
3537

3638
import static java.util.Objects.requireNonNull;
3739
import static org.elasticsearch.core.Strings.format;
@@ -172,41 +174,65 @@ private <Params extends PersistentTaskParams> void startTask(PersistentTask<Para
172174
taskInProgress.getTaskName()
173175
);
174176

175-
TaskAwareRequest request = new TaskAwareRequest() {
176-
TaskId parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId());
177+
final var request = new PersistentTaskAwareRequest<>(taskInProgress, executor);
178+
try (var ignored = threadPool.getThreadContext().newTraceContext()) {
179+
doStartTask(taskInProgress, executor, request);
180+
}
181+
}
177182

178-
@Override
179-
public void setParentTask(TaskId taskId) {
180-
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change");
181-
}
183+
/**
184+
* A {@link TaskAwareRequest} which creates the relevant task using a {@link PersistentTasksExecutor}.
185+
*/
186+
private static class PersistentTaskAwareRequest<Params extends PersistentTaskParams> implements TaskAwareRequest {
187+
private final PersistentTask<Params> taskInProgress;
188+
private final TaskId parentTaskId;
189+
private final PersistentTasksExecutor<Params> executor;
190+
191+
private PersistentTaskAwareRequest(PersistentTask<Params> taskInProgress, PersistentTasksExecutor<Params> executor) {
192+
this.taskInProgress = taskInProgress;
193+
this.parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId());
194+
this.executor = executor;
195+
}
182196

183-
@Override
184-
public void setRequestId(long requestId) {
185-
throw new UnsupportedOperationException("does not have a request ID");
186-
}
197+
@Override
198+
public void setParentTask(TaskId taskId) {
199+
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change");
200+
}
187201

188-
@Override
189-
public TaskId getParentTask() {
190-
return parentTaskId;
191-
}
202+
@Override
203+
public void setRequestId(long requestId) {
204+
throw new UnsupportedOperationException("does not have a request ID");
205+
}
192206

193-
@Override
194-
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
195-
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers);
196-
}
197-
};
207+
@Override
208+
public TaskId getParentTask() {
209+
return parentTaskId;
210+
}
198211

199-
try (var ignored = threadPool.getThreadContext().newTraceContext()) {
200-
doStartTask(taskInProgress, executor, request);
212+
@Override
213+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
214+
return executor.createTask(id, type, action, parentTaskId, taskInProgress, headers);
215+
}
216+
}
217+
218+
/**
219+
* A no-op {@link PersistentTasksExecutor} to create a placeholder task if creating the real task fails for some reason.
220+
*/
221+
private static class PersistentTaskStartupFailureExecutor<Params extends PersistentTaskParams> extends PersistentTasksExecutor<Params> {
222+
PersistentTaskStartupFailureExecutor(String taskName, Executor executor) {
223+
super(taskName, executor);
201224
}
225+
226+
@Override
227+
protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) {}
202228
}
203229

204230
private <Params extends PersistentTaskParams> void doStartTask(
205231
PersistentTask<Params> taskInProgress,
206232
PersistentTasksExecutor<Params> executor,
207233
TaskAwareRequest request
208234
) {
209-
AllocatedPersistentTask task;
235+
final AllocatedPersistentTask task;
210236
try {
211237
task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", request);
212238
} catch (Exception e) {
@@ -220,7 +246,21 @@ private <Params extends PersistentTaskParams> void doStartTask(
220246
+ "], removing from persistent tasks",
221247
e
222248
);
223-
notifyMasterOfFailedTask(taskInProgress, e);
249+
250+
// create a no-op placeholder task so that we don't keep trying to start this task while we wait for the cluster state update
251+
// which handles the failure
252+
final var placeholderTask = (AllocatedPersistentTask) taskManager.register(
253+
"persistent",
254+
taskInProgress.getTaskName() + "[c]",
255+
new PersistentTaskAwareRequest<>(
256+
taskInProgress,
257+
new PersistentTaskStartupFailureExecutor<>(executor.getTaskName(), EsExecutors.DIRECT_EXECUTOR_SERVICE)
258+
)
259+
);
260+
placeholderTask.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId());
261+
taskManager.unregister(placeholderTask);
262+
runningTasks.put(taskInProgress.getAllocationId(), placeholderTask);
263+
placeholderTask.markAsFailed(e);
224264
return;
225265
}
226266

0 commit comments

Comments
 (0)