Skip to content

Commit 83c4dda

Browse files
Adding node feature for persistent task
1 parent d9e286a commit 83c4dda

File tree

4 files changed

+80
-3
lines changed

4 files changed

+80
-3
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,11 @@ public class InferenceFeatures implements FeatureSpecification {
5656
public static final NodeFeature INFERENCE_ENDPOINT_CACHE = new NodeFeature("inference.endpoint.cache");
5757
public static final NodeFeature INFERENCE_CCM_CACHE = new NodeFeature("inference.ccm.cache");
5858
public static final NodeFeature SEARCH_USAGE_EXTENDED_DATA = new NodeFeature("search.usage.extended_data");
59+
public static final NodeFeature INFERENCE_AUTH_POLLER_PERSISTENT_TASK = new NodeFeature("inference.auth_poller.persistent_task");
5960

6061
@Override
6162
public Set<NodeFeature> getFeatures() {
62-
return Set.of(INFERENCE_ENDPOINT_CACHE, INFERENCE_CCM_CACHE);
63+
return Set.of(INFERENCE_ENDPOINT_CACHE, INFERENCE_CCM_CACHE, INFERENCE_AUTH_POLLER_PERSISTENT_TASK);
6364
}
6465

6566
@Override

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ private CCMRelatedComponents createCCMDependentComponents(
470470

471471
var authTaskExecutor = AuthorizationTaskExecutor.create(
472472
services.clusterService(),
473+
services.featureService(),
473474
new AuthorizationPoller.Parameters(
474475
serviceComponents,
475476
authorizationHandler,

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/authorization/AuthorizationTaskExecutor.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.core.FixForMultiProject;
2727
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.core.TimeValue;
29+
import org.elasticsearch.features.FeatureService;
2930
import org.elasticsearch.injection.guice.Inject;
3031
import org.elasticsearch.persistent.AllocatedPersistentTask;
3132
import org.elasticsearch.persistent.ClusterPersistentTasksCustomMetadata;
@@ -40,6 +41,7 @@
4041
import org.elasticsearch.transport.TransportService;
4142
import org.elasticsearch.xcontent.NamedXContentRegistry;
4243
import org.elasticsearch.xcontent.ParseField;
44+
import org.elasticsearch.xpack.inference.InferenceFeatures;
4345
import org.elasticsearch.xpack.inference.common.BroadcastMessageAction;
4446

4547
import java.io.IOException;
@@ -68,14 +70,20 @@ public class AuthorizationTaskExecutor extends PersistentTasksExecutor<Authoriza
6870
private final AuthorizationPoller.Parameters pollerParameters;
6971
private final AtomicReference<AuthorizationPoller> currentTask = new AtomicReference<>();
7072
private final AtomicBoolean running = new AtomicBoolean(false);
73+
private final FeatureService featureService;
7174

72-
public static AuthorizationTaskExecutor create(ClusterService clusterService, AuthorizationPoller.Parameters parameters) {
75+
public static AuthorizationTaskExecutor create(
76+
ClusterService clusterService,
77+
FeatureService featureService,
78+
AuthorizationPoller.Parameters parameters
79+
) {
7380
Objects.requireNonNull(clusterService);
7481
Objects.requireNonNull(parameters);
7582

7683
return new AuthorizationTaskExecutor(
7784
clusterService,
7885
new PersistentTasksService(clusterService, parameters.serviceComponents().threadPool(), parameters.client()),
86+
featureService,
7987
parameters
8088
);
8189
}
@@ -84,10 +92,12 @@ public static AuthorizationTaskExecutor create(ClusterService clusterService, Au
8492
AuthorizationTaskExecutor(
8593
ClusterService clusterService,
8694
PersistentTasksService persistentTasksService,
95+
FeatureService featureService,
8796
AuthorizationPoller.Parameters pollerParameters
8897
) {
8998
super(TASK_NAME, pollerParameters.serviceComponents().threadPool().executor(UTILITY_THREAD_POOL_NAME));
9099
this.clusterService = Objects.requireNonNull(clusterService);
100+
this.featureService = Objects.requireNonNull(featureService);
91101
this.persistentTasksService = Objects.requireNonNull(persistentTasksService);
92102
this.pollerParameters = Objects.requireNonNull(pollerParameters);
93103
}
@@ -130,7 +140,7 @@ private void startInternal(boolean createPersistentTask) {
130140
}
131141

132142
private void sendStartRequest(@Nullable ClusterState state) {
133-
if (running.get() == false || authorizationTaskExists(state)) {
143+
if (shouldSkipCreatingTask(state)) {
134144
return;
135145
}
136146

@@ -152,6 +162,22 @@ private void sendStartRequest(@Nullable ClusterState state) {
152162
);
153163
}
154164

165+
private boolean shouldSkipCreatingTask(@Nullable ClusterState state) {
166+
if (state == null) {
167+
return true;
168+
}
169+
170+
return clusterCanSupportFeature(state) == false || running.get() == false || authorizationTaskExists(state);
171+
}
172+
173+
private boolean clusterCanSupportFeature(@Nullable ClusterState state) {
174+
if (state == null) {
175+
return false;
176+
}
177+
178+
return state.clusterRecovered() && featureService.clusterHasFeature(state, InferenceFeatures.INFERENCE_AUTH_POLLER_PERSISTENT_TASK);
179+
}
180+
155181
private static boolean authorizationTaskExists(@Nullable ClusterState state) {
156182
if (state == null) {
157183
return false;

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elastic/authorization/AuthorizationTaskExecutorTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.node.DiscoveryNodes;
1818
import org.elasticsearch.cluster.service.ClusterService;
1919
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.features.FeatureService;
2021
import org.elasticsearch.persistent.ClusterPersistentTasksCustomMetadata;
2122
import org.elasticsearch.persistent.PersistentTasksService;
2223
import org.elasticsearch.test.ESTestCase;
@@ -49,6 +50,7 @@ public class AuthorizationTaskExecutorTests extends ESTestCase {
4950
private ClusterService clusterService;
5051
private PersistentTasksService persistentTasksService;
5152
private String localNodeId;
53+
private FeatureService enabledFeatureServiceMock;
5254

5355
@Before
5456
public void setUp() throws Exception {
@@ -57,6 +59,8 @@ public void setUp() throws Exception {
5759
clusterService = createClusterService(threadPool);
5860
persistentTasksService = mock(PersistentTasksService.class);
5961
localNodeId = clusterService.localNode().getId();
62+
enabledFeatureServiceMock = mock(FeatureService.class);
63+
when(enabledFeatureServiceMock.clusterHasFeature(any(), any())).thenReturn(true);
6064
}
6165

6266
@After
@@ -72,6 +76,7 @@ public void testMultipleCallsToStart_OnlyRegistersOnce() {
7276
var executor = new AuthorizationTaskExecutor(
7377
mockClusterService,
7478
persistentTasksService,
79+
enabledFeatureServiceMock,
7580
new AuthorizationPoller.Parameters(
7681
createWithEmptySettings(threadPool),
7782
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -102,6 +107,7 @@ public void testStartLazy_OnlyRegistersOnce_NeverCallsPersistentTaskService() {
102107
var executor = new AuthorizationTaskExecutor(
103108
mockClusterService,
104109
persistentTasksService,
110+
enabledFeatureServiceMock,
105111
new AuthorizationPoller.Parameters(
106112
createWithEmptySettings(threadPool),
107113
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -138,6 +144,7 @@ public void testDoesNotRegisterListener_IfUrlIsEmpty() {
138144
var executor = new AuthorizationTaskExecutor(
139145
mockClusterService,
140146
persistentTasksService,
147+
enabledFeatureServiceMock,
141148
new AuthorizationPoller.Parameters(
142149
createWithEmptySettings(threadPool),
143150
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -168,6 +175,7 @@ public void testMultipleCallsToStart_AndStop() {
168175
var executor = new AuthorizationTaskExecutor(
169176
mockClusterService,
170177
persistentTasksService,
178+
enabledFeatureServiceMock,
171179
new AuthorizationPoller.Parameters(
172180
createWithEmptySettings(threadPool),
173181
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -214,6 +222,7 @@ public void testCallsSendClusterStartRequest_WhenStartIsCalled() {
214222
var executor = new AuthorizationTaskExecutor(
215223
mockClusterService,
216224
persistentTasksService,
225+
enabledFeatureServiceMock,
217226
new AuthorizationPoller.Parameters(
218227
createWithEmptySettings(threadPool),
219228
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -263,6 +272,7 @@ public void testDoesNotCallSendClusterStartRequest_WhenStartIsCalled_WhenItIsAlr
263272
var executor = new AuthorizationTaskExecutor(
264273
mockClusterService,
265274
persistentTasksService,
275+
enabledFeatureServiceMock,
266276
new AuthorizationPoller.Parameters(
267277
createWithEmptySettings(threadPool),
268278
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -292,6 +302,7 @@ public void testCreatesTask_WhenItDoesNotExistOnClusterStateChange() {
292302
var executor = new AuthorizationTaskExecutor(
293303
clusterService,
294304
persistentTasksService,
305+
enabledFeatureServiceMock,
295306
new AuthorizationPoller.Parameters(
296307
createWithEmptySettings(threadPool),
297308
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -340,10 +351,46 @@ private ClusterState initialState() {
340351
return ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).metadata(EMPTY_METADATA).build();
341352
}
342353

354+
public void testDoesNotCreateTask_WhenFeatureIsNotSupported() {
355+
var eisUrl = "abc";
356+
var disabledFeatureServiceMock = mock(FeatureService.class);
357+
when(disabledFeatureServiceMock.clusterHasFeature(any(), any())).thenReturn(false);
358+
359+
var executor = new AuthorizationTaskExecutor(
360+
clusterService,
361+
persistentTasksService,
362+
disabledFeatureServiceMock,
363+
new AuthorizationPoller.Parameters(
364+
createWithEmptySettings(threadPool),
365+
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
366+
mock(Sender.class),
367+
ElasticInferenceServiceSettingsTests.create(eisUrl, TimeValue.timeValueMillis(1), TimeValue.timeValueMillis(1), true),
368+
mock(ModelRegistry.class),
369+
mock(Client.class),
370+
createMockCCMFeature(false),
371+
createMockCCMService(false)
372+
)
373+
);
374+
executor.startAndImmediatelyCreateTask();
375+
376+
var listener1 = new PlainActionFuture<Void>();
377+
clusterService.getClusterApplierService().onNewClusterState("initialization", this::initialState, listener1);
378+
listener1.actionGet(TimeValue.THIRTY_SECONDS);
379+
// We should never call sendClusterStartRequest since the feature is not supported
380+
verify(persistentTasksService, never()).sendClusterStartRequest(
381+
eq(AuthorizationPoller.TASK_NAME),
382+
eq(AuthorizationPoller.TASK_NAME),
383+
eq(AuthorizationTaskParams.INSTANCE),
384+
any(),
385+
any()
386+
);
387+
}
388+
343389
public void testDoesNotRegisterClusterStateListener_DoesNotCreateTask_WhenTheEisUrlIsEmpty() {
344390
var executor = new AuthorizationTaskExecutor(
345391
clusterService,
346392
persistentTasksService,
393+
enabledFeatureServiceMock,
347394
new AuthorizationPoller.Parameters(
348395
createWithEmptySettings(threadPool),
349396
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -373,6 +420,7 @@ public void testDoesNotRegisterClusterStateListener_DoesNotCreateTask_WhenTheEis
373420
var executor = new AuthorizationTaskExecutor(
374421
clusterService,
375422
persistentTasksService,
423+
enabledFeatureServiceMock,
376424
new AuthorizationPoller.Parameters(
377425
createWithEmptySettings(threadPool),
378426
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -425,6 +473,7 @@ public void testDoesNotCreateTask_OnClusterStateChange_WhenItAlreadyExists() {
425473
var executor = new AuthorizationTaskExecutor(
426474
mockClusterService,
427475
persistentTasksService,
476+
enabledFeatureServiceMock,
428477
new AuthorizationPoller.Parameters(
429478
createWithEmptySettings(threadPool),
430479
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),

0 commit comments

Comments
 (0)