Skip to content

Commit 9045040

Browse files
Adding node feature for persistent task
1 parent 807c2e0 commit 9045040

File tree

7 files changed

+214
-5
lines changed

7 files changed

+214
-5
lines changed

x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceBaseRestTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ protected void assertNonEmptyInferenceResults(Map<String, Object> resultMap, int
525525
}
526526
}
527527

528-
static void assertStatusOkOrCreated(Response response) throws IOException {
528+
public static void assertStatusOkOrCreated(Response response) throws IOException {
529529
int statusCode = response.getStatusLine().getStatusCode();
530530
// Once EntityUtils.toString(entity) is called the entity cannot be reused.
531531
// Avoid that call with check here.

x-pack/plugin/inference/qa/rolling-upgrade/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ apply plugin: 'elasticsearch.bc-upgrade-test'
1515
dependencies {
1616
javaRestTestImplementation(testArtifact(project(xpackModule('core'))))
1717
javaRestTestImplementation project(path: xpackModule('inference'))
18+
1819
javaRestTestImplementation(testArtifact(project(":qa:rolling-upgrade"), "javaRestTest"))
20+
21+
// allow these upgrade tests to import some helpers from the javaRestTest classes in x-pack:plugin:inference:qa:inference-service-tests
22+
javaRestTestImplementation(testArtifact(project(":x-pack:plugin:inference:qa:inference-service-tests"), "javaRestTest"))
1923
}
2024

2125
// Inference API added in 8.11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.application;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.Name;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
17+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
18+
import org.elasticsearch.upgrades.ParameterizedRollingUpgradeTestCase;
19+
import org.elasticsearch.xpack.inference.services.elastic.authorization.AuthorizationPoller;
20+
import org.junit.ClassRule;
21+
22+
import java.io.IOException;
23+
import java.util.Map;
24+
25+
import static org.elasticsearch.xpack.inference.InferenceBaseRestTest.assertStatusOkOrCreated;
26+
import static org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSettings.ELASTIC_INFERENCE_SERVICE_URL;
27+
import static org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSettings.PERIODIC_AUTHORIZATION_ENABLED;
28+
import static org.hamcrest.Matchers.is;
29+
30+
public class AuthorizationTaskExecutorUpgradeIT extends ParameterizedRollingUpgradeTestCase {
31+
32+
private static final Logger logger = LogManager.getLogger(AuthorizationTaskExecutorUpgradeIT.class);
33+
private static final String BEFORE_AUTHORIZATION_TASK_FEATURE = "gte_v9.1.0";
34+
// The bug where the authorization task is registered before the upgrade is complete was introduced in 9.3.0
35+
// This is the currently latest version before that
36+
private static final String MAX_CLUSTER_VERSION_BEFORE_BUG_INTRODUCED = "gte_v9.2.2";
37+
38+
@ClassRule
39+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
40+
.distribution(DistributionType.DEFAULT)
41+
.version(getOldClusterVersion(), isOldClusterDetachedVersion())
42+
.nodes(NODE_NUM)
43+
.setting("xpack.security.enabled", "false")
44+
.setting("xpack.license.self_generated.type", "trial")
45+
.setting(PERIODIC_AUTHORIZATION_ENABLED.getKey(), "false")
46+
// We need a url set for the authorization task to be created, but we don't actually care if we get a valid response
47+
// just that the task will be created upon upgrade
48+
.setting(ELASTIC_INFERENCE_SERVICE_URL.getKey(), "http://localhost:12345")
49+
.build();
50+
51+
private static final String GET_METHOD = "GET";
52+
53+
public AuthorizationTaskExecutorUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
54+
super(upgradedNodes);
55+
}
56+
57+
@Override
58+
protected ElasticsearchCluster getUpgradeCluster() {
59+
return cluster;
60+
}
61+
62+
public void testUpgradeAuthorizationTaskExecutor() throws Exception {
63+
assumeTrue(
64+
"Only test a version prior to v9.3.0 but not on v9.3.0 so we start in a"
65+
+ " version before the authorization polling task existed",
66+
oldClusterHasFeature(BEFORE_AUTHORIZATION_TASK_FEATURE)
67+
&& oldClusterHasFeature(MAX_CLUSTER_VERSION_BEFORE_BUG_INTRODUCED) == false
68+
);
69+
70+
final var isBeforeAnyNodesUpgraded = isOldCluster();
71+
final var isDuringNodeUpgrades = isMixedCluster();
72+
73+
if (isBeforeAnyNodesUpgraded || isDuringNodeUpgrades) {
74+
logger.info(
75+
Strings.format(
76+
"Is before any nodes upgraded: %s, is during node upgrades: %s",
77+
isBeforeAnyNodesUpgraded,
78+
isDuringNodeUpgrades
79+
)
80+
);
81+
logger.info(Strings.format("Original cluster version: %s", getOldClusterVersion()));
82+
// If cluster version does not have the authorization polling task feature:
83+
// The task must not exist on a non-upgraded or mixed cluster.
84+
assertFalse(doesAuthPollingTaskExist());
85+
}
86+
87+
if (isUpgradedCluster()) {
88+
logger.info("Cluster is fully upgraded scenario");
89+
// once fully upgraded, the authorization polling task should be created
90+
assertBusy(() -> assertTrue(doesAuthPollingTaskExist()));
91+
}
92+
}
93+
94+
@SuppressWarnings("unchecked")
95+
private static boolean doesAuthPollingTaskExist() throws IOException {
96+
var request = new Request(GET_METHOD, Strings.format("_tasks?pretty&actions=%s*", AuthorizationPoller.TASK_NAME));
97+
var response = adminClient().performRequest(request);
98+
assertStatusOkOrCreated(response);
99+
100+
/*
101+
The task response will look like this
102+
{
103+
"nodes": {
104+
"jFlV8lS0SKip7Tp6Iz9Eew": {
105+
...
106+
"tasks": {
107+
"jFlV8lS0SKip7Tp6Iz9Eew:336": {
108+
"node": "jFlV8lS0SKip7Tp6Iz9Eew",
109+
"id": 336,
110+
"type": "persistent",
111+
"action": "eis-authorization-poller[c]",
112+
...
113+
}
114+
}
115+
}
116+
}
117+
}
118+
*/
119+
var responseAsMap = entityAsMap(response);
120+
var nodes = (Map<String, Object>) responseAsMap.get("nodes");
121+
122+
if (nodes == null || nodes.isEmpty()) {
123+
return false;
124+
}
125+
126+
// There should only ever be a single authorization task in the cluster
127+
assertThat(nodes.size(), is(1));
128+
129+
var node = (Map<String, Object>) nodes.values().iterator().next();
130+
var tasks = (Map<String, Object>) node.get("tasks");
131+
132+
if (tasks == null || tasks.isEmpty()) {
133+
return false;
134+
}
135+
136+
for (var taskObj : tasks.values()) {
137+
var task = (Map<String, Object>) taskObj;
138+
var action = (String) task.get("action");
139+
if (action != null && action.startsWith(AuthorizationPoller.TASK_NAME)) {
140+
return true;
141+
}
142+
}
143+
144+
return false;
145+
}
146+
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,11 @@ public class InferenceFeatures implements FeatureSpecification {
5656
public static final NodeFeature INFERENCE_ENDPOINT_CACHE = new NodeFeature("inference.endpoint.cache");
5757
public static final NodeFeature INFERENCE_CCM_CACHE = new NodeFeature("inference.ccm.cache");
5858
public static final NodeFeature SEARCH_USAGE_EXTENDED_DATA = new NodeFeature("search.usage.extended_data");
59+
public static final NodeFeature INFERENCE_AUTH_POLLER_PERSISTENT_TASK = new NodeFeature("inference.auth_poller.persistent_task");
5960

6061
@Override
6162
public Set<NodeFeature> getFeatures() {
62-
return Set.of(INFERENCE_ENDPOINT_CACHE, INFERENCE_CCM_CACHE);
63+
return Set.of(INFERENCE_ENDPOINT_CACHE, INFERENCE_CCM_CACHE, INFERENCE_AUTH_POLLER_PERSISTENT_TASK);
6364
}
6465

6566
@Override

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ public Collection<?> createComponents(PluginServices services) {
365365

366366
var authTaskExecutor = AuthorizationTaskExecutor.create(
367367
services.clusterService(),
368+
services.featureService(),
368369
new AuthorizationPoller.Parameters(
369370
serviceComponents.get(),
370371
authorizationHandler,

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/authorization/AuthorizationTaskExecutor.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
import org.elasticsearch.ResourceAlreadyExistsException;
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.cluster.ClusterChangedEvent;
15+
import org.elasticsearch.cluster.ClusterState;
1516
import org.elasticsearch.cluster.ClusterStateListener;
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.Strings;
1819
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1920
import org.elasticsearch.core.FixForMultiProject;
2021
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.features.FeatureService;
2123
import org.elasticsearch.persistent.AllocatedPersistentTask;
2224
import org.elasticsearch.persistent.ClusterPersistentTasksCustomMetadata;
2325
import org.elasticsearch.persistent.PersistentTaskParams;
@@ -29,6 +31,7 @@
2931
import org.elasticsearch.transport.RemoteTransportException;
3032
import org.elasticsearch.xcontent.NamedXContentRegistry;
3133
import org.elasticsearch.xcontent.ParseField;
34+
import org.elasticsearch.xpack.inference.InferenceFeatures;
3235

3336
import java.util.List;
3437
import java.util.Map;
@@ -46,14 +49,20 @@ public class AuthorizationTaskExecutor extends PersistentTasksExecutor<Authoriza
4649
private final PersistentTasksService persistentTasksService;
4750
private final AuthorizationPoller.Parameters pollerParameters;
4851
private final AtomicReference<AuthorizationPoller> currentTask = new AtomicReference<>();
52+
private final FeatureService featureService;
4953

50-
public static AuthorizationTaskExecutor create(ClusterService clusterService, AuthorizationPoller.Parameters parameters) {
54+
public static AuthorizationTaskExecutor create(
55+
ClusterService clusterService,
56+
FeatureService featureService,
57+
AuthorizationPoller.Parameters parameters
58+
) {
5159
Objects.requireNonNull(clusterService);
5260
Objects.requireNonNull(parameters);
5361

5462
var executor = new AuthorizationTaskExecutor(
5563
clusterService,
5664
new PersistentTasksService(clusterService, parameters.serviceComponents().threadPool(), parameters.client()),
65+
featureService,
5766
parameters
5867
);
5968
executor.init();
@@ -64,10 +73,12 @@ public static AuthorizationTaskExecutor create(ClusterService clusterService, Au
6473
AuthorizationTaskExecutor(
6574
ClusterService clusterService,
6675
PersistentTasksService persistentTasksService,
76+
FeatureService featureService,
6777
AuthorizationPoller.Parameters pollerParameters
6878
) {
6979
super(TASK_NAME, pollerParameters.serviceComponents().threadPool().executor(UTILITY_THREAD_POOL_NAME));
7080
this.clusterService = Objects.requireNonNull(clusterService);
81+
this.featureService = Objects.requireNonNull(featureService);
7182
this.persistentTasksService = Objects.requireNonNull(persistentTasksService);
7283
this.pollerParameters = Objects.requireNonNull(pollerParameters);
7384
}
@@ -120,14 +131,14 @@ protected AuthorizationPoller createTask(
120131

121132
@Override
122133
public void clusterChanged(ClusterChangedEvent event) {
123-
if (authorizationTaskExists(event)) {
134+
if (clusterCanSupportFeature(event.state()) == false || authorizationTaskExists(event)) {
124135
return;
125136
}
126137

127138
persistentTasksService.sendClusterStartRequest(
128139
TASK_NAME,
129140
TASK_NAME,
130-
new AuthorizationTaskParams(),
141+
AuthorizationTaskParams.INSTANCE,
131142
TimeValue.THIRTY_SECONDS,
132143
ActionListener.wrap(persistentTask -> logger.debug("Created authorization poller task"), exception -> {
133144
var thrownException = exception instanceof RemoteTransportException ? exception.getCause() : exception;
@@ -138,6 +149,10 @@ public void clusterChanged(ClusterChangedEvent event) {
138149
);
139150
}
140151

152+
private boolean clusterCanSupportFeature(ClusterState state) {
153+
return state.clusterRecovered() && featureService.clusterHasFeature(state, InferenceFeatures.INFERENCE_AUTH_POLLER_PERSISTENT_TASK);
154+
}
155+
141156
private static boolean authorizationTaskExists(ClusterChangedEvent event) {
142157
return ClusterPersistentTasksCustomMetadata.getTaskWithId(event.state(), TASK_NAME) != null;
143158
}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elastic/authorization/AuthorizationTaskExecutorTests.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.node.DiscoveryNodes;
1818
import org.elasticsearch.cluster.service.ClusterService;
1919
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.features.FeatureService;
2021
import org.elasticsearch.persistent.ClusterPersistentTasksCustomMetadata;
2122
import org.elasticsearch.persistent.PersistentTasksService;
2223
import org.elasticsearch.test.ESTestCase;
@@ -39,13 +40,15 @@
3940
import static org.mockito.Mockito.never;
4041
import static org.mockito.Mockito.times;
4142
import static org.mockito.Mockito.verify;
43+
import static org.mockito.Mockito.when;
4244

4345
public class AuthorizationTaskExecutorTests extends ESTestCase {
4446

4547
private ThreadPool threadPool;
4648
private ClusterService clusterService;
4749
private PersistentTasksService persistentTasksService;
4850
private String localNodeId;
51+
private FeatureService enabledFeatureServiceMock;
4952

5053
@Before
5154
public void setUp() throws Exception {
@@ -54,6 +57,8 @@ public void setUp() throws Exception {
5457
clusterService = createClusterService(threadPool);
5558
persistentTasksService = mock(PersistentTasksService.class);
5659
localNodeId = clusterService.localNode().getId();
60+
enabledFeatureServiceMock = mock(FeatureService.class);
61+
when(enabledFeatureServiceMock.clusterHasFeature(any(), any())).thenReturn(true);
5762
}
5863

5964
@After
@@ -69,6 +74,7 @@ public void testCreatesTask_WhenItDoesNotExistOnClusterStateChange() {
6974
var executor = new AuthorizationTaskExecutor(
7075
clusterService,
7176
persistentTasksService,
77+
enabledFeatureServiceMock,
7278
new AuthorizationPoller.Parameters(
7379
createWithEmptySettings(threadPool),
7480
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -114,10 +120,44 @@ private ClusterState initialState() {
114120
return ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).metadata(EMPTY_METADATA).build();
115121
}
116122

123+
public void testDoesNotCreateTask_WhenFeatureIsNotSupported() {
124+
var eisUrl = "abc";
125+
var disabledFeatureServiceMock = mock(FeatureService.class);
126+
when(disabledFeatureServiceMock.clusterHasFeature(any(), any())).thenReturn(false);
127+
128+
var executor = new AuthorizationTaskExecutor(
129+
clusterService,
130+
persistentTasksService,
131+
disabledFeatureServiceMock,
132+
new AuthorizationPoller.Parameters(
133+
createWithEmptySettings(threadPool),
134+
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
135+
mock(Sender.class),
136+
ElasticInferenceServiceSettingsTests.create(eisUrl, TimeValue.timeValueMillis(1), TimeValue.timeValueMillis(1), true),
137+
mock(ModelRegistry.class),
138+
mock(Client.class)
139+
)
140+
);
141+
executor.init();
142+
143+
var listener1 = new PlainActionFuture<Void>();
144+
clusterService.getClusterApplierService().onNewClusterState("initialization", this::initialState, listener1);
145+
listener1.actionGet(TimeValue.THIRTY_SECONDS);
146+
// We should never call sendClusterStartRequest since the feature is not supported
147+
verify(persistentTasksService, never()).sendClusterStartRequest(
148+
eq(AuthorizationPoller.TASK_NAME),
149+
eq(AuthorizationPoller.TASK_NAME),
150+
eq(AuthorizationTaskParams.INSTANCE),
151+
any(),
152+
any()
153+
);
154+
}
155+
117156
public void testDoesNotRegisterClusterStateListener_DoesNotCreateTask_WhenTheEisUrlIsEmpty() {
118157
var executor = new AuthorizationTaskExecutor(
119158
clusterService,
120159
persistentTasksService,
160+
enabledFeatureServiceMock,
121161
new AuthorizationPoller.Parameters(
122162
createWithEmptySettings(threadPool),
123163
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -145,6 +185,7 @@ public void testDoesNotRegisterClusterStateListener_DoesNotCreateTask_WhenTheEis
145185
var executor = new AuthorizationTaskExecutor(
146186
clusterService,
147187
persistentTasksService,
188+
enabledFeatureServiceMock,
148189
new AuthorizationPoller.Parameters(
149190
createWithEmptySettings(threadPool),
150191
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),
@@ -195,6 +236,7 @@ public void testDoesNotCreateTask_OnClusterStateChange_WhenItAlreadyExists() {
195236
var executor = new AuthorizationTaskExecutor(
196237
clusterService,
197238
persistentTasksService,
239+
enabledFeatureServiceMock,
198240
new AuthorizationPoller.Parameters(
199241
createWithEmptySettings(threadPool),
200242
mock(ElasticInferenceServiceAuthorizationRequestHandler.class),

0 commit comments

Comments
 (0)