2323import org .threadly .concurrent .ReschedulingOperation ;
2424import org .threadly .concurrent .SingleThreadScheduler ;
2525import org .threadly .concurrent .SubmitterScheduler ;
26+ import org .threadly .concurrent .future .FutureCallback ;
2627import org .threadly .concurrent .future .FutureUtils ;
2728import org .threadly .concurrent .future .ListenableFuture ;
2829import org .threadly .concurrent .future .SettableListenableFuture ;
@@ -401,23 +402,28 @@ private void process(HTTPRequestWrapper hrw) {
401402 if (hrw .chr .getHTTPRequest ().getHTTPHeaders ().isChunked ()) {
402403 hrw .client .write (hrw .chr .getHTTPRequest ().getMergedByteBuffers ());
403404
404- hrw .chr .nextBodySection ().resultCallback (new Consumer <ByteBuffer >() {
405+ hrw .chr .nextBodySection ().callback (new FutureCallback <ByteBuffer >() {
405406 @ Override
406- public void accept (ByteBuffer bb ) {
407+ public void handleResult (ByteBuffer bb ) {
407408 ListenableFuture <?> writeFuture = hrw .client .write (HTTPUtils .wrapInChunk (bb ));
408409
409410 if (bb != null && bb .hasRemaining ()) {
410- writeFuture . resultCallback (( ignored ) ->
411- hrw . chr . nextBodySection (). resultCallback (this ));
411+ ListenableFuture < ByteBuffer > nextWrite = hrw . chr . nextBodySection ();
412+ writeFuture . resultCallback (( ignored ) -> nextWrite . callback (this ));
412413 }
413414 }
415+
416+ @ Override
417+ public void handleFailure (Throwable t ) {
418+ hrw .slf .handleFailure (t );
419+ }
414420 });
415421 } else {
416- hrw .chr .nextBodySection ().resultCallback (new Consumer <ByteBuffer >() {
422+ hrw .chr .nextBodySection ().callback (new FutureCallback <ByteBuffer >() {
417423 private boolean firstSection = true ;
418424
419425 @ Override
420- public void accept (ByteBuffer bb ) {
426+ public void handleResult (ByteBuffer bb ) {
421427 if (bb != null && bb .hasRemaining ()) {
422428 MergedByteBuffers writeBuffer ;
423429 if (firstSection ) {
@@ -429,13 +435,19 @@ public void accept(ByteBuffer bb) {
429435 writeBuffer = new SimpleMergedByteBuffers (false , bb );
430436 }
431437
438+ ListenableFuture <ByteBuffer > nextWrite = hrw .chr .nextBodySection ();
432439 hrw .client .write (writeBuffer )
433- .resultCallback ((ignored ) -> hrw . chr . nextBodySection (). resultCallback (this ));
440+ .resultCallback ((ignored ) -> nextWrite . callback (this ));
434441 } else if (firstSection ) {
435442 firstSection = false ;
436443 hrw .client .write (hrw .chr .getHTTPRequest ().getMergedByteBuffers ());
437444 }
438445 }
446+
447+ @ Override
448+ public void handleFailure (Throwable t ) {
449+ hrw .slf .handleFailure (t );
450+ }
439451 });
440452 }
441453 } else {
0 commit comments