Skip to content

Commit e54d3b1

Browse files
authored
Merge pull request #35019 from cescoffier/fix-wrong-scheduler-on-blocking-method-for-grpc
2 parents 3fb7c7b + 7b159c8 commit e54d3b1

File tree

3 files changed

+87
-22
lines changed

3 files changed

+87
-22
lines changed

extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ void processGeneratedBeans(CombinedIndexBuildItem index, BuildProducer<Annotatio
175175
Set<String> blockingMethods = gatherBlockingOrVirtualMethodNames(userDefinedBean, index.getIndex(), false);
176176
Set<String> virtualMethods = gatherBlockingOrVirtualMethodNames(userDefinedBean, index.getIndex(), true);
177177
generatedBeans.put(generatedBean.name(), blockingMethods);
178-
virtuals.put(generatedBean.name(), virtualMethods);
178+
if (!virtualMethods.isEmpty()) {
179+
virtuals.put(generatedBean.name(), virtualMethods);
180+
}
179181
}
180182
}
181183

@@ -451,7 +453,7 @@ static Set<String> gatherBlockingOrVirtualMethodNames(ClassInfo service, IndexVi
451453
implBaseMethod.parameterTypes().toArray(new Type[0]));
452454
if (virtual && blocking == BlockingMode.VIRTUAL_THREAD) {
453455
result.add(methodName);
454-
} else if (blocking.blocking) {
456+
} else if (!virtual && blocking.blocking) {
455457
result.add(methodName);
456458
}
457459
}

extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java

Lines changed: 83 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Queue;
88
import java.util.Set;
99
import java.util.concurrent.ConcurrentHashMap;
10+
import java.util.concurrent.ConcurrentLinkedQueue;
1011
import java.util.concurrent.Executor;
1112
import java.util.function.Consumer;
1213
import java.util.function.Function;
@@ -47,9 +48,12 @@ public BlockingServerInterceptor(Vertx vertx, List<String> blockingMethods, List
4748
this.blockingMethods = new HashSet<>();
4849
this.virtualMethods = new HashSet<>();
4950
this.devMode = devMode;
50-
for (String method : blockingMethods) {
51-
this.blockingMethods.add(method.toLowerCase());
51+
if (blockingMethods != null) {
52+
for (String method : blockingMethods) {
53+
this.blockingMethods.add(method.toLowerCase());
54+
}
5255
}
56+
5357
if (virtualMethods != null) {
5458
for (String method : virtualMethods) {
5559
this.virtualMethods.add(method.toLowerCase());
@@ -92,7 +96,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
9296
// it is initialized by io.quarkus.grpc.runtime.supports.context.GrpcRequestContextGrpcInterceptor
9397
// that should always be called before this interceptor
9498
ContextState state = requestContext.getState();
95-
ReplayListener<ReqT> replay = new ReplayListener<>(state, true);
99+
VirtualReplayListener<ReqT> replay = new VirtualReplayListener<>(state);
96100
virtualThreadExecutor.execute(() -> {
97101
ServerCall.Listener<ReqT> listener;
98102
try {
@@ -110,7 +114,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
110114
// it is initialized by io.quarkus.grpc.runtime.supports.context.GrpcRequestContextGrpcInterceptor
111115
// that should always be called before this interceptor
112116
ContextState state = requestContext.getState();
113-
ReplayListener<ReqT> replay = new ReplayListener<>(state, false);
117+
ReplayListener<ReqT> replay = new ReplayListener<>(state);
114118
vertx.executeBlocking(f -> {
115119
ServerCall.Listener<ReqT> listener;
116120
try {
@@ -138,16 +142,14 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
138142
*/
139143
private class ReplayListener<ReqT> extends ServerCall.Listener<ReqT> {
140144
private final InjectableContext.ContextState requestContextState;
141-
private final boolean virtual;
142145

143146
// exclusive to event loop context
144147
private ServerCall.Listener<ReqT> delegate;
145148
private final Queue<Consumer<ServerCall.Listener<ReqT>>> incomingEvents = new LinkedList<>();
146149
private boolean isConsumingFromIncomingEvents = false;
147150

148-
private ReplayListener(InjectableContext.ContextState requestContextState, boolean virtual) {
151+
private ReplayListener(InjectableContext.ContextState requestContextState) {
149152
this.requestContextState = requestContextState;
150-
this.virtual = virtual;
151153
}
152154

153155
/**
@@ -161,22 +163,14 @@ void setDelegate(ServerCall.Listener<ReqT> delegate) {
161163
if (!this.isConsumingFromIncomingEvents) {
162164
Consumer<ServerCall.Listener<ReqT>> consumer = incomingEvents.poll();
163165
if (consumer != null) {
164-
if (virtual) {
165-
executeVirtualWithRequestContext(consumer);
166-
} else {
167-
executeBlockingWithRequestContext(consumer);
168-
}
166+
executeBlockingWithRequestContext(consumer);
169167
}
170168
}
171169
}
172170

173171
private void scheduleOrEnqueue(Consumer<ServerCall.Listener<ReqT>> consumer) {
174172
if (this.delegate != null && !this.isConsumingFromIncomingEvents) {
175-
if (virtual) {
176-
executeVirtualWithRequestContext(consumer);
177-
} else {
178-
executeBlockingWithRequestContext(consumer);
179-
}
173+
executeBlockingWithRequestContext(consumer);
180174
} else {
181175
incomingEvents.add(consumer);
182176
}
@@ -198,7 +192,7 @@ private void executeBlockingWithRequestContext(Consumer<ServerCall.Listener<ReqT
198192
blockingHandler);
199193
}
200194
this.isConsumingFromIncomingEvents = true;
201-
vertx.executeBlocking(blockingHandler, false, p -> {
195+
vertx.executeBlocking(blockingHandler, true, p -> {
202196
Consumer<ServerCall.Listener<ReqT>> next = incomingEvents.poll();
203197
if (next != null) {
204198
executeBlockingWithRequestContext(next);
@@ -208,6 +202,77 @@ private void executeBlockingWithRequestContext(Consumer<ServerCall.Listener<ReqT
208202
});
209203
}
210204

205+
@Override
206+
public void onMessage(ReqT message) {
207+
scheduleOrEnqueue(t -> t.onMessage(message));
208+
}
209+
210+
@Override
211+
public void onHalfClose() {
212+
scheduleOrEnqueue(ServerCall.Listener::onHalfClose);
213+
}
214+
215+
@Override
216+
public void onCancel() {
217+
scheduleOrEnqueue(ServerCall.Listener::onCancel);
218+
}
219+
220+
@Override
221+
public void onComplete() {
222+
scheduleOrEnqueue(ServerCall.Listener::onComplete);
223+
}
224+
225+
@Override
226+
public void onReady() {
227+
scheduleOrEnqueue(ServerCall.Listener::onReady);
228+
}
229+
}
230+
231+
/**
232+
* Stores the incoming events until the listener is injected.
233+
* When injected, replay the events.
234+
* <p>
235+
* Note that event must be executed in order, explaining why incomingEvents
236+
* are executed sequentially
237+
* <p>
238+
* This replay listener is only used for virtual threads.
239+
*/
240+
private class VirtualReplayListener<ReqT> extends ServerCall.Listener<ReqT> {
241+
private final InjectableContext.ContextState requestContextState;
242+
243+
// exclusive to event loop context
244+
private ServerCall.Listener<ReqT> delegate;
245+
private final Queue<Consumer<ServerCall.Listener<ReqT>>> incomingEvents = new ConcurrentLinkedQueue<>();
246+
private volatile boolean isConsumingFromIncomingEvents = false;
247+
248+
private VirtualReplayListener(InjectableContext.ContextState requestContextState) {
249+
this.requestContextState = requestContextState;
250+
}
251+
252+
/**
253+
* Must be called from within the event loop context
254+
* If there are deferred events will start executing them in the shared worker context
255+
*
256+
* @param delegate the original
257+
*/
258+
void setDelegate(ServerCall.Listener<ReqT> delegate) {
259+
this.delegate = delegate;
260+
if (!this.isConsumingFromIncomingEvents) {
261+
Consumer<ServerCall.Listener<ReqT>> consumer = incomingEvents.poll();
262+
if (consumer != null) {
263+
executeVirtualWithRequestContext(consumer);
264+
}
265+
}
266+
}
267+
268+
private void scheduleOrEnqueue(Consumer<ServerCall.Listener<ReqT>> consumer) {
269+
if (this.delegate != null && !this.isConsumingFromIncomingEvents) {
270+
executeVirtualWithRequestContext(consumer);
271+
} else {
272+
incomingEvents.add(consumer);
273+
}
274+
}
275+
211276
private void executeVirtualWithRequestContext(Consumer<ServerCall.Listener<ReqT>> consumer) {
212277
final Context grpcContext = Context.current();
213278
Handler<Promise<Object>> blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate,

integration-tests/grpc-hibernate/src/test/java/com/example/grpc/hibernate/BlockingRawTestBase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.concurrent.CopyOnWriteArrayList;
1010

1111
import org.junit.jupiter.api.BeforeEach;
12-
import org.junit.jupiter.api.Disabled;
1312
import org.junit.jupiter.api.Test;
1413
import org.junit.jupiter.api.Timeout;
1514

@@ -39,7 +38,6 @@ void clear() {
3938
}
4039

4140
@Test
42-
@Disabled("Flaky. See for instance https://github.com/quarkusio/quarkus/pull/27590#issuecomment-1231802402")
4341
@Timeout(TIMEOUT)
4442
void shouldAdd() {
4543
List<String> expected = new ArrayList<>();

0 commit comments

Comments
 (0)