diff --git a/README.md b/README.md index bc9fef7..bc0f0a7 100644 --- a/README.md +++ b/README.md @@ -6,15 +6,17 @@ The MCP (Model Context Protocol) Server Plugin for Jenkins implements the server - **MCP Server Implementation**: Implements the server-side of the Model Context Protocol. - **Jenkins Integration**: Exposes Jenkins functionalities as MCP tools and resources. -- **Real-time Communication**: Uses Server-Sent Events (SSE) for efficient, real-time communication with clients. +- **Multiple Transport Options**: Supports both Server-Sent Events (SSE) and Streamable transport for efficient, real-time communication with clients. +- **Backward Compatibility**: Maintains support for legacy SSE transport while providing modern streamable transport. - **Extensible Architecture**: Allows easy extension of MCP capabilities through the `McpServerExtension` interface. ## Key Components -1. **Endpoint**: The main entry point for MCP communication, handling SSE connections and message routing. -2. **DefaultMcpServer**: Implements `McpServerExtension`, providing default tools for interacting with Jenkins jobs and builds. -3. **McpToolWrapper**: Wraps Java methods as MCP tools, handling parameter parsing and result formatting. -4. **McpServerExtension**: Interface for extending MCP server capabilities. +1. **Endpoint**: The main entry point for legacy SSE-based MCP communication, handling SSE connections and message routing. +2. **StreamableEndpoint**: The modern entry point for streamable MCP communication, providing enhanced transport capabilities with message replay and improved session management. +3. **DefaultMcpServer**: Implements `McpServerExtension`, providing default tools for interacting with Jenkins jobs and builds. +4. **McpToolWrapper**: Wraps Java methods as MCP tools, handling parameter parsing and result formatting. +5. **McpServerExtension**: Interface for extending MCP server capabilities. ## MCP SDK Version @@ -34,17 +36,43 @@ The MCP Server plugin automatically sets up necessary endpoints and tools upon i ### Connecting to the MCP Server -MCP clients can connect to the server using: +MCP clients can connect to the server using either transport option: +#### Streamable Transport (Recommended) +- Streamable Endpoint: `/mcp-server/streamable` +- Supports message replay and enhanced session management +- Better error handling and connection recovery + +#### Legacy SSE Transport - SSE Endpoint: `/mcp-server/sse` - Message Endpoint: `/mcp-server/message` +- Maintained for backward compatibility ### Authentication and Credentials The MCP Server Plugin requires the same credentials as the Jenkins instance it's running on. To authenticate your MCP queries: 1. **Jenkins API Token**: Generate an API token from your Jenkins user account. -2. **Basic Authentication**: Use the API token in the HTTP Basic Authentication header. Below is an example of VS code settings.xml +2. **Basic Authentication**: Use the API token in the HTTP Basic Authentication header. Below are examples for both transport types: + +#### Streamable Transport Configuration (Recommended) +```json +{ + "mcp": { + "servers": { + "jenkins": { + "type": "http", + "url": "https://jenkins-host/mcp-server/streamable", + "headers": { + "Authorization": "Basic " + } + } + } + } +} +``` + +#### Legacy SSE Transport Configuration ```json { "mcp": { @@ -60,6 +88,20 @@ The MCP Server Plugin requires the same credentials as the Jenkins instance it's } } ``` + +### Transport Comparison + +| Feature | Streamable Transport | Legacy SSE Transport | +|---------|---------------------|---------------------| +| **Message Replay** | ✅ Supports replay from Last-Event-ID | ❌ No replay support | +| **Session Management** | ✅ Enhanced with proper lifecycle handling | ⚠️ Basic session handling | +| **Error Recovery** | ✅ Robust error handling and recovery | ⚠️ Limited error recovery | +| **Connection Stability** | ✅ Better connection management | ⚠️ Basic connection handling | +| **Performance** | ✅ Optimized for modern clients | ⚠️ Legacy performance | +| **Backward Compatibility** | ❌ Requires modern MCP clients | ✅ Works with older clients | + +**Recommendation**: Use Streamable Transport for new integrations and modern MCP clients. Legacy SSE Transport is maintained for backward compatibility. + Example of using the token: ### Available Tools @@ -98,7 +140,7 @@ The plugin provides the following built-in tools for interacting with Jenkins: Each tool accepts specific parameters to customize its behavior. For detailed usage instructions and parameter descriptions, refer to the API documentation or use the MCP introspection capabilities. -To use these tools, connect to the MCP server endpoint and make tool calls using your MCP client implementation. +To use these tools, connect to either the streamable endpoint (`/mcp-server/mcp`) or the legacy SSE endpoint (`/mcp-server/sse`) and make tool calls using your MCP client implementation. ### Extending MCP Capabilities To add new MCP tools or functionalities: diff --git a/pom.xml b/pom.xml index 61c12aa..3fd06d7 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ io.modelcontextprotocol.sdk mcp-bom - 0.10.0 + 0.11.0 pom import diff --git a/src/main/java/io/jenkins/plugins/mcp/server/Endpoint.java b/src/main/java/io/jenkins/plugins/mcp/server/Endpoint.java index dbb9055..5a2acb0 100644 --- a/src/main/java/io/jenkins/plugins/mcp/server/Endpoint.java +++ b/src/main/java/io/jenkins/plugins/mcp/server/Endpoint.java @@ -31,6 +31,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import hudson.Extension; +import hudson.ExtensionList; import hudson.model.RootAction; import hudson.model.User; import hudson.security.csrf.CrumbExclusion; @@ -107,9 +108,11 @@ public class Endpoint extends CrumbExclusion implements RootAction, McpServerTra */ private McpServerSession.Factory sessionFactory; - public Endpoint() throws ServletException { + private StreamableEndpoint streamableEndpoint; + public Endpoint() throws ServletException { init(); + streamableEndpoint = ExtensionList.lookupSingleton(StreamableEndpoint.class); } public static String getRequestedResourcePath(HttpServletRequest httpServletRequest) { @@ -132,6 +135,10 @@ public boolean process(HttpServletRequest request, HttpServletResponse response, response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); return true; } + if (requestedResource.startsWith("/" + StreamableEndpoint.MCP_SERVER_STREAMABLE)) { + streamableEndpoint.process(request, response, chain); + return true; + } return false; } @@ -177,6 +184,11 @@ protected void init() throws ServletException { public void doFilter( ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { + if (streamableEndpoint.isStreamableRequest(servletRequest, servletResponse)) { + streamableEndpoint.process( + (HttpServletRequest) servletRequest, (HttpServletResponse) servletResponse, filterChain); + return; + } if (isSSERequest(servletRequest, servletResponse)) { handleSSE((HttpServletRequest) servletRequest, (HttpServletResponse) servletResponse); } else { @@ -357,7 +369,7 @@ protected void handleMessage(HttpServletRequest request, HttpServletResponse res && jsonrpcRequest.params() instanceof Map params) { Map arguments = (Map) params.get("arguments"); if (arguments != null) { - arguments.put("userId", sessionObject.userId); + arguments.put(McpToolWrapper.USER_ID_KEY, sessionObject.userId); } } } diff --git a/src/main/java/io/jenkins/plugins/mcp/server/StreamableEndpoint.java b/src/main/java/io/jenkins/plugins/mcp/server/StreamableEndpoint.java new file mode 100644 index 0000000..d873755 --- /dev/null +++ b/src/main/java/io/jenkins/plugins/mcp/server/StreamableEndpoint.java @@ -0,0 +1,689 @@ +/* + * + * The MIT License + * + * Copyright (c) 2025, Gong Yi. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + */ + +package io.jenkins.plugins.mcp.server; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import hudson.Extension; +import hudson.model.User; +import hudson.security.csrf.CrumbExclusion; +import hudson.util.PluginServletFilter; +import io.jenkins.plugins.mcp.server.annotation.Tool; +import io.jenkins.plugins.mcp.server.tool.McpToolWrapper; +import io.modelcontextprotocol.server.McpServerFeatures; +import io.modelcontextprotocol.spec.HttpHeaders; +import io.modelcontextprotocol.spec.McpError; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpStreamableServerSession; +import io.modelcontextprotocol.spec.McpStreamableServerTransport; +import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider; +import jakarta.servlet.AsyncContext; +import jakarta.servlet.AsyncEvent; +import jakarta.servlet.AsyncListener; +import jakarta.servlet.Filter; +import jakarta.servlet.FilterChain; +import jakarta.servlet.ServletException; +import jakarta.servlet.ServletRequest; +import jakarta.servlet.ServletResponse; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.StringUtils; +import org.kohsuke.accmod.Restricted; +import org.kohsuke.accmod.restrictions.NoExternalUse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +/** + * Streamable HTTP transport endpoint for MCP server. + * This endpoint implements the new streamable transport protocol at /mcp. + */ +@Restricted(NoExternalUse.class) +@Extension +public class StreamableEndpoint extends CrumbExclusion implements McpStreamableServerTransportProvider { + public static final String UTF_8 = "UTF-8"; + public static final String APPLICATION_JSON = "application/json"; + public static final String FAILED_TO_SEND_ERROR_RESPONSE = "Failed to send error response: {}"; + public static final String MCP_SERVER = "mcp-server"; + public static final String STREAMABLE_ENDPOINT = "/streamable"; + public static final String MCP_SERVER_STREAMABLE = MCP_SERVER + STREAMABLE_ENDPOINT; + + /** + * Event type for regular messages + */ + public static final String MESSAGE_EVENT_TYPE = "message"; + + private static final Logger logger = LoggerFactory.getLogger(StreamableEndpoint.class); + + /** + * JSON object mapper for serialization/deserialization + */ + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Map of active client sessions, keyed by session ID + */ + private final Map sessions = new ConcurrentHashMap<>(); + + /** + * Flag indicating if the transport is in the process of shutting down + */ + private final AtomicBoolean isClosing = new AtomicBoolean(false); + + /** + * Session factory for creating new sessions + */ + private McpStreamableServerSession.Factory sessionFactory; + + public StreamableEndpoint() throws ServletException { + init(); + } + + protected void init() throws ServletException { + McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() + .tools(true) + .prompts(true) + .resources(true, true) + .build(); + var extensions = McpServerExtension.all(); + + var tools = extensions.stream() + .map(McpServerExtension::getSyncTools) + .flatMap(List::stream) + .toList(); + var prompts = extensions.stream() + .map(McpServerExtension::getSyncPrompts) + .flatMap(List::stream) + .toList(); + var resources = extensions.stream() + .map(McpServerExtension::getSyncResources) + .flatMap(List::stream) + .toList(); + + var annotationTools = extensions.stream() + .flatMap(extension -> Arrays.stream(extension.getClass().getMethods()) + .filter(method -> method.isAnnotationPresent(Tool.class)) + .map(method -> new McpToolWrapper(objectMapper, extension, method).asSyncToolSpecification())) + .toList(); + + List allTools = new ArrayList<>(); + allTools.addAll(tools); + allTools.addAll(annotationTools); + io.modelcontextprotocol.server.McpServer.sync(this) + .capabilities(serverCapabilities) + .tools(allTools) + .prompts(prompts) + .resources(resources) + .build(); + + try { + PluginServletFilter.addFilter(new Filter() { + @Override + public void doFilter( + ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) + throws IOException, ServletException { + if (isStreamableRequest(servletRequest, servletResponse)) { + // Handle streamable requests through the process method + if (servletRequest instanceof HttpServletRequest request + && servletResponse instanceof HttpServletResponse response) { + if (!process(request, response, filterChain)) { + filterChain.doFilter(servletRequest, servletResponse); + } + } else { + filterChain.doFilter(servletRequest, servletResponse); + } + } else { + filterChain.doFilter(servletRequest, servletResponse); + } + } + + @Override + public void destroy() { + closeGracefully().block(); + } + }); + } catch (ServletException e) { + logger.error("Failed to register servlet filter", e); + throw e; + } + } + + protected String getRequestedResourcePath(HttpServletRequest request) { + String requestURI = request.getRequestURI(); + String contextPath = request.getContextPath(); + if (contextPath != null && !contextPath.isEmpty()) { + requestURI = requestURI.substring(contextPath.length()); + } + return requestURI; + } + + @Override + public String protocolVersion() { + // FIXME this should be done in sdk itself as returning "2024-11-05" + // looks wrong for streamable + // see https://github.com/modelcontextprotocol/java-sdk/pull/441 + return "2025-03-26"; + } + + private static final List ACCEPTED_METHODS = List.of("POST", "GET"); + + @Override + public boolean process(HttpServletRequest request, HttpServletResponse response, FilterChain chain) + throws IOException, ServletException { + + String method = StringUtils.upperCase(request.getMethod(), Locale.ENGLISH); + + if (!ACCEPTED_METHODS.contains(method)) { + response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, "Method not allowed: " + method); + return true; + } + + String requestedResource = getRequestedResourcePath(request); + if (requestedResource.startsWith("/" + MCP_SERVER_STREAMABLE) && method.equalsIgnoreCase("POST")) { + handleStreamableMessage(request, response); + return true; + } + if (requestedResource.startsWith("/" + MCP_SERVER_STREAMABLE) && method.equalsIgnoreCase("GET")) { + handleStreamableConnect(request, response); + return true; + } + return false; + } + + @Override + public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) { + this.sessionFactory = sessionFactory; + } + + @Override + public Mono notifyClients(String method, Object params) { + if (sessions.isEmpty()) { + logger.debug("No active sessions to broadcast message to"); + return Mono.empty(); + } + + logger.debug("Attempting to broadcast message to {} active sessions", sessions.size()); + + return Mono.fromRunnable(() -> { + sessions.values().parallelStream().forEach(sessionObject -> { + try { + sessionObject.session.sendNotification(method, params).block(); + } catch (Exception e) { + logger.error( + "Failed to send message to sessionObject {}: {}", + sessionObject.session.getId(), + e.getMessage()); + } + }); + }); + } + + boolean isStreamableRequest(ServletRequest servletRequest, ServletResponse servletResponse) { + if (servletRequest instanceof HttpServletRequest request && servletResponse instanceof HttpServletResponse) { + String requestedResource = getRequestedResourcePath(request); + return requestedResource.startsWith("/" + MCP_SERVER_STREAMABLE); + } + return false; + } + + @Override + public Mono closeGracefully() { + return Mono.fromRunnable(() -> { + isClosing.set(true); + logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()); + + sessions.values().parallelStream().forEach(sessionObject -> { + try { + sessionObject.session.closeGracefully().block(); + } catch (Exception e) { + logger.error("Failed to close sessionObject {}: {}", sessionObject.session.getId(), e.getMessage()); + } + }); + + sessions.clear(); + logger.debug("Graceful shutdown completed"); + }); + } + + /** + * Handles streamable SSE connection requests for message replay. + */ + protected void handleStreamableConnect(HttpServletRequest request, HttpServletResponse response) + throws IOException { + if (isClosing.get()) { + response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Server is shutting down"); + return; + } + + List badRequestErrors = new ArrayList<>(); + + String accept = request.getHeader("Accept"); + if (accept == null || !accept.contains("text/event-stream")) { + badRequestErrors.add("text/event-stream required in Accept header"); + } + + String sessionId = request.getHeader(HttpHeaders.MCP_SESSION_ID); + if (sessionId == null || sessionId.isBlank()) { + badRequestErrors.add("Session ID required in mcp-sessionObject-id header"); + } + + if (!badRequestErrors.isEmpty()) { + String combinedMessage = String.join("; ", badRequestErrors); + responseError(response, HttpServletResponse.SC_BAD_REQUEST, new McpError(combinedMessage)); + return; + } + + SessionObject sessionObject = sessions.get(sessionId); + if (sessionObject == null) { + response.sendError(HttpServletResponse.SC_NOT_FOUND); + return; + } + + logger.debug("Handling streamable GET request for sessionObject: {}", sessionId); + + try { + response.setContentType("text/event-stream"); + response.setCharacterEncoding(UTF_8); + response.setHeader("Cache-Control", "no-cache"); + response.setHeader("Connection", "keep-alive"); + response.setHeader("Access-Control-Allow-Origin", "*"); + + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + + HttpServletStreamableMcpSessionTransport sessionTransport = + new HttpServletStreamableMcpSessionTransport(sessionId, asyncContext, response.getWriter()); + + // Check if this is a replay request + String lastEventId = request.getHeader("Last-Event-ID"); + if (lastEventId != null) { + logger.debug("Handling replay request for sessionObject {} from event ID: {}", sessionId, lastEventId); + try { + sessionObject.session.replay(lastEventId).toIterable().forEach(message -> { + try { + sessionTransport.sendMessage(message).block(); + } catch (Exception e) { + logger.error("Failed to replay message: {}", e.getMessage()); + asyncContext.complete(); + } + }); + } catch (Exception e) { + logger.error("Failed to replay messages: {}", e.getMessage()); + asyncContext.complete(); + } + } else { + // Establish new listening stream + logger.debug("Establishing new listening stream for sessionObject: {}", sessionId); + var listeningStream = sessionObject.session.listeningStream(sessionTransport); + + // Set up cleanup when connection closes + asyncContext.addListener(new AsyncListener() { + @Override + public void onComplete(AsyncEvent event) { + logger.debug("SSE connection completed for sessionObject: {}", sessionId); + listeningStream.close(); + } + + @Override + public void onTimeout(AsyncEvent event) { + logger.debug("SSE connection timed out for sessionObject: {}", sessionId); + listeningStream.close(); + } + + @Override + public void onError(AsyncEvent event) { + logger.debug("SSE connection error for sessionObject: {}", sessionId); + listeningStream.close(); + } + + @Override + public void onStartAsync(AsyncEvent event) { + // No action needed + } + }); + } + + logger.info("Streamable MCP connection established for sessionObject {}", sessionId); + + } catch (Exception e) { + logger.error("Failed to handle streamable GET request for sessionObject {}: {}", sessionId, e.getMessage()); + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } + + /** + * Handles streamable message processing. + */ + protected void handleStreamableMessage(HttpServletRequest request, HttpServletResponse response) + throws IOException { + if (isClosing.get()) { + response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Server is shutting down"); + return; + } + + List badRequestErrors = new ArrayList<>(); + + String accept = request.getHeader("Accept"); + if (accept == null || !accept.contains("text/event-stream")) { + badRequestErrors.add("text/event-stream required in Accept header"); + } + if (accept == null || !accept.contains(APPLICATION_JSON)) { + badRequestErrors.add("application/json required in Accept header"); + } + + try { + BufferedReader reader = request.getReader(); + StringBuilder body = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + body.append(line); + } + + if (body.isEmpty() && request.getMethod().equals("POST")) { + response.sendError(HttpServletResponse.SC_BAD_REQUEST); + } + + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body.toString()); + + // Handle initialization request + if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest + && jsonrpcRequest.method().equals(McpSchema.METHOD_INITIALIZE)) { + handleInitializeRequest(jsonrpcRequest, request, response, badRequestErrors); + return; + } + + // Handle other messages with existing session + String sessionId = request.getHeader(HttpHeaders.MCP_SESSION_ID); + if (sessionId == null || sessionId.isBlank()) { + badRequestErrors.add("Session ID required in mcp-session-id header"); + } + + if (!badRequestErrors.isEmpty()) { + String combinedMessage = String.join("; ", badRequestErrors); + responseError(response, HttpServletResponse.SC_BAD_REQUEST, new McpError(combinedMessage)); + return; + } + + SessionObject session = sessions.get(sessionId); + if (session == null) { + responseError( + response, HttpServletResponse.SC_NOT_FOUND, new McpError("Session not found: " + sessionId)); + return; + } + + // Handle different message types + if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) { + session.session.accept(jsonrpcResponse).block(); + response.setStatus(HttpServletResponse.SC_ACCEPTED); + } else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) { + session.session.accept(jsonrpcNotification).block(); + response.setStatus(HttpServletResponse.SC_ACCEPTED); + } else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) { + // For streaming responses, we need to return SSE + handleStreamingRequest(jsonrpcRequest, sessionId, request, response); + } + + } catch (Exception e) { + logger.error("Error processing streamable MCP message", e); + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + + /** Handles initialization requests for new sessions. */ + private void handleInitializeRequest( + McpSchema.JSONRPCRequest jsonrpcRequest, + HttpServletRequest request, + HttpServletResponse response, + List badRequestErrors) + throws IOException { + + if (!badRequestErrors.isEmpty()) { + String combinedMessage = String.join("; ", badRequestErrors); + responseError(response, HttpServletResponse.SC_BAD_REQUEST, new McpError(combinedMessage)); + return; + } + + McpSchema.InitializeRequest initializeRequest = + objectMapper.convertValue(jsonrpcRequest.params(), new TypeReference<>() {}); + + McpStreamableServerSession.McpStreamableServerSessionInit init = sessionFactory.startSession(initializeRequest); + User currentUser = User.current(); + sessions.put( + init.session().getId(), + new SessionObject(init.session(), currentUser != null ? currentUser.getId() : null)); + + try { + McpSchema.InitializeResult initResult = init.initResult().block(); + + response.setContentType(APPLICATION_JSON); + response.setCharacterEncoding(UTF_8); + response.setHeader(HttpHeaders.MCP_SESSION_ID, init.session().getId()); + response.setStatus(HttpServletResponse.SC_OK); + + String jsonResponse = objectMapper.writeValueAsString( + new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initResult, null)); + + PrintWriter writer = response.getWriter(); + writer.write(jsonResponse); + writer.flush(); + } catch (Exception e) { + logger.error("Failed to initialize session: {}", e.getMessage()); + responseError( + response, + HttpServletResponse.SC_INTERNAL_SERVER_ERROR, + new McpError("Failed to initialize session: " + e.getMessage())); + } + } + + /** Handles streaming requests that require SSE responses. */ + private void handleStreamingRequest( + McpSchema.JSONRPCRequest jsonrpcRequest, + String sessionId, + HttpServletRequest request, + HttpServletResponse response) + throws IOException { + + response.setContentType("text/event-stream"); + response.setCharacterEncoding(UTF_8); + response.setHeader("Cache-Control", "no-cache"); + response.setHeader("Connection", "keep-alive"); + response.setHeader("Access-Control-Allow-Origin", "*"); + + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + + SessionObject sessionObject = sessions.get(sessionId); + + McpStreamableServerSession session = sessionObject.session; + if (session == null) { + logger.error("Session not found for streaming request: {}", sessionId); + asyncContext.complete(); + return; + } + + HttpServletStreamableMcpSessionTransport sessionTransport = + new HttpServletStreamableMcpSessionTransport(sessionId, asyncContext, response.getWriter()); + + // do we have a current user? + // specs are not clear with this (but convenient if you use a client to send request with different user + // to avoid multiple connection + // Maybe make this configurable? + User currentUser = User.current(); + String userId = currentUser != null ? currentUser.getId() : null; + // if not use the one from the session + if (userId == null) { + userId = sessionObject.userId; + } + if (userId != null) { + Map params = (Map) jsonrpcRequest.params(); + if (params != null) { + Map arguments = (Map) params.get("arguments"); + if (arguments != null) { + arguments.put(McpToolWrapper.USER_ID_KEY, userId); + } + } + } + + try { + session.responseStream(jsonrpcRequest, sessionTransport) + .doOnError(error -> { + logger.error("Error processing streaming request: {}", error.getMessage()); + sessionTransport.close(); + }) + .doFinally(signalType -> { + logger.debug( + "Streaming request processing completed for session: {} with signal: {}", + sessionId, + signalType); + }) + .subscribe(); + + } catch (Exception e) { + logger.error("Failed to process streaming request for session {}: {}", sessionId, e.getMessage()); + sessionTransport.close(); + } + + logger.debug("Handling streaming request for session: {}", sessionId); + } + + /** Sends an error response. */ + private void responseError(HttpServletResponse response, int statusCode, McpError error) { + try { + response.setContentType(APPLICATION_JSON); + response.setCharacterEncoding(UTF_8); + response.setStatus(statusCode); + + String errorJson = objectMapper.writeValueAsString(error); + response.getWriter().write(errorJson); + response.getWriter().flush(); + } catch (IOException e) { + logger.error("Failed to send error response: {}", e.getMessage()); + } + } + + /** + * Sends an SSE event to a client with a specific ID. + */ + private void sendEvent(PrintWriter writer, String eventType, String data, String id) throws IOException { + if (id != null) { + writer.write("id: " + id + "\n"); + } + writer.write("event: " + eventType + "\n"); + writer.write("data: " + data + "\n\n"); + writer.flush(); + + if (writer.checkError()) { + throw new IOException("Client disconnected"); + } + } + + /** + * Inner class that implements the streamable session transport for HTTP servlet. + */ + private class HttpServletStreamableMcpSessionTransport implements McpStreamableServerTransport { + private final String sessionId; + private final AsyncContext asyncContext; + private final PrintWriter writer; + private volatile boolean closed = false; + + public HttpServletStreamableMcpSessionTransport( + String sessionId, AsyncContext asyncContext, PrintWriter writer) { + this.sessionId = sessionId; + this.asyncContext = asyncContext; + this.writer = writer; + } + + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message) { + return sendMessage(message, null); + } + + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message, String messageId) { + return Mono.fromRunnable(() -> { + if (closed) { + logger.debug("Session {} was closed during message send attempt", sessionId); + return; + } + + try { + String finalMessageId = messageId; + if (finalMessageId == null) { + if (message instanceof McpSchema.JSONRPCRequest request) { + finalMessageId = request.id() != null ? request.id().toString() : null; + } else if (message instanceof McpSchema.JSONRPCResponse response) { + finalMessageId = + response.id() != null ? response.id().toString() : null; + } + } + + String jsonText = objectMapper.writeValueAsString(message); + StreamableEndpoint.this.sendEvent( + writer, MESSAGE_EVENT_TYPE, jsonText, finalMessageId != null ? finalMessageId : sessionId); + logger.debug("Message sent to session {} with ID {}", sessionId, finalMessageId); + } catch (Exception e) { + logger.error("Failed to send message to session {}: {}", sessionId, e.getMessage()); + close(); + } + }); + } + + @Override + public Mono closeGracefully() { + return Mono.fromRunnable(this::close); + } + + @Override + public T unmarshalFrom(Object data, TypeReference typeRef) { + return objectMapper.convertValue(data, typeRef); + } + + @Override + public void close() { + if (!closed) { + closed = true; + try { + asyncContext.complete(); + } catch (Exception e) { + logger.debug("Error completing async context for session {}: {}", sessionId, e.getMessage()); + } + } + } + } + + record SessionObject(McpStreamableServerSession session, String userId) {} +} diff --git a/src/main/java/io/jenkins/plugins/mcp/server/tool/McpToolWrapper.java b/src/main/java/io/jenkins/plugins/mcp/server/tool/McpToolWrapper.java index 1344194..5eb0449 100644 --- a/src/main/java/io/jenkins/plugins/mcp/server/tool/McpToolWrapper.java +++ b/src/main/java/io/jenkins/plugins/mcp/server/tool/McpToolWrapper.java @@ -67,6 +67,8 @@ public class McpToolWrapper { private static final boolean PROPERTY_REQUIRED_BY_DEFAULT = true; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final String USER_ID_KEY = McpToolWrapper.class.getName() + "-params-userId"; + static { OBJECT_MAPPER.registerModule(new JenkinsExportedBeanModule()); } @@ -245,7 +247,7 @@ McpSchema.CallToolResult toMcpResult(Object result) { McpSchema.CallToolResult call(McpSyncServerExchange exchange, Map args) { var oldUser = User.current(); try { - String userId = (String) args.get("userId"); + String userId = (String) args.get(USER_ID_KEY); var user = User.get(userId, false, Map.of()); if (user != null) { ACL.as(user); diff --git a/src/test/java/io/jenkins/plugins/mcp/server/StreamableEndpointTest.java b/src/test/java/io/jenkins/plugins/mcp/server/StreamableEndpointTest.java new file mode 100644 index 0000000..595f5ff --- /dev/null +++ b/src/test/java/io/jenkins/plugins/mcp/server/StreamableEndpointTest.java @@ -0,0 +1,295 @@ +/* + * + * The MIT License + * + * Copyright (c) 2025, Gong Yi. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + */ + +package io.jenkins.plugins.mcp.server; + +import static io.jenkins.plugins.mcp.server.StreamableEndpoint.MCP_SERVER_STREAMABLE; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.client.McpClient; +import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; +import io.modelcontextprotocol.spec.McpSchema; +import jakarta.servlet.http.HttpServletResponse; +import java.net.URL; +import java.time.Duration; +import java.util.Map; +import org.htmlunit.HttpMethod; +import org.htmlunit.WebRequest; +import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; +import org.jenkinsci.plugins.workflow.job.WorkflowJob; +import org.junit.jupiter.api.Test; +import org.jvnet.hudson.test.JenkinsRule; +import org.jvnet.hudson.test.junit.jupiter.WithJenkins; + +@WithJenkins +public class StreamableEndpointTest { + + @Test + void testStreamableToolCallSimpleJson(JenkinsRule jenkins) throws Exception { + var url = jenkins.getURL(); + var baseUrl = url.toString(); + + var transport = HttpClientStreamableHttpTransport.builder(baseUrl) + .endpoint(MCP_SERVER_STREAMABLE) + .build(); + + try (var client = McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(500)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .build()) { + client.initialize(); + client.getServerCapabilities(); + var tools = client.listTools(); + assertThat(tools.tools()) + .extracting(McpSchema.Tool::name) + .containsOnly( + "sayHello", + "testInt", + "testWithError", + "getBuildLog", + "triggerBuild", + "updateBuild", + "getJobs", + "getBuild", + "getJob", + "getJobScm", + "getBuildScm", + "getBuildChangeSets"); + McpSchema.CallToolRequest request = new McpSchema.CallToolRequest("sayHello", Map.of("name", "streamable")); + + var response = client.callTool(request); + assertThat(response.isError()).isFalse(); + assertThat(response.content()).hasSize(1); + assertThat(response.content().get(0).type()).isEqualTo("text"); + assertThat(response.content()).first().isInstanceOfSatisfying(McpSchema.TextContent.class, textContent -> { + assertThat(textContent.type()).isEqualTo("text"); + + ObjectMapper objectMapper = new ObjectMapper(); + try { + var contentMap = objectMapper.readValue(textContent.text(), Map.class); + assertThat(contentMap).extractingByKey("message").isEqualTo("Hello, streamable!"); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + } + } + + @Test + void testStreamableToolCallIntResult(JenkinsRule jenkins) throws Exception { + var url = jenkins.getURL(); + var baseUrl = url.toString(); + + var transport = HttpClientStreamableHttpTransport.builder(baseUrl) + .endpoint(MCP_SERVER_STREAMABLE) + .build(); + + try (var client = McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(500)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .build()) { + client.initialize(); + McpSchema.CallToolRequest request = new McpSchema.CallToolRequest("testInt", Map.of()); + + var response = client.callTool(request); + assertThat(response.isError()).isFalse(); + assertThat(response.content()).hasSize(1); + assertThat(response.content().get(0).type()).isEqualTo("text"); + assertThat(response.content()).first().isInstanceOfSatisfying(McpSchema.TextContent.class, textContent -> { + assertThat(textContent.type()).isEqualTo("text"); + + ObjectMapper objectMapper = new ObjectMapper(); + try { + var result = objectMapper.readValue(textContent.text(), Integer.class); + assertThat(result).isEqualTo(10); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + } + } + + @Test + void testStreamableToolCallWithException(JenkinsRule jenkins) throws Exception { + var url = jenkins.getURL(); + var baseUrl = url.toString(); + + var transport = HttpClientStreamableHttpTransport.builder(baseUrl) + .endpoint(MCP_SERVER_STREAMABLE) + .build(); + + try (var client = McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(500)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .build()) { + client.initialize(); + McpSchema.CallToolRequest request = new McpSchema.CallToolRequest("testWithError", Map.of()); + + var response = client.callTool(request); + assertThat(response.isError()).isTrue(); + assertThat(response.content()).hasSize(1); + assertThat(response.content().get(0).type()).isEqualTo("text"); + assertThat(response.content()).first().isInstanceOfSatisfying(McpSchema.TextContent.class, textContent -> { + assertThat(textContent.type()).isEqualTo("text"); + assertThat(textContent.text()).contains("Error occurred during execution"); + }); + } + } + + @Test + void testStreamableEndpointUrlSupportPostOnly(JenkinsRule jenkins) throws Exception { + var url = jenkins.getURL(); + var baseUrl = url.toString(); + var streamableUrl = baseUrl + MCP_SERVER_STREAMABLE; + try (JenkinsRule.WebClient webClient = jenkins.createWebClient()) { + + // Test that GET without proper headers returns error + var getRequest = new WebRequest(new URL(streamableUrl), HttpMethod.GET); + var getResponse = webClient.loadWebResponse(getRequest); + assertThat(getResponse.getStatusCode()).isEqualTo(HttpServletResponse.SC_BAD_REQUEST); + + // Test that PUT is not allowed + var putRequest = new WebRequest(new URL(streamableUrl), HttpMethod.PUT); + var putResponse = webClient.loadWebResponse(putRequest); + assertThat(putResponse.getStatusCode()).isEqualTo(HttpServletResponse.SC_METHOD_NOT_ALLOWED); + + // Test that DELETE is not allowed + var deleteRequest = new WebRequest(new URL(streamableUrl), HttpMethod.DELETE); + var deleteResponse = webClient.loadWebResponse(deleteRequest); + assertThat(deleteResponse.getStatusCode()).isEqualTo(HttpServletResponse.SC_METHOD_NOT_ALLOWED); + } + } + + @Test + void testStreamableEndpointRequiresProperHeaders(JenkinsRule jenkins) throws Exception { + var url = jenkins.getURL(); + var baseUrl = url.toString(); + var streamableUrl = baseUrl + MCP_SERVER_STREAMABLE; + try (JenkinsRule.WebClient webClient = jenkins.createWebClient()) { + + // Test POST without proper Accept headers + var postRequest = new WebRequest(new URL(streamableUrl), HttpMethod.POST); + postRequest.setAdditionalHeader("Content-Type", "application/json"); + var postResponse = webClient.loadWebResponse(postRequest); + assertThat(postResponse.getStatusCode()).isEqualTo(HttpServletResponse.SC_BAD_REQUEST); + + // Test GET without proper Accept headers + var getRequest = new WebRequest(new URL(streamableUrl), HttpMethod.GET); + var getResponse = webClient.loadWebResponse(getRequest); + assertThat(getResponse.getStatusCode()).isEqualTo(HttpServletResponse.SC_BAD_REQUEST); + } + } + + @Test + void testStreamableJenkinsToolCalls(JenkinsRule jenkins) throws Exception { + + WorkflowJob project = jenkins.createProject(WorkflowJob.class, "demo-job-1"); + project.setDefinition(new CpsFlowDefinition("", true)); + + project = jenkins.createProject(WorkflowJob.class, "demo-job-2"); + project.setDefinition(new CpsFlowDefinition("", true)); + + var url = jenkins.getURL(); + var baseUrl = url.toString(); + + var transport = HttpClientStreamableHttpTransport.builder(baseUrl) + .endpoint(MCP_SERVER_STREAMABLE) + .build(); + + try (var client = McpClient.sync(transport) + .initializationTimeout(Duration.ofSeconds(500)) + .requestTimeout(Duration.ofSeconds(500)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .build()) { + client.initialize(); + + // Test getJobs tool call + McpSchema.CallToolRequest getJobsRequest = new McpSchema.CallToolRequest("getJobs", Map.of("limit", 5)); + var getJobsResponse = client.callTool(getJobsRequest); + assertThat(getJobsResponse.isError()).isFalse(); + assertThat(getJobsResponse.content()).hasSize(2); + assertThat(getJobsResponse.content().get(0).type()).isEqualTo("text"); + + // Test that the response contains valid JSON (even if empty list) + assertThat(getJobsResponse.content()) + .first() + .isInstanceOfSatisfying(McpSchema.TextContent.class, textContent -> { + assertThat(textContent.text()).startsWith("{").endsWith("}"); + }); + } + } + + @Test + void testStreamableSessionManagement(JenkinsRule jenkins) throws Exception { + var url = jenkins.getURL(); + var baseUrl = url.toString(); + + var transport1 = HttpClientStreamableHttpTransport.builder(baseUrl) + .endpoint(MCP_SERVER_STREAMABLE) + .build(); + + var transport2 = HttpClientStreamableHttpTransport.builder(baseUrl) + .endpoint(MCP_SERVER_STREAMABLE) + .build(); + + // Test that multiple clients can connect simultaneously + try (var client1 = McpClient.sync(transport1) + .requestTimeout(Duration.ofSeconds(500)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .build(); + var client2 = McpClient.sync(transport2) + .requestTimeout(Duration.ofSeconds(500)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .build()) { + + client1.initialize(); + client2.initialize(); + + // Both clients should be able to make tool calls independently + McpSchema.CallToolRequest request1 = new McpSchema.CallToolRequest("sayHello", Map.of("name", "client1")); + McpSchema.CallToolRequest request2 = new McpSchema.CallToolRequest("sayHello", Map.of("name", "client2")); + + var response1 = client1.callTool(request1); + var response2 = client2.callTool(request2); + + assertThat(response1.isError()).isFalse(); + assertThat(response2.isError()).isFalse(); + + // Verify responses are different and correct + assertThat(response1.content()).first().isInstanceOfSatisfying(McpSchema.TextContent.class, textContent -> { + assertThat(textContent.text()).contains("client1"); + }); + + assertThat(response2.content()).first().isInstanceOfSatisfying(McpSchema.TextContent.class, textContent -> { + assertThat(textContent.text()).contains("client2"); + }); + } + } +} diff --git a/src/test/java/io/jenkins/plugins/mcp/server/extensions/DefaultMcpServerTest.java b/src/test/java/io/jenkins/plugins/mcp/server/extensions/DefaultMcpServerTest.java index f33d91c..ea0650d 100644 --- a/src/test/java/io/jenkins/plugins/mcp/server/extensions/DefaultMcpServerTest.java +++ b/src/test/java/io/jenkins/plugins/mcp/server/extensions/DefaultMcpServerTest.java @@ -39,8 +39,10 @@ import hudson.model.Result; import hudson.model.StringParameterDefinition; import hudson.security.FullControlOnceLoggedInAuthorizationStrategy; +import io.jenkins.plugins.mcp.server.StreamableEndpoint; import io.modelcontextprotocol.client.McpClient; import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; +import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; import io.modelcontextprotocol.spec.McpSchema; import java.time.Duration; import java.util.Base64; @@ -259,7 +261,7 @@ void testMcpToolCallGetJobNotExist(JenkinsRule jenkins) throws Exception { } @Test - void testMcpToolCallGetJobs(JenkinsRule jenkins) throws Exception { + void testMcpToolCallGetJobsSse(JenkinsRule jenkins) throws Exception { enableSecurity(jenkins); for (int i = 0; i < 2; i++) { jenkins.createProject(WorkflowJob.class, "demo-job" + i); @@ -337,6 +339,85 @@ void testMcpToolCallGetJobs(JenkinsRule jenkins) throws Exception { } } + @Test + void testMcpToolCallGetJobsStreamable(JenkinsRule jenkins) throws Exception { + enableSecurity(jenkins); + for (int i = 0; i < 2; i++) { + jenkins.createProject(WorkflowJob.class, "demo-job" + i); + } + + var folder = jenkins.createFolder("test"); + + for (int i = 0; i < 20; i++) { + folder.createProject(WorkflowJob.class, "sub-demo-job" + i); + } + + var url = jenkins.getURL(); + var baseUrl = url.toString(); + + String username = "admin"; + String password = "admin"; + String authString = username + ":" + password; + String encodedAuth = Base64.getEncoder().encodeToString(authString.getBytes()); + + var transport = HttpClientStreamableHttpTransport.builder(baseUrl) + .endpoint(StreamableEndpoint.MCP_SERVER_STREAMABLE) + .customizeRequest(request -> { + request.setHeader("Authorization", "Basic " + encodedAuth); + }) + .build(); + + try (var client = McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(500)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .build()) { + client.initialize(); + { + McpSchema.CallToolRequest request = + new McpSchema.CallToolRequest("getJobs", Map.of("parentFullName", "test", "limit", 10)); + + var response = client.callTool(request); + assertThat(response.isError()).isFalse(); + assertThat(response.content()).hasSize(10); + assertThat(response.content().get(0).type()).isEqualTo("text"); + assertThat(response.content()) + .first() + .isInstanceOfSatisfying(McpSchema.TextContent.class, textContent -> { + assertThat(textContent.type()).isEqualTo("text"); + + ObjectMapper objectMapper = new ObjectMapper(); + try { + var contetMap = objectMapper.readValue(textContent.text(), Map.class); + assertThat(contetMap).containsEntry("name", "sub-demo-job0"); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + } + { + McpSchema.CallToolRequest request = new McpSchema.CallToolRequest("getJobs", Map.of()); + + var response = client.callTool(request); + assertThat(response.isError()).isFalse(); + assertThat(response.content()).hasSize(3); + assertThat(response.content().get(0).type()).isEqualTo("text"); + assertThat(response.content()) + .first() + .isInstanceOfSatisfying(McpSchema.TextContent.class, textContent -> { + assertThat(textContent.type()).isEqualTo("text"); + + ObjectMapper objectMapper = new ObjectMapper(); + try { + var contetMap = objectMapper.readValue(textContent.text(), Map.class); + assertThat(contetMap).containsEntry("name", "demo-job0"); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + } + } + } + private void enableSecurity(JenkinsRule jenkins) throws Exception { JenkinsRule.DummySecurityRealm securityRealm = jenkins.createDummySecurityRealm(); jenkins.jenkins.setSecurityRealm(securityRealm);