Skip to content

Commit f1ad5a7

Browse files
committed
HttpClient: Allow queue limit and change request exceptions
This commit includes two changes: 1) Allows you to set a queue limit for requests It's still recommended the queue size to be at least as large as the number of concurrent requests due to the fact that all requests must pass through the queue. This limit however can aleviate conditions where we otherwise would need to timeout by being in queue so long. Instead we can now fail fast when the client is not making progress 2) This changes the exceptions which may be thrown from `request` to no longer only be `HTTPParsingException` This is to make the behavior uniform with asyncRequest. AsyncRequest may throw a CancellationException, a RejectedExecutionException, or other cases which are not only `HTTPParsingException`. Rather than changing the async request to always return this exception, I think it's better to let the specific exception types bubble up. Alternatively we could create `http` exceptions for each condition we want to report, however I don't believe it's worth the per-request overhead to map these exception types.
1 parent 80e4d8a commit f1ad5a7

File tree

3 files changed

+101
-29
lines changed

3 files changed

+101
-29
lines changed

client/src/main/java/org/threadly/litesockets/client/http/HTTPClient.java

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
import java.nio.ByteBuffer;
77
import java.util.ArrayDeque;
88
import java.util.Iterator;
9+
import java.util.Queue;
10+
import java.util.concurrent.ArrayBlockingQueue;
911
import java.util.concurrent.CancellationException;
1012
import java.util.concurrent.ConcurrentHashMap;
1113
import java.util.concurrent.ConcurrentLinkedQueue;
1214
import java.util.concurrent.CopyOnWriteArraySet;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.RejectedExecutionException;
1317
import java.util.concurrent.TimeUnit;
1418

