Skip to content

Commit 9936f54

Browse files
committed
Fixes request context lost in Infinispan cache get/getasync
1 parent 4e66327 commit 9936f54

File tree

7 files changed

+259
-75
lines changed

7 files changed

+259
-75
lines changed

extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java

Lines changed: 140 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,21 @@
33
import java.util.Map;
44
import java.util.Objects;
55
import java.util.Optional;
6+
import java.util.concurrent.Callable;
67
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.CompletionStage;
79
import java.util.concurrent.ConcurrentHashMap;
810
import java.util.concurrent.Executor;
911
import java.util.concurrent.Flow;
1012
import java.util.concurrent.TimeUnit;
13+
import java.util.function.BiConsumer;
1114
import java.util.function.Function;
1215
import java.util.function.Predicate;
16+
import java.util.function.Supplier;
1317

1418
import org.infinispan.client.hotrod.RemoteCache;
1519
import org.infinispan.client.hotrod.impl.protocol.Codec27;
1620
import org.infinispan.commons.util.NullValue;
17-
import org.infinispan.commons.util.concurrent.CompletionStages;
1821
import org.reactivestreams.FlowAdapters;
1922

2023
import io.quarkus.arc.Arc;
@@ -81,78 +84,65 @@ private <T> T decodeNull(Object value) {
8184

8285
@Override
8386
public <K, V> Uni<V> get(K key, Function<K, V> valueLoader) {
84-
return Uni.createFrom()
85-
.completionStage(() -> CompletionStages.handleAndCompose(remoteCache.getAsync(key), (v1, ex1) -> {
86-
if (ex1 != null) {
87-
return CompletableFuture.failedFuture(ex1);
88-
}
87+
Context context = Vertx.currentContext();
88+
Executor executor = duplicateContextExecutor(context);
8989

90+
return Uni.createFrom().completionStage(new Supplier<CompletionStage<V>>() {
91+
@Override
92+
public CompletionStage<V> get() {
93+
return remoteCache.getAsync(key);
94+
}
95+
})
96+
.emitOn(executor)
97+
.flatMap(v1 -> {
9098
if (v1 != null) {
91-
return CompletableFuture.completedFuture(decodeNull(v1));
99+
return Uni.createFrom()
100+
.completionStage(new Supplier<CompletionStage<V>>() {
101+
@Override
102+
public CompletionStage<V> get() {
103+
return CompletableFuture.completedFuture(InfinispanCacheImpl.this.decodeNull(v1));
104+
}
105+
})
106+
.emitOn(executor);
92107
}
93108

94109
CompletableFuture<V> resultAsync = new CompletableFuture<>();
95110
CompletableFuture<V> computedValue = computationResults.putIfAbsent(key, resultAsync);
111+
96112
if (computedValue != null) {
97-
return computedValue;
113+
return Uni.createFrom().completionStage(computedValue).emitOn(executor);
114+
}
115+
116+
if (context != null) {
117+
return Uni.createFrom().completionStage(new Supplier<CompletionStage<? extends V>>() {
118+
@Override
119+
public CompletionStage<? extends V> get() {
120+
return context.executeBlocking(new Callable<V>() {
121+
@Override
122+
public V call() throws Exception {
123+
return valueLoader.apply(key);
124+
}
125+
}).toCompletionStage()
126+
.thenComposeAsync(newValue -> {
127+
InfinispanCacheImpl.this.putIfAbsentInInfinispan(key, newValue, resultAsync,
128+
executor);
129+
return resultAsync;
130+
}, executor);
131+
}
132+
});
98133
}
134+
99135
V newValue = valueLoader.apply(key);
100-
remoteCache
101-
.putIfAbsentAsync(key, encodeNull(newValue), lifespan, TimeUnit.MILLISECONDS, maxIdle,
102-
TimeUnit.MILLISECONDS)
103-
.whenComplete((existing, ex2) -> {
104-
if (ex2 != null) {
105-
resultAsync.completeExceptionally((Throwable) ex2);
106-
} else if (existing == null) {
107-
resultAsync.complete(newValue);
108-
} else {
109-
resultAsync.complete(decodeNull(existing));
110-
}
111-
computationResults.remove(key);
112-
});
113-
return resultAsync;
114-
}));
136+
putIfAbsentInInfinispan(key, newValue, resultAsync, executor);
137+
return Uni.createFrom().completionStage(resultAsync).emitOn(executor);
138+
});
115139
}
116140

117141
@Override
118142
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
119143
Context context = Vertx.currentContext();
120-
121-
return Uni.createFrom().completionStage(CompletionStages.handleAndCompose(remoteCache.getAsync(key), (v1, ex1) -> {
122-
if (ex1 != null) {
123-
return CompletableFuture.failedFuture(ex1);
124-
}
125-
126-
if (v1 != null) {
127-
return CompletableFuture.completedFuture(decodeNull(v1));
128-
}
129-
130-
CompletableFuture<V> resultAsync = new CompletableFuture<>();
131-
CompletableFuture<V> computedValue = computationResults.putIfAbsent(key, resultAsync);
132-
if (computedValue != null) {
133-
return computedValue;
134-
}
135-
valueLoader.apply(key).convert().toCompletionStage()
136-
.whenComplete((newValue, ex2) -> {
137-
if (ex2 != null) {
138-
resultAsync.completeExceptionally(ex2);
139-
computationResults.remove(key);
140-
} else {
141-
remoteCache.putIfAbsentAsync(key, encodeNull(newValue), lifespan, TimeUnit.MILLISECONDS, maxIdle,
142-
TimeUnit.MILLISECONDS).whenComplete((existing, ex3) -> {
143-
if (ex3 != null) {
144-
resultAsync.completeExceptionally((Throwable) ex3);
145-
} else if (existing == null) {
146-
resultAsync.complete(newValue);
147-
} else {
148-
resultAsync.complete(decodeNull(existing));
149-
}
150-
computationResults.remove(key);
151-
});
152-
}
153-
});
154-
return resultAsync;
155-
})).emitOn(new Executor() {
144+
Executor executor = duplicateContextExecutor(context);
145+
return Uni.createFrom().completionStage(getFromInfinispanAsync(key, valueLoader, executor)).emitOn(new Executor() {
156146
// We need make sure we go back to the original context when the cache value is computed.
157147
// Otherwise, we would always emit on the context having computed the value, which could
158148
// break the duplicated context isolation.
@@ -194,17 +184,105 @@ public void handle(Void ignored) {
194184
}
195185
}
196186
}
197-
});
187+
}).emitOn(executor);
188+
}
189+
190+
private static Executor duplicateContextExecutor(Context context) {
191+
Executor executor = new Executor() {
192+
@Override
193+
public void execute(Runnable r) {
194+
if (context == null)
195+
r.run();
196+
else
197+
context.runOnContext(x -> r.run());
198+
}
199+
};
200+
return executor;
201+
}
202+
203+
private <K, V> CompletionStage<V> getFromInfinispanAsync(K key, Function<K, Uni<V>> valueLoader, Executor executor) {
204+
return remoteCache.getAsync(key)
205+
.exceptionallyAsync(ex -> ex, executor)
206+
.thenApplyAsync(new Function() {
207+
@Override
208+
public Object apply(Object v1) {
209+
if (v1 != null) {
210+
return CompletableFuture.completedFuture(InfinispanCacheImpl.this.decodeNull(v1));
211+
}
212+
213+
CompletableFuture<V> resultAsync = new CompletableFuture<>();
214+
CompletableFuture<V> computedValue = computationResults.putIfAbsent(key, resultAsync);
215+
216+
if (computedValue != null) {
217+
return computedValue;
218+
}
219+
220+
valueLoader.apply(key)
221+
.convert().toCompletionStage()
222+
.whenCompleteAsync(new BiConsumer<V, Throwable>() {
223+
@Override
224+
public void accept(V newValue, Throwable ex2) {
225+
if (ex2 != null) {
226+
resultAsync.completeExceptionally(ex2);
227+
computationResults.remove(key);
228+
} else {
229+
InfinispanCacheImpl.this.putIfAbsentInInfinispan(key, newValue, resultAsync,
230+
executor);
231+
}
232+
}
233+
}, executor);
234+
return resultAsync;
235+
}
236+
}, executor).thenComposeAsync(new Function() {
237+
@Override
238+
public Object apply(Object c) {
239+
return c;
240+
}
241+
}, executor);
242+
243+
}
244+
245+
private <K, V> void putIfAbsentInInfinispan(K key, V newValue, CompletableFuture<V> resultAsync, Executor executor) {
246+
remoteCache.putIfAbsentAsync(
247+
key,
248+
encodeNull(newValue),
249+
lifespan, TimeUnit.MILLISECONDS,
250+
maxIdle, TimeUnit.MILLISECONDS).whenCompleteAsync(new BiConsumer<Object, Throwable>() {
251+
@Override
252+
public void accept(Object existing, Throwable ex) {
253+
try {
254+
if (ex != null) {
255+
resultAsync.completeExceptionally(ex);
256+
} else if (existing == null) {
257+
resultAsync.complete(newValue);
258+
} else {
259+
resultAsync.complete(InfinispanCacheImpl.this.decodeNull(existing));
260+
}
261+
} finally {
262+
computationResults.remove(key);
263+
}
264+
}
265+
}, executor);
198266
}
199267

