Skip to content

Commit b8bb59a

Browse files
Add internal cluster setting to aid testing
1 parent 94612b5 commit b8bb59a

File tree

10 files changed

+109
-57
lines changed

10 files changed

+109
-57
lines changed

x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/BaseMockEISAuthServerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public class BaseMockEISAuthServerTest extends ESRestTestCase {
3939
.setting("xpack.security.enabled", "true")
4040
// Adding both settings unless one feature flag is disabled in a particular environment
4141
.setting("xpack.inference.elastic.url", mockEISServer::getUrl)
42+
// If we don't disable this there's a very small chance that the authorization code could attempt to make two
43+
// calls which would result in a test failure because the webserver is only expecting a single request
44+
// So to ensure we avoid that all together, this flag indicates that we'll only perform a single authorization request
45+
.setting("xpack.inference.elastic.periodic_authorization_enabled", "false")
4246
// This plugin is located in the inference/qa/test-service-plugin package, look for TestInferenceServicePlugin
4347
.plugin("inference-service-test")
4448
.user("x_pack_rest_user", "x-pack-test-password")

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,10 @@ public Collection<?> createComponents(PluginServices services) {
277277
ElasticInferenceServiceSettings inferenceServiceSettings = new ElasticInferenceServiceSettings(settings);
278278
String elasticInferenceUrl = inferenceServiceSettings.getElasticInferenceServiceUrl();
279279

280-
var elasticInferenceServiceComponentsInstance = ElasticInferenceServiceComponents.withDefaults(elasticInferenceUrl);
280+
var elasticInferenceServiceComponentsInstance = ElasticInferenceServiceComponents.withDefaults(
281+
elasticInferenceUrl,
282+
inferenceServiceSettings.isPeriodicAuthorizationEnabled()
283+
);
281284
elasticInferenceServiceComponents.set(elasticInferenceServiceComponentsInstance);
282285

283286
var authorizationHandler = new ElasticInferenceServiceAuthorizationRequestHandler(

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,7 @@ public ElasticInferenceService(
119119
IMPLEMENTED_TASK_TYPES,
120120
this,
121121
getSender(),
122-
this.elasticInferenceServiceComponents.authRequestInterval(),
123-
this.elasticInferenceServiceComponents.maxAuthRequestJitter()
122+
elasticInferenceServiceComponents
124123
);
125124
}
126125

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceServiceComponents.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,30 @@
2020
public record ElasticInferenceServiceComponents(
2121
@Nullable String elasticInferenceServiceUrl,
2222
TimeValue authRequestInterval,
23-
TimeValue maxAuthRequestJitter
23+
TimeValue maxAuthRequestJitter,
24+
boolean periodicAuthorizationEnabled
2425
) {
2526

2627
private static final TimeValue DEFAULT_AUTH_REQUEST_INTERVAL = TimeValue.timeValueMinutes(10);
2728
private static final TimeValue DEFAULT_AUTH_REQUEST_JITTER = TimeValue.timeValueMinutes(5);
2829

29-
public static final ElasticInferenceServiceComponents EMPTY_INSTANCE = new ElasticInferenceServiceComponents(
30-
null,
31-
DEFAULT_AUTH_REQUEST_INTERVAL,
32-
DEFAULT_AUTH_REQUEST_JITTER
33-
);
30+
public static final ElasticInferenceServiceComponents EMPTY_INSTANCE = ElasticInferenceServiceComponents.withDefaults(null);
3431

3532
public static ElasticInferenceServiceComponents withDefaults(String elasticInferenceServiceUrl) {
3633
return new ElasticInferenceServiceComponents(
3734
elasticInferenceServiceUrl,
3835
DEFAULT_AUTH_REQUEST_INTERVAL,
39-
DEFAULT_AUTH_REQUEST_JITTER
36+
DEFAULT_AUTH_REQUEST_JITTER,
37+
true
38+
);
39+
}
40+
41+
public static ElasticInferenceServiceComponents withDefaults(String elasticInferenceServiceUrl, boolean periodicAuthorizationEnabled) {
42+
return new ElasticInferenceServiceComponents(
43+
elasticInferenceServiceUrl,
44+
DEFAULT_AUTH_REQUEST_INTERVAL,
45+
DEFAULT_AUTH_REQUEST_JITTER,
46+
periodicAuthorizationEnabled
4047
);
4148
}
4249

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceServiceSettings.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ public class ElasticInferenceServiceSettings {
3535
private final String eisGatewayUrl;
3636

3737
private final String elasticInferenceServiceUrl;
38+
private final boolean periodicAuthorizationEnabled;
3839

3940
public ElasticInferenceServiceSettings(Settings settings) {
4041
eisGatewayUrl = EIS_GATEWAY_URL.get(settings);
4142
elasticInferenceServiceUrl = ELASTIC_INFERENCE_SERVICE_URL.get(settings);
43+
periodicAuthorizationEnabled = PERIODIC_AUTHORIZATION_ENABLED.get(settings);
4244
}
4345

4446
public static final SSLConfigurationSettings ELASTIC_INFERENCE_SERVICE_SSL_CONFIGURATION_SETTINGS = SSLConfigurationSettings.withPrefix(
@@ -52,18 +54,31 @@ public ElasticInferenceServiceSettings(Settings settings) {
5254
Setting.Property.NodeScope
5355
);
5456

57+
/**
58+
* This setting is for testing only. It controls whether authorization is only performed once at bootup. If set to true, an
59+
* authorization request will be made repeatedly on an interval.
60+
*/
61+
public static final Setting<Boolean> PERIODIC_AUTHORIZATION_ENABLED = Setting.boolSetting(
62+
"xpack.inference.elastic.periodic_authorization_enabled",
63+
true,
64+
Setting.Property.NodeScope
65+
);
66+
5567
public static List<Setting<?>> getSettingsDefinitions() {
5668
ArrayList<Setting<?>> settings = new ArrayList<>();
5769
settings.add(EIS_GATEWAY_URL);
5870
settings.add(ELASTIC_INFERENCE_SERVICE_URL);
5971
settings.add(ELASTIC_INFERENCE_SERVICE_SSL_ENABLED);
6072
settings.addAll(ELASTIC_INFERENCE_SERVICE_SSL_CONFIGURATION_SETTINGS.getEnabledSettings());
61-
73+
settings.add(PERIODIC_AUTHORIZATION_ENABLED);
6274
return settings;
6375
}
6476

6577
public String getElasticInferenceServiceUrl() {
6678
return Strings.isEmpty(elasticInferenceServiceUrl) ? eisGatewayUrl : elasticInferenceServiceUrl;
6779
}
6880

81+
public boolean isPeriodicAuthorizationEnabled() {
82+
return periodicAuthorizationEnabled;
83+
}
6984
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/authorization/ElasticInferenceServiceAuthorizationHandler.java

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.common.Randomness;
14+
import org.elasticsearch.common.Strings;
1415
import org.elasticsearch.core.TimeValue;
1516
import org.elasticsearch.inference.InferenceService;
1617
import org.elasticsearch.inference.InferenceServiceConfiguration;
@@ -22,6 +23,7 @@
2223
import org.elasticsearch.xpack.inference.services.ServiceComponents;
2324
import org.elasticsearch.xpack.inference.services.elastic.DefaultModelConfig;
2425
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceService;
26+
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceComponents;
2527

2628
import java.io.Closeable;
2729
import java.io.IOException;
@@ -35,6 +37,7 @@
3537
import java.util.TreeSet;
3638
import java.util.concurrent.CountDownLatch;
3739
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicBoolean;
3841
import java.util.concurrent.atomic.AtomicReference;
3942

4043
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;
@@ -60,12 +63,14 @@ static AuthorizedContent empty() {
6063
private final TimeValue requestInterval;
6164
private final TimeValue maxJitter;
6265
private final Map<String, DefaultModelConfig> defaultModelsConfigs;
63-
private final CountDownLatch authorizationCompletedLatch = new CountDownLatch(1);
66+
private final CountDownLatch firstAuthorizationCompletedLatch = new CountDownLatch(1);
6467
private final EnumSet<TaskType> implementedTaskTypes;
6568
private final InferenceService inferenceService;
6669
private final Sender sender;
6770
private final Runnable callback;
6871
private final AtomicReference<Scheduler.ScheduledCancellable> lastAuthTask = new AtomicReference<>(null);
72+
private final AtomicBoolean shutdown = new AtomicBoolean(false);
73+
private final boolean periodAuthorizationEnabled;
6974

7075
public ElasticInferenceServiceAuthorizationHandler(
7176
ServiceComponents serviceComponents,
@@ -75,8 +80,7 @@ public ElasticInferenceServiceAuthorizationHandler(
7580
EnumSet<TaskType> implementedTaskTypes,
7681
InferenceService inferenceService,
7782
Sender sender,
78-
TimeValue requestInterval,
79-
TimeValue maxJitter
83+
ElasticInferenceServiceComponents elasticInferenceServiceComponents
8084
) {
8185
this(
8286
serviceComponents,
@@ -86,8 +90,7 @@ public ElasticInferenceServiceAuthorizationHandler(
8690
implementedTaskTypes,
8791
Objects.requireNonNull(inferenceService),
8892
sender,
89-
requestInterval,
90-
maxJitter,
93+
elasticInferenceServiceComponents,
9194
null
9295
);
9396
}
@@ -101,8 +104,7 @@ public ElasticInferenceServiceAuthorizationHandler(
101104
EnumSet<TaskType> implementedTaskTypes,
102105
InferenceService inferenceService,
103106
Sender sender,
104-
TimeValue requestInterval,
105-
TimeValue maxJitter,
107+
ElasticInferenceServiceComponents elasticInferenceServiceComponents,
106108
// this is a hack to facilitate testing
107109
Runnable callback
108110
) {
@@ -114,8 +116,12 @@ public ElasticInferenceServiceAuthorizationHandler(
114116
// allow the service to be null for testing
115117
this.inferenceService = inferenceService;
116118
this.sender = Objects.requireNonNull(sender);
117-
this.requestInterval = Objects.requireNonNull(requestInterval);
118-
this.maxJitter = Objects.requireNonNull(maxJitter);
119+
120+
Objects.requireNonNull(elasticInferenceServiceComponents);
121+
requestInterval = elasticInferenceServiceComponents.authRequestInterval();
122+
maxJitter = elasticInferenceServiceComponents.maxAuthRequestJitter();
123+
periodAuthorizationEnabled = elasticInferenceServiceComponents.periodicAuthorizationEnabled();
124+
119125
configuration = new AtomicReference<>(
120126
new ElasticInferenceService.Configuration(authorizedContent.get().taskTypesAndModels.getAuthorizedTaskTypes())
121127
);
@@ -134,7 +140,7 @@ public void init() {
134140
*/
135141
public void waitForAuthorizationToComplete(TimeValue waitTime) {
136142
try {
137-
if (authorizationCompletedLatch.await(waitTime.getSeconds(), TimeUnit.SECONDS) == false) {
143+
if (firstAuthorizationCompletedLatch.await(waitTime.getSeconds(), TimeUnit.SECONDS) == false) {
138144
throw new IllegalStateException("The wait time has expired for authorization to complete.");
139145
}
140146
} catch (InterruptedException e) {
@@ -172,28 +178,50 @@ public synchronized InferenceServiceConfiguration getConfiguration() {
172178

173179
@Override
174180
public void close() throws IOException {
181+
shutdown.set(true);
175182
if (lastAuthTask.get() != null) {
176183
lastAuthTask.get().cancel();
177184
}
178185
}
179186

180187
private void scheduleAuthorizationRequest() {
181188
try {
189+
if (periodAuthorizationEnabled == false) {
190+
return;
191+
}
192+
193+
// this call has to be on the individual thread otherwise we get an exception
182194
var random = Randomness.get();
183195
var jitter = (long) (maxJitter.millis() * random.nextDouble());
184196
var waitTime = TimeValue.timeValueMillis(requestInterval.millis() + jitter);
185-
lastAuthTask.set(serviceComponents.threadPool()
186-
.schedule(
187-
this::scheduleAndSendAuthorizationRequest,
188-
waitTime,
189-
serviceComponents.threadPool().executor(UTILITY_THREAD_POOL_NAME)
190-
));
197+
198+
logger.debug(
199+
() -> Strings.format(
200+
"Scheduling the next authorization call with request interval: %s ms, jitter: %d ms",
201+
requestInterval.millis(),
202+
jitter
203+
)
204+
);
205+
logger.debug(() -> Strings.format("Next authorization call in %d minutes", waitTime.getMinutes()));
206+
207+
lastAuthTask.set(
208+
serviceComponents.threadPool()
209+
.schedule(
210+
this::scheduleAndSendAuthorizationRequest,
211+
waitTime,
212+
serviceComponents.threadPool().executor(UTILITY_THREAD_POOL_NAME)
213+
)
214+
);
191215
} catch (Exception e) {
192216
logger.warn("Failed scheduling authorization request", e);
193217
}
194218
}
195219

196220
private void scheduleAndSendAuthorizationRequest() {
221+
if (shutdown.get()) {
222+
return;
223+
}
224+
197225
scheduleAuthorizationRequest();
198226
sendAuthorizationRequest();
199227
}
@@ -205,21 +233,22 @@ private void sendAuthorizationRequest() {
205233
if (callback != null) {
206234
callback.run();
207235
}
208-
authorizationCompletedLatch.countDown();
236+
firstAuthorizationCompletedLatch.countDown();
209237
}, e -> {
210238
// we don't need to do anything if there was a failure, everything is disabled by default
211-
authorizationCompletedLatch.countDown();
239+
firstAuthorizationCompletedLatch.countDown();
212240
});
213241

214242
authorizationHandler.getAuthorization(listener, sender);
215243
} catch (Exception e) {
216244
logger.warn("Failure while sending the request to retrieve authorization", e);
217245
// we don't need to do anything if there was a failure, everything is disabled by default
218-
authorizationCompletedLatch.countDown();
246+
firstAuthorizationCompletedLatch.countDown();
219247
}
220248
}
221249

222250
private synchronized void setAuthorizedContent(ElasticInferenceServiceAuthorizationModel auth) {
251+
logger.debug("Received authorization response");
223252
var authorizedTaskTypesAndModels = authorizedContent.get().taskTypesAndModels.merge(auth)
224253
.newLimitedToTaskTypes(EnumSet.copyOf(implementedTaskTypes));
225254

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/Utils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.xpack.inference.mock.TestDenseInferenceServiceExtension;
3535
import org.elasticsearch.xpack.inference.mock.TestSparseInferenceServiceExtension;
3636
import org.elasticsearch.xpack.inference.registry.ModelRegistry;
37+
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSettings;
3738
import org.hamcrest.Matchers;
3839

3940
import java.io.IOException;
@@ -78,7 +79,8 @@ public static ClusterService mockClusterService(Settings settings) {
7879
ThrottlerManager.getSettingsDefinitions(),
7980
RetrySettings.getSettingsDefinitions(),
8081
Truncator.getSettingsDefinitions(),
81-
RequestExecutorServiceSettings.getSettingsDefinitions()
82+
RequestExecutorServiceSettings.getSettingsDefinitions(),
83+
ElasticInferenceServiceSettings.getSettingsDefinitions()
8284
).flatMap(Collection::stream).collect(Collectors.toSet());
8385

8486
var cSettings = new ClusterSettings(settings, registeredSettings);

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceServiceSparseEmbeddingsModelTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public static ElasticInferenceServiceSparseEmbeddingsModel createModel(String ur
2626
new ElasticInferenceServiceSparseEmbeddingsServiceSettings(modelId, maxInputTokens, null),
2727
EmptyTaskSettings.INSTANCE,
2828
EmptySecretSettings.INSTANCE,
29-
ElasticInferenceServiceComponents.withDefaults(url)
29+
ElasticInferenceServiceComponents.withDefaults(url, false)
3030
);
3131
}
3232
}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceServiceTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,7 +1065,7 @@ private void testUnifiedStreamError(int responseCode, String responseJson, Strin
10651065
new ElasticInferenceServiceCompletionServiceSettings("model_id", new RateLimitSettings(100)),
10661066
EmptyTaskSettings.INSTANCE,
10671067
EmptySecretSettings.INSTANCE,
1068-
ElasticInferenceServiceComponents.withDefaults(eisGatewayUrl)
1068+
ElasticInferenceServiceComponents.withDefaults(eisGatewayUrl, false)
10691069
);
10701070
PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
10711071
service.unifiedCompletionInfer(
@@ -1151,7 +1151,7 @@ private ElasticInferenceService createService(
11511151
return new ElasticInferenceService(
11521152
senderFactory,
11531153
createWithEmptySettings(threadPool),
1154-
ElasticInferenceServiceComponents.withDefaults(gatewayUrl),
1154+
ElasticInferenceServiceComponents.withDefaults(gatewayUrl, false),
11551155
mockModelRegistry(),
11561156
mockAuthHandler
11571157
);
@@ -1161,7 +1161,7 @@ private ElasticInferenceService createServiceWithAuthHandler(HttpRequestSender.F
11611161
return new ElasticInferenceService(
11621162
senderFactory,
11631163
createWithEmptySettings(threadPool),
1164-
ElasticInferenceServiceComponents.withDefaults(eisGatewayUrl),
1164+
ElasticInferenceServiceComponents.withDefaults(eisGatewayUrl, false),
11651165
mockModelRegistry(),
11661166
new ElasticInferenceServiceAuthorizationRequestHandler(eisGatewayUrl, threadPool)
11671167
);
@@ -1171,7 +1171,7 @@ public static ElasticInferenceService createServiceWithAuthHandler(HttpRequestSe
11711171
return new ElasticInferenceService(
11721172
senderFactory,
11731173
createWithEmptySettings(threadPool),
1174-
ElasticInferenceServiceComponents.withDefaults(eisGatewayUrl),
1174+
ElasticInferenceServiceComponents.withDefaults(eisGatewayUrl, false),
11751175
mockModelRegistry(threadPool),
11761176
new ElasticInferenceServiceAuthorizationRequestHandler(eisGatewayUrl, threadPool)
11771177
);

0 commit comments

Comments
 (0)