1519
import javax.net.ssl.SSLContext;
@@ -18,6 +22,7 @@
1822
import org.threadly.concurrent.ReschedulingOperation;
1923
import org.threadly.concurrent.SingleThreadScheduler;
2024
import org.threadly.concurrent.SubmitterScheduler;
25+
import org.threadly.concurrent.future.FutureUtils;
2126
import org.threadly.concurrent.future.ListenableFuture;
2227
import org.threadly.concurrent.future.SettableListenableFuture;
2328
import org.threadly.litesockets.Client;
@@ -60,7 +65,7 @@ public class HTTPClient extends AbstractService {
6065
private final int maxResponseSize;
6166
private final SubmitterScheduler ssi;
6267
private final SocketExecuter sei;
63-
private final ConcurrentLinkedQueue<HTTPRequestWrapper> queue = new ConcurrentLinkedQueue<>();
68+
private final Queue<HTTPRequestWrapper> queue;
6469
private final ConcurrentHashMap<TCPClient, HTTPRequestWrapper> inProcess = new ConcurrentHashMap<>();
6570
private final ConcurrentHashMap<HTTPAddress, ArrayDeque<Pair<Long,TCPClient>>> sockets = new ConcurrentHashMap<>();
6671
private final CopyOnWriteArraySet<TCPClient> tcpClients = new CopyOnWriteArraySet<>();
@@ -93,12 +98,34 @@ public HTTPClient() {
9398
* @param maxResponseSize the maximum responseSize clients are allowed to send.
9499
*/
95100
public HTTPClient(int maxConcurrent, int maxResponseSize) {
101+
this(maxConcurrent, maxResponseSize, -1);
102+
}
103+
104+
/**
105+
* <p>This constructor will let you set the max Concurrent Requests and max Response Size but will still
106+
* create its own {@link SingleThreadScheduler} to use as a threadpool.</p>
107+
*
108+
* <p>The maximum queue length parameter can help improved conditions where you want to fail fast
109+
* if the downstream service is fully consumed. Since all requests will start in the queue we
110+
* recommend to either provide a {@code 0} to leave the queue unbounded, or to set to at least
111+
* the {@code maxConcurrent} value.</p>
112+
*
113+
* @param maxConcurrent maximum number of requests to run simultaneously.
114+
* @param maxResponseSize the maximum responseSize clients are allowed to send.
115+
* @param maxQueueSize Maximum queue size, {@code <= 0} to leave unbounded. Recommended to be >= {@code maxConcurrent}
116+
*/
117+
public HTTPClient(int maxConcurrent, int maxResponseSize, int maxQueueSize) {
96118
this.maxConcurrent = maxConcurrent;
97119
this.maxResponseSize = maxResponseSize;
98120
sts = new SingleThreadScheduler();
99121
this.ssi = sts;
100122
ntse = new NoThreadSocketExecuter();
101123
sei = ntse;
124+
if (maxQueueSize < 1 || maxQueueSize == Integer.MAX_VALUE) {
125+
queue = new ConcurrentLinkedQueue<>();
126+
} else {
127+
queue = new ArrayBlockingQueue<>(maxQueueSize);
128+
}
102129
runSocketTask = new RunSocket(ssi);
103130
}
104131

@@ -111,10 +138,33 @@ public HTTPClient(int maxConcurrent, int maxResponseSize) {
111138
* @param sei the SocketExecuter to use with these HTTPClients.
112139
*/
113140
public HTTPClient(int maxConcurrent, int maxResponseSize, SocketExecuter sei) {
141+
this(maxConcurrent, maxResponseSize, sei, -1);
142+
}
143+
144+
/**
145+
* <p>This constructor will let you set the max Concurrent Requests and max Response Size
146+
* as well as your own {@link SocketExecuter} as the thread pool to use.</p>
147+
*
148+
* <p>The maximum queue length parameter can help improved conditions where you want to fail fast
149+
* if the downstream service is fully consumed. Since all requests will start in the queue we
150+
* recommend to either provide a {@code 0} to leave the queue unbounded, or to set to at least
151+
* the {@code maxConcurrent} value.</p>
152+
*
153+
* @param maxConcurrent maximum number of requests to run simultaneously.
154+
* @param maxResponseSize the maximum responseSize clients are allowed to send.
155+
* @param sei the SocketExecuter to use with these HTTPClients.
156+
* @param maxQueueSize Maximum queue size, {@code <= 0} to be unbounded. Recommended to be {@code >= maxConcurrent}
157+
*/
158+
public HTTPClient(int maxConcurrent, int maxResponseSize, SocketExecuter sei, int maxQueueSize) {
114159
this.maxConcurrent = maxConcurrent;
115160
this.maxResponseSize = maxResponseSize;
116161
this.ssi = sei.getThreadScheduler();
117162
this.sei = sei;
163+
if (maxQueueSize < 1 || maxQueueSize == Integer.MAX_VALUE) {
164+
queue = new ConcurrentLinkedQueue<>();
165+
} else {
166+
queue = new ArrayBlockingQueue<>(maxQueueSize);
167+
}
118168
runSocketTask = new RunSocket(ssi);
119169
}
120170

@@ -232,19 +282,7 @@ public HTTPResponseData request(final URL url) throws HTTPParsingException {
232282
* @throws HTTPParsingException is thrown if the server sends back protocol or a response that is larger then allowed.
233283
*/
234284
public HTTPResponseData request(final URL url, final HTTPRequestMethod rm, final ByteBuffer bb) throws HTTPParsingException {
235-
HTTPResponseData hr = null;
236-
try {
237-
hr = requestAsync(url, rm, bb).get();
238-
} catch (InterruptedException e) {
239-
Thread.currentThread().interrupt();
240-
} catch (Exception e) {
241-
if(e.getCause() instanceof HTTPParsingException) {
242-
throw (HTTPParsingException)e.getCause();
243-
} else {
244-
throw new HTTPParsingException(e);
245-
}
246-
}
247-
return hr;
285+
return extractAsyncResponse(requestAsync(url, rm, bb));
248286
}
249287

250288
/**
@@ -255,21 +293,27 @@ public HTTPResponseData request(final URL url, final HTTPRequestMethod rm, final
255293
* @throws HTTPParsingException is thrown if the server sends back protocol or a response that is larger then allowed.
256294
*/
257295
public HTTPResponseData request(final ClientHTTPRequest request) throws HTTPParsingException {
258-
HTTPResponseData hr = null;
296+
return extractAsyncResponse(requestAsync(request));
297+
}
298+
299+
protected HTTPResponseData extractAsyncResponse(ListenableFuture<HTTPResponseData> lf) throws HTTPParsingException {
259300
try {
260-
hr = requestAsync(request).get();
301+
return lf.get();
261302
} catch (InterruptedException e) {
262303
Thread.currentThread().interrupt();
263-
} catch (Exception e) {
264-
if(e.getCause() instanceof HTTPParsingException) {
304+
lf.cancel(true);
305+
throw new RuntimeException("Request interrupted", e);
306+
} catch (CancellationException e) {
307+
throw e;
308+
} catch (ExecutionException e) {
309+
if (e.getCause() instanceof HTTPParsingException) {
265310
throw (HTTPParsingException)e.getCause();
266-
} else if(e instanceof CancellationException) {
267-
throw new HTTPParsingException("HTTP Timeout!", e);
311+
} else if (e.getCause() instanceof RejectedExecutionException) {
312+
throw (RejectedExecutionException)e.getCause();
268313
} else {
269314
throw new HTTPParsingException(e);
270315
}
271316
}
272-
return hr;
273317
}
274318

275319
/**
@@ -312,7 +356,9 @@ public ListenableFuture<HTTPResponseData> requestAsync(final URL url, final HTTP
312356
*/
313357
public ListenableFuture<HTTPResponseData> requestAsync(final ClientHTTPRequest request) {
314358
HTTPRequestWrapper hrw = new HTTPRequestWrapper(request);
315-
queue.add(hrw);
359+
if (! queue.offer(hrw)) {
360+
return FutureUtils.immediateFailureFuture(new RejectedExecutionException("Request queue full"));
361+
}
316362
if(ntse != null) {
317363
ntse.wakeup();
318364
runSocketTask.signalToRun();

client/src/test/java/org/threadly/litesockets/client/http/HTTPClientTests.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import java.io.IOException;
88
import java.net.URL;
99
import java.util.HashMap;
10+
import java.util.concurrent.CancellationException;
1011
import java.util.concurrent.ExecutionException;
12+
import java.util.concurrent.RejectedExecutionException;
1113
import java.util.concurrent.TimeUnit;
1214
import java.util.concurrent.TimeoutException;
1315
import java.util.concurrent.atomic.AtomicInteger;
@@ -309,8 +311,8 @@ public void timeoutRequest() throws IOException, HTTPParsingException {
309311
try{
310312
httpClient.request(hrb.buildClientHTTPRequest());
311313
fail();
312-
} catch(HTTPParsingException e) {
313-
assertEquals("HTTP Timeout!", e.getMessage());
314+
} catch(CancellationException e) {
315+
assertEquals("Request timed out at point: SendingRequest", e.getMessage());
314316
// below conditions may be slightly async due to future getting a result before listeners are invoked
315317
new TestCondition(() -> httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000);
316318
new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000);
@@ -335,13 +337,37 @@ protected void processQueue() {
335337
try{
336338
httpClient.request(hrb.buildClientHTTPRequest());
337339
fail();
338-
} catch(HTTPParsingException e) {
339-
assertEquals("HTTP Timeout!", e.getMessage());
340+
} catch(CancellationException e) {
341+
assertEquals("Request timed out at point: Queued", e.getMessage());
340342
// below conditions may be slightly async due to future getting a result before listeners are invoked
341343
new TestCondition(() -> httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000);
342344
new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000);
343345
}
344346
}
347+
348+
@Test
349+
public void queueLimitTest() throws IOException, HTTPParsingException {
350+
int port = PortUtils.findTCPPort();
351+
TCPServer server = SEI.createTCPServer("localhost", port);
352+
server.start();
353+
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
354+
hrb.setBody(IOUtils.EMPTY_BYTEBUFFER);
355+
final HTTPClient httpClient = new HTTPClient(1, 1048576, 1) {
356+
@Override
357+
protected void processQueue() {
358+
// queue is never processed so we know it's queued
359+
}
360+
};
361+
httpClient.start();
362+
httpClient.requestAsync(hrb.buildClientHTTPRequest()); // first request should queue fine
363+
try{
364+
httpClient.request(hrb.buildClientHTTPRequest());
365+
fail();
366+
} catch(RejectedExecutionException e) {
367+
assertEquals(1, httpClient.getRequestQueueSize());
368+
assertEquals("Request queue full", e.getMessage());
369+
}
370+
}
345371

346372
@Test
347373
public void expireRequest() throws IOException, HTTPParsingException {
@@ -356,8 +382,8 @@ public void expireRequest() throws IOException, HTTPParsingException {
356382
try {
357383
httpClient.request(hrb.buildClientHTTPRequest());
358384
fail();
359-
} catch(HTTPParsingException hp) {
360-
385+
} catch(CancellationException e) {
386+
assertEquals("Request timed out at point: ReadingResponseBody", e.getMessage());
361387
}
362388
assertTrue((Clock.accurateForwardProgressingMillis() - start) > 30);
363389
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group = org.threadly
2-
version = 0.23
2+
version = 0.24-SNAPSHOT
33
threadlyVersion = 5.37
44
litesocketsVersion = 4.10
55
org.gradle.parallel=false

0 commit comments

Comments
 (0)