Skip to content

Commit d3b0a56

Browse files
committed
HTTPClient: Change how response bodies are handled
This is an API breaking change to change how response bodies are handled. Instead of having the HTTPClient handle the body, the body handling is instead put into the request (so each request may handle the body differently if needed). This breaks the API in that HTTPClient is no longer monitoring the maximum body accepted, but rather the default behavior in HTTPRequestBuilder is to buffer and monitor the body limit that was previously done in HTTPClient.
1 parent b6bea4d commit d3b0a56

File tree

4 files changed

+157
-44
lines changed

4 files changed

+157
-44
lines changed

client/src/main/java/org/threadly/litesockets/client/http/HTTPClient.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,10 @@
6060
* is kept in memory and are not handled as streams. See {@link HTTPStreamClient} for use with large HTTP data sets.</p>
6161
*/
6262
public class HTTPClient extends AbstractService {
63-
public static final int DEFAULT_CONCURRENT = 2;
63+
public static final int DEFAULT_CONCURRENT = 8;
6464
public static final int DEFAULT_TIMEOUT = 15000;
6565
public static final int DEFAULT_MAX_IDLE = 45000;
66-
public static final int MAX_HTTP_RESPONSE = 1048576; //1MB
6766

68-
private final int maxResponseSize;
6967
private final SubmitterScheduler ssi;
7068
private final SocketExecuter sei;
7169
private final Queue<HTTPRequestWrapper> queue;
@@ -90,18 +88,17 @@ public class HTTPClient extends AbstractService {
9088
*
9189
*/
9290
public HTTPClient() {
93-
this(DEFAULT_CONCURRENT, MAX_HTTP_RESPONSE);
91+
this(DEFAULT_CONCURRENT, -1);
9492
}
9593

9694
/**
9795
* <p>This constructor will let you set the max Concurrent Requests and max Response Size but will still
9896
* create its own {@link SingleThreadScheduler} to use as a threadpool.</p>
99-
*
97+
*
10098
* @param maxConcurrent maximum number of requests to run simultaneously.
101-
* @param maxResponseSize the maximum responseSize clients are allowed to send.
10299
*/
103-
public HTTPClient(int maxConcurrent, int maxResponseSize) {
104-
this(maxConcurrent, maxResponseSize, -1);
100+
public HTTPClient(int maxConcurrent) {
101+
this(maxConcurrent, -1);
105102
}
106103

107104
/**
@@ -114,12 +111,10 @@ public HTTPClient(int maxConcurrent, int maxResponseSize) {
114111
* the {@code maxConcurrent} value.</p>
115112
*
116113
* @param maxConcurrent maximum number of requests to run simultaneously.
117-
* @param maxResponseSize the maximum responseSize clients are allowed to send.
118114
* @param maxQueueSize Maximum queue size, {@code <= 0} to leave unbounded. Recommended to be >= {@code maxConcurrent}
119115
*/
120-
public HTTPClient(int maxConcurrent, int maxResponseSize, int maxQueueSize) {
116+
public HTTPClient(int maxConcurrent, int maxQueueSize) {
121117
this.maxConcurrent = maxConcurrent;
122-
this.maxResponseSize = maxResponseSize;
123118
sts = new SingleThreadScheduler();
124119
this.ssi = sts;
125120
ntse = new NoThreadSocketExecuter();
@@ -137,11 +132,10 @@ public HTTPClient(int maxConcurrent, int maxResponseSize, int maxQueueSize) {
137132
* as well as your own {@link SocketExecuter} as the thread pool to use.</p>
138133
*
139134
* @param maxConcurrent maximum number of requests to run simultaneously.
140-
* @param maxResponseSize the maximum responseSize clients are allowed to send.
141135
* @param sei the SocketExecuter to use with these HTTPClients.
142136
*/
143-
public HTTPClient(int maxConcurrent, int maxResponseSize, SocketExecuter sei) {
144-
this(maxConcurrent, maxResponseSize, sei, -1);
137+
public HTTPClient(int maxConcurrent, SocketExecuter sei) {
138+
this(maxConcurrent, -1, sei);
145139
}
146140

147141
/**
@@ -153,14 +147,12 @@ public HTTPClient(int maxConcurrent, int maxResponseSize, SocketExecuter sei) {
153147
* recommend to either provide a {@code 0} to leave the queue unbounded, or to set to at least
154148
* the {@code maxConcurrent} value.</p>
155149
*
156-
* @param maxConcurrent maximum number of requests to run simultaneously.
157-
* @param maxResponseSize the maximum responseSize clients are allowed to send.
150+
* @param maxConcurrent maximum number of requests to run simultaneously.
158151
* @param sei the SocketExecuter to use with these HTTPClients.
159152
* @param maxQueueSize Maximum queue size, {@code <= 0} to be unbounded. Recommended to be {@code >= maxConcurrent}
160153
*/
161-
public HTTPClient(int maxConcurrent, int maxResponseSize, SocketExecuter sei, int maxQueueSize) {
154+
public HTTPClient(int maxConcurrent, int maxQueueSize, SocketExecuter sei) {
162155
this.maxConcurrent = maxConcurrent;
163-
this.maxResponseSize = maxResponseSize;
164156
this.ssi = sei.getThreadScheduler();
165157
this.sei = sei;
166158
if (maxQueueSize < 1 || maxQueueSize == Integer.MAX_VALUE) {
@@ -187,7 +179,7 @@ public int getRequestQueueSize() {
187179
*
188180
* @return number of request currently in progress.
189181
*/
190-
public int getInProgressSize() {
182+
public int getInProgressCount() {
191183
return this.inProcess.size();
192184
}
193185

@@ -196,7 +188,7 @@ public int getInProgressSize() {
196188
*
197189
* @return number of open connections.
198190
*/
199-
public int getOpenConnections() {
191+
public int getOpenConnectionCount() {
200192
return tcpClients.size();
201193
}
202194

@@ -634,7 +626,6 @@ private class HTTPRequestWrapper implements HTTPResponseCallback {
634626
private final ClientHTTPRequest chr;
635627
private RequestState currentState = RequestState.Queued;
636628
private HTTPResponse response;
637-
private ReuseableMergedByteBuffers responseMBB = new ReuseableMergedByteBuffers();
638629
private TCPClient client;
639630
private long lastRead = Clock.lastKnownForwardProgressingMillis();
640631

@@ -685,10 +676,11 @@ public void headersFinished(HTTPResponse hr) {
685676

686677
@Override
687678
public void bodyData(ByteBuffer bb) {
688-
responseMBB.add(bb);
689-
if(responseMBB.remaining() > maxResponseSize) {
679+
try {
680+
chr.getBodyConsumer().accept(bb);
681+
} catch (Exception e) {
690682
TCPClient client = this.client; // will be `null` after error
691-
slf.setFailure(new HTTPParsingException("Response Body to large!"));
683+
slf.setFailure(e);
692684
client.close();
693685
}
694686
}
@@ -697,7 +689,7 @@ public void bodyData(ByteBuffer bb) {
697689
public void finished() {
698690
currentState = RequestState.Finished;
699691
slf.setResult(new HTTPResponseData(HTTPClient.this, chr.getHTTPRequest(), response,
700-
responseMBB.duplicateAndClean()));
692+
chr.getBodyConsumer().finishBody()));
701693
hrp.removeHTTPResponseCallback(this);
702694
TCPClient client = this.client;
703695
this.client = null;
@@ -766,7 +758,7 @@ public HTTPResponseData(HTTPClient client, HTTPRequest origRequest, HTTPResponse
766758
MergedByteBuffers bb) {
767759
this.client = client;
768760
this.hr = hr;
769-
this.body = bb;
761+
this.body = bb == null ? new SimpleMergedByteBuffers(false) : bb;
770762
this.origRequest = origRequest;
771763
}
772764

client/src/test/java/org/threadly/litesockets/client/http/HTTPClientTests.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.concurrent.RejectedExecutionException;
1616
import java.util.concurrent.TimeUnit;
1717
import java.util.concurrent.TimeoutException;
18+
import java.util.concurrent.atomic.AtomicInteger;
1819

1920
import org.junit.After;
2021
import org.junit.Before;
@@ -29,8 +30,10 @@
2930
import org.threadly.litesockets.SocketExecuter;
3031
import org.threadly.litesockets.TCPServer;
3132
import org.threadly.litesockets.ThreadedSocketExecuter;
33+
import org.threadly.litesockets.buffers.MergedByteBuffers;
3234
import org.threadly.litesockets.client.http.HTTPClient.HTTPResponseData;
3335
import org.threadly.litesockets.protocols.http.request.ClientHTTPRequest;
36+
import org.threadly.litesockets.protocols.http.request.ClientHTTPRequest.BodyConsumer;
3437
import org.threadly.litesockets.protocols.http.request.HTTPRequestBuilder;
3538
import org.threadly.litesockets.protocols.http.response.HTTPResponse;
3639
import org.threadly.litesockets.protocols.http.response.HTTPResponseBuilder;
@@ -58,7 +61,7 @@ public class HTTPClientTests {
5861
static HTTPResponse RESPONSE_HUGE_NOCL;
5962
static {
6063
StringBuilder sb = new StringBuilder();
61-
while(sb.length() < (HTTPClient.MAX_HTTP_RESPONSE*2)) {
64+
while(sb.length() < (HTTPRequestBuilder.MAX_HTTP_BUFFERED_RESPONSE*2)) {
6265
sb.append(CONTENT);
6366
}
6467
LARGE_CONTENT = sb.toString();
@@ -146,7 +149,7 @@ public void manyRequestsConcurrentJavaExecutor() throws IOException, Interrupted
146149
fakeServer = new TestHTTPServer(port, RESPONSE_CL, CONTENT, false, false);
147150
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
148151
hrb.setHTTPAddress(new HTTPAddress("localhost", port, false), true);
149-
final HTTPClient httpClient = new HTTPClient(HTTPClient.DEFAULT_CONCURRENT, HTTPClient.MAX_HTTP_RESPONSE, TSE);
152+
final HTTPClient httpClient = new HTTPClient(HTTPClient.DEFAULT_CONCURRENT, TSE);
150153
httpClient.start();
151154

152155
AsyncVerifier av = new AsyncVerifier();
@@ -192,7 +195,7 @@ public void manyRequestsConcurrentOnPool() throws IOException, InterruptedExcept
192195
hrb.setHTTPAddress(new HTTPAddress("localhost", port, false), true);
193196
final ThreadedSocketExecuter TSE = new ThreadedSocketExecuter(PS);
194197
TSE.start();
195-
final HTTPClient httpClient = new HTTPClient(200, HTTPClient.MAX_HTTP_RESPONSE, TSE);
198+
final HTTPClient httpClient = new HTTPClient(200, TSE);
196199
httpClient.start();
197200

198201
AsyncVerifier av = new AsyncVerifier();
@@ -292,6 +295,32 @@ public void streamedBodyRequest() throws IOException, HTTPParsingException, Inte
292295
assertEquals(CONTENT, lf.get().getBodyAsString());
293296
}
294297

298+
@Test
299+
public void streamedBodyResponse() throws IOException, HTTPParsingException {
300+
int port = PortUtils.findTCPPort();
301+
fakeServer = new TestHTTPServer(port, RESPONSE_HUGE, LARGE_CONTENT, false, false);
302+
AtomicInteger readContentSize = new AtomicInteger(0);
303+
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port))
304+
.setBodyConsumer(new BodyConsumer() {
305+
@Override
306+
public void accept(ByteBuffer bb) throws HTTPParsingException {
307+
readContentSize.addAndGet(bb.remaining());
308+
}
309+
310+
@Override
311+
public MergedByteBuffers finishBody() {
312+
return null;
313+
}
314+
});
315+
316+
final HTTPClient httpClient = new HTTPClient();
317+
httpClient.start();
318+
MergedByteBuffers emptyResponseBody = httpClient.request(hrb.buildClientHTTPRequest()).getBody();
319+
320+
assertEquals(0, emptyResponseBody.remaining());
321+
assertEquals(LARGE_CONTENT.length(), readContentSize.get());
322+
}
323+
295324
@Test
296325
public void contentLengthOnHeadRequest() throws IOException, HTTPParsingException {
297326
int port = PortUtils.findTCPPort();
@@ -342,7 +371,7 @@ public void timeoutRequest() throws IOException, HTTPParsingException {
342371
assertEquals("Request timed out at point: SendingRequest", e.getMessage());
343372
// below conditions may be slightly async due to future getting a result before listeners are invoked
344373
new TestCondition(() -> httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000);
345-
new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000);
374+
new TestCondition(() -> httpClient.getInProgressCount() == 0).blockTillTrue(1_000);
346375
}
347376
}
348377

@@ -368,7 +397,7 @@ protected void processQueue() {
368397
assertEquals("Request timed out at point: Queued", e.getMessage());
369398
// below conditions may be slightly async due to future getting a result before listeners are invoked
370399
new TestCondition(() -> httpClient.getRequestQueueSize() == 0).blockTillTrue(1_000);
371-
new TestCondition(() -> httpClient.getInProgressSize() == 0).blockTillTrue(1_000);
400+
new TestCondition(() -> httpClient.getInProgressCount() == 0).blockTillTrue(1_000);
372401
}
373402
}
374403

@@ -379,7 +408,7 @@ public void queueLimitTest() throws IOException, HTTPParsingException {
379408
server.start();
380409
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
381410
hrb.setBody(IOUtils.EMPTY_BYTEBUFFER);
382-
final HTTPClient httpClient = new HTTPClient(1, 1048576, 1) {
411+
final HTTPClient httpClient = new HTTPClient(1, 1) {
383412
@Override
384413
protected void processQueue() {
385414
// queue is never processed so we know it's queued
@@ -426,7 +455,7 @@ public void sslRequest() throws IOException, HTTPParsingException {
426455
}
427456

428457
@Test
429-
public void toLargeRequest() throws IOException, HTTPParsingException {
458+
public void tooLargeRequest() throws IOException, HTTPParsingException {
430459
int port = PortUtils.findTCPPort();
431460
fakeServer = new TestHTTPServer(port, RESPONSE_HUGE, LARGE_CONTENT, false, false);
432461
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
@@ -441,7 +470,7 @@ public void toLargeRequest() throws IOException, HTTPParsingException {
441470
}
442471

443472
@Test
444-
public void toLargeRequestNoContentLength() throws IOException, HTTPParsingException {
473+
public void tooLargeRequestNoContentLength() throws IOException, HTTPParsingException {
445474
int port = PortUtils.findTCPPort();
446475
fakeServer = new TestHTTPServer(port, RESPONSE_NO_CL, LARGE_CONTENT, false, true);
447476
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));

protocol/src/main/java/org/threadly/litesockets/protocols/http/request/ClientHTTPRequest.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
import org.threadly.concurrent.future.FutureUtils;
77
import org.threadly.concurrent.future.ListenableFuture;
8+
import org.threadly.litesockets.buffers.MergedByteBuffers;
89
import org.threadly.litesockets.protocols.http.shared.HTTPAddress;
10+
import org.threadly.litesockets.protocols.http.shared.HTTPParsingException;
911

1012
// TODO - do we want to move this into the `client`? I see the `HTTPRequestBuilder` references it,
1113
// but we could create an extending class `ClientHTTPRequestBuilder` which can build this
@@ -20,13 +22,16 @@ public class ClientHTTPRequest {
2022
private final HTTPRequest request;
2123
private final HTTPAddress ha;
2224
private final Supplier<ListenableFuture<ByteBuffer>> bodyProvider;
25+
private final BodyConsumer bodyConsumer;
2326
private final int timeoutMS;
2427

2528
protected ClientHTTPRequest(HTTPRequest request, HTTPAddress ha, int timeoutMS,
26-
Supplier<ListenableFuture<ByteBuffer>> bodyProvider) {
29+
Supplier<ListenableFuture<ByteBuffer>> bodyProvider,
30+
BodyConsumer bodyConsumer) {
2731
this.request = request;
2832
this.ha = ha;
2933
this.bodyProvider = bodyProvider == null ? EMPTY_BODY_SUPPLIER : bodyProvider;
34+
this.bodyConsumer = bodyConsumer;
3035
this.timeoutMS = timeoutMS;
3136
}
3237

@@ -43,6 +48,10 @@ public HTTPAddress getHTTPAddress() {
4348
return ha;
4449
}
4550

51+
public BodyConsumer getBodyConsumer() {
52+
return bodyConsumer;
53+
}
54+
4655
/**
4756
* Returns if there is a body associated to this request.
4857
*
@@ -64,14 +73,6 @@ public ListenableFuture<ByteBuffer> nextBodySection() {
6473
public int getTimeoutMS() {
6574
return this.timeoutMS;
6675
}
67-
68-
// TODO - is this useful? I am not seeing any cases of it being used
69-
public HTTPRequestBuilder makeBuilder() {
70-
HTTPRequestBuilder hrb = request.makeBuilder();
71-
hrb.setHTTPAddress(ha, false);
72-
73-
return hrb;
74-
}
7576

7677
@Override
7778
public int hashCode() {
@@ -112,4 +113,26 @@ public boolean equals(Object obj) {
112113
}
113114
return true;
114115
}
116+
117+
/**
118+
* Consumer to accept the response body as it is read from the network.
119+
*/
120+
public interface BodyConsumer {
121+
/**
122+
* Invoked to provide newly read body contents.
123+
*
124+
* @param bb Buffer containing the body data
125+
* @throws HTTPParsingException May be thrown if there is any errors in the body contents
126+
*/
127+
public void accept(ByteBuffer bb) throws HTTPParsingException;
128+
129+
/**
130+
* Invoked once the body has been fully consumed. This should finish and cleanup anything
131+
* necessary, providing a {@link MergedByteBuffers} to be represented in the final
132+
* {@link HTTPResponseData}.
133+
*
134+
* @return Buffer containing the final form of the body (or empty if body wont be read from {@link HTTPResponseData})
135+
*/
136+
public MergedByteBuffers finishBody();
137+
}
115138
}

0 commit comments

Comments
 (0)