2020import org .reactivestreams .Subscriber ;
2121import org .reactivestreams .Subscription ;
2222
23- import java .util .concurrent .CountDownLatch ;
23+ import java .util .NoSuchElementException ;
24+ import java .util .concurrent .CompletableFuture ;
2425import java .util .concurrent .ScheduledExecutorService ;
2526import java .util .concurrent .TimeUnit ;
2627import java .util .concurrent .TimeoutException ;
2728import java .util .concurrent .atomic .AtomicBoolean ;
28- import java .util .concurrent .atomic .AtomicReference ;
2929
3030@ FunctionalInterface
3131public interface ReactiveSocketFactory <T , R extends ReactiveSocket > {
@@ -38,9 +38,7 @@ public interface ReactiveSocketFactory<T, R extends ReactiveSocket> {
3838 * @return blocks on create the socket
3939 */
4040 default R callAndWait (T t ) {
41- AtomicReference <R > reference = new AtomicReference <>();
42- AtomicReference <Throwable > error = new AtomicReference <>();
43- CountDownLatch latch = new CountDownLatch (1 );
41+ CompletableFuture <R > future = new CompletableFuture <>();
4442
4543 call (t )
4644 .subscribe (new Subscriber <R >() {
@@ -51,26 +49,21 @@ public void onSubscribe(Subscription s) {
5149
5250 @ Override
5351 public void onNext (R reactiveSocket ) {
54- reference . set (reactiveSocket );
52+ future . complete (reactiveSocket );
5553 }
5654
5755 @ Override
5856 public void onError (Throwable t ) {
59- error .set (t );
60- latch .countDown ();
57+ future .completeExceptionally (t );
6158 }
6259
6360 @ Override
6461 public void onComplete () {
65- latch . countDown ( );
62+ future . completeExceptionally ( new NoSuchElementException ( "Sequence contains no elements" ) );
6663 }
6764 });
6865
69- if (error .get () != null ) {
70- throw new RuntimeException (error .get ());
71- } else {
72- return reference .get ();
73- }
66+ return future .join ();
7467 }
7568
7669 /**
0 commit comments