11package org .threadly .litesockets .protocols .http .request ;
22
3+ import java .io .IOException ;
34import java .io .InputStream ;
45import java .net .URL ;
56import java .nio .ByteBuffer ;
1112import java .util .Map ;
1213import java .util .Map .Entry ;
1314import java .util .TreeMap ;
15+ import java .util .concurrent .Callable ;
1416import java .util .concurrent .TimeUnit ;
1517import java .util .function .Supplier ;
1618
@@ -121,7 +123,6 @@ public HTTPRequestBuilder setHTTPVersion(final String version) {
121123 return this ;
122124 }
123125
124-
125126 /**
126127 * This sets the request path for the {@link HTTPRequestBuilder}. If a query is on this path it will replace the current query
127128 * in this builder.
@@ -178,7 +179,6 @@ public HTTPRequestBuilder removeQuery(final String key) {
178179 return this ;
179180 }
180181
181-
182182 /**
183183 * Sets the {@link HTTPAddress} for this builder. This will add a Host header into the headers of this builder
184184 * when this object it built. This is also used with the {@link #buildHTTPAddress()} method.
@@ -256,6 +256,12 @@ public HTTPRequestBuilder setPort(final int port) {
256256 return this ;
257257 }
258258
259+ /**
260+ * Set a single part body to send in the request.
261+ *
262+ * @param bb The buffer to be provided or {@code null} to unset the body
263+ * @return the current {@link HTTPRequestBuilder} object.
264+ */
259265 public HTTPRequestBuilder setBody (final ByteBuffer bb ) {
260266 if (bb != null && bb .hasRemaining ()) {
261267 @ SuppressWarnings ({"unchecked" , "rawtypes" })
@@ -272,11 +278,56 @@ public HTTPRequestBuilder setBody(final ByteBuffer bb) {
272278 return this ;
273279 }
274280
281+ /**
282+ * Set a single part body to send in the request.
283+ *
284+ * @param str The body contents represented as a string
285+ * @return the current {@link HTTPRequestBuilder} object.
286+ */
287+ public HTTPRequestBuilder setBody (final String str ) {
288+ return setBody (ByteBuffer .wrap (str .getBytes ()));
289+ }
290+
291+ /**
292+ * Set a single part body to send in the request.
293+ *
294+ * @param str The body contents represented as a string
295+ * @return the current {@link HTTPRequestBuilder} object.
296+ */
297+ public HTTPRequestBuilder setBody (final String str , Charset cs ) {
298+ return setBody (ByteBuffer .wrap (str .getBytes (cs )));
299+ }
300+
301+ /**
302+ * Set a body to be consumed from an {@link InputStream}.
303+ *
304+ * @param executor Executor to do blocking read from InputStream on
305+ * @param bodySize The total size to be consumed from the InputStream
306+ * @param bodyStream The stream to consume from
307+ * @param bufferSize The size per-read from the stream, up to twice of this may be allocated at a time
308+ * @return the current {@link HTTPRequestBuilder} object.
309+ */
275310 public HTTPRequestBuilder setStreamedBody (final SubmitterExecutor executor , final int bodySize ,
276- final InputStream bodyStream ) {
277- return setStreamedBody (bodySize , bodyProducer (executor , bodyStream ));
311+ final InputStream bodyStream ,
312+ final int bufferSize ) {
313+ return setStreamedBody (bodySize , bodyProducer (executor , bodyStream , bufferSize ));
278314 }
279315
316+ /**
317+ * Set a body from a supplier of {@link ListenableFuture}'s. Each future should provide the next
318+ * part of the body. Once a future returns a {@code null} or otherwise empty {@link ByteBuffer},
319+ * it is assumed the body is complete and will not be invoked for more content.
320+ * <p>
321+ * The Supplier will NOT be invoked concurrently, however the returned buffer of the last invoke
322+ * CAN'T be reused. The next write buffer will be requested before the last one has finished
323+ * sending in order to facilitate smooth performance when reading content to send has a delay.
324+ * There will never be more than 2 unsent writes requested, so buffer reuse can happen for every
325+ * other request.
326+ *
327+ * @param bodySize The total size to be consumed from the InputStream
328+ * @param bodySupplier The supplier of writes, till {@code null} ends the body stream
329+ * @return the current {@link HTTPRequestBuilder} object.
330+ */
280331 public HTTPRequestBuilder setStreamedBody (final int bodySize ,
281332 final Supplier <ListenableFuture <ByteBuffer >> bodySupplier ) {
282333 this .bodySupplier = bodySupplier ;
@@ -285,37 +336,80 @@ public HTTPRequestBuilder setStreamedBody(final int bodySize,
285336 return this ;
286337 }
287338
339+ /**
340+ * Set a chunked body to be consumed from an {@link InputStream}. Each read will be turned into
341+ * an HTTP chunk.
342+ *
343+ * @param executor Executor to do blocking read from InputStream on
344+ * @param bodyStream The stream to consume from
345+ * @param bufferSize The size per-read from the stream, up to twice of this may be allocated at a time
346+ * @return the current {@link HTTPRequestBuilder} object.
347+ */
288348 public HTTPRequestBuilder setChunkedBody (final SubmitterExecutor executor ,
289- final InputStream bodyStream ) {
290- return setChunkedBody (bodyProducer (executor , bodyStream ));
291- }
292-
293- private Supplier <ListenableFuture <ByteBuffer >> bodyProducer (final SubmitterExecutor executor ,
294- final InputStream bodyStream ) {
295- return () -> executor .submit (() -> {
296- byte [] buffer = new byte [8192 ];
297- int c = bodyStream .read (buffer );
298- if (c > 0 ) {
299- return ByteBuffer .wrap (buffer , 0 , c );
300- } else {
301- return null ;
302- }
303- });
349+ final InputStream bodyStream ,
350+ final int bufferSize ) {
351+ return setChunkedBody (bodyProducer (executor , bodyStream , bufferSize ));
304352 }
305353
354+ /**
355+ * Set a chunked body from a supplier of {@link ListenableFuture}'s. Each future should provide
356+ * the next chunk for the body. Once a future returns a {@code null} or otherwise empty
357+ * {@link ByteBuffer}, it is assumed the body is complete and will not be invoked for more
358+ * content.
359+ * <p>
360+ * The Supplier will NOT be invoked concurrently, however the returned buffer of the last invoke
361+ * CAN'T be reused. The next write buffer will be requested before the last one has finished
362+ * sending in order to facilitate smooth performance when reading content to send has a delay.
363+ * There will never be more than 2 unsent writes requested, so buffer reuse can happen for every
364+ * other request.
365+ *
366+ * @param bodySupplier The supplier of writes, till {@code null} ends the body stream
367+ * @return the current {@link HTTPRequestBuilder} object.
368+ */
306369 public HTTPRequestBuilder setChunkedBody (final Supplier <ListenableFuture <ByteBuffer >> bodySupplier ) {
307370 this .bodySupplier = bodySupplier ;
308371 this .removeHeader (HTTPConstants .HTTP_KEY_CONTENT_LENGTH );
309372 this .setHeader (HTTPConstants .HTTP_KEY_TRANSFER_ENCODING , "chunked" );
310373 return this ;
311374 }
312375
313- public HTTPRequestBuilder setBody (final String str ) {
314- return setBody (ByteBuffer .wrap (str .getBytes ()));
315- }
316-
317- public HTTPRequestBuilder setBody (final String str , Charset cs ) {
318- return setBody (ByteBuffer .wrap (str .getBytes (cs )));
376+ private Supplier <ListenableFuture <ByteBuffer >> bodyProducer (final SubmitterExecutor executor ,
377+ final InputStream bodyStream ,
378+ final int bufferSize ) {
379+ Callable <ByteBuffer > streamReader = new Callable <ByteBuffer >() {
380+ private boolean use0 = true ;
381+ private ByteBuffer buffer0 = ByteBuffer .allocate (bufferSize );
382+ private ByteBuffer buffer1 = null ; // lazily set
383+
384+ @ Override
385+ public ByteBuffer call () throws Exception {
386+ if (use0 ) {
387+ use0 = false ;
388+
389+ return read (buffer0 );
390+ } else {
391+ use0 = true ;
392+
393+ if (buffer1 == null ) {
394+ buffer1 = ByteBuffer .allocate (bufferSize );
395+ }
396+
397+ return read (buffer1 );
398+ }
399+ }
400+
401+ private ByteBuffer read (ByteBuffer buffer ) throws IOException {
402+ int c = bodyStream .read (buffer .array ());
403+ if (c > 0 ) {
404+ buffer .position (0 );
405+ buffer .limit (c );
406+ return buffer ;
407+ } else {
408+ return null ;
409+ }
410+ }
411+ };
412+ return () -> executor .submit (streamReader );
319413 }
320414
321415 public HTTPRequestBuilder setTimeout (long size , TimeUnit unit ) {
0 commit comments