2020import org .elasticsearch .cluster .ClusterState ;
2121import org .elasticsearch .cluster .ClusterStateObserver ;
2222import org .elasticsearch .cluster .metadata .ProjectId ;
23+ import org .elasticsearch .cluster .project .ProjectResolver ;
2324import org .elasticsearch .cluster .service .ClusterService ;
2425import org .elasticsearch .core .Nullable ;
2526import org .elasticsearch .core .TimeValue ;
@@ -55,16 +56,61 @@ public PersistentTasksService(ClusterService clusterService, ThreadPool threadPo
5556 /**
5657 * Notifies the master node to create new persistent task and to assign it to a node. Accepts operation timeout as optional parameter
5758 */
59+ @ Deprecated (forRemoval = true ) // Use the explict cluster/project version instead
5860 public <Params extends PersistentTaskParams > void sendStartRequest (
5961 final String taskId ,
6062 final String taskName ,
6163 final Params taskParams ,
6264 final TimeValue timeout ,
6365 final ActionListener <PersistentTask <Params >> listener
66+ ) {
67+ sendStartRequest (null , taskId , taskName , taskParams , timeout , listener );
68+ }
69+
70+ /**
71+ * Notifies the master node to create new cluster persistent task and to assign it to a node.
72+ * Accepts operation timeout as optional parameter
73+ */
74+ public <Params extends PersistentTaskParams > void sendClusterStartRequest (
75+ final String taskId ,
76+ final String taskName ,
77+ final Params taskParams ,
78+ final TimeValue timeout ,
79+ final ActionListener <PersistentTask <Params >> listener
80+ ) {
81+ assertClusterScope ();
82+ assert PersistentTasksExecutorRegistry .isClusterScopedTask (taskName ) : taskName + " is not a cluster scoped task" ;
83+ sendStartRequest (null , taskId , taskName , taskParams , timeout , listener );
84+ }
85+
86+ /**
87+ * Notifies the master node to create new project level persistent task and to assign it to a node.
88+ * Accepts operation timeout as optional parameter
89+ */
90+ public <Params extends PersistentTaskParams > void sendProjectStartRequest (
91+ final ProjectId projectId ,
92+ final String taskId ,
93+ final String taskName ,
94+ final Params taskParams ,
95+ final TimeValue timeout ,
96+ final ActionListener <PersistentTask <Params >> listener
97+ ) {
98+ assert PersistentTasksExecutorRegistry .isClusterScopedTask (taskName ) == false : taskName + " is not a project scoped task" ;
99+ sendStartRequest (Objects .requireNonNull (projectId ), taskId , taskName , taskParams , timeout , listener );
100+ }
101+
102+ private <Params extends PersistentTaskParams > void sendStartRequest (
103+ @ Nullable final ProjectId projectId ,
104+ final String taskId ,
105+ final String taskName ,
106+ final Params taskParams ,
107+ final TimeValue timeout ,
108+ final ActionListener <PersistentTask <Params >> listener
64109 ) {
65110 @ SuppressWarnings ("unchecked" )
66111 final ActionListener <PersistentTask <?>> wrappedListener = listener .map (t -> (PersistentTask <Params >) t );
67112 execute (
113+ projectId ,
68114 new StartPersistentTaskAction .Request (Objects .requireNonNull (timeout ), taskId , taskName , taskParams ),
69115 StartPersistentTaskAction .INSTANCE ,
70116 wrappedListener
@@ -79,15 +125,77 @@ public <Params extends PersistentTaskParams> void sendStartRequest(
79125 * {@code null}, the persistent task is considered as successfully completed.
80126 * Accepts operation timeout as optional parameter
81127 */
128+ @ Deprecated (forRemoval = true ) // Use the explict cluster/project version instead
82129 public void sendCompletionRequest (
83130 final String taskId ,
84131 final long taskAllocationId ,
85132 final @ Nullable Exception taskFailure ,
86133 final @ Nullable String localAbortReason ,
87134 final @ Nullable TimeValue timeout ,
88135 final ActionListener <PersistentTask <?>> listener
136+ ) {
137+ sendCompletionRequest (null , taskId , taskAllocationId , taskFailure , localAbortReason , timeout , listener );
138+ }
139+
140+ /**
141+ * Notifies the master node about the completion of a persistent task.
142+ * <p>
143+ * At most one of {@code failure} and {@code localAbortReason} may be
144+ * provided. When both {@code failure} and {@code localAbortReason} are
145+ * {@code null}, the persistent task is considered as successfully completed.
146+ * Accepts operation timeout as optional parameter
147+ */
148+ public void sendClusterCompletionRequest (
149+ final String taskId ,
150+ final long taskAllocationId ,
151+ final @ Nullable Exception taskFailure ,
152+ final @ Nullable String localAbortReason ,
153+ final @ Nullable TimeValue timeout ,
154+ final ActionListener <PersistentTask <?>> listener
155+ ) {
156+ assertClusterScope ();
157+ sendCompletionRequest (null , taskId , taskAllocationId , taskFailure , localAbortReason , timeout , listener );
158+ }
159+
160+ /**
161+ * Notifies the master node about the completion of a project level persistent task.
162+ * <p>
163+ * At most one of {@code failure} and {@code localAbortReason} may be
164+ * provided. When both {@code failure} and {@code localAbortReason} are
165+ * {@code null}, the persistent task is considered as successfully completed.
166+ * Accepts operation timeout as optional parameter
167+ */
168+ public void sendProjectCompletionRequest (
169+ final ProjectId projectId ,
170+ final String taskId ,
171+ final long taskAllocationId ,
172+ final @ Nullable Exception taskFailure ,
173+ final @ Nullable String localAbortReason ,
174+ final @ Nullable TimeValue timeout ,
175+ final ActionListener <PersistentTask <?>> listener
176+ ) {
177+ sendCompletionRequest (
178+ Objects .requireNonNull (projectId ),
179+ taskId ,
180+ taskAllocationId ,
181+ taskFailure ,
182+ localAbortReason ,
183+ timeout ,
184+ listener
185+ );
186+ }
187+
188+ private void sendCompletionRequest (
189+ @ Nullable final ProjectId projectId ,
190+ final String taskId ,
191+ final long taskAllocationId ,
192+ final @ Nullable Exception taskFailure ,
193+ final @ Nullable String localAbortReason ,
194+ final @ Nullable TimeValue timeout ,
195+ final ActionListener <PersistentTask <?>> listener
89196 ) {
90197 execute (
198+ projectId ,
91199 new CompletionPersistentTaskAction .Request (
92200 Objects .requireNonNull (timeout ),
93201 taskId ,
@@ -122,14 +230,63 @@ void sendCancelRequest(final long taskId, final String reason, final ActionListe
122230 * {@link AllocatedPersistentTask#updatePersistentTaskState} instead.
123231 * Accepts operation timeout as optional parameter
124232 */
233+ @ Deprecated (forRemoval = true ) // Use the explict cluster/project version instead
125234 void sendUpdateStateRequest (
126235 final String taskId ,
127236 final long taskAllocationID ,
128237 final PersistentTaskState taskState ,
129238 final TimeValue timeout ,
130239 final ActionListener <PersistentTask <?>> listener
240+ ) {
241+ sendUpdateStateRequest (null , taskId , taskAllocationID , taskState , timeout , listener );
242+ }
243+
244+ /**
245+ * Notifies the master node that the state of a persistent task has changed.
246+ * <p>
247+ * Persistent task implementers shouldn't call this method directly and use
248+ * {@link AllocatedPersistentTask#updatePersistentTaskState} instead.
249+ * Accepts operation timeout as optional parameter
250+ */
251+ void sendClusterUpdateStateRequest (
252+ final String taskId ,
253+ final long taskAllocationID ,
254+ final PersistentTaskState taskState ,
255+ final TimeValue timeout ,
256+ final ActionListener <PersistentTask <?>> listener
257+ ) {
258+ assertClusterScope ();
259+ sendUpdateStateRequest (null , taskId , taskAllocationID , taskState , timeout , listener );
260+ }
261+
262+ /**
263+ * Notifies the master node that the state of a project level persistent task has changed.
264+ * <p>
265+ * Persistent task implementers shouldn't call this method directly and use
266+ * {@link AllocatedPersistentTask#updatePersistentTaskState} instead.
267+ * Accepts operation timeout as optional parameter
268+ */
269+ void sendProjectUpdateStateRequest (
270+ final ProjectId projectId ,
271+ final String taskId ,
272+ final long taskAllocationID ,
273+ final PersistentTaskState taskState ,
274+ final TimeValue timeout ,
275+ final ActionListener <PersistentTask <?>> listener
276+ ) {
277+ sendUpdateStateRequest (Objects .requireNonNull (projectId ), taskId , taskAllocationID , taskState , timeout , listener );
278+ }
279+
280+ private void sendUpdateStateRequest (
281+ @ Nullable final ProjectId projectId ,
282+ final String taskId ,
283+ final long taskAllocationID ,
284+ final PersistentTaskState taskState ,
285+ final TimeValue timeout ,
286+ final ActionListener <PersistentTask <?>> listener
131287 ) {
132288 execute (
289+ projectId ,
133290 new UpdatePersistentTaskStatusAction .Request (Objects .requireNonNull (timeout ), taskId , taskAllocationID , taskState ),
134291 UpdatePersistentTaskStatusAction .INSTANCE ,
135292 listener
@@ -140,7 +297,38 @@ void sendUpdateStateRequest(
140297 * Notifies the master node to remove a persistent task from the cluster state. Accepts operation timeout as optional parameter
141298 */
142299 public void sendRemoveRequest (final String taskId , final TimeValue timeout , final ActionListener <PersistentTask <?>> listener ) {
300+ sendRemoveRequest (null , taskId , timeout , listener );
301+ }
302+
303+ /**
304+ * Notifies the master node to remove a persistent task from the cluster state. Accepts operation timeout as optional parameter
305+ */
306+ public void sendClusterRemoveRequest (final String taskId , final TimeValue timeout , final ActionListener <PersistentTask <?>> listener ) {
307+ assertClusterScope ();
308+ sendRemoveRequest (null , taskId , timeout , listener );
309+ }
310+
311+ /**
312+ * Notifies the master node to remove a project level persistent task from the cluster state.
313+ * Accepts operation timeout as optional parameter
314+ */
315+ public void sendProjectRemoveRequest (
316+ final ProjectId projectId ,
317+ final String taskId ,
318+ final TimeValue timeout ,
319+ final ActionListener <PersistentTask <?>> listener
320+ ) {
321+ sendRemoveRequest (Objects .requireNonNull (projectId ), taskId , timeout , listener );
322+ }
323+
324+ private void sendRemoveRequest (
325+ @ Nullable final ProjectId projectId ,
326+ final String taskId ,
327+ final TimeValue timeout ,
328+ final ActionListener <PersistentTask <?>> listener
329+ ) {
143330 execute (
331+ projectId ,
144332 new RemovePersistentTaskAction .Request (Objects .requireNonNull (timeout ), taskId ),
145333 RemovePersistentTaskAction .INSTANCE ,
146334 listener
@@ -153,12 +341,13 @@ public void sendRemoveRequest(final String taskId, final TimeValue timeout, fina
153341 * The origin is set in the context and the listener is wrapped to ensure the proper context is restored
154342 */
155343 private <Req extends ActionRequest , Resp extends PersistentTaskResponse > void execute (
344+ @ Nullable final ProjectId projectId ,
156345 final Req request ,
157346 final ActionType <Resp > action ,
158347 final ActionListener <PersistentTask <?>> listener
159348 ) {
160349 try {
161- client .execute (action , request , listener .map (PersistentTaskResponse ::getTask ));
350+ getDefaultOrProjectClient ( projectId ) .execute (action , request , listener .map (PersistentTaskResponse ::getTask ));
162351 } catch (Exception e ) {
163352 listener .onFailure (e );
164353 }
@@ -283,4 +472,27 @@ default void onTimeout(TimeValue timeout) {
283472 onFailure (new IllegalStateException ("Timed out when waiting for persistent task after " + timeout ));
284473 }
285474 }
475+
476+ private Client getDefaultOrProjectClient (@ Nullable ProjectId projectId ) {
477+ if (projectId == null ) {
478+ return client ;
479+ }
480+
481+ final var currentProjectId = client .projectResolver ().getProjectId ();
482+ if (projectId .equals (currentProjectId )) {
483+ // The projectId will already be used in the client through thread context, so no need to set it again
484+ return client ;
485+ } else {
486+ // Execute will throw if the projectId conflicts with the one in the current context
487+ return client .projectClient (projectId );
488+ }
489+ }
490+
491+ /**
492+ * Asserts that the current request is called from cluster scope and not project scope.
493+ */
494+ private void assertClusterScope () {
495+ ProjectResolver projectResolver = client .projectResolver ();
496+ assert projectResolver .getProjectId () == null || ProjectId .DEFAULT .equals (projectResolver .getProjectId ());
497+ }
286498}
0 commit comments