Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions jsonrpc/jsonrpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2025 The Go MCP SDK Authors. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

// Package jsonrpc exposes part of a JSON-RPC v2 implementation
// for use by mcp transport authors.
package jsonrpc

import "github.com/modelcontextprotocol/go-sdk/internal/jsonrpc2"

type (
// ID is a JSON-RPC request ID.
ID = jsonrpc2.ID
// Message is a JSON-RPC message.
Message = jsonrpc2.Message
// Request is a JSON-RPC request.
Request = jsonrpc2.Request
// Response is a JSON-RPC response.
Response = jsonrpc2.Response
)
3 changes: 2 additions & 1 deletion mcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/modelcontextprotocol/go-sdk/internal/jsonrpc2"
"github.com/modelcontextprotocol/go-sdk/jsonrpc"
)

// A Client is an MCP client, which may be connected to an MCP server
Expand Down Expand Up @@ -301,7 +302,7 @@ func (cs *ClientSession) receivingMethodInfos() map[string]methodInfo {
return clientMethodInfos
}

func (cs *ClientSession) handle(ctx context.Context, req *JSONRPCRequest) (any, error) {
func (cs *ClientSession) handle(ctx context.Context, req *jsonrpc.Request) (any, error) {
return handleReceive(ctx, cs, req)
}

Expand Down
3 changes: 2 additions & 1 deletion mcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/modelcontextprotocol/go-sdk/internal/jsonrpc2"
"github.com/modelcontextprotocol/go-sdk/internal/util"
"github.com/modelcontextprotocol/go-sdk/jsonrpc"
)