200268
@Override
201269
public Uni<Void> invalidate(Object key) {
202-
return Uni.createFrom().completionStage(() -> remoteCache.removeAsync(key));
270+
return Uni.createFrom().completionStage(new Supplier<CompletionStage<Void>>() {
271+
@Override
272+
public CompletionStage<Void> get() {
273+
return remoteCache.removeAsync(key);
274+
}
275+
});
203276
}
204277

205278
@Override
206279
public Uni<Void> invalidateAll() {
207-
return Uni.createFrom().completionStage(() -> remoteCache.clearAsync());
280+
return Uni.createFrom().completionStage(new Supplier<CompletionStage<Void>>() {
281+
@Override
282+
public CompletionStage<Void> get() {
283+
return remoteCache.clearAsync();
284+
}
285+
});
208286
}
209287

210288
@Override
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.quarkus.it.cache.infinispan;
2+
3+
import jakarta.inject.Inject;
4+
import jakarta.ws.rs.client.ClientRequestContext;
5+
import jakarta.ws.rs.ext.Provider;
6+
7+
@Provider
8+
public class ClientRequestFilter implements jakarta.ws.rs.client.ClientRequestFilter {
9+
10+
ClientRequestService requestService;
11+
12+
@Inject
13+
public ClientRequestFilter(ClientRequestService requestService) {
14+
this.requestService = requestService;
15+
}
16+
17+
@Override
18+
public void filter(ClientRequestContext requestContext) {
19+
if (requestService != null && requestService.data() != null) {
20+
requestContext.getHeaders().add("extra", requestService.data());
21+
}
22+
}
23+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.quarkus.it.cache.infinispan;
2+
3+
import jakarta.enterprise.context.RequestScoped;
4+
5+
@RequestScoped
6+
public class ClientRequestService {
7+
String data;
8+
9+
public String data() {
10+
return data;
11+
}
12+
13+
public ClientRequestService setData(String data) {
14+
this.data = data;
15+
return this;
16+
}
17+
}

integration-tests/infinispan-cache/src/main/java/io/quarkus/it/cache/infinispan/ExpensiveResource.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,71 @@
22

33
import java.util.concurrent.atomic.AtomicInteger;
44

5+
import jakarta.inject.Inject;
6+
import jakarta.ws.rs.DELETE;
57
import jakarta.ws.rs.GET;
6-
import jakarta.ws.rs.POST;
78
import jakarta.ws.rs.Path;
89
import jakarta.ws.rs.PathParam;
910
import jakarta.ws.rs.QueryParam;
11+
import jakarta.ws.rs.core.Response;
1012

1113
import org.infinispan.protostream.GeneratedSchema;
1214
import org.infinispan.protostream.annotations.Proto;
1315
import org.infinispan.protostream.annotations.ProtoSchema;
1416

17+
import io.quarkus.cache.CacheInvalidate;
1518
import io.quarkus.cache.CacheInvalidateAll;
1619
import io.quarkus.cache.CacheKey;
1720
import io.quarkus.cache.CacheResult;
21+
import io.smallrye.mutiny.Uni;
1822

1923
@Path("/expensive-resource")
2024
public class ExpensiveResource {
2125

2226
private final AtomicInteger invocations = new AtomicInteger(0);
2327

28+
@Inject
29+
ClientRequestService requestService;
30+
2431
@GET
2532
@Path("/{keyElement1}/{keyElement2}/{keyElement3}")
2633
@CacheResult(cacheName = "expensiveResourceCache")
2734
public ExpensiveResponse getExpensiveResponse(@PathParam("keyElement1") @CacheKey String keyElement1,
28-
@PathParam("keyElement2") @CacheKey String keyElement2, @PathParam("keyElement3") @CacheKey String keyElement3,
35+
@PathParam("keyElement2") @CacheKey String keyElement2,
36+
@PathParam("keyElement3") @CacheKey String keyElement3,
2937
@QueryParam("foo") String foo) {
3038
invocations.incrementAndGet();
39+
requestService.setData("getExpensiveResponse " + foo);
3140
return new ExpensiveResponse(keyElement1 + " " + keyElement2 + " " + keyElement3 + " too!");
3241
}
3342

34-
@POST
43+
@GET
44+
@Path("/async/{keyElement1}/{keyElement2}/{keyElement3}")
45+
@CacheResult(cacheName = "expensiveResourceCache")
46+
public Uni<ExpensiveResponse> getExpensiveResponseAsync(@PathParam("keyElement1") @CacheKey String keyElement1,
47+
@PathParam("keyElement2") @CacheKey String keyElement2,
48+
@PathParam("keyElement3") @CacheKey String keyElement3,
49+
@QueryParam("foo") String foo) {
50+
invocations.incrementAndGet();
51+
requestService.setData("getExpensiveResponseAsync " + foo);
52+
return Uni.createFrom()
53+
.item(new ExpensiveResponse(keyElement1 + " " + keyElement2 + " " + keyElement3 + " async too!"));
54+
}
55+
56+
@DELETE
57+
@Path("/{keyElement1}/{keyElement2}/{keyElement3}")
58+
@CacheInvalidate(cacheName = "expensiveResourceCache")
59+
public Response resetExpensiveResponse(@PathParam("keyElement1") @CacheKey String keyElement1,
60+
@PathParam("keyElement2") @CacheKey String keyElement2, @PathParam("keyElement3") @CacheKey String keyElement3,
61+
@QueryParam("foo") String foo) {
62+
requestService.setData("invalidate");
63+
return Response.ok().build();
64+
}
65+
66+
@DELETE
3567
@CacheInvalidateAll(cacheName = "expensiveResourceCache")
3668
public void invalidateAll() {
37-
69+
requestService.setData("invalidateAll");
3870
}
3971

4072
@GET

0 commit comments

Comments
 (0)