Skip to content

Commit aec300d

Browse files
authored
Merge pull request #488 from mkouba/issue-486_1-7-x
ws: use InboundProcessingMode#CONCURRENT for generated endpoints
2 parents ebb94e0 + 50cd954 commit aec300d

File tree

2 files changed

+130
-1
lines changed

2 files changed

+130
-1
lines changed

transports/websocket/deployment/src/main/java/io/quarkiverse/mcp/server/websocket/deployment/WebSocketMcpServerProcessor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.quarkus.gizmo.Type;
3838
import io.quarkus.runtime.util.HashUtil;
3939
import io.quarkus.security.identity.CurrentIdentityAssociation;
40+
import io.quarkus.websockets.next.InboundProcessingMode;
4041
import io.quarkus.websockets.next.WebSocket;
4142
import io.vertx.core.Vertx;
4243

@@ -69,7 +70,10 @@ void generateEndpoints(McpWebSocketServersBuildTimeConfig config, BuildProducer<
6970
.build();
7071
// @WebSocket(path = "/foo/bar")
7172
endpointCreator.addAnnotation(
72-
AnnotationInstance.builder(WebSocket.class).add("path", e.getValue().websocket().endpointPath()).build());
73+
AnnotationInstance.builder(WebSocket.class)
74+
.add("path", e.getValue().websocket().endpointPath())
75+
.add("inboundProcessingMode", InboundProcessingMode.CONCURRENT)
76+
.build());
7377

7478
Class<?>[] params = new Class<?>[] {
7579
McpServersRuntimeConfig.class,
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package io.quarkiverse.mcp.server.websocket.test.cancel;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
10+
import jakarta.inject.Inject;
11+
12+
import org.awaitility.Awaitility;
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.extension.RegisterExtension;
15+
16+
import io.quarkiverse.mcp.server.Cancellation;
17+
import io.quarkiverse.mcp.server.Cancellation.OperationCancellationException;
18+
import io.quarkiverse.mcp.server.Cancellation.Result;
19+
import io.quarkiverse.mcp.server.Tool;
20+
import io.quarkiverse.mcp.server.ToolArg;
21+
import io.quarkiverse.mcp.server.ToolManager;
22+
import io.quarkiverse.mcp.server.ToolResponse;
23+
import io.quarkiverse.mcp.server.test.McpAssured;
24+
import io.quarkiverse.mcp.server.test.McpAssured.McpWebSocketTestClient;
25+
import io.quarkiverse.mcp.server.websocket.test.McpServerTest;
26+
import io.quarkus.runtime.Startup;
27+
import io.quarkus.test.QuarkusUnitTest;
28+
import io.vertx.core.json.JsonObject;
29+
30+
public class CancellationTest extends McpServerTest {
31+
32+
@RegisterExtension
33+
static final QuarkusUnitTest config = defaultConfig()
34+
.withApplicationRoot(
35+
root -> root.addClasses(MyTools.class));
36+
37+
@Test
38+
public void testCancellation() throws InterruptedException {
39+
assertCancellation("alpha", MyTools.ALPHA_LATCH);
40+
assertCancellation("bravo", MyTools.BRAVO_LATCH);
41+
}
42+
43+
private void assertCancellation(String toolName, CountDownLatch latch) throws InterruptedException {
44+
McpWebSocketTestClient client = McpAssured.newConnectedWebSocketClient();
45+
MyTools.CANCELLED.set(false);
46+
47+
JsonObject request = client.newRequest("tools/call")
48+
.put("params", new JsonObject()
49+
.put("name", toolName));
50+
client.sendAndForget(request);
51+
52+
// Wait for the tool execution start
53+
assertTrue(latch.await(5, TimeUnit.SECONDS));
54+
55+
JsonObject notification = client.newMessage("notifications/cancelled").put("params",
56+
new JsonObject()
57+
.put("requestId", request.getValue("id"))
58+
.put("reason", "No reason at all"));
59+
60+
client.sendAndForget(notification);
61+
// This notification should be ignored
62+
client.sendAndForget(notification);
63+
64+
Awaitility.await().until(() -> MyTools.CANCELLED.get());
65+
// Only the response to the "initialize" request
66+
assertEquals(1, client.snapshot().responses().size());
67+
}
68+
69+
public static class MyTools {
70+
71+
static final CountDownLatch ALPHA_LATCH = new CountDownLatch(1);
72+
static final CountDownLatch BRAVO_LATCH = new CountDownLatch(1);
73+
74+
static final AtomicBoolean CANCELLED = new AtomicBoolean();
75+
76+
@Inject
77+
ToolManager manager;
78+
79+
@Startup
80+
void onStart() {
81+
manager.newTool("alpha")
82+
.setDescription("alpha description!")
83+
.setHandler(
84+
toolArgs -> {
85+
ALPHA_LATCH.countDown();
86+
int c = 0;
87+
while (c++ < 20) {
88+
Result r = toolArgs.cancellation().check();
89+
if (r.isRequested()
90+
&& r.reason().isPresent()
91+
&& r.reason().get().equals("No reason at all")) {
92+
CANCELLED.set(true);
93+
throw new OperationCancellationException();
94+
}
95+
try {
96+
TimeUnit.MILLISECONDS.sleep(500);
97+
} catch (InterruptedException e) {
98+
throw new RuntimeException();
99+
}
100+
}
101+
return ToolResponse.success("OK");
102+
})
103+
.register();
104+
}
105+
106+
@Tool
107+
String bravo(Cancellation cancellation, @ToolArg(defaultValue = "1") int price) throws InterruptedException {
108+
BRAVO_LATCH.countDown();
109+
int c = 0;
110+
while (c++ < 20) {
111+
Result r = cancellation.check();
112+
if (r.isRequested()
113+
&& r.reason().isPresent()
114+
&& r.reason().get().equals("No reason at all")) {
115+
CANCELLED.set(true);
116+
throw new OperationCancellationException();
117+
}
118+
TimeUnit.MILLISECONDS.sleep(500);
119+
}
120+
return "OK";
121+
}
122+
123+
}
124+
125+
}

0 commit comments

Comments
 (0)