Skip to content

Commit e5d7ba0

Browse files
committed
internal/mcp: use JSONRPC aliases consistently
Use JSONRPCMessage, JSONRPCRequest, and JSONRPCResponse aliases throughout the internal/mcp package, so that no jsonrpc2 names are exposed in the public API. Change-Id: Ibce794907538706448c694eafccf599a5892ab17 Reviewed-on: https://go-review.googlesource.com/c/tools/+/682796 LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Jonathan Amsterdam <[email protected]>
1 parent 7db3d3f commit e5d7ba0

File tree

8 files changed

+55
-57
lines changed

8 files changed

+55
-57
lines changed

internal/mcp/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func (cs *ClientSession) receivingMethodInfos() map[string]methodInfo {
251251
return clientMethodInfos
252252
}
253253

254-
func (cs *ClientSession) handle(ctx context.Context, req *jsonrpc2.Request) (any, error) {
254+
func (cs *ClientSession) handle(ctx context.Context, req *JSONRPCRequest) (any, error) {
255255
return handleReceive(ctx, cs, req)
256256
}
257257

internal/mcp/conformance_test.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ var update = flag.Bool("update", false, "if set, update conformance test data")
4646
// with -update to have the test runner update the expected output, which may
4747
// be client or server depending on the perspective of the test.
4848
type conformanceTest struct {
49-
name string // test name
50-
path string // path to test file
51-
archive *txtar.Archive // raw archive, for updating
52-
tools, prompts, resources []string // named features to include
53-
client []jsonrpc2.Message // client messages
54-
server []jsonrpc2.Message // server messages
49+
name string // test name
50+
path string // path to test file
51+
archive *txtar.Archive // raw archive, for updating
52+
tools, prompts, resources []string // named features to include
53+
client []JSONRPCMessage // client messages
54+
server []JSONRPCMessage // server messages
5555
}
5656

5757
// TODO(rfindley): add client conformance tests.
@@ -117,24 +117,24 @@ func runServerTest(t *testing.T, test *conformanceTest) {
117117
t.Fatal(err)
118118
}
119119

120-
writeMsg := func(msg jsonrpc2.Message) {
120+
writeMsg := func(msg JSONRPCMessage) {
121121
if err := cStream.Write(ctx, msg); err != nil {
122122
t.Fatalf("Write failed: %v", err)
123123
}
124124
}
125125

126126
var (
127-
serverMessages []jsonrpc2.Message
128-
outRequests []*jsonrpc2.Request
129-
outResponses []*jsonrpc2.Response
127+
serverMessages []JSONRPCMessage
128+
outRequests []*JSONRPCRequest
129+
outResponses []*JSONRPCResponse
130130
)
131131

132132
// Separate client requests and responses; we use them differently.
133133
for _, msg := range test.client {
134134
switch msg := msg.(type) {
135-
case *jsonrpc2.Request:
135+
case *JSONRPCRequest:
136136
outRequests = append(outRequests, msg)
137-
case *jsonrpc2.Response:
137+
case *JSONRPCResponse:
138138
outResponses = append(outResponses, msg)
139139
default:
140140
t.Fatalf("bad message type %T", msg)
@@ -143,7 +143,7 @@ func runServerTest(t *testing.T, test *conformanceTest) {
143143

144144
// nextResponse handles incoming requests and notifications, and returns the
145145
// next incoming response.
146-
nextResponse := func() (*jsonrpc2.Response, error, bool) {
146+
nextResponse := func() (*JSONRPCResponse, error, bool) {
147147
for {
148148
msg, err := cStream.Read(ctx)
149149
if err != nil {
@@ -156,7 +156,7 @@ func runServerTest(t *testing.T, test *conformanceTest) {
156156
return nil, err, false
157157
}
158158
serverMessages = append(serverMessages, msg)
159-
if req, ok := msg.(*jsonrpc2.Request); ok && req.ID.IsValid() {
159+
if req, ok := msg.(*JSONRPCRequest); ok && req.ID.IsValid() {
160160
// Pair up the next outgoing response with this request.
161161
// We assume requests arrive in the same order every time.
162162
if len(outResponses) == 0 {
@@ -167,7 +167,7 @@ func runServerTest(t *testing.T, test *conformanceTest) {
167167
outResponses = outResponses[1:]
168168
continue
169169
}
170-
return msg.(*jsonrpc2.Response), nil, true
170+
return msg.(*JSONRPCResponse), nil, true
171171
}
172172
}
173173

@@ -191,7 +191,7 @@ func runServerTest(t *testing.T, test *conformanceTest) {
191191
// There might be more notifications or requests, but there shouldn't be more
192192
// responses.
193193
// Run this in a goroutine so the current thread can wait for it.
194-
var extra *jsonrpc2.Response
194+
var extra *JSONRPCResponse
195195
go func() {
196196
extra, err, _ = nextResponse()
197197
}()
@@ -240,8 +240,8 @@ func runServerTest(t *testing.T, test *conformanceTest) {
240240
t.Fatalf("os.WriteFile(%q) failed: %v", test.path, err)
241241
}
242242
} else {
243-
// jsonrpc2.Messages are not comparable, so we instead compare lines of JSON.
244-
transform := cmpopts.AcyclicTransformer("toJSON", func(msg jsonrpc2.Message) []string {
243+
// JSONRPCMessages are not comparable, so we instead compare lines of JSON.
244+
transform := cmpopts.AcyclicTransformer("toJSON", func(msg JSONRPCMessage) []string {
245245
encoded, err := jsonrpc2.EncodeIndent(msg, "", "\t")
246246
if err != nil {
247247
t.Fatal(err)
@@ -271,9 +271,9 @@ func loadConformanceTest(dir, path string) (*conformanceTest, error) {
271271
}
272272

273273
// decodeMessages loads JSON-RPC messages from the archive file.
274-
decodeMessages := func(data []byte) ([]jsonrpc2.Message, error) {
274+
decodeMessages := func(data []byte) ([]JSONRPCMessage, error) {
275275
dec := json.NewDecoder(bytes.NewReader(data))
276-
var res []jsonrpc2.Message
276+
var res []JSONRPCMessage
277277
for dec.More() {
278278
var raw json.RawMessage
279279
if err := dec.Decode(&raw); err != nil {

internal/mcp/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ func (ss *ServerSession) receivingMethodHandler() methodHandler {
597597
func (ss *ServerSession) getConn() *jsonrpc2.Connection { return ss.conn }
598598

599599
// handle invokes the method described by the given JSON RPC request.
600-
func (ss *ServerSession) handle(ctx context.Context, req *jsonrpc2.Request) (any, error) {
600+
func (ss *ServerSession) handle(ctx context.Context, req *JSONRPCRequest) (any, error) {
601601
ss.mu.Lock()
602602
initialized := ss.initialized
603603
ss.mu.Unlock()

internal/mcp/shared.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func defaultReceivingMethodHandler[S Session](ctx context.Context, session S, me
108108
return info.handleMethod.(MethodHandler[S])(ctx, session, method, params)
109109
}
110110

111-
func handleReceive[S Session](ctx context.Context, session S, req *jsonrpc2.Request) (Result, error) {
111+
func handleReceive[S Session](ctx context.Context, session S, req *JSONRPCRequest) (Result, error) {
112112
info, ok := session.receivingMethodInfos()[req.Method]
113113
if !ok {
114114
return nil, jsonrpc2.ErrNotHandled

internal/mcp/sse.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func NewSSEHandler(getServer func(request *http.Request) *Server) *SSEHandler {
111111
// - Close terminates the hanging GET.
112112
type SSEServerTransport struct {
113113
endpoint string
114-
incoming chan jsonrpc2.Message // queue of incoming messages; never closed
114+
incoming chan JSONRPCMessage // queue of incoming messages; never closed
115115

116116
// We must guard both pushes to the incoming queue and writes to the response
117117
// writer, because incoming POST requests are arbitrarily concurrent and we
@@ -138,7 +138,7 @@ func NewSSEServerTransport(endpoint string, w http.ResponseWriter) *SSEServerTra
138138
return &SSEServerTransport{
139139
endpoint: endpoint,
140140
w: w,
141-
incoming: make(chan jsonrpc2.Message, 100),
141+
incoming: make(chan JSONRPCMessage, 100),
142142
done: make(chan struct{}),
143143
}
144144
}
@@ -264,7 +264,7 @@ type sseServerConn struct {
264264
}
265265

266266
// Read implements jsonrpc2.Reader.
267-
func (s sseServerConn) Read(ctx context.Context) (jsonrpc2.Message, error) {
267+
func (s sseServerConn) Read(ctx context.Context) (JSONRPCMessage, error) {
268268
select {
269269
case <-ctx.Done():
270270
return nil, ctx.Err()
@@ -276,7 +276,7 @@ func (s sseServerConn) Read(ctx context.Context) (jsonrpc2.Message, error) {
276276
}
277277

278278
// Write implements jsonrpc2.Writer.
279-
func (s sseServerConn) Write(ctx context.Context, msg jsonrpc2.Message) error {
279+
func (s sseServerConn) Write(ctx context.Context, msg JSONRPCMessage) error {
280280
if ctx.Err() != nil {
281281
return ctx.Err()
282282
}
@@ -507,7 +507,7 @@ func (c *sseClientConn) isDone() bool {
507507
return c.closed
508508
}
509509

510-
func (c *sseClientConn) Read(ctx context.Context) (jsonrpc2.Message, error) {
510+
func (c *sseClientConn) Read(ctx context.Context) (JSONRPCMessage, error) {
511511
select {
512512
case <-ctx.Done():
513513
return nil, ctx.Err()
@@ -528,7 +528,7 @@ func (c *sseClientConn) Read(ctx context.Context) (jsonrpc2.Message, error) {
528528
}
529529
}
530530

531-
func (c *sseClientConn) Write(ctx context.Context, msg jsonrpc2.Message) error {
531+
func (c *sseClientConn) Write(ctx context.Context, msg JSONRPCMessage) error {
532532
data, err := jsonrpc2.EncodeMessage(msg)
533533
if err != nil {
534534
return err

internal/mcp/streamable.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
151151
func NewStreamableServerTransport(sessionID string) *StreamableServerTransport {
152152
return &StreamableServerTransport{
153153
id: sessionID,
154-
incoming: make(chan jsonrpc2.Message, 10),
154+
incoming: make(chan JSONRPCMessage, 10),
155155
done: make(chan struct{}),
156156
outgoingMessages: make(map[streamID][]*streamableMsg),
157157
signals: make(map[streamID]chan struct{}),
@@ -166,7 +166,7 @@ type StreamableServerTransport struct {
166166
nextStreamID atomic.Int64 // incrementing next stream ID
167167

168168
id string
169-
incoming chan jsonrpc2.Message // messages from the client to the server
169+
incoming chan JSONRPCMessage // messages from the client to the server
170170

171171
mu sync.Mutex
172172

@@ -466,7 +466,7 @@ func parseEventID(eventID string) (conn streamID, idx int, ok bool) {
466466
}
467467

468468
// Read implements the [Connection] interface.
469-
func (t *StreamableServerTransport) Read(ctx context.Context) (jsonrpc2.Message, error) {
469+
func (t *StreamableServerTransport) Read(ctx context.Context) (JSONRPCMessage, error) {
470470
select {
471471
case <-ctx.Done():
472472
return nil, ctx.Err()
@@ -613,7 +613,7 @@ type streamableClientConn struct {
613613
}
614614

615615
// Read implements the [Connection] interface.
616-
func (s *streamableClientConn) Read(ctx context.Context) (jsonrpc2.Message, error) {
616+
func (s *streamableClientConn) Read(ctx context.Context) (JSONRPCMessage, error) {
617617
select {
618618
case <-ctx.Done():
619619
return nil, ctx.Err()
@@ -625,7 +625,7 @@ func (s *streamableClientConn) Read(ctx context.Context) (jsonrpc2.Message, erro
625625
}
626626

627627
// Write implements the [Connection] interface.
628-
func (s *streamableClientConn) Write(ctx context.Context, msg jsonrpc2.Message) error {
628+
func (s *streamableClientConn) Write(ctx context.Context, msg JSONRPCMessage) error {
629629
s.mu.Lock()
630630
if s.err != nil {
631631
s.mu.Unlock()

internal/mcp/transport.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ type Transport interface {
3737
type (
3838
// JSONRPCID is a JSON-RPC request ID.
3939
JSONRPCID = jsonrpc2.ID
40-
// JSONRPCHandler is a JSON-RPC handler function.
41-
JSONRPCHandler = func(ctx context.Context, req *JSONRPCRequest) (result any, err error)
4240
// JSONRPCMessage is a JSON-RPC message.
4341
JSONRPCMessage = jsonrpc2.Message
4442
// JSONRPCRequest is a JSON-RPC request.
@@ -95,7 +93,7 @@ type binder[T handler] interface {
9593
}
9694

9795
type handler interface {
98-
handle(ctx context.Context, req *jsonrpc2.Request) (any, error)
96+
handle(ctx context.Context, req *JSONRPCRequest) (any, error)
9997
}
10098

10199
func connect[H handler](ctx context.Context, t Transport, b binder[H]) (H, error) {
@@ -136,7 +134,7 @@ type canceller struct {
136134
}
137135

138136
// Preempt implements jsonrpc2.Preempter.
139-
func (c *canceller) Preempt(ctx context.Context, req *jsonrpc2.Request) (result any, err error) {
137+
func (c *canceller) Preempt(ctx context.Context, req *JSONRPCRequest) (result any, err error) {
140138
if req.Method == "notifications/cancelled" {
141139
var params CancelledParams
142140
if err := json.Unmarshal(req.Params, &params); err != nil {
@@ -203,7 +201,7 @@ type loggingConn struct {
203201
}
204202

205203
// loggingReader is a stream middleware that logs incoming messages.
206-
func (s *loggingConn) Read(ctx context.Context) (jsonrpc2.Message, error) {
204+
func (s *loggingConn) Read(ctx context.Context) (JSONRPCMessage, error) {
207205
msg, err := s.delegate.Read(ctx)
208206
if err != nil {
209207
fmt.Fprintf(s.w, "read error: %v", err)
@@ -218,7 +216,7 @@ func (s *loggingConn) Read(ctx context.Context) (jsonrpc2.Message, error) {
218216
}
219217

220218
// loggingWriter is a stream middleware that logs outgoing messages.
221-
func (s *loggingConn) Write(ctx context.Context, msg jsonrpc2.Message) error {
219+
func (s *loggingConn) Write(ctx context.Context, msg JSONRPCMessage) error {
222220
err := s.delegate.Write(ctx, msg)
223221
if err != nil {
224222
fmt.Fprintf(s.w, "write error: %v", err)
@@ -268,11 +266,11 @@ type ioConn struct {
268266

269267
// If outgoiBatch has a positive capacity, it will be used to batch requests
270268
// and notifications before sending.
271-
outgoingBatch []jsonrpc2.Message
269+
outgoingBatch []JSONRPCMessage
272270

273271
// Unread messages in the last batch. Since reads are serialized, there is no
274272
// need to guard here.
275-
queue []jsonrpc2.Message
273+
queue []JSONRPCMessage
276274

277275
// batches correlate incoming requests to the batch in which they arrived.
278276
// Since writes may be concurrent to reads, we need to guard this with a mutex.
@@ -314,7 +312,7 @@ func (t *ioConn) addBatch(batch *msgBatch) error {
314312
// The second result reports whether resp was part of a batch. If this is true,
315313
// the first result is nil if the batch is still incomplete, or the full set of
316314
// batch responses if resp completed the batch.
317-
func (t *ioConn) updateBatch(resp *jsonrpc2.Response) ([]*jsonrpc2.Response, bool) {
315+
func (t *ioConn) updateBatch(resp *JSONRPCResponse) ([]*JSONRPCResponse, bool) {
318316
t.batchMu.Lock()
319317
defer t.batchMu.Unlock()
320318

@@ -349,14 +347,14 @@ func (t *ioConn) updateBatch(resp *jsonrpc2.Response) ([]*jsonrpc2.Response, boo
349347
// When there are no unresolved calls, the response payload is sent.
350348
type msgBatch struct {
351349
unresolved map[jsonrpc2.ID]int
352-
responses []*jsonrpc2.Response
350+
responses []*JSONRPCResponse
353351
}
354352

355-
func (t *ioConn) Read(ctx context.Context) (jsonrpc2.Message, error) {
353+
func (t *ioConn) Read(ctx context.Context) (JSONRPCMessage, error) {
356354
return t.read(ctx, t.in)
357355
}
358356

359-
func (t *ioConn) read(ctx context.Context, in *json.Decoder) (jsonrpc2.Message, error) {
357+
func (t *ioConn) read(ctx context.Context, in *json.Decoder) (JSONRPCMessage, error) {
360358
select {
361359
case <-ctx.Done():
362360
return nil, ctx.Err()
@@ -381,7 +379,7 @@ func (t *ioConn) read(ctx context.Context, in *json.Decoder) (jsonrpc2.Message,
381379
if batch {
382380
var respBatch *msgBatch // track incoming requests in the batch
383381
for _, msg := range msgs {
384-
if req, ok := msg.(*jsonrpc2.Request); ok {
382+
if req, ok := msg.(*JSONRPCRequest); ok {
385383
if respBatch == nil {
386384
respBatch = &msgBatch{
387385
unresolved: make(map[jsonrpc2.ID]int),
@@ -406,7 +404,7 @@ func (t *ioConn) read(ctx context.Context, in *json.Decoder) (jsonrpc2.Message,
406404

407405
// readBatch reads batch data, which may be either a single JSON-RPC message,
408406
// or an array of JSON-RPC messages.
409-
func readBatch(data []byte) (msgs []jsonrpc2.Message, isBatch bool, _ error) {
407+
func readBatch(data []byte) (msgs []JSONRPCMessage, isBatch bool, _ error) {
410408
// Try to read an array of messages first.
411409
var rawBatch []json.RawMessage
412410
if err := json.Unmarshal(data, &rawBatch); err == nil {
@@ -424,10 +422,10 @@ func readBatch(data []byte) (msgs []jsonrpc2.Message, isBatch bool, _ error) {
424422
}
425423
// Try again with a single message.
426424
msg, err := jsonrpc2.DecodeMessage(data)
427-
return []jsonrpc2.Message{msg}, false, err
425+
return []JSONRPCMessage{msg}, false, err
428426
}
429427

430-
func (t *ioConn) Write(ctx context.Context, msg jsonrpc2.Message) error {
428+
func (t *ioConn) Write(ctx context.Context, msg JSONRPCMessage) error {
431429
select {
432430
case <-ctx.Done():
433431
return ctx.Err()
@@ -438,7 +436,7 @@ func (t *ioConn) Write(ctx context.Context, msg jsonrpc2.Message) error {
438436
// check that first. Otherwise, it is a request or notification, and we may
439437
// want to collect it into a batch before sending, if we're configured to use
440438
// outgoing batches.
441-
if resp, ok := msg.(*jsonrpc2.Response); ok {
439+
if resp, ok := msg.(*JSONRPCResponse); ok {
442440
if batch, ok := t.updateBatch(resp); ok {
443441
if len(batch) > 0 {
444442
data, err := marshalMessages(batch)
@@ -478,7 +476,7 @@ func (t *ioConn) Close() error {
478476
return t.rwc.Close()
479477
}
480478

481-
func marshalMessages[T jsonrpc2.Message](msgs []T) ([]byte, error) {
479+
func marshalMessages[T JSONRPCMessage](msgs []T) ([]byte, error) {
482480
var rawMsgs []json.RawMessage
483481
for _, msg := range msgs {
484482
raw, err := jsonrpc2.EncodeMessage(msg)

0 commit comments

Comments
 (0)