diff --git a/mcp/streamable.go b/mcp/streamable.go index 12e24ffa..67cdc390 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -1656,6 +1656,12 @@ func (c *streamableClientConn) processStream(requestSummary string, resp *http.R lastEventID = evt.ID } + // Skip non-message events (e.g., "ping" events used for keep-alive) + // According to SSE spec, events with no name default to "message" + if evt.Name != "" && evt.Name != "message" { + continue + } + msg, err := jsonrpc.DecodeMessage(evt.Data) if err != nil { c.fail(fmt.Errorf("%s: failed to decode event: %v", requestSummary, err)) diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index 0579f0cb..e9c0cbda 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -1746,3 +1746,153 @@ func mustNotPanic(t *testing.T, h http.Handler) http.Handler { h.ServeHTTP(w, req) }) } + +// TestPingEventFiltering verifies that the streamable client correctly filters +// out SSE "ping" events, which are used for keep-alive but should not be +// treated as JSON-RPC messages. +// +// This test addresses issue #636: the client should skip non-"message" events +// according to the SSE specification. It tests the fix in processStream where +// events with evt.Name != "" && evt.Name != "message" are skipped. +func TestPingEventFiltering(t *testing.T) { + // This test verifies the low-level processStream filtering. + // We create a mock response with ping and message events. + + sseData := `event: ping +data: ping + +event: message +id: 1 +data: {"jsonrpc":"2.0","id":1,"result":{}} + +event: ping +data: keepalive + +` + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: io.NopCloser(bytes.NewReader([]byte(sseData))), + } + + // Create a minimal streamableClientConn for testing + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + incoming := make(chan jsonrpc.Message, 10) + done := make(chan struct{}) + + conn := &streamableClientConn{ + ctx: ctx, + done: done, + incoming: incoming, + } + + // Create a test request + testReq := &jsonrpc.Request{ + ID: jsonrpc2.Int64ID(1), + Method: "test", + } + + // Process the stream + go conn.processStream("test", resp, testReq) + + // Collect messages with timeout + var messages []jsonrpc.Message + timeout := time.After(1 * time.Second) + +collectLoop: + for { + select { + case msg := <-incoming: + messages = append(messages, msg) + // We expect only 1 message (the response), not the ping events + if len(messages) >= 1 { + break collectLoop + } + case <-timeout: + break collectLoop + } + } + + // Verify we only received the actual message, not the ping events + if len(messages) != 1 { + t.Errorf("got %d messages, want 1 (ping events should be filtered)", len(messages)) + for i, msg := range messages { + t.Logf("message %d: %T", i, msg) + } + } + + // Verify the message is the response + if len(messages) > 0 { + resp, ok := messages[0].(*jsonrpc.Response) + if !ok { + t.Errorf("first message is %T, want *jsonrpc.Response", messages[0]) + } else if resp.ID.Raw() != int64(1) { + t.Errorf("response ID is %v, want 1", resp.ID.Raw()) + } + } +} + +// TestScanEventsPingFiltering is a unit test for the low-level event scanning +// with ping events to verify scanEvents properly parses all event types. +func TestScanEventsPingFiltering(t *testing.T) { + // Create SSE stream with mixed events + sseData := `event: ping +data: ping + +event: message +data: {"jsonrpc":"2.0","method":"test","params":{}} + +event: ping +data: keepalive + +event: message +data: {"jsonrpc":"2.0","method":"test2","params":{}} + +` + + reader := strings.NewReader(sseData) + var events []Event + + // Scan all events + for evt, err := range scanEvents(reader) { + if err != nil { + if err != io.EOF { + t.Fatalf("scanEvents error: %v", err) + } + break + } + events = append(events, evt) + } + + // Verify we got all 4 events + if len(events) != 4 { + t.Fatalf("got %d events, want 4", len(events)) + } + + // Verify event types + expectedNames := []string{"ping", "message", "ping", "message"} + for i, evt := range events { + if evt.Name != expectedNames[i] { + t.Errorf("event %d: got name %q, want %q", i, evt.Name, expectedNames[i]) + } + } + + // Verify that we can decode the message events but would fail on ping events + for i, evt := range events { + if evt.Name == "message" { + _, err := jsonrpc.DecodeMessage(evt.Data) + if err != nil { + t.Errorf("event %d: failed to decode message event: %v", i, err) + } + } else if evt.Name == "ping" { + // Ping events have non-JSON data and should fail decoding + _, err := jsonrpc.DecodeMessage(evt.Data) + if err == nil { + t.Errorf("event %d: ping event unexpectedly decoded as valid JSON-RPC", i) + } + } + } +}