Skip to content

Commit b882c07

Browse files
committed
Make persistent task framework project aware (MP-1955)
This PR is the follow-up work for MP-1945 and MP-1938 which laid the foundation of two different scoped persistent tasks. It updates the persistent task framework to be aware of the two task types so that it can handle both cluster scope tasks and per-project tasks. Once these changes are in place, we will make health-node to be the first cluster-scope persistent task. Relates: ES-10168
1 parent a23175d commit b882c07

File tree

21 files changed

+1140
-316
lines changed

21 files changed

+1140
-316
lines changed
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.persistent;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.client.internal.Client;
15+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
16+
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.cluster.metadata.ProjectId;
18+
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
20+
import org.elasticsearch.common.io.stream.StreamOutput;
21+
import org.elasticsearch.common.settings.SettingsModule;
22+
import org.elasticsearch.core.Nullable;
23+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
24+
import org.elasticsearch.plugins.PersistentTaskPlugin;
25+
import org.elasticsearch.plugins.Plugin;
26+
import org.elasticsearch.tasks.TaskInfo;
27+
import org.elasticsearch.test.ESIntegTestCase;
28+
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.xcontent.NamedXContentRegistry;
30+
import org.elasticsearch.xcontent.ObjectParser;
31+
import org.elasticsearch.xcontent.ParseField;
32+
import org.elasticsearch.xcontent.XContentBuilder;
33+
import org.elasticsearch.xcontent.XContentParser;
34+
35+
import java.io.IOException;
36+
import java.util.Collection;
37+
import java.util.List;
38+
39+
import static org.hamcrest.Matchers.containsInAnyOrder;
40+
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.hasSize;
42+
import static org.hamcrest.Matchers.not;
43+
44+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
45+
public class ClusterAndProjectPersistentTasksSmokeIT extends ESIntegTestCase {
46+
47+
@Override
48+
protected Collection<Class<? extends Plugin>> nodePlugins() {
49+
return List.of(TestPersistentTasksPlugin.class);
50+
}
51+
52+
public void testCoexistenceOfClusterAndProjectPersistentTasks() throws Exception {
53+
internalCluster().startNode();
54+
ensureGreen();
55+
final var persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
56+
final var clusterService = internalCluster().getInstance(ClusterService.class);
57+
58+
// Start a project persistent task
59+
final PersistentTask<TestEmptyProjectParams> projectTask;
60+
{
61+
final PlainActionFuture<PersistentTask<TestEmptyProjectParams>> future = new PlainActionFuture<>();
62+
persistentTasksService.sendStartRequest(
63+
randomUUID(),
64+
TestProjectPersistentTasksExecutor.NAME,
65+
TestEmptyProjectParams.INSTANCE,
66+
TEST_REQUEST_TIMEOUT,
67+
future
68+
);
69+
projectTask = safeGet(future);
70+
}
71+
72+
// Start a cluster persistent task
73+
final PersistentTask<TestEmptyClusterParams> clusterTask;
74+
{
75+
final PlainActionFuture<PersistentTask<TestEmptyClusterParams>> future = new PlainActionFuture<>();
76+
persistentTasksService.sendStartRequest(
77+
randomUUID(),
78+
TestClusterPersistentTasksExecutor.NAME,
79+
TestEmptyClusterParams.INSTANCE,
80+
TEST_REQUEST_TIMEOUT,
81+
future
82+
);
83+
clusterTask = safeGet(future);
84+
}
85+
86+
// Two tasks have different allocation IDs
87+
assertThat(projectTask.getAllocationId(), not(equalTo(clusterTask.getAllocationId())));
88+
// They are found in different section of the cluster state
89+
assertClusterStateHasTaskSize(clusterService, Metadata.DEFAULT_PROJECT_ID, 1);
90+
assertClusterStateHasTaskSize(clusterService, null, 1);
91+
92+
// List tasks work correctly for both of them
93+
final List<TaskInfo> tasks = safeGet(clusterAdmin().prepareListTasks().execute()).getTasks()
94+
.stream()
95+
.filter(taskInfo -> "persistent".equals(taskInfo.type()) && taskInfo.action().startsWith("test-"))
96+
.toList();
97+
assertThat(tasks.toString(), tasks, hasSize(2));
98+
assertThat(
99+
tasks.stream().map(TaskInfo::action).toList(),
100+
containsInAnyOrder(TestProjectPersistentTasksExecutor.NAME + "[c]", TestClusterPersistentTasksExecutor.NAME + "[c]")
101+
);
102+
assertThat(
103+
tasks.stream().map(taskinfo -> taskinfo.parentTaskId().getId()).toList(),
104+
containsInAnyOrder(projectTask.getAllocationId(), clusterTask.getAllocationId())
105+
);
106+
107+
// Start remove the tasks
108+
if (randomBoolean()) {
109+
// Remove project task first
110+
final PlainActionFuture<PersistentTask<?>> future1 = new PlainActionFuture<>();
111+
persistentTasksService.sendRemoveRequest(projectTask.getId(), TEST_REQUEST_TIMEOUT, future1);
112+
safeGet(future1);
113+
114+
assertBusy(() -> {
115+
final List<TaskInfo> remainingTasks = safeGet(clusterAdmin().prepareListTasks().execute()).getTasks()
116+
.stream()
117+
.filter(
118+
taskInfo -> "persistent".equals(taskInfo.type())
119+
&& taskInfo.action().startsWith("test-")
120+
&& taskInfo.cancelled() == false
121+
)
122+
.toList();
123+
assertThat(remainingTasks.toString(), remainingTasks, hasSize(1));
124+
assertThat(remainingTasks.getFirst().parentTaskId().getId(), equalTo(clusterTask.getAllocationId()));
125+
126+
});
127+
assertClusterStateHasTaskSize(clusterService, Metadata.DEFAULT_PROJECT_ID, 0);
128+
assertClusterStateHasTaskSize(clusterService, null, 1);
129+
130+
final PlainActionFuture<PersistentTask<?>> future2 = new PlainActionFuture<>();
131+
persistentTasksService.sendRemoveRequest(clusterTask.getId(), TEST_REQUEST_TIMEOUT, future2);
132+
safeGet(future2);
133+
} else {
134+
// Remove cluster task first
135+
final PlainActionFuture<PersistentTask<?>> future1 = new PlainActionFuture<>();
136+
persistentTasksService.sendRemoveRequest(clusterTask.getId(), TEST_REQUEST_TIMEOUT, future1);
137+
safeGet(future1);
138+
139+
assertBusy(() -> {
140+
final List<TaskInfo> remainingTasks = safeGet(clusterAdmin().prepareListTasks().execute()).getTasks()
141+
.stream()
142+
.filter(
143+
taskInfo -> "persistent".equals(taskInfo.type())
144+
&& taskInfo.action().startsWith("test-")
145+
&& taskInfo.cancelled() == false
146+
)
147+
.toList();
148+
assertThat(remainingTasks.toString(), remainingTasks, hasSize(1));
149+
assertThat(remainingTasks.getFirst().parentTaskId().getId(), equalTo(projectTask.getAllocationId()));
150+
});
151+
152+
assertClusterStateHasTaskSize(clusterService, Metadata.DEFAULT_PROJECT_ID, 1);
153+
assertClusterStateHasTaskSize(clusterService, null, 0);
154+
155+
final PlainActionFuture<PersistentTask<?>> future2 = new PlainActionFuture<>();
156+
persistentTasksService.sendRemoveRequest(projectTask.getId(), TEST_REQUEST_TIMEOUT, future2);
157+
safeGet(future2);
158+
}
159+
160+
assertClusterStateHasTaskSize(clusterService, Metadata.DEFAULT_PROJECT_ID, 0);
161+
assertClusterStateHasTaskSize(clusterService, null, 0);
162+
}
163+
164+
private void assertClusterStateHasTaskSize(ClusterService clusterService, @Nullable ProjectId projectId, int size) {
165+
assertThat(
166+
PersistentTasks.getTasks(clusterService.state(), projectId)
167+
.tasks()
168+
.stream()
169+
.filter(task -> task.getTaskName().startsWith("test-"))
170+
.toList(),
171+
hasSize(size)
172+
);
173+
}
174+
175+
public static class TestPersistentTasksPlugin extends Plugin implements PersistentTaskPlugin {
176+
@Override
177+
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
178+
ClusterService clusterService,
179+
ThreadPool threadPool,
180+
Client client,
181+
SettingsModule settingsModule,
182+
IndexNameExpressionResolver expressionResolver
183+
) {
184+
return List.of(new TestProjectPersistentTasksExecutor(clusterService), new TestClusterPersistentTasksExecutor(clusterService));
185+
}
186+
187+
@Override
188+
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
189+
return List.of(
190+
new NamedWriteableRegistry.Entry(
191+
PersistentTaskParams.class,
192+
TestProjectPersistentTasksExecutor.NAME,
193+
ignore -> TestEmptyProjectParams.INSTANCE
194+
),
195+
new NamedWriteableRegistry.Entry(
196+
PersistentTaskParams.class,
197+
TestClusterPersistentTasksExecutor.NAME,
198+
ignore -> TestEmptyClusterParams.INSTANCE
199+
)
200+
);
201+
}
202+
203+
@Override
204+
public List<NamedXContentRegistry.Entry> getNamedXContent() {
205+
return List.of(
206+
new NamedXContentRegistry.Entry(
207+
PersistentTaskParams.class,
208+
new ParseField(TestProjectPersistentTasksExecutor.NAME),
209+
TestEmptyProjectParams::fromXContent
210+
),
211+
new NamedXContentRegistry.Entry(
212+
PersistentTaskParams.class,
213+
new ParseField(TestClusterPersistentTasksExecutor.NAME),
214+
TestEmptyClusterParams::fromXContent
215+
)
216+
);
217+
}
218+
}
219+
220+
public static class TestEmptyProjectParams implements PersistentTaskParams {
221+
222+
public static final TestEmptyProjectParams INSTANCE = new TestEmptyProjectParams();
223+
public static final ObjectParser<TestEmptyProjectParams, Void> PARSER = new ObjectParser<>(
224+
TestProjectPersistentTasksExecutor.NAME,
225+
true,
226+
() -> INSTANCE
227+
);
228+
229+
@Override
230+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
231+
return builder.startObject().endObject();
232+
}
233+
234+
@Override
235+
public void writeTo(StreamOutput out) {}
236+
237+
@Override
238+
public String getWriteableName() {
239+
return TestProjectPersistentTasksExecutor.NAME;
240+
}
241+
242+
@Override
243+
public TransportVersion getMinimalSupportedVersion() {
244+
return TransportVersion.current();
245+
}
246+
247+
public static TestEmptyProjectParams fromXContent(XContentParser parser) throws IOException {
248+
return PARSER.parse(parser, null);
249+
}
250+
}
251+
252+
public static class TestProjectPersistentTasksExecutor extends PersistentTasksExecutor<TestEmptyProjectParams> {
253+
static final String NAME = "test-project-persistent-task";
254+
255+
protected TestProjectPersistentTasksExecutor(ClusterService clusterService) {
256+
super(NAME, clusterService.threadPool().generic());
257+
}
258+
259+
@Override
260+
protected void nodeOperation(AllocatedPersistentTask task, TestEmptyProjectParams params, PersistentTaskState state) {}
261+
}
262+
263+
public static class TestEmptyClusterParams implements PersistentTaskParams {
264+
265+
public static final TestEmptyClusterParams INSTANCE = new TestEmptyClusterParams();
266+
public static final ObjectParser<TestEmptyClusterParams, Void> PARSER = new ObjectParser<>(
267+
TestClusterPersistentTasksExecutor.NAME,
268+
true,
269+
() -> INSTANCE
270+
);
271+
272+
@Override
273+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
274+
return builder.startObject().endObject();
275+
}
276+
277+
@Override
278+
public void writeTo(StreamOutput out) {}
279+
280+
@Override
281+
public String getWriteableName() {
282+
return TestClusterPersistentTasksExecutor.NAME;
283+
}
284+
285+
@Override
286+
public TransportVersion getMinimalSupportedVersion() {
287+
return TransportVersion.current();
288+
}
289+
290+
public static TestEmptyClusterParams fromXContent(XContentParser parser) throws IOException {
291+
return PARSER.parse(parser, null);
292+
}
293+
}
294+
295+
public static class TestClusterPersistentTasksExecutor extends PersistentTasksExecutor<TestEmptyClusterParams> {
296+
static final String NAME = "test-cluster-persistent-task";
297+
298+
protected TestClusterPersistentTasksExecutor(ClusterService clusterService) {
299+
super(NAME, clusterService.threadPool().generic());
300+
}
301+
302+
@Override
303+
public Scope scope() {
304+
return Scope.CLUSTER;
305+
}
306+
307+
@Override
308+
protected void nodeOperation(AllocatedPersistentTask task, TestEmptyClusterParams params, PersistentTaskState state) {}
309+
}
310+
}

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public ClusterState execute(ClusterState currentState) {
7979
.filter(
8080
pendingClusterTask -> pendingClusterTask.getSource()
8181
.string()
82-
.matches("finish persistent task \\[.*] \\(failed\\)")
82+
.matches("finish project .* persistent task \\[.*] \\(failed\\)")
8383
)
8484
.count();
8585
assertThat(completePersistentTaskPendingTasksCount, lessThanOrEqualTo(1L));

0 commit comments

Comments
 (0)