Skip to content

Commit cdd6345

Browse files
committed
Standardize CompletableFutureWrapper.toCompletableFuture(enlistOrigin) behavior
1 parent bed18c9 commit cdd6345

File tree

6 files changed

+126
-9
lines changed

6 files changed

+126
-9
lines changed

src/main/java/net/tascalate/concurrent/CompletableFutureWrapper.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ boolean complete(T value, Throwable ex) {
5454
return null == ex ? success(value) : failure(ex);
5555
}
5656

57+
@Override
58+
public CompletableFuture<T> toCompletableFuture() {
59+
// Return concrete subclass that neither completes nor cancels this wrapper
60+
return (CompletableFuture<T>)delegate.thenApply(Function.identity());
61+
}
62+
63+
// Report self-origin
64+
@Override
65+
public CompletionStage<T> τ() {
66+
return this;
67+
}
68+
5769
@Override
5870
protected <U> Promise<U> wrap(CompletionStage<U> original) {
5971
return new CompletableFutureWrapper<>((CompletableFuture<U>)original);

src/main/java/net/tascalate/concurrent/CompletablePromise.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,17 @@ public CompletionStage<T> minimalCompletionStage() {
112112
// Should pass this, not delegate - while delegate has invalid thenCompose impl.
113113
return new MinimalCompletionStage<>(this);
114114
}
115+
116+
@Override
117+
public CompletableFuture<T> toCompletableFuture() {
118+
// Returns the delegate itself, so this CompletablePromise may be completed / cancelled
119+
return delegate;
120+
}
121+
122+
@Override
123+
public CompletionStage<T> τ() {
124+
return delegate;
125+
}
115126

116127
@Override
117128
protected <U> CompletablePromise<U> wrap(CompletionStage<U> original) {

src/main/java/net/tascalate/concurrent/ConfigurableDependentPromise.java

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.function.Predicate;
3737
import java.util.function.Supplier;
3838

39+
import net.tascalate.concurrent.core.Delegator;
3940
import net.tascalate.concurrent.decorators.AbstractPromiseDecorator;
4041

4142
/**
@@ -74,7 +75,7 @@
7475
* @param <T>
7576
* a type of the successfully resolved promise value
7677
*/
77-
public class ConfigurableDependentPromise<T> implements DependentPromise<T> {
78+
public class ConfigurableDependentPromise<T> implements DependentPromise<T>, Delegator<T> {
7879
protected final Promise<T> delegate;
7980
protected final CompletionStage<?>[] cancellableOrigins;
8081
protected final Set<PromiseOrigin> defaultEnlistOptions;
@@ -830,21 +831,61 @@ protected Promise<T> cancellablePromiseOf(Promise<T> original) {
830831
public CompletableFuture<T> toCompletableFuture() {
831832
return toCompletableFuture(defaultEnlistOrigin());
832833
}
833-
834+
834835
@Override
835836
public CompletableFuture<T> toCompletableFuture(boolean enlistOrigin) {
836-
if (!enlistOrigin) {
837-
return delegate.toCompletableFuture();
838-
} else {
839-
CompletableFuture<T> result = new CompletableFuture<T>() {
837+
CompletionStage<T> tau = tauOf(delegate);
838+
CompletableFuture<T> rootDelegate = tau instanceof CompletableFuture ? (CompletableFuture<T>)tau : null;
839+
CompletableFuture<T> result;
840+
if (enlistOrigin) {
841+
if (rootDelegate != null) {
842+
// Use toCompletableFuture only if there is a delegate
843+
CompletableFuture<T> defaultResult = delegate.toCompletableFuture();
844+
if (rootDelegate == defaultResult) {
845+
return defaultResult;
846+
}
847+
}
848+
result = new CompletableFuture<T>() {
840849
@Override
841850
public boolean cancel(boolean mayInterruptIfRunning) {
842851
ConfigurableDependentPromise.this.cancel(mayInterruptIfRunning);
843852
return super.cancel(mayInterruptIfRunning);
844853
}
845854
};
846-
whenComplete((r, e) -> iif(null == e ? result.complete(r) : result.completeExceptionally(e)));
847-
return result;
855+
} else {
856+
CompletableFuture<T> defaultResult = delegate.toCompletableFuture();
857+
if (rootDelegate != defaultResult) {
858+
// Assume if there is no rootDelegate or rootDelegate != rootDelegate.toCompletableFuture
859+
// then toCompletableFuture returns stage that doesn't cancel original one
860+
return defaultResult;
861+
}
862+
result = new CompletableFuture<>();
863+
}
864+
whenComplete((r, e) -> iif(null == e ? result.complete(r) : result.completeExceptionally(e)));
865+
return result;
866+
}
867+
868+
@Override
869+
public CompletionStage<T> τ() {
870+
return tauOf(delegate);
871+
}
872+
873+
@Override
874+
public String toString() {
875+
return String.format(
876+
"%s@%d[delegate=%s, enlistOptions=%s, cancellable=%s]",
877+
getClass().getSimpleName(), System.identityHashCode(this),
878+
delegate, this.defaultEnlistOptions, null == cancellableOrigins ? "{}" : Arrays.asList(cancellableOrigins)
879+
);
880+
}
881+
882+
private static <T> CompletionStage<T> tauOf(Promise<T> promise) {
883+
if (promise instanceof Delegator) {
884+
@SuppressWarnings("unchecked")
885+
Delegator<T> delegator = (Delegator<T>)promise;
886+
return delegator.τ();
887+
} else {
888+
return promise;
848889
}
849890
}
850891

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Copyright 2015-2020 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package net.tascalate.concurrent.core;
17+
18+
import java.util.concurrent.CompletionStage;
19+
20+
public interface Delegator<T> {
21+
CompletionStage<T> τ();
22+
}

src/main/java/net/tascalate/concurrent/decorators/AbstractPromiseLikeDecorator.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@
2323
import java.util.function.Function;
2424

2525
import net.tascalate.concurrent.Promise;
26+
import net.tascalate.concurrent.core.Delegator;
2627

2728
public abstract class AbstractPromiseLikeDecorator<T, D extends CompletionStage<T>>
2829
extends AbstractCompletionStageDecorator<T, D>
29-
implements CompletionStage<T> {
30+
implements CompletionStage<T>, Delegator<T> {
3031

3132
protected AbstractPromiseLikeDecorator(D delegate) {
3233
super(delegate);
@@ -270,4 +271,29 @@ public <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U>
270271
public <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
271272
return cast(super.handleAsync(fn, executor));
272273
}
274+
275+
// Tau-operator -- the origin of origins
276+
public CompletionStage<T> τ() {
277+
CompletionStage<T> p = delegate;
278+
if (p instanceof Delegator) {
279+
return delegator(p).τ();
280+
} else {
281+
// Default path -- unroll
282+
while (p instanceof AbstractCompletionStageDecorator) {
283+
@SuppressWarnings("unchecked")
284+
AbstractCompletionStageDecorator<T, ? extends CompletionStage<T>> ap =
285+
(AbstractCompletionStageDecorator<T, ? extends CompletionStage<T>>)p;
286+
p = ap.delegate;
287+
if (p instanceof Delegator) {
288+
return delegator(p).τ();
289+
}
290+
}
291+
return p;
292+
}
293+
}
294+
295+
@SuppressWarnings("unchecked")
296+
private static <T> Delegator<T> delegator(CompletionStage<T> delegate) {
297+
return (Delegator<T>)delegate;
298+
}
273299
}

src/test/java/net/tascalate/concurrent/J8Examples.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import net.tascalate.concurrent.core.CompletionStageAPI;
3434
import net.tascalate.concurrent.decorators.AbstractCompletionStageDecorator;
35+
import net.tascalate.concurrent.decorators.CustomizablePromiseDecorator;
3536
import net.tascalate.concurrent.decorators.ExtendedPromiseDecorator;
3637
import net.tascalate.concurrent.locks.AsyncSemaphoreLock;
3738

@@ -106,6 +107,10 @@ public static void main(final String[] argv) throws InterruptedException, Execut
106107

107108
final TaskExecutorService executorService = TaskExecutors.newFixedThreadPool(6, tf);
108109

110+
System.out.println(new CustomizablePromiseDecorator<>(CompletableTask.completed("", executorService).defaultAsyncOn(executorService), null).τ());
111+
System.out.println(new CustomizablePromiseDecorator<>(Promises.success(11).defaultAsyncOn(executorService), null).τ());
112+
System.out.println(new CustomizablePromiseDecorator<>(new CompletablePromise<Long>().defaultAsyncOn(executorService), null).τ());
113+
109114
Promise<?> t =
110115
Promises.success("11")
111116
//CompletableTask

0 commit comments

Comments
 (0)