2020import org .elasticsearch .cluster .ClusterState ;
2121import org .elasticsearch .cluster .ClusterStateObserver ;
2222import org .elasticsearch .cluster .metadata .ProjectId ;
23+ import org .elasticsearch .cluster .project .ProjectResolver ;
2324import org .elasticsearch .cluster .service .ClusterService ;
2425import org .elasticsearch .core .Nullable ;
2526import org .elasticsearch .core .TimeValue ;
@@ -55,6 +56,7 @@ public PersistentTasksService(ClusterService clusterService, ThreadPool threadPo
5556 /**
5657 * Notifies the master node to create new persistent task and to assign it to a node. Accepts operation timeout as optional parameter
5758 */
59+ @ Deprecated (forRemoval = true ) // Use the explict cluster/project version instead
5860 public <Params extends PersistentTaskParams > void sendStartRequest (
5961 final String taskId ,
6062 final String taskName ,
@@ -76,6 +78,7 @@ public <Params extends PersistentTaskParams> void sendClusterStartRequest(
7678 final TimeValue timeout ,
7779 final ActionListener <PersistentTask <Params >> listener
7880 ) {
81+ assertClusterScope ();
7982 assert PersistentTasksExecutorRegistry .isClusterScopedTask (taskName ) : taskName + " is not a cluster scoped task" ;
8083 sendStartRequest (null , taskId , taskName , taskParams , timeout , listener );
8184 }
@@ -122,6 +125,7 @@ private <Params extends PersistentTaskParams> void sendStartRequest(
122125 * {@code null}, the persistent task is considered as successfully completed.
123126 * Accepts operation timeout as optional parameter
124127 */
128+ @ Deprecated (forRemoval = true ) // Use the explict cluster/project version instead
125129 public void sendCompletionRequest (
126130 final String taskId ,
127131 final long taskAllocationId ,
@@ -149,6 +153,7 @@ public void sendClusterCompletionRequest(
149153 final @ Nullable TimeValue timeout ,
150154 final ActionListener <PersistentTask <?>> listener
151155 ) {
156+ assertClusterScope ();
152157 sendCompletionRequest (null , taskId , taskAllocationId , taskFailure , localAbortReason , timeout , listener );
153158 }
154159
@@ -207,40 +212,12 @@ private void sendCompletionRequest(
207212 * Cancels a locally running task using the Task Manager API. Accepts operation timeout as optional parameter
208213 */
209214 void sendCancelRequest (final long taskId , final String reason , final ActionListener <ListTasksResponse > listener ) {
210- sendCancelRequest (null , taskId , reason , listener );
211- }
212-
213- /**
214- * Cancels a locally running task using the Task Manager API. Accepts operation timeout as optional parameter
215- */
216- void sendClusterCancelRequest (final long taskId , final String reason , final ActionListener <ListTasksResponse > listener ) {
217- sendCancelRequest (null , taskId , reason , listener );
218- }
219-
220- /**
221- * Cancels a locally running project task using the Task Manager API. Accepts operation timeout as optional parameter
222- */
223- void sendProjectCancelRequest (
224- final ProjectId projectId ,
225- final long taskId ,
226- final String reason ,
227- final ActionListener <ListTasksResponse > listener
228- ) {
229- sendCancelRequest (Objects .requireNonNull (projectId ), taskId , reason , listener );
230- }
231-
232- private void sendCancelRequest (
233- @ Nullable final ProjectId projectId ,
234- final long taskId ,
235- final String reason ,
236- final ActionListener <ListTasksResponse > listener
237- ) {
238215 CancelTasksRequest request = new CancelTasksRequest ();
239216 request .setTargetTaskId (new TaskId (clusterService .localNode ().getId (), taskId ));
240217 request .setReason (reason );
241218 // TODO set timeout?
242219 try {
243- getDefaultOrProjectClient ( projectId ) .admin ().cluster ().cancelTasks (request , listener );
220+ client .admin ().cluster ().cancelTasks (request , listener );
244221 } catch (Exception e ) {
245222 listener .onFailure (e );
246223 }
@@ -253,6 +230,7 @@ private void sendCancelRequest(
253230 * {@link AllocatedPersistentTask#updatePersistentTaskState} instead.
254231 * Accepts operation timeout as optional parameter
255232 */
233+ @ Deprecated (forRemoval = true ) // Use the explict cluster/project version instead
256234 void sendUpdateStateRequest (
257235 final String taskId ,
258236 final long taskAllocationID ,
@@ -277,6 +255,7 @@ void sendClusterUpdateStateRequest(
277255 final TimeValue timeout ,
278256 final ActionListener <PersistentTask <?>> listener
279257 ) {
258+ assertClusterScope ();
280259 sendUpdateStateRequest (null , taskId , taskAllocationID , taskState , timeout , listener );
281260 }
282261
@@ -325,6 +304,7 @@ public void sendRemoveRequest(final String taskId, final TimeValue timeout, fina
325304 * Notifies the master node to remove a persistent task from the cluster state. Accepts operation timeout as optional parameter
326305 */
327306 public void sendClusterRemoveRequest (final String taskId , final TimeValue timeout , final ActionListener <PersistentTask <?>> listener ) {
307+ assertClusterScope ();
328308 sendRemoveRequest (null , taskId , timeout , listener );
329309 }
330310
@@ -496,4 +476,12 @@ default void onTimeout(TimeValue timeout) {
496476 private Client getDefaultOrProjectClient (@ Nullable ProjectId projectId ) {
497477 return projectId == null ? client : client .projectClient (projectId );
498478 }
479+
480+ /**
481+ * Asserts that the current request is called from cluster scope and not project scope.
482+ */
483+ private void assertClusterScope () {
484+ ProjectResolver projectResolver = client .projectResolver ();
485+ assert projectResolver .getProjectId () == null || ProjectId .DEFAULT .equals (projectResolver .getProjectId ());
486+ }
499487}
0 commit comments