Skip to content

Commit 0b41047

Browse files
committed
CompositeFuture should support custom future implementations (#5401)
See #5399 CompositeFuture expected every future to extend FutureBase. But sometimes users may come with their own implementation. Signed-off-by: Thomas Segismont <[email protected]>
1 parent b8d0699 commit 0b41047

File tree

2 files changed

+112
-8
lines changed

2 files changed

+112
-8
lines changed

src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
package io.vertx.core.impl.future;
1313

1414
import io.vertx.core.AsyncResult;
15-
import io.vertx.core.Future;
1615
import io.vertx.core.CompositeFuture;
16+
import io.vertx.core.Future;
1717
import io.vertx.core.Handler;
1818

1919
/**
@@ -64,8 +64,12 @@ private CompositeFutureImpl(int op, boolean initializing, Future<?>... results)
6464

6565
private void init() {
6666
for (Future<?> result : results) {
67-
FutureInternal internal = (FutureInternal<?>) result;
68-
internal.addListener(this);
67+
if (result instanceof FutureInternal) {
68+
FutureInternal internal = (FutureInternal<?>) result;
69+
internal.addListener(this);
70+
} else {
71+
result.onComplete(this::onSuccess, this::onFailure);
72+
}
6973
}
7074
Object o;
7175
synchronized (this) {
@@ -201,8 +205,10 @@ public int size() {
201205

202206
private void complete(Object result) {
203207
for (Future<?> r : results) {
204-
FutureInternal internal = (FutureInternal<?>) r;
205-
internal.removeListener(this);
208+
if (r instanceof FutureInternal) {
209+
FutureInternal internal = (FutureInternal<?>) r;
210+
internal.removeListener(this);
211+
}
206212
}
207213
if (result == this) {
208214
tryComplete(this);

src/test/java/io/vertx/core/CompositeFutureTest.java

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@
1111

1212
package io.vertx.core;
1313

14-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
15-
1614
import io.vertx.core.impl.future.FutureImpl;
1715
import io.vertx.core.impl.future.Listener;
1816
import io.vertx.test.core.Repeat;
19-
2017
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
2118
import org.junit.Test;
2219

@@ -33,6 +30,8 @@
3330
import java.util.stream.Collectors;
3431
import java.util.stream.IntStream;
3532

33+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
34+
3635
/**
3736
* @author <a href="mailto:[email protected]">Julien Viet</a>
3837
*/
@@ -574,4 +573,103 @@ public void testAnyRemovesListeners2() {
574573
Future.any(f, Future.succeededFuture());
575574
assertEquals(Collections.emptySet(), f.listeners);
576575
}
576+
577+
@Test
578+
public void testCustomFuture() {
579+
Promise<Void> p1 = Promise.promise();
580+
Promise<Void> p2 = Promise.promise();
581+
Promise<Void> p3 = Promise.promise();
582+
583+
CompositeFuture cf = Future.all(p1.future(), new MyFuture(p2), p3.future());
584+
585+
p1.complete(null);
586+
p2.complete(null);
587+
p3.complete(null);
588+
589+
assertTrue(cf.isComplete());
590+
}
591+
592+
private static class MyFuture implements Future<Void> {
593+
594+
private final Future<Void> delegate;
595+
596+
private MyFuture(Promise<Void> promise) {
597+
delegate = promise.future();
598+
}
599+
600+
@Override
601+
public boolean isComplete() {
602+
return delegate.isComplete();
603+
}
604+
605+
@Override
606+
public Future<Void> onComplete(Handler<AsyncResult<Void>> handler) {
607+
return delegate.onComplete(handler);
608+
}
609+
610+
@Override
611+
public Void result() {
612+
return delegate.result();
613+
}
614+
615+
@Override
616+
public Throwable cause() {
617+
return delegate.cause();
618+
}
619+
620+
@Override
621+
public boolean succeeded() {
622+
return delegate.succeeded();
623+
}
624+
625+
@Override
626+
public boolean failed() {
627+
return delegate.failed();
628+
}
629+
630+
@Override
631+
public <U> Future<U> compose(Function<Void, Future<U>> successMapper, Function<Throwable, Future<U>> failureMapper) {
632+
return delegate.compose(successMapper, failureMapper);
633+
}
634+
635+
@Override
636+
public <U> Future<U> transform(Function<AsyncResult<Void>, Future<U>> mapper) {
637+
return delegate.transform(mapper);
638+
}
639+
640+
@Override
641+
public <U> Future<Void> eventually(Function<Void, Future<U>> function) {
642+
return delegate.eventually(function);
643+
}
644+
645+
@Override
646+
public <U> Future<U> map(Function<Void, U> mapper) {
647+
return delegate.map(mapper);
648+
}
649+
650+
@Override
651+
public <V> Future<V> map(V value) {
652+
return delegate.map(value);
653+
}
654+
655+
@Override
656+
public Future<Void> otherwise(Function<Throwable, Void> mapper) {
657+
return delegate.otherwise(mapper);
658+
}
659+
660+
@Override
661+
public Future<Void> otherwise(Void value) {
662+
return delegate.otherwise(value);
663+
}
664+
665+
@Override
666+
public Future<Void> expecting(Expectation<? super Void> expectation) {
667+
return delegate.expecting(expectation);
668+
}
669+
670+
@Override
671+
public Future<Void> timeout(long delay, TimeUnit unit) {
672+
return delegate.timeout(delay, unit);
673+
}
674+
}
577675
}

0 commit comments

Comments
 (0)