Skip to content

Commit d5a05ec

Browse files
Simplifying http client to better support authorization tests (elastic#138630)
1 parent f1641e3 commit d5a05ec

File tree

2 files changed

+25
-27
lines changed
  • x-pack/plugin/inference/src

2 files changed

+25
-27
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClient.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.io.IOException;
2929
import java.util.Objects;
3030
import java.util.concurrent.CancellationException;
31-
import java.util.concurrent.atomic.AtomicReference;
3231

3332
import static org.elasticsearch.core.Strings.format;
3433
import static org.elasticsearch.xpack.inference.InferencePlugin.INFERENCE_RESPONSE_THREAD_POOL_NAME;
@@ -39,14 +38,7 @@
3938
public class HttpClient implements Closeable {
4039
private static final Logger logger = LogManager.getLogger(HttpClient.class);
4140

42-
enum Status {
43-
CREATED,
44-
STARTED,
45-
STOPPED
46-
}
47-
4841
private final CloseableHttpAsyncClient client;
49-
private final AtomicReference<Status> status = new AtomicReference<>(Status.CREATED);
5042
private final ThreadPool threadPool;
5143
private final HttpSettings settings;
5244
private final ThrottlerManager throttlerManager;
@@ -127,15 +119,10 @@ private static CloseableHttpAsyncClient createAsyncClient(
127119
}
128120

129121
public void start() {
130-
if (status.compareAndSet(Status.CREATED, Status.STARTED)) {
131-
client.start();
132-
}
122+
client.start();
133123
}
134124

135125
public void send(HttpRequest request, HttpClientContext context, ActionListener<HttpResult> listener) throws IOException {
136-
// The caller must call start() first before attempting to send a request
137-
assert status.get() == Status.STARTED : "call start() before attempting to send a request";
138-
139126
SocketAccess.doPrivileged(() -> client.execute(request.httpRequestBase(), context, new FutureCallback<>() {
140127
@Override
141128
public void completed(HttpResponse response) {
@@ -145,7 +132,7 @@ public void completed(HttpResponse response) {
145132
@Override
146133
public void failed(Exception ex) {
147134
throttlerManager.warn(logger, format("Request from inference entity id [%s] failed", request.inferenceEntityId()), ex);
148-
failUsingResponseThread(ex, listener);
135+
failUsingResponseThread(getException(ex), listener);
149136
}
150137

151138
@Override
@@ -179,10 +166,22 @@ private void failUsingResponseThread(Exception exception, ActionListener<?> list
179166
threadPool.executor(INFERENCE_RESPONSE_THREAD_POOL_NAME).execute(() -> listener.onFailure(exception));
180167
}
181168

182-
public void stream(HttpRequest request, HttpContext context, ActionListener<StreamingHttpResult> listener) throws IOException {
183-
// The caller must call start() first before attempting to send a request
184-
assert status.get() == Status.STARTED : "call start() before attempting to send a request";
169+
private static Exception getException(Exception e) {
170+
if (e instanceof CancellationException cancellationException) {
171+
return createNotRunningException(cancellationException);
172+
}
185173

174+
return e;
175+
}
176+
177+
private static IllegalStateException createNotRunningException(Exception exception) {
178+
// If the http client isn't running, it is either not started yet, in which case we have a bug somewhere because
179+
// it should always be started as part of the inference plugin startup, or it is stopped meaning the node is shutting down.
180+
// If we're shutting down, the user should retry the request, and hopefully it'll hit a node that isn't shutting down.
181+
return new IllegalStateException("Http client is not running, please retry the request", exception);
182+
}
183+
184+
public void stream(HttpRequest request, HttpContext context, ActionListener<StreamingHttpResult> listener) throws IOException {
186185
var streamingProcessor = new StreamingHttpResultPublisher(threadPool, settings, listener);
187186

188187
SocketAccess.doPrivileged(() -> client.execute(request.requestProducer(), streamingProcessor, context, new FutureCallback<>() {
@@ -193,7 +192,7 @@ public void completed(Void response) {
193192

194193
@Override
195194
public void failed(Exception ex) {
196-
threadPool.executor(INFERENCE_RESPONSE_THREAD_POOL_NAME).execute(() -> streamingProcessor.failed(ex));
195+
threadPool.executor(INFERENCE_RESPONSE_THREAD_POOL_NAME).execute(() -> streamingProcessor.failed(getException(ex)));
197196
}
198197

199198
@Override
@@ -212,7 +211,6 @@ public void cancelled() {
212211

213212
@Override
214213
public void close() throws IOException {
215-
status.set(Status.STOPPED);
216214
client.close();
217215
}
218216
}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/HttpClientTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.http.nio.reactor.IOReactorException;
2323
import org.elasticsearch.ElasticsearchException;
2424
import org.elasticsearch.action.support.PlainActionFuture;
25+
import org.elasticsearch.action.support.TestPlainActionFuture;
2526
import org.elasticsearch.common.Strings;
2627
import org.elasticsearch.common.settings.Settings;
2728
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -33,7 +34,6 @@
3334
import org.elasticsearch.threadpool.ThreadPool;
3435
import org.elasticsearch.xcontent.XContentType;
3536
import org.elasticsearch.xpack.inference.external.request.HttpRequest;
36-
import org.elasticsearch.xpack.inference.external.request.HttpRequestTests;
3737
import org.junit.After;
3838
import org.junit.Before;
3939

@@ -47,6 +47,7 @@
4747
import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityExecutors;
4848
import static org.elasticsearch.xpack.inference.Utils.mockClusterService;
4949
import static org.elasticsearch.xpack.inference.logging.ThrottlerManagerTests.mockThrottlerManager;
50+
import static org.hamcrest.Matchers.containsString;
5051
import static org.hamcrest.Matchers.equalTo;
5152
import static org.hamcrest.Matchers.hasSize;
5253
import static org.hamcrest.Matchers.is;
@@ -102,13 +103,12 @@ public void testSend_MockServerReceivesRequest() throws Exception {
102103

103104
public void testSend_ThrowsErrorIfCalledBeforeStart() throws Exception {
104105
try (var httpClient = HttpClient.create(emptyHttpSettings(), threadPool, createConnectionManager(), mockThrottlerManager())) {
105-
PlainActionFuture<HttpResult> listener = new PlainActionFuture<>();
106-
var thrownException = expectThrows(
107-
AssertionError.class,
108-
() -> httpClient.send(HttpRequestTests.createMock("inferenceEntityId"), HttpClientContext.create(), listener)
109-
);
106+
var listener = new TestPlainActionFuture<HttpResult>();
107+
var httpPost = createHttpPost(webServer.getPort(), "key", "value");
108+
httpClient.send(httpPost, HttpClientContext.create(), listener);
109+
var thrownException = expectThrows(IllegalStateException.class, () -> listener.actionGet(TimeValue.THIRTY_SECONDS));
110110

111-
assertThat(thrownException.getMessage(), is("call start() before attempting to send a request"));
111+
assertThat(thrownException.getMessage(), containsString("Http client is not running, please retry the request"));
112112
}
113113
}
114114

0 commit comments

Comments
 (0)