Skip to content

Commit 31c5d53

Browse files
committed
Optimizing ad-hoc third-party cancel methods invocation
1 parent 3d0a676 commit 31c5d53

File tree

4 files changed

+286
-71
lines changed

4 files changed

+286
-71
lines changed

src/main/java/net/tascalate/concurrent/SharedFunctions.java

Lines changed: 4 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package net.tascalate.concurrent;
1717

18-
import java.lang.reflect.Method;
1918
import java.util.NoSuchElementException;
20-
import java.util.Optional;
21-
import java.util.concurrent.CancellationException;
2219
import java.util.concurrent.CompletableFuture;
2320
import java.util.concurrent.CompletionException;
2421
import java.util.concurrent.CompletionStage;
@@ -27,7 +24,8 @@
2724
import java.util.function.BiFunction;
2825
import java.util.function.Function;
2926
import java.util.function.Supplier;
30-
import java.util.stream.Stream;
27+
28+
import net.tascalate.concurrent.core.CancelMethodsCache;
3129

3230
class SharedFunctions {
3331

@@ -77,17 +75,8 @@ static boolean cancelPromise(CompletionStage<?> promise, boolean mayInterruptIfR
7775
Future<?> future = (Future<?>) promise;
7876
return future.cancel(mayInterruptIfRunning);
7977
} else {
80-
Stream<Function<Class<?>, ExceptionalCancellation>> options = Stream.of(
81-
SharedFunctions::cancelInterruptibleMethodOf,
82-
SharedFunctions::cancelMethodOf,
83-
SharedFunctions::completeExceptionallyMethodOf
84-
);
85-
86-
return options.map(f -> tryCancellation(f, promise, mayInterruptIfRunning))
87-
.filter(Optional::isPresent)
88-
.map(Optional::get)
89-
.findFirst()
90-
.orElse(Boolean.FALSE);
78+
return CancelMethodsCache.cancellationOf(promise.getClass())
79+
.apply(promise, mayInterruptIfRunning);
9180
}
9281
}
9382

@@ -112,66 +101,10 @@ static <T> Supplier<T> supply(T value) {
112101

113102
static void iif(boolean v) {}
114103
static <T> void voided(T v) {}
115-
116-
private static Optional<Boolean> tryCancellation(Function<Class<?>, ExceptionalCancellation> option,
117-
CompletionStage<?> promise,
118-
boolean mayInterruptIfRunning) {
119-
120-
121-
return Optional.ofNullable( option.apply(promise.getClass()) )
122-
.map(SharedFunctions::uncheckedReflectionException) // TODO: False for reflective operation or runtime exception?
123-
.map(bf -> bf.apply(promise, mayInterruptIfRunning ? Boolean.TRUE : Boolean.FALSE))
124-
;
125-
}
126-
127-
private static ExceptionalCancellation completeExceptionallyMethodOf(Class<?> clazz) {
128-
try {
129-
Method m = clazz.getMethod("completeExceptionally", Throwable.class);
130-
return (p, b) -> (Boolean)m.invoke(p, new CancellationException());
131-
} catch (ReflectiveOperationException | SecurityException ex) {
132-
return null;
133-
}
134-
}
135-
136-
private static ExceptionalCancellation cancelInterruptibleMethodOf(Class<?> clazz) {
137-
try {
138-
Method m = clazz.getMethod("cancel", boolean.class);
139-
return (p, b) -> (Boolean)m.invoke(p, b);
140-
} catch (ReflectiveOperationException | SecurityException ex) {
141-
return null;
142-
}
143-
}
144-
145-
private static ExceptionalCancellation cancelMethodOf(Class<?> clazz) {
146-
try {
147-
Method m = clazz.getMethod("cancel");
148-
return (p, b) -> (Boolean)m.invoke(p);
149-
} catch (ReflectiveOperationException | SecurityException ex) {
150-
return null;
151-
}
152-
}
153-
154-
private static <T, U> Cancellation uncheckedReflectionException(ExceptionalCancellation original ) {
155-
return (a, b) -> {
156-
try {
157-
return original.apply(a, b);
158-
} catch (ReflectiveOperationException ex) {
159-
throw new RuntimeException(ex);
160-
}
161-
};
162-
}
163104

164105
static final Function<Object, Throwable> NO_SUCH_ELEMENT = t -> new NoSuchElementException("Result rejected by filter: " + t);
165106

166107
private static final BiFunction<Object, Object, Object> SELECT_FIRST = (u, v) -> u;
167108
private static final BiFunction<Object, Object, Object> SELECT_SECOND = (u, v) -> v;
168109
private static final Function<Object, Object> NULLIFY = v -> null;
169-
170-
@FunctionalInterface
171-
private static interface Cancellation extends BiFunction<CompletionStage<?>, Boolean, Boolean> { }
172-
173-
@FunctionalInterface
174-
public interface ExceptionalCancellation {
175-
Boolean apply(CompletionStage<?> p, Boolean b) throws ReflectiveOperationException;
176-
}
177110
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Copyright 2015-2020 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package net.tascalate.concurrent.core;
17+
18+
import java.lang.ref.Reference;
19+
import java.lang.ref.ReferenceQueue;
20+
import java.lang.ref.WeakReference;
21+
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.function.Function;
24+
25+
26+
public final class Cache<K, V> {
27+
private final Map<Reference<K>, V> entries = new ConcurrentHashMap<>();
28+
private final ReferenceQueue<K> queue = new ReferenceQueue<K>();
29+
30+
public V get(K key, Function<? super K, ? extends V> valueFactory) {
31+
expungeStaleEntries();
32+
return entries.computeIfAbsent(new KeyReference<>(key), __ -> valueFactory.apply(key));
33+
}
34+
35+
private void expungeStaleEntries() {
36+
Reference<? extends K> ref;
37+
while ( (ref = queue.poll()) != null ) {
38+
@SuppressWarnings("unchecked")
39+
Reference<K> keyRef = (Reference<K>) ref;
40+
// keyRef now is equal only to itself while referent is cleared already
41+
// however, it should be found in entries by reference
42+
entries.remove(keyRef);
43+
}
44+
}
45+
46+
static final class KeyReference<K> extends WeakReference<K> {
47+
private final int referentHashCode;
48+
49+
KeyReference(K key) {
50+
this(key, null);
51+
}
52+
53+
KeyReference(K key, ReferenceQueue<K> queue) {
54+
super(key, queue);
55+
referentHashCode = key == null ? 0 : key.hashCode();
56+
}
57+
58+
@Override
59+
public int hashCode() {
60+
return referentHashCode;
61+
}
62+
63+
@Override
64+
public boolean equals(Object other) {
65+
if (this == other)
66+
return true;
67+
if (!(other instanceof KeyReference))
68+
return false;
69+
Object r1 = this.get();
70+
Object r2 = ((KeyReference<?>) other).get();
71+
return null == r1 ? null == r2 : r1.equals(r2);
72+
}
73+
}
74+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/**
2+
* Copyright 2015-2020 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package net.tascalate.concurrent.core;
17+
18+
import java.lang.invoke.MethodHandle;
19+
import java.lang.invoke.MethodHandles;
20+
import java.lang.invoke.MethodType;
21+
import java.lang.reflect.Method;
22+
import java.lang.reflect.Modifier;
23+
import java.util.HashSet;
24+
import java.util.Objects;
25+
import java.util.Optional;
26+
import java.util.Set;
27+
import java.util.concurrent.CancellationException;
28+
import java.util.concurrent.CompletionStage;
29+
import java.util.function.Function;
30+
import java.util.stream.Stream;
31+
32+
public final class CancelMethodsCache {
33+
34+
@FunctionalInterface
35+
public static interface Cancellation {
36+
boolean apply(CompletionStage<?> p, boolean b);
37+
}
38+
39+
private CancelMethodsCache() {}
40+
41+
public static Cancellation cancellationOf(Class<?> stageClass) {
42+
return CANCEL_METHODS.get(stageClass, LOOKUP_CANCEL_METHOD);
43+
}
44+
45+
private static final Cache<Class<?>, Cancellation> CANCEL_METHODS = new Cache<>();
46+
private static final Cancellation NO_CANCELATION = (p, b) -> false;
47+
private static final Function<Class<?>, Cancellation> LOOKUP_CANCEL_METHOD = c -> {
48+
Stream<Function<Class<?>, ExceptionalCancellation>> options = Stream.of(
49+
CancelMethodsCache::cancelInterruptibleMethodOf,
50+
CancelMethodsCache::cancelMethodOf,
51+
CancelMethodsCache::completeExceptionallyMethodOf
52+
);
53+
return options.map(option -> Optional.ofNullable( option.apply(c) ))
54+
.filter(Optional::isPresent)
55+
.map(Optional::get)
56+
.map(ExceptionalCancellation::unchecked)
57+
.findFirst()
58+
.orElse(NO_CANCELATION);
59+
};
60+
61+
private static ExceptionalCancellation cancelInterruptibleMethodOf(Class<?> clazz) {
62+
try {
63+
Method m = clazz.getMethod("cancel", boolean.class);
64+
Method mf = firstUnreflectableMethod(m);
65+
if (null != mf) {
66+
try {
67+
MethodHandle mh = MethodHandles.publicLookup()
68+
.unreflect(mf)
69+
.asType(MethodType.methodType(boolean.class, CompletionStage.class, boolean.class));
70+
return (p, b) -> (boolean)mh.invokeExact(p, b);
71+
} catch (IllegalAccessException ex) {
72+
// will try reflective
73+
}
74+
}
75+
return (p, b) -> (Boolean)m.invoke(p, b);
76+
} catch (ReflectiveOperationException | SecurityException ex) {
77+
return null;
78+
}
79+
}
80+
81+
private static ExceptionalCancellation cancelMethodOf(Class<?> clazz) {
82+
try {
83+
Method m = clazz.getMethod("cancel");
84+
Method mf = firstUnreflectableMethod(m);
85+
if (null != mf) {
86+
try {
87+
MethodHandle mh = MethodHandles.publicLookup()
88+
.unreflect(mf)
89+
.asType(MethodType.methodType(boolean.class, CompletionStage.class));
90+
return (p, b) -> (boolean)mh.invokeExact(p);
91+
} catch (IllegalAccessException ex) {
92+
// will try reflective
93+
}
94+
}
95+
return (p, b) -> (Boolean)m.invoke(p);
96+
} catch (ReflectiveOperationException | SecurityException ex) {
97+
return null;
98+
}
99+
}
100+
101+
private static ExceptionalCancellation completeExceptionallyMethodOf(Class<?> clazz) {
102+
try {
103+
Method m = clazz.getMethod("completeExceptionally", Throwable.class);
104+
Method mf = firstUnreflectableMethod(m);
105+
if (null != mf) {
106+
try {
107+
MethodHandle mh = MethodHandles.publicLookup()
108+
.unreflect(mf)
109+
.asType(MethodType.methodType(boolean.class, CompletionStage.class, CancellationException.class));
110+
return (p, b) -> (boolean)mh.invokeExact(p, new CancellationException());
111+
} catch (IllegalAccessException ex) {
112+
}
113+
}
114+
return (p, b) -> (Boolean)m.invoke(p, new CancellationException());
115+
} catch (ReflectiveOperationException | SecurityException ex) {
116+
return null;
117+
}
118+
}
119+
120+
private static Method firstUnreflectableMethod(Method m) {
121+
return firstUnreflectableMethod(m.getDeclaringClass(), m, new HashSet<>());
122+
}
123+
124+
private static Method firstUnreflectableMethod(Class<?> clazz, Method m, Set<Class<?>> visited) {
125+
if (visited.contains(clazz)) {
126+
return null;
127+
}
128+
visited.add(clazz);
129+
if ((clazz.getModifiers() & Modifier.PUBLIC) != 0) {
130+
try {
131+
Method parent = clazz.getDeclaredMethod(m.getName(), m.getParameterTypes());
132+
if ((parent.getModifiers() & Modifier.PUBLIC) != 0) {
133+
return parent;
134+
} else {
135+
return null;
136+
}
137+
} catch (NoSuchMethodException ex) {
138+
// Ok, will check parents
139+
}
140+
}
141+
return
142+
Stream.concat(Stream.of(clazz.getSuperclass()), Stream.of(clazz.getInterfaces()))
143+
.filter(Objects::nonNull)
144+
.map(superClazz -> firstUnreflectableMethod(superClazz, m, visited))
145+
.filter(Objects::nonNull)
146+
.findFirst()
147+
.orElse(null)
148+
;
149+
}
150+
151+
@FunctionalInterface
152+
static interface ExceptionalCancellation {
153+
boolean apply(CompletionStage<?> p, boolean b) throws Throwable;
154+
default Cancellation unchecked() {
155+
return (a, b) -> {
156+
try {
157+
return this.apply(a, b);
158+
} catch (Error | RuntimeException ex) {
159+
throw ex;
160+
} catch (Throwable ex) {
161+
throw new RuntimeException(ex);
162+
}
163+
};
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)