Skip to content

Commit 29e22be

Browse files
committed
HTTPClient: Allow request data to be streamed easier
This updates HTTPRequestBuilder / ClientHTTPRequest to provide the body as stateful sections. The entire body can be specified if available still, but it can also be provided as a stream of ListenableFuture<ByteBuffer>'s Once the body stream is completed the ListenableFuture should provide a null or empty ByteBuffer. This is a natural transition for chunked encoding (which is now supported in HTTPClient), but can also work for a large streamed body.
1 parent c93c3d2 commit 29e22be

File tree

5 files changed

+183
-91
lines changed

5 files changed

+183
-91
lines changed

client/src/main/java/org/threadly/litesockets/client/http/HTTPClient.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.concurrent.ExecutionException;
1616
import java.util.concurrent.RejectedExecutionException;
1717
import java.util.concurrent.TimeUnit;
18+
import java.util.function.Consumer;
1819

1920
import javax.net.ssl.SSLContext;
2021
import javax.net.ssl.SSLEngine;
@@ -44,6 +45,7 @@
4445
import org.threadly.litesockets.protocols.http.shared.HTTPParsingException;
4546
import org.threadly.litesockets.protocols.http.shared.HTTPRequestMethod;
4647
import org.threadly.litesockets.protocols.http.shared.HTTPResponseCode;
48+
import org.threadly.litesockets.protocols.http.shared.HTTPUtils;
4749
import org.threadly.litesockets.protocols.websocket.WSFrame;
4850
import org.threadly.litesockets.utils.IOUtils;
4951
import org.threadly.litesockets.utils.SSLUtils;
@@ -395,15 +397,50 @@ private void process(HTTPRequestWrapper hrw) {
395397
addBackTCPClient(hrw.chr.getHTTPAddress(), freshClient); // if client is cleaned up this will ignore
396398
return;
397399
}
398-
MergedByteBuffers writeBuffer;
399-
if (hrw.chr.getBodyBuffer() == null) {
400-
writeBuffer = hrw.chr.getHTTPRequest().getMergedByteBuffers();
400+
if (hrw.chr.hasBody()) {
401+
if (hrw.chr.getHTTPRequest().getHTTPHeaders().isChunked()) {
402+
hrw.client.write(hrw.chr.getHTTPRequest().getMergedByteBuffers());
403+
404+
hrw.chr.nextBodySection().resultCallback(new Consumer<ByteBuffer>() {
405+
@Override
406+
public void accept(ByteBuffer bb) {
407+
ListenableFuture<?> writeFuture = hrw.client.write(HTTPUtils.wrapInChunk(bb));
408+
409+
if (bb != null && bb.hasRemaining()) {
410+
writeFuture.resultCallback((ignored) ->
411+
hrw.chr.nextBodySection().resultCallback(this));
412+
}
413+
}
414+
});
415+
} else {
416+
hrw.chr.nextBodySection().resultCallback(new Consumer<ByteBuffer>() {
417+
private boolean firstSection = true;
418+
419+
@Override
420+
public void accept(ByteBuffer bb) {
421+
if (bb != null && bb.hasRemaining()) {
422+
MergedByteBuffers writeBuffer;
423+
if (firstSection) {
424+
firstSection = false;
425+
writeBuffer = new SimpleMergedByteBuffers(false,
426+
hrw.chr.getHTTPRequest().getMergedByteBuffers(),
427+
bb);
428+
} else {
429+
writeBuffer = new SimpleMergedByteBuffers(false, bb);
430+
}
431+
432+
hrw.client.write(writeBuffer)
433+
.resultCallback((ignored) -> hrw.chr.nextBodySection().resultCallback(this));
434+
} else if (firstSection) {
435+
firstSection = false;
436+
hrw.client.write(hrw.chr.getHTTPRequest().getMergedByteBuffers());
437+
}
438+
}
439+
});
440+
}
401441
} else {
402-
writeBuffer = new SimpleMergedByteBuffers(false,
403-
hrw.chr.getHTTPRequest().getMergedByteBuffers(),
404-
hrw.chr.getBodyBuffer().duplicate());
442+
hrw.client.write(hrw.chr.getHTTPRequest().getMergedByteBuffers());
405443
}
406-
hrw.client.write(writeBuffer);
407444
} catch (Throwable t) {
408445
//Have to catch all here or we dont keep processing if NoThreadSE is in use
409446
hrw.slf.setFailure(t);

client/src/main/java/org/threadly/litesockets/client/http/HTTPStreamClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ public ListenableFuture<HTTPResponse> writeRequest(HTTPRequest request) {
175175
public ListenableFuture<?> write(ByteBuffer bb) {
176176
if(currentHttpRequest == null) {
177177
throw new IllegalStateException("Must have a pending HTTPRequest before you can write!");
178-
}
179-
if(currentHttpRequest != null && currentHttpRequest.getHTTPHeaders().isChunked()) {
178+
} else if(currentHttpRequest.getHTTPHeaders().isChunked()) {
180179
return client.write(HTTPUtils.wrapInChunk(bb));
181180
} else {
182181
return client.write(bb);

client/src/test/java/org/threadly/litesockets/client/http/HTTPClientTests.java

Lines changed: 79 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
package org.threadly.litesockets.client.http;
22

33
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertFalse;
45
import static org.junit.Assert.assertTrue;
56
import static org.junit.Assert.fail;
67

78
import java.io.IOException;
89
import java.net.URL;
10+
import java.nio.ByteBuffer;
911
import java.util.HashMap;
12+
import java.util.Iterator;
1013
import java.util.concurrent.CancellationException;
1114
import java.util.concurrent.ExecutionException;
1215
import java.util.concurrent.RejectedExecutionException;
1316
import java.util.concurrent.TimeUnit;
1417
import java.util.concurrent.TimeoutException;
15-
import java.util.concurrent.atomic.AtomicInteger;
1618

1719
import org.junit.After;
1820
import org.junit.Before;
1921
import org.junit.Test;
2022
import org.threadly.concurrent.PriorityScheduler;
2123
import org.threadly.concurrent.future.FutureCallback;
24+
import org.threadly.concurrent.future.FutureUtils;
2225
import org.threadly.concurrent.future.ListenableFuture;
26+
import org.threadly.concurrent.future.SettableListenableFuture;
2327
import org.threadly.litesockets.Client;
2428
import org.threadly.litesockets.Server.ClientAcceptor;
2529
import org.threadly.litesockets.SocketExecuter;
@@ -36,8 +40,12 @@
3640
import org.threadly.litesockets.protocols.http.shared.HTTPParsingException;
3741
import org.threadly.litesockets.utils.IOUtils;
3842
import org.threadly.litesockets.utils.PortUtils;
43+
import org.threadly.test.concurrent.AsyncVerifier;
3944
import org.threadly.test.concurrent.TestCondition;
45+
import org.threadly.test.concurrent.TestUtils;
46+
import org.threadly.util.ArrayIterator;
4047
import org.threadly.util.Clock;
48+
import org.threadly.util.debug.Profiler;
4149

4250
public class HTTPClientTests {
4351
static String CONTENT = "TEST123";
@@ -81,54 +89,56 @@ public void stop() {
8189
}
8290

8391
@Test
84-
public void manyRequestsConcurrent() throws IOException {
92+
public void manyRequestsConcurrent() throws IOException, InterruptedException, TimeoutException {
8593
final int number = 500;
8694
final int port = PortUtils.findTCPPort();
8795
fakeServer = new TestHTTPServer(port, RESPONSE_CL, CONTENT, false, false);
8896
final HTTPRequestBuilder hrb = new HTTPRequestBuilder().setPort(port);
8997
hrb.setHTTPAddress(new HTTPAddress("localhost", port, false), true);
9098
final HTTPClient httpClient = new HTTPClient();
91-
final AtomicInteger count = new AtomicInteger(0);
9299
httpClient.start();
93100

101+
AsyncVerifier av = new AsyncVerifier();
94102
PriorityScheduler CLIENT_PS = new PriorityScheduler(20);
95103
Runnable run = new Runnable() {
96104
@Override
97105
public void run() {
98106
ClientHTTPRequest chr = hrb.buildClientHTTPRequest();
99-
//final long start = Clock.accurateForwardProgressingMillis();
100-
final ListenableFuture<HTTPResponseData> lf = httpClient.requestAsync(chr);
107+
final long start = Clock.accurateForwardProgressingMillis();
108+
109+
final ListenableFuture<HTTPResponseData> lf = httpClient.requestAsync(chr);
101110
lf.callback(new FutureCallback<HTTPResponseData>() {
102111
@Override
103112
public void handleResult(HTTPResponseData result) {
104-
//System.out.println("DELAY:"+(Clock.accurateForwardProgressingMillis()-start));
105-
assertEquals("TEST123", result.getBodyAsString());
106-
count.incrementAndGet();
113+
System.out.println("DELAY:"+(Clock.accurateForwardProgressingMillis()-start));
114+
av.assertEquals(CONTENT, result.getBodyAsString());
115+
av.signalComplete();
107116
}
108117

109118
@Override
110119
public void handleFailure(Throwable t) {
111-
System.err.println("***********************ERR*******************");
112-
t.printStackTrace();
113-
System.err.println("***********************ERR*******************");
114-
fail();
120+
av.fail(t);
115121
}});
116122
}};
117123

118124
for(int i=0; i<number; i++) {
119125
CLIENT_PS.execute(run);
120126
}
121-
new TestCondition(){
122-
@Override
123-
public boolean get() {
124-
return count.get() == number;
125-
}
126-
}.blockTillTrue(10000);
127+
128+
Profiler p = new Profiler();
129+
TestUtils.sleep(2_000);
130+
p.start();
131+
TestUtils.sleep(2_000);
132+
p.stop();
133+
System.out.println(p.dump());
134+
135+
av.waitForTest(10_000, number);
136+
127137
httpClient.stop();
128138
}
129139

130140
@Test
131-
public void manyRequestsConcurrentJavaExecutor() throws IOException, InterruptedException {
141+
public void manyRequestsConcurrentJavaExecutor() throws IOException, InterruptedException, TimeoutException {
132142
final int number = 500;
133143
final int port = PortUtils.findTCPPort();
134144
final ThreadedSocketExecuter TSE = new ThreadedSocketExecuter(PS);
@@ -138,48 +148,43 @@ public void manyRequestsConcurrentJavaExecutor() throws IOException, Interrupted
138148
hrb.setHTTPAddress(new HTTPAddress("localhost", port, false), true);
139149
final HTTPClient httpClient = new HTTPClient(HTTPClient.DEFAULT_CONCURRENT, HTTPClient.MAX_HTTP_RESPONSE, TSE);
140150
httpClient.start();
141-
final AtomicInteger count = new AtomicInteger(0);
142151

143-
PriorityScheduler CLIENT_PS = new PriorityScheduler(200);
152+
AsyncVerifier av = new AsyncVerifier();
153+
PriorityScheduler CLIENT_PS = new PriorityScheduler(20);
144154
Runnable run = new Runnable() {
145155
@Override
146156
public void run() {
147157
ClientHTTPRequest chr = hrb.buildClientHTTPRequest();
148158
//final long start = Clock.accurateForwardProgressingMillis();
159+
149160
final ListenableFuture<HTTPResponseData> lf = httpClient.requestAsync(chr);
150161
lf.callback(new FutureCallback<HTTPResponseData>() {
151162
@Override
152163
public void handleResult(HTTPResponseData result) {
153164
//System.out.println("DELAY:"+(Clock.accurateForwardProgressingMillis()-start));
154-
assertEquals("TEST123", result.getBodyAsString());
155-
count.incrementAndGet();
165+
av.assertEquals(CONTENT, result.getBodyAsString());
166+
av.signalComplete();
156167
}
157168

158169
@Override
159170
public void handleFailure(Throwable t) {
160-
System.err.println("***********************ERR*******************");
161-
t.printStackTrace();
162-
System.err.println("***********************ERR*******************");
163-
fail();
171+
av.fail(t);
164172
}});
165173
}};
166174

167175
for(int i=0; i<number; i++) {
168176
CLIENT_PS.execute(run);
169177
}
170-
new TestCondition(){
171-
@Override
172-
public boolean get() {
173-
return count.get() == number;
174-
}
175-
}.blockTillTrue(10000);
178+
179+
av.waitForTest(10_000, number);
180+
176181
httpClient.stop();
177182
TSE.stop();
178183
}
179184

180185

181186
@Test
182-
public void manyRequestsConcurrentOnPool() throws IOException {
187+
public void manyRequestsConcurrentOnPool() throws IOException, InterruptedException, TimeoutException {
183188
final int number = 500;
184189
final int port = PortUtils.findTCPPort();
185190
fakeServer = new TestHTTPServer(port, RESPONSE_CL, CONTENT, false, false);
@@ -189,9 +194,9 @@ public void manyRequestsConcurrentOnPool() throws IOException {
189194
TSE.start();
190195
final HTTPClient httpClient = new HTTPClient(200, HTTPClient.MAX_HTTP_RESPONSE, TSE);
191196
httpClient.start();
192-
final AtomicInteger count = new AtomicInteger(0);
193197

194-
PriorityScheduler CLIENT_PS = new PriorityScheduler(200);
198+
AsyncVerifier av = new AsyncVerifier();
199+
PriorityScheduler CLIENT_PS = new PriorityScheduler(20);
195200
Runnable run = new Runnable() {
196201
@Override
197202
public void run() {
@@ -203,28 +208,22 @@ public void run() {
203208
@Override
204209
public void handleResult(HTTPResponseData result) {
205210
//System.out.println("DELAY:"+(Clock.accurateForwardProgressingMillis()-start));
206-
assertEquals("TEST123", result.getBodyAsString());
207-
count.incrementAndGet();
211+
av.assertEquals(CONTENT, result.getBodyAsString());
212+
av.signalComplete();
208213
}
209214

210215
@Override
211216
public void handleFailure(Throwable t) {
212-
System.err.println("***********************ERR*******************");
213-
t.printStackTrace();
214-
System.err.println("***********************ERR*******************");
215-
fail();
217+
av.fail(t);
216218
}});
217219
}};
218220

219221
for(int i=0; i<number; i++) {
220222
CLIENT_PS.execute(run);
221223
}
222-
new TestCondition(){
223-
@Override
224-
public boolean get() {
225-
return count.get() == number;
226-
}
227-
}.blockTillTrue(10000);
224+
225+
av.waitForTest(10_000, number);
226+
228227
httpClient.stop();
229228
TSE.stop();
230229
}
@@ -237,7 +236,7 @@ public void blockingRequest() throws IOException, HTTPParsingException {
237236
hrb.setHTTPAddress(new HTTPAddress("localhost", port, false), true);
238237
final HTTPClient httpClient = new HTTPClient();
239238
httpClient.start();
240-
assertEquals("TEST123", httpClient.request(hrb.buildClientHTTPRequest()).getBodyAsString());
239+
assertEquals(CONTENT, httpClient.request(hrb.buildClientHTTPRequest()).getBodyAsString());
241240
}
242241

243242
@Test
@@ -262,12 +261,40 @@ public void noContentLengthWithBody() throws IOException, HTTPParsingException {
262261
httpClient.start();
263262
HTTPResponseData hrs = httpClient.request(hrb.buildClientHTTPRequest());
264263
//System.out.println(hrs.getResponse());
265-
assertEquals("TEST123", hrs.getBodyAsString());
264+
assertEquals(CONTENT, hrs.getBodyAsString());
265+
}
266+
267+
//@Test
268+
public void streamedBodyRequest() throws IOException, HTTPParsingException, InterruptedException, ExecutionException {
269+
int port = PortUtils.findTCPPort();
270+
fakeServer = new TestHTTPServer(port, RESPONSE_CL, CONTENT, false, true);
271+
ByteBuffer write1 = ByteBuffer.allocate(100);
272+
ByteBuffer write2 = ByteBuffer.allocate(100);
273+
SettableListenableFuture<ByteBuffer> write1SLF = new SettableListenableFuture<>();
274+
SettableListenableFuture<ByteBuffer> write2SLF = new SettableListenableFuture<>();
275+
@SuppressWarnings({"unchecked", "rawtypes"})
276+
Iterator<ListenableFuture<ByteBuffer>> writeIt =
277+
ArrayIterator.makeIterator(new ListenableFuture[] {
278+
write1SLF, write2SLF, FutureUtils.immediateResultFuture(null) });
279+
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port))
280+
.setStreamedBody(write1.remaining() + write2.remaining(), writeIt::next);
281+
282+
final HTTPClient httpClient = new HTTPClient();
283+
httpClient.start();
284+
ListenableFuture<HTTPResponseData> lf = httpClient.requestAsync(hrb.buildClientHTTPRequest());
285+
Thread.sleep(100);
286+
assertFalse(lf.isDone());
287+
write1SLF.setResult(write1);
288+
Thread.sleep(100);
289+
assertFalse(lf.isDone());
290+
write2SLF.setResult(write2);
291+
292+
assertEquals(CONTENT, lf.get().getBodyAsString());
266293
}
267294

268295
@Test
269296
public void contentLengthOnHeadRequest() throws IOException, HTTPParsingException {
270-
int port = PortUtils.findTCPPort(); // TODO
297+
int port = PortUtils.findTCPPort();
271298
fakeServer = new TestHTTPServer(port, RESPONSE_CL, "", false, true);
272299
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("http://localhost:"+port));
273300
hrb.setRequestMethod("HEAD");
@@ -395,7 +422,7 @@ public void sslRequest() throws IOException, HTTPParsingException {
395422
final HTTPRequestBuilder hrb = new HTTPRequestBuilder(new URL("https://localhost:"+port));
396423
final HTTPClient httpClient = new HTTPClient();
397424
httpClient.start();
398-
assertEquals("TEST123", httpClient.request(hrb.buildClientHTTPRequest()).getBodyAsString());
425+
assertEquals(CONTENT, httpClient.request(hrb.buildClientHTTPRequest()).getBodyAsString());
399426
}
400427

401428
@Test

0 commit comments

Comments
 (0)