[ISSUE #5208] develop mcp protocol#5203
Conversation
There was a problem hiding this comment.
Welcome to the Apache EventMesh community!!
This is your first PR in our project. We're very excited to have you onboard contributing. Your contributions are greatly appreciated!
Please make sure that the changes are covered by tests.
We will be here shortly.
Let us know if you need any help!
Want to get closer to the community?
| WeChat Assistant | WeChat Public Account | Slack |
|---|---|---|
![]() |
![]() |
Join Slack Chat |
Mailing Lists:
| Name | Description | Subscribe | Unsubscribe | Archive |
|---|---|---|---|---|
| Users | User support and questions mailing list | Subscribe | Unsubscribe | Mail Archives |
| Development | Development related discussions | Subscribe | Unsubscribe | Mail Archives |
| Commits | All commits to repositories | Subscribe | Unsubscribe | Mail Archives |
| Issues | Issues or PRs comments and reviews | Subscribe | Unsubscribe | Mail Archives |
|
@jerryyummy please create a issue and give your proposal first, let's have some discussion |
settings.gradle
Outdated
| include 'eventmesh-protocol-plugin:eventmesh-protocol-http' | ||
| include 'eventmesh-protocol-plugin:eventmesh-protocol-grpc' | ||
| include 'eventmesh-protocol-plugin:eventmesh-protocol-grpcmessage' | ||
| include 'eventmesh-protocol-plugin:eventmesh-protocol-mcp' |
There was a problem hiding this comment.
why you add this module, this module didn't used in your pr
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| @Slf4j | ||
| public abstract class AbstractMCPServer extends AbstractRemotingServer{ |
There was a problem hiding this comment.
Are there any differences between the implementation of AbstractMCPServer\EventMeshMCPServer\EventMeshMcpBootstrap and HTTP server except for protocol parsing and distribution? Can HTTP server be reused?
There was a problem hiding this comment.
actually no difference, so i may reuse http server instead
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| this.sinkHandler.stop(); |
There was a problem hiding this comment.
stop the ThreadPoolExecutor first
|
|
||
| @Override | ||
| public void onException(ConnectRecord record) { | ||
|
|
There was a problem hiding this comment.
Suggestion:
exception handling or error logging need
|
|
||
| @Override | ||
| public void commit(ConnectRecord record) { | ||
|
|
| Thread.currentThread().interrupt(); | ||
| } | ||
| this.sinkHandler.stop(); | ||
| log.info("All tasks completed, start shut down mcp sink connector"); |
There was a problem hiding this comment.
Question:
typo? "start shut down" or "finish"?
| @EqualsAndHashCode(callSuper = true) | ||
| public class McpSourceConfig extends SourceConfig { | ||
|
|
||
| public SourceConnectorConfig connectorConfig; |
There was a problem hiding this comment.
Suggestion:
can merge SourceConnectorConfig with McpSourceConfig ?
| log.warn("ConnectRecord data is null, ignore."); | ||
| continue; | ||
| } | ||
| queue.put(sinkRecord); |
There was a problem hiding this comment.
Suggestion:
Use the executor's built-in queue instead of a custom queue. Prefer executor.submit() over queue.put().


Fixes #5208
Motivation
Explain the content here.
Explain why you want to make the changes and what problem you're trying to solve.
The motivation of this PR is to improve the MCP protocol integration in EventMesh.
Previously, the implementation relied on SSE-based streaming and custom queue management, which led to:
Inefficient multi-thread handling in SinkConnector, with difficulties in graceful shutdown.
Limitations of SSE for long-lived connections and client discovery.
Inconsistent position/offset management, causing unstable message delivery and retries.
Redundant code in MCP server implementation, since Netty-based server duplicated existing HTTP server capabilities.
This PR aims to address those issues by refactoring the architecture to:
Adopt Streamable HTTP instead of SSE, making it easier for clients (e.g., Cursor) to discover and connect.
Replace self-managed queue with standard ExecutorService, improving throughput and supporting graceful exit.
Enhance position management to ensure at-least-once semantics and reliable offset commit.
Reuse existing HTTP server for MCP server, avoiding duplicate logic and improving maintainability.
Modifications
Describe the modifications you've done.
Implemented ProtocolFactory and McpStandardProtocol to unify MCP protocol handling.
Added McpSourceConnector, McpRequest, McpResponse, and MultiMcpRequestContext to manage request/response lifecycle.
Introduced McpSinkConnector and McpSinkHandler abstraction, with CommonMcpSinkHandler and AbstractMcpSinkHandler supporting Round-Robin and Broadcast strategies.
Refactored threading model: replaced custom queue with ExecutorService in McpSinkConnector for task submission and graceful shutdown.
Added retry mechanism with McpAttemptEvent and DLQ support for failed records.
Defined export data structures: McpConnectRecord, McpExportRecord, McpExportRecordPage, and McpExportMetadata.
Migrated MCP server from a Netty-only server to reuse Vert.x HTTP server, simplifying request handling and routing.
Prepared support for Streamable HTTP transport, aligning with the new MCP specification.
Documentation