Skip to content

Commit cc08228

Browse files
authored
SOLR-17211: HttpJdkSolrClient Support Async requests (#2374)
1 parent 5c399dd commit cc08228

File tree

12 files changed

+459
-47
lines changed

12 files changed

+459
-47
lines changed

solr/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ Improvements
126126

127127
* SOLR-17164: Add 2 arg variant of vectorSimilarity() function (Sanjay Dutt, hossman)
128128

129+
* SOLR-17211: New SolrJ JDK client supports Async (James Dyer)
130+
129131
Optimizations
130132
---------------------
131133
* SOLR-17144: Close searcherExecutor thread per core after 1 minute (Pierre Salagnac, Christine Poerschke)

solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ Requests are sent in the form of {solr-javadocs}/solrj/org/apache/solr/client/so
9898
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/HttpSolrClient.html[`HttpSolrClient`] - geared towards query-centric workloads, though also a good general-purpose client.
9999
Communicates directly with a single Solr node.
100100
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/Http2SolrClient.html[`Http2SolrClient`] - async, non-blocking and general-purpose client that leverage HTTP/2 using the Jetty Http library.
101-
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.html[`HttpJdkSolrClient`] - General-purpose client using the JDK's built-in Http Client. Supports both Http/2 and Http/1.1. Targeted for those users wishing to minimize application dependencies.
101+
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.html[`HttpJdkSolrClient`] - General-purpose client using the JDK's built-in Http Client. Supports both Http/2 and Http/1.1. Supports async. Targeted for those users wishing to minimize application dependencies.
102102
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrClient.html[`LBHttpSolrClient`] - balances request load across a list of Solr nodes.
103103
Adjusts the list of "in-service" nodes based on node health.
104104
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.html[`LBHttp2SolrClient`] - just like `LBHttpSolrClient` but using `Http2SolrClient` instead, with the Jetty Http library.

solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ public void send(OutStream outStream, SolrRequest<?> req, String collection) thr
422422

423423
private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {};
424424

425+
@Override
425426
public Cancellable asyncRequest(
426427
SolrRequest<?> solrRequest,
427428
String collection,
@@ -470,7 +471,7 @@ public void onFailure(Response response, Throwable failure) {
470471
}
471472
}
472473
};
473-
474+
asyncListener.onStart();
474475
req = makeRequestAndSend(solrRequest, url, listener, true);
475476
} catch (SolrServerException | IOException e) {
476477
asyncListener.onFailure(e);

solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java

Lines changed: 116 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Map;
3939
import java.util.Objects;
4040
import java.util.concurrent.BlockingQueue;
41+
import java.util.concurrent.CompletableFuture;
4142
import java.util.concurrent.ExecutorService;
4243
import java.util.concurrent.Future;
4344
import java.util.concurrent.LinkedBlockingQueue;
@@ -51,6 +52,8 @@
5152
import org.apache.solr.client.solrj.SolrServerException;
5253
import org.apache.solr.client.solrj.request.QueryRequest;
5354
import org.apache.solr.client.solrj.request.RequestWriter;
55+
import org.apache.solr.client.solrj.util.AsyncListener;
56+
import org.apache.solr.client.solrj.util.Cancellable;
5457
import org.apache.solr.client.solrj.util.ClientUtils;
5558
import org.apache.solr.common.SolrException;
5659
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -80,7 +83,7 @@ public class HttpJdkSolrClient extends HttpSolrClientBase {
8083

8184
private boolean forceHttp11;
8285

83-
private boolean shutdownExecutor;
86+
private final boolean shutdownExecutor;
8487

8588
protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder builder) {
8689
super(serverBaseUrl, builder);
@@ -133,80 +136,133 @@ protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder buil
133136
assert ObjectReleaseTracker.track(this);
134137
}
135138

139+
@Override
140+
public Cancellable asyncRequest(
141+
SolrRequest<?> solrRequest,
142+
String collection,
143+
AsyncListener<NamedList<Object>> asyncListener) {
144+
try {
145+
PreparedRequest pReq = prepareRequest(solrRequest, collection);
146+
asyncListener.onStart();
147+
CompletableFuture<NamedList<Object>> response =
148+
httpClient
149+
.sendAsync(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream())
150+
.thenApply(
151+
httpResponse -> {
152+
try {
153+
return processErrorsAndResponse(
154+
solrRequest, pReq.parserToUse, httpResponse, pReq.url);
155+
} catch (SolrServerException e) {
156+
throw new RuntimeException(e);
157+
}
158+
})
159+
.whenComplete(
160+
(nl, t) -> {
161+
if (t != null) {
162+
asyncListener.onFailure(t);
163+
} else {
164+
asyncListener.onSuccess(nl);
165+
}
166+
});
167+
return new HttpSolrClientCancellable(response);
168+
} catch (Exception e) {
169+
asyncListener.onFailure(e);
170+
return () -> {};
171+
}
172+
}
173+
136174
@Override
137175
public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
138176
throws SolrServerException, IOException {
177+
PreparedRequest pReq = prepareRequest(solrRequest, collection);
178+
HttpResponse<InputStream> response = null;
179+
try {
180+
response = httpClient.send(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream());
181+
return processErrorsAndResponse(solrRequest, pReq.parserToUse, response, pReq.url);
182+
} catch (InterruptedException e) {
183+
Thread.currentThread().interrupt();
184+
throw new RuntimeException(e);
185+
} catch (HttpTimeoutException e) {
186+
throw new SolrServerException(
187+
"Timeout occurred while waiting response from server at: " + pReq.url, e);
188+
} catch (SolrException se) {
189+
throw se;
190+
} catch (RuntimeException re) {
191+
throw new SolrServerException(re);
192+
} finally {
193+
if (pReq.contentWritingFuture != null) {
194+
pReq.contentWritingFuture.cancel(true);
195+
}
196+
197+
// See
198+
// https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscribers.html#ofInputStream()
199+
if (!wantStream(pReq.parserToUse)) {
200+
try {
201+
response.body().close();
202+
} catch (Exception e1) {
203+
// ignore
204+
}
205+
}
206+
}
207+
}
208+
209+
private PreparedRequest prepareRequest(SolrRequest<?> solrRequest, String collection)
210+
throws SolrServerException, IOException {
139211
checkClosed();
140212
if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest)) {
141213
collection = defaultCollection;
142214
}
143215
String url = getRequestPath(solrRequest, collection);
144216
ResponseParser parserToUse = responseParser(solrRequest);
145217
ModifiableSolrParams queryParams = initalizeSolrParams(solrRequest, parserToUse);
146-
HttpResponse<InputStream> resp = null;
218+
var reqb = HttpRequest.newBuilder();
219+
PreparedRequest pReq = null;
147220
try {
148-
var reqb = HttpRequest.newBuilder();
149221
switch (solrRequest.getMethod()) {
150222
case GET:
151223
{
152-
resp = doGet(url, reqb, solrRequest, queryParams);
224+
pReq = prepareGet(url, reqb, solrRequest, queryParams);
153225
break;
154226
}
155227
case POST:
156228
case PUT:
157229
{
158-
resp = doPutOrPost(url, solrRequest.getMethod(), reqb, solrRequest, queryParams);
230+
pReq = preparePutOrPost(url, solrRequest.getMethod(), reqb, solrRequest, queryParams);
159231
break;
160232
}
161233
default:
162234
{
163235
throw new IllegalStateException("Unsupported method: " + solrRequest.getMethod());
164236
}
165237
}
166-
return processErrorsAndResponse(solrRequest, parserToUse, resp, url);
167-
} catch (InterruptedException e) {
168-
Thread.currentThread().interrupt();
169-
throw new RuntimeException(e);
170-
} catch (HttpTimeoutException e) {
171-
throw new SolrServerException(
172-
"Timeout occurred while waiting response from server at: " + url, e);
173-
} catch (SolrException se) {
174-
throw se;
175238
} catch (URISyntaxException | RuntimeException re) {
176239
throw new SolrServerException(re);
177-
} finally {
178-
// See
179-
// https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscribers.html#ofInputStream()
180-
if (!wantStream(parserToUse)) {
181-
try {
182-
resp.body().close();
183-
} catch (Exception e1) {
184-
// ignore
185-
}
186-
}
187240
}
241+
pReq.parserToUse = parserToUse;
242+
pReq.url = url;
243+
return pReq;
188244
}
189245

190-
private HttpResponse<InputStream> doGet(
246+
private PreparedRequest prepareGet(
191247
String url,
192248
HttpRequest.Builder reqb,
193249
SolrRequest<?> solrRequest,
194250
ModifiableSolrParams queryParams)
195-
throws IOException, InterruptedException, URISyntaxException {
251+
throws IOException, URISyntaxException {
196252
validateGetRequest(solrRequest);
197253
reqb.GET();
198254
decorateRequest(reqb, solrRequest);
199255
reqb.uri(new URI(url + "?" + queryParams));
200-
return httpClient.send(reqb.build(), HttpResponse.BodyHandlers.ofInputStream());
256+
return new PreparedRequest(reqb, null);
201257
}
202258

203-
private HttpResponse<InputStream> doPutOrPost(
259+
private PreparedRequest preparePutOrPost(
204260
String url,
205261
SolrRequest.METHOD method,
206262
HttpRequest.Builder reqb,
207263
SolrRequest<?> solrRequest,
208264
ModifiableSolrParams queryParams)
209-
throws IOException, InterruptedException, URISyntaxException {
265+
throws IOException, URISyntaxException {
210266

211267
final RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest);
212268

@@ -274,15 +330,21 @@ private HttpResponse<InputStream> doPutOrPost(
274330
URI uriWithQueryParams = new URI(url + "?" + queryParams);
275331
reqb.uri(uriWithQueryParams);
276332

277-
HttpResponse<InputStream> response;
278-
try {
279-
response = httpClient.send(reqb.build(), HttpResponse.BodyHandlers.ofInputStream());
280-
} finally {
281-
if (contentWritingFuture != null) {
282-
contentWritingFuture.cancel(true);
283-
}
333+
return new PreparedRequest(reqb, contentWritingFuture);
334+
}
335+
336+
private static class PreparedRequest {
337+
Future<?> contentWritingFuture;
338+
HttpRequest.Builder reqb;
339+
340+
ResponseParser parserToUse;
341+
342+
String url;
343+
344+
PreparedRequest(HttpRequest.Builder reqb, Future<?> contentWritingFuture) {
345+
this.reqb = reqb;
346+
this.contentWritingFuture = contentWritingFuture;
284347
}
285-
return response;
286348
}
287349

288350
/**
@@ -469,6 +531,23 @@ protected String allProcessorSupportedContentTypesCommaDelimited(
469531
.collect(Collectors.joining(", "));
470532
}
471533

534+
protected static class HttpSolrClientCancellable implements Cancellable {
535+
private final CompletableFuture<NamedList<Object>> response;
536+
537+
protected HttpSolrClientCancellable(CompletableFuture<NamedList<Object>> response) {
538+
this.response = response;
539+
}
540+
541+
@Override
542+
public void cancel() {
543+
response.cancel(true);
544+
}
545+
546+
protected CompletableFuture<NamedList<Object>> getResponse() {
547+
return response;
548+
}
549+
}
550+
472551
public static class Builder
473552
extends HttpSolrClientBuilderBase<HttpJdkSolrClient.Builder, HttpJdkSolrClient> {
474553

solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClientBase.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.apache.solr.client.solrj.SolrServerException;
4040
import org.apache.solr.client.solrj.request.RequestWriter;
4141
import org.apache.solr.client.solrj.request.V2Request;
42+
import org.apache.solr.client.solrj.util.AsyncListener;
43+
import org.apache.solr.client.solrj.util.Cancellable;
4244
import org.apache.solr.common.SolrException;
4345
import org.apache.solr.common.params.CommonParams;
4446
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -368,6 +370,20 @@ protected void setParser(ResponseParser parser) {
368370

369371
protected abstract void updateDefaultMimeTypeForParser();
370372

373+
/**
374+
* Execute an asynchronous request to a Solr collection
375+
*
376+
* @param solrRequest the request to perform
377+
* @param collection if null the default collection is used
378+
* @param asyncListener callers should provide an implementation to handle events: start, success,
379+
* exception
380+
* @return Cancellable allowing the caller to attempt cancellation
381+
*/
382+
public abstract Cancellable asyncRequest(
383+
SolrRequest<?> solrRequest,
384+
String collection,
385+
AsyncListener<NamedList<Object>> asyncListener);
386+
371387
public boolean isV2ApiRequest(final SolrRequest<?> request) {
372388
return request instanceof V2Request || request.getPath().contains("/____v2");
373389
}

solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@
1717

1818
package org.apache.solr.client.solrj.util;
1919

20-
/** Listener for async requests */
20+
/**
21+
* Listener for async requests
22+
*
23+
* @param <T> The result type returned by the {@code onSuccess} method
24+
*/
2125
public interface AsyncListener<T> {
2226
/** Callback method invoked before processing the request */
2327
default void onStart() {}
2428

29+
/** Callback method invoked when the request completes successfully */
2530
void onSuccess(T t);
2631

32+
/** Callback method invoked when the request completes in failure */
2733
void onFailure(Throwable throwable);
2834
}

solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717

1818
package org.apache.solr.client.solrj.util;
1919

20+
/**
21+
* The return type for solrJ asynchronous requests, providing a mechanism whereby callers may
22+
* request cancellation.
23+
*/
2024
public interface Cancellable {
25+
26+
/**
27+
* Request to cancel the asynchronous request. This may be a no-op in some situations, for
28+
* instance, if the request failed or otherwise is complete.
29+
*/
2130
void cancel();
2231
}

0 commit comments

Comments
 (0)