1010import org .apache .logging .log4j .LogManager ;
1111import org .apache .logging .log4j .Logger ;
1212import org .elasticsearch .ResourceAlreadyExistsException ;
13- import org .elasticsearch .ResourceNotFoundException ;
1413import org .elasticsearch .action .ActionListener ;
1514import org .elasticsearch .action .ActionType ;
1615import org .elasticsearch .action .support .ActionFilters ;
1716import org .elasticsearch .cluster .ClusterChangedEvent ;
1817import org .elasticsearch .cluster .ClusterState ;
1918import org .elasticsearch .cluster .ClusterStateListener ;
19+ import org .elasticsearch .cluster .metadata .ProjectId ;
2020import org .elasticsearch .cluster .service .ClusterService ;
2121import org .elasticsearch .common .Strings ;
2222import org .elasticsearch .common .io .stream .NamedWriteableRegistry ;
3535import org .elasticsearch .persistent .PersistentTasksCustomMetadata ;
3636import org .elasticsearch .persistent .PersistentTasksExecutor ;
3737import org .elasticsearch .persistent .PersistentTasksService ;
38- import org .elasticsearch .plugins .Plugin ;
3938import org .elasticsearch .tasks .TaskId ;
4039import org .elasticsearch .transport .RemoteTransportException ;
4140import org .elasticsearch .transport .TransportService ;
4241import org .elasticsearch .xcontent .NamedXContentRegistry ;
4342import org .elasticsearch .xcontent .ParseField ;
4443import org .elasticsearch .xpack .inference .InferenceFeatures ;
4544import org .elasticsearch .xpack .inference .common .BroadcastMessageAction ;
45+ import org .elasticsearch .xpack .inference .services .elastic .ccm .CCMEnablementService ;
46+ import org .elasticsearch .xpack .inference .services .elastic .ccm .CCMFeature ;
4647
4748import java .io .IOException ;
4849import java .util .List ;
@@ -71,10 +72,14 @@ public class AuthorizationTaskExecutor extends PersistentTasksExecutor<Authoriza
7172 private final AtomicReference <AuthorizationPoller > currentTask = new AtomicReference <>();
7273 private final AtomicBoolean running = new AtomicBoolean (false );
7374 private final FeatureService featureService ;
75+ private final CCMEnablementService ccmEnablementService ;
76+ private final CCMFeature ccmFeature ;
7477
7578 public static AuthorizationTaskExecutor create (
7679 ClusterService clusterService ,
7780 FeatureService featureService ,
81+ CCMEnablementService ccmEnablementService ,
82+ CCMFeature ccmFeature ,
7883 AuthorizationPoller .Parameters parameters
7984 ) {
8085 Objects .requireNonNull (clusterService );
@@ -84,6 +89,8 @@ public static AuthorizationTaskExecutor create(
8489 clusterService ,
8590 new PersistentTasksService (clusterService , parameters .serviceComponents ().threadPool (), parameters .client ()),
8691 featureService ,
92+ ccmEnablementService ,
93+ ccmFeature ,
8794 parameters
8895 );
8996 }
@@ -93,12 +100,16 @@ public static AuthorizationTaskExecutor create(
93100 ClusterService clusterService ,
94101 PersistentTasksService persistentTasksService ,
95102 FeatureService featureService ,
103+ CCMEnablementService ccmEnablementService ,
104+ CCMFeature ccmFeature ,
96105 AuthorizationPoller .Parameters pollerParameters
97106 ) {
98107 super (TASK_NAME , pollerParameters .serviceComponents ().threadPool ().executor (UTILITY_THREAD_POOL_NAME ));
99108 this .clusterService = Objects .requireNonNull (clusterService );
100109 this .featureService = Objects .requireNonNull (featureService );
101110 this .persistentTasksService = Objects .requireNonNull (persistentTasksService );
111+ this .ccmEnablementService = Objects .requireNonNull (ccmEnablementService );
112+ this .ccmFeature = Objects .requireNonNull (ccmFeature );
102113 this .pollerParameters = Objects .requireNonNull (pollerParameters );
103114 }
104115
@@ -109,20 +120,10 @@ public static AuthorizationTaskExecutor create(
109120 * get an error indicating that it isn't aware of whether the task is a cluster scoped task.
110121 */
111122 public synchronized void startAndLazyCreateTask () {
112- startInternal (false );
123+ startInternal ();
113124 }
114125
115- /**
116- * Starts the authorization task executor and creates the persistent task if it doesn't already exist. This should only be called from
117- * a context where the cluster state is already initialized. Don't call this from the plugin
118- * {@link org.elasticsearch.xpack.inference.InferencePlugin#createComponents(Plugin.PluginServices)}. Use
119- * {@link #startAndLazyCreateTask()} instead.
120- */
121- public synchronized void startAndImmediatelyCreateTask () {
122- startInternal (true );
123- }
124-
125- private void startInternal (boolean createPersistentTask ) {
126+ private void startInternal () {
126127 var eisUrl = pollerParameters .elasticInferenceServiceSettings ().getElasticInferenceServiceUrl ();
127128
128129 logger .info ("Authorization task executor EIS URL: [{}]" , eisUrl );
@@ -131,14 +132,14 @@ private void startInternal(boolean createPersistentTask) {
131132 if (Strings .isNullOrEmpty (eisUrl ) == false && running .compareAndSet (false , true )) {
132133 logger .info ("Starting authorization task executor" );
133134
134- if (createPersistentTask ) {
135- sendStartRequest (clusterService .state ());
136- }
137-
138135 clusterService .addListener (this );
139136 }
140137 }
141138
139+ private void sendStartRequestWithCurrentClusterState () {
140+ sendStartRequest (clusterService .state ());
141+ }
142+
142143 private void sendStartRequest (@ Nullable ClusterState state ) {
143144 if (shouldSkipCreatingTask (state )) {
144145 return ;
@@ -161,12 +162,21 @@ private void sendStartRequest(@Nullable ClusterState state) {
161162 );
162163 }
163164
164- private boolean shouldSkipCreatingTask (@ Nullable ClusterState state ) {
165+ // Default for testing
166+ // TODO test this
167+ boolean shouldSkipCreatingTask (@ Nullable ClusterState state ) {
165168 if (state == null ) {
166169 return true ;
167170 }
168171
169- return clusterCanSupportFeature (state ) == false || running .get () == false || authorizationTaskExists (state );
172+ return clusterCanSupportFeature (state ) == false
173+ || running .get () == false
174+ || authorizationTaskExists (state )
175+ || ccmSupportedButNotYetConfigured ();
176+ }
177+
178+ private boolean ccmSupportedButNotYetConfigured () {
179+ return ccmFeature .isCcmSupportedEnvironment () && ccmEnablementService .isEnabled (ProjectId .DEFAULT ) == false ;
170180 }
171181
172182 private boolean clusterCanSupportFeature (@ Nullable ClusterState state ) {
@@ -185,31 +195,6 @@ private static boolean authorizationTaskExists(@Nullable ClusterState state) {
185195 return ClusterPersistentTasksCustomMetadata .getTaskWithId (state , TASK_NAME ) != null ;
186196 }
187197
188- public synchronized void stop () {
189- if (running .compareAndSet (true , false )) {
190- logger .info ("Shutting down authorization task executor" );
191- clusterService .removeListener (this );
192-
193- sendStopRequest ();
194- }
195- }
196-
197- private void sendStopRequest () {
198- persistentTasksService .sendClusterRemoveRequest (
199- TASK_NAME ,
200- TimeValue .THIRTY_SECONDS ,
201- ActionListener .wrap (
202- persistentTask -> logger .info ("Stopped authorization poller task, id {}" , persistentTask .getId ()),
203- exception -> {
204- var thrownException = exception instanceof RemoteTransportException ? exception .getCause () : exception ;
205- if (thrownException instanceof ResourceNotFoundException == false ) {
206- logger .error ("Failed to stop authorization poller task" , exception );
207- }
208- }
209- )
210- );
211- }
212-
213198 /**
214199 * This method should only be used for testing purposes to get the current running task.
215200 */
@@ -270,11 +255,6 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
270255 );
271256 }
272257
273- /**
274- * This action is used to broadcast to all the nodes that the authorization task executor should start or stop.
275- * This is specifically useful for CCM, since whether to do the polling depends on the CCM
276- * configuration to exist first.
277- */
278258 public static class Action extends BroadcastMessageAction <Message > {
279259 public static final String NAME = "cluster:internal/xpack/inference/update_authorization_task" ;
280260 public static final ActionType <Response > INSTANCE = new ActionType <>(NAME );
@@ -295,9 +275,7 @@ public Action(
295275 @ Override
296276 protected void receiveMessage (Message message ) {
297277 if (message .enable ()) {
298- authorizationTaskExecutor .startAndImmediatelyCreateTask ();
299- } else {
300- authorizationTaskExecutor .stop ();
278+ authorizationTaskExecutor .sendStartRequestWithCurrentClusterState ();
301279 }
302280 }
303281 }
0 commit comments