const DefaultPageSize = 1000
Expand Down Expand Up @@ -610,7 +611,7 @@ func (ss *ServerSession) receivingMethodHandler() methodHandler {
func (ss *ServerSession) getConn() *jsonrpc2.Connection { return ss.conn }

// handle invokes the method described by the given JSON RPC request.
func (ss *ServerSession) handle(ctx context.Context, req *JSONRPCRequest) (any, error) {
func (ss *ServerSession) handle(ctx context.Context, req *jsonrpc.Request) (any, error) {
ss.mu.Lock()
initialized := ss.initialized
ss.mu.Unlock()
Expand Down
3 changes: 2 additions & 1 deletion mcp/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/modelcontextprotocol/go-sdk/internal/jsonrpc2"
"github.com/modelcontextprotocol/go-sdk/jsonrpc"
)

// latestProtocolVersion is the latest protocol version that this version of the SDK supports.
Expand Down Expand Up @@ -121,7 +122,7 @@ func defaultReceivingMethodHandler[S Session](ctx context.Context, session S, me
return info.handleMethod.(MethodHandler[S])(ctx, session, method, params)
}

func handleReceive[S Session](ctx context.Context, session S, req *JSONRPCRequest) (Result, error) {
func handleReceive[S Session](ctx context.Context, session S, req *jsonrpc.Request) (Result, error) {
info, ok := session.receivingMethodInfos()[req.Method]
if !ok {
return nil, jsonrpc2.ErrNotHandled
Expand Down
13 changes: 7 additions & 6 deletions mcp/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"

"github.com/modelcontextprotocol/go-sdk/internal/jsonrpc2"
"github.com/modelcontextprotocol/go-sdk/jsonrpc"
)

// This file implements support for SSE (HTTP with server-sent events)
Expand Down Expand Up @@ -111,7 +112,7 @@ func NewSSEHandler(getServer func(request *http.Request) *Server) *SSEHandler {
// - Close terminates the hanging GET.
type SSEServerTransport struct {
endpoint string
incoming chan JSONRPCMessage // queue of incoming messages; never closed
incoming chan jsonrpc.Message // queue of incoming messages; never closed

// We must guard both pushes to the incoming queue and writes to the response
// writer, because incoming POST requests are arbitrarily concurrent and we
Expand All @@ -138,7 +139,7 @@ func NewSSEServerTransport(endpoint string, w http.ResponseWriter) *SSEServerTra
return &SSEServerTransport{
endpoint: endpoint,
w: w,
incoming: make(chan JSONRPCMessage, 100),
incoming: make(chan jsonrpc.Message, 100),
done: make(chan struct{}),
}
}
Expand Down Expand Up @@ -267,7 +268,7 @@ type sseServerConn struct {
func (s sseServerConn) SessionID() string { return "" }

// Read implements jsonrpc2.Reader.
func (s sseServerConn) Read(ctx context.Context) (JSONRPCMessage, error) {
func (s sseServerConn) Read(ctx context.Context) (jsonrpc.Message, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -279,7 +280,7 @@ func (s sseServerConn) Read(ctx context.Context) (JSONRPCMessage, error) {
}

// Write implements jsonrpc2.Writer.
func (s sseServerConn) Write(ctx context.Context, msg JSONRPCMessage) error {
func (s sseServerConn) Write(ctx context.Context, msg jsonrpc.Message) error {
if ctx.Err() != nil {
return ctx.Err()
}
Expand Down Expand Up @@ -532,7 +533,7 @@ func (c *sseClientConn) isDone() bool {
return c.closed
}

func (c *sseClientConn) Read(ctx context.Context) (JSONRPCMessage, error) {
func (c *sseClientConn) Read(ctx context.Context) (jsonrpc.Message, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -553,7 +554,7 @@ func (c *sseClientConn) Read(ctx context.Context) (JSONRPCMessage, error) {
}
}

func (c *sseClientConn) Write(ctx context.Context, msg JSONRPCMessage) error {
func (c *sseClientConn) Write(ctx context.Context, msg jsonrpc.Message) error {
data, err := jsonrpc2.EncodeMessage(msg)
if err != nil {
return err
Expand Down
37 changes: 19 additions & 18 deletions mcp/streamable.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync/atomic"

"github.com/modelcontextprotocol/go-sdk/internal/jsonrpc2"
"github.com/modelcontextprotocol/go-sdk/jsonrpc"
)

const (
Expand Down Expand Up @@ -157,12 +158,12 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
func NewStreamableServerTransport(sessionID string) *StreamableServerTransport {
return &StreamableServerTransport{
id: sessionID,
incoming: make(chan JSONRPCMessage, 10),
incoming: make(chan jsonrpc.Message, 10),
done: make(chan struct{}),
outgoingMessages: make(map[streamID][]*streamableMsg),
signals: make(map[streamID]chan struct{}),
requestStreams: make(map[JSONRPCID]streamID),
streamRequests: make(map[streamID]map[JSONRPCID]struct{}),
requestStreams: make(map[jsonrpc.ID]streamID),
streamRequests: make(map[streamID]map[jsonrpc.ID]struct{}),
}
}

Expand All @@ -176,7 +177,7 @@ type StreamableServerTransport struct {
nextStreamID atomic.Int64 // incrementing next stream ID

id string
incoming chan JSONRPCMessage // messages from the client to the server
incoming chan jsonrpc.Message // messages from the client to the server

mu sync.Mutex

Expand Down Expand Up @@ -226,7 +227,7 @@ type StreamableServerTransport struct {
// Lifecycle: requestStreams persists for the duration of the session.
//
// TODO(rfindley): clean up once requests are handled.
requestStreams map[JSONRPCID]streamID
requestStreams map[jsonrpc.ID]streamID

// streamRequests tracks the set of unanswered incoming RPCs for each logical
// stream.
Expand All @@ -237,7 +238,7 @@ type StreamableServerTransport struct {
// Lifecycle: streamRequests values persist as until the requests have been
// replied to by the server. Notably, NOT until they are sent to an HTTP
// response, as delivery is not guaranteed.
streamRequests map[streamID]map[JSONRPCID]struct{}
streamRequests map[streamID]map[jsonrpc.ID]struct{}
}

type streamID int64
Expand Down Expand Up @@ -271,7 +272,7 @@ func (s *StreamableServerTransport) Connect(context.Context) (Connection, error)
// 2. Expose a 'HandlerTransport' interface that allows transports to provide
// a handler middleware, so that we don't hard-code this behavior in
// ServerSession.handle.
// 3. Add a `func ForRequest(context.Context) JSONRPCID` accessor that lets
// 3. Add a `func ForRequest(context.Context) jsonrpc.ID` accessor that lets
// any transport access the incoming request ID.
//
// For now, by giving only the StreamableServerTransport access to the request
Expand Down Expand Up @@ -340,9 +341,9 @@ func (t *StreamableServerTransport) servePOST(w http.ResponseWriter, req *http.R
http.Error(w, fmt.Sprintf("malformed payload: %v", err), http.StatusBadRequest)
return
}
requests := make(map[JSONRPCID]struct{})
requests := make(map[jsonrpc.ID]struct{})
for _, msg := range incoming {
if req, ok := msg.(*JSONRPCRequest); ok && req.ID.IsValid() {
if req, ok := msg.(*jsonrpc.Request); ok && req.ID.IsValid() {
requests[req.ID] = struct{}{}
}
}
Expand All @@ -352,7 +353,7 @@ func (t *StreamableServerTransport) servePOST(w http.ResponseWriter, req *http.R
signal := make(chan struct{}, 1)
t.mu.Lock()
if len(requests) > 0 {
t.streamRequests[id] = make(map[JSONRPCID]struct{})
t.streamRequests[id] = make(map[jsonrpc.ID]struct{})
}
for reqID := range requests {
t.requestStreams[reqID] = id
Expand Down Expand Up @@ -484,7 +485,7 @@ func parseEventID(eventID string) (sid streamID, idx int, ok bool) {
}

// Read implements the [Connection] interface.
func (t *StreamableServerTransport) Read(ctx context.Context) (JSONRPCMessage, error) {
func (t *StreamableServerTransport) Read(ctx context.Context) (jsonrpc.Message, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -499,10 +500,10 @@ func (t *StreamableServerTransport) Read(ctx context.Context) (JSONRPCMessage, e
}

// Write implements the [Connection] interface.
func (t *StreamableServerTransport) Write(ctx context.Context, msg JSONRPCMessage) error {
func (t *StreamableServerTransport) Write(ctx context.Context, msg jsonrpc.Message) error {
// Find the incoming request that this write relates to, if any.
var forRequest, replyTo JSONRPCID
if resp, ok := msg.(*JSONRPCResponse); ok {
var forRequest, replyTo jsonrpc.ID
if resp, ok := msg.(*jsonrpc.Response); ok {
// If the message is a response, it relates to its request (of course).
forRequest = resp.ID
replyTo = resp.ID
Expand All @@ -511,7 +512,7 @@ func (t *StreamableServerTransport) Write(ctx context.Context, msg JSONRPCMessag
// ongoing request. This may not be the case if the request way made with
// an unrelated context.
if v := ctx.Value(idContextKey{}); v != nil {
forRequest = v.(JSONRPCID)
forRequest = v.(jsonrpc.ID)
}
}

Expand Down Expand Up @@ -661,7 +662,7 @@ func (c *streamableClientConn) SessionID() string {
}

// Read implements the [Connection] interface.
func (s *streamableClientConn) Read(ctx context.Context) (JSONRPCMessage, error) {
func (s *streamableClientConn) Read(ctx context.Context) (jsonrpc.Message, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -673,7 +674,7 @@ func (s *streamableClientConn) Read(ctx context.Context) (JSONRPCMessage, error)
}

// Write implements the [Connection] interface.
func (s *streamableClientConn) Write(ctx context.Context, msg JSONRPCMessage) error {
func (s *streamableClientConn) Write(ctx context.Context, msg jsonrpc.Message) error {
s.mu.Lock()
if s.err != nil {
s.mu.Unlock()
Expand Down Expand Up @@ -709,7 +710,7 @@ func (s *streamableClientConn) Write(ctx context.Context, msg JSONRPCMessage) er
return nil
}

func (s *streamableClientConn) postMessage(ctx context.Context, sessionID string, msg JSONRPCMessage) (string, error) {
func (s *streamableClientConn) postMessage(ctx context.Context, sessionID string, msg jsonrpc.Message) (string, error) {
data, err := jsonrpc2.EncodeMessage(msg)
if err != nil {
return "", err
Expand Down
Loading
Loading