Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions mcp/streamable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,12 @@ func (c *streamableClientConn) processStream(requestSummary string, resp *http.R
lastEventID = evt.ID
}

// Skip non-message events (e.g., "ping" events used for keep-alive)
// According to SSE spec, events with no name default to "message"
if evt.Name != "" && evt.Name != "message" {
continue
}

msg, err := jsonrpc.DecodeMessage(evt.Data)
if err != nil {
c.fail(fmt.Errorf("%s: failed to decode event: %v", requestSummary, err))
Expand Down
150 changes: 150 additions & 0 deletions mcp/streamable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1746,3 +1746,153 @@ func mustNotPanic(t *testing.T, h http.Handler) http.Handler {
h.ServeHTTP(w, req)
})
}

// TestPingEventFiltering verifies that the streamable client correctly filters
// out SSE "ping" events, which are used for keep-alive but should not be
// treated as JSON-RPC messages.
//
// This test addresses issue #636: the client should skip non-"message" events
// according to the SSE specification. It tests the fix in processStream where
// events with evt.Name != "" && evt.Name != "message" are skipped.
func TestPingEventFiltering(t *testing.T) {
// This test verifies the low-level processStream filtering.
// We create a mock response with ping and message events.

sseData := `event: ping
data: ping

event: message
id: 1
data: {"jsonrpc":"2.0","id":1,"result":{}}

event: ping
data: keepalive

`

resp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/event-stream"}},
Body: io.NopCloser(bytes.NewReader([]byte(sseData))),
}

// Create a minimal streamableClientConn for testing
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

incoming := make(chan jsonrpc.Message, 10)
done := make(chan struct{})

conn := &streamableClientConn{
ctx: ctx,
done: done,
incoming: incoming,
}

// Create a test request
testReq := &jsonrpc.Request{
ID: jsonrpc2.Int64ID(1),
Method: "test",
}

// Process the stream
go conn.processStream("test", resp, testReq)

// Collect messages with timeout
var messages []jsonrpc.Message
timeout := time.After(1 * time.Second)

collectLoop:
for {
select {
case msg := <-incoming:
messages = append(messages, msg)
// We expect only 1 message (the response), not the ping events
if len(messages) >= 1 {
break collectLoop
}
case <-timeout:
break collectLoop
}
}

// Verify we only received the actual message, not the ping events
if len(messages) != 1 {
t.Errorf("got %d messages, want 1 (ping events should be filtered)", len(messages))
for i, msg := range messages {
t.Logf("message %d: %T", i, msg)
}
}

// Verify the message is the response
if len(messages) > 0 {
resp, ok := messages[0].(*jsonrpc.Response)
if !ok {
t.Errorf("first message is %T, want *jsonrpc.Response", messages[0])
} else if resp.ID.Raw() != int64(1) {
t.Errorf("response ID is %v, want 1", resp.ID.Raw())
}
}
}

// TestScanEventsPingFiltering is a unit test for the low-level event scanning
// with ping events to verify scanEvents properly parses all event types.
func TestScanEventsPingFiltering(t *testing.T) {
// Create SSE stream with mixed events
sseData := `event: ping
data: ping

event: message
data: {"jsonrpc":"2.0","method":"test","params":{}}

event: ping
data: keepalive

event: message
data: {"jsonrpc":"2.0","method":"test2","params":{}}

`

reader := strings.NewReader(sseData)
var events []Event

// Scan all events
for evt, err := range scanEvents(reader) {
if err != nil {
if err != io.EOF {
t.Fatalf("scanEvents error: %v", err)
}
break
}
events = append(events, evt)
}

// Verify we got all 4 events
if len(events) != 4 {
t.Fatalf("got %d events, want 4", len(events))
}

// Verify event types
expectedNames := []string{"ping", "message", "ping", "message"}
for i, evt := range events {
if evt.Name != expectedNames[i] {
t.Errorf("event %d: got name %q, want %q", i, evt.Name, expectedNames[i])
}
}

// Verify that we can decode the message events but would fail on ping events
for i, evt := range events {
if evt.Name == "message" {
_, err := jsonrpc.DecodeMessage(evt.Data)
if err != nil {
t.Errorf("event %d: failed to decode message event: %v", i, err)
}
} else if evt.Name == "ping" {
// Ping events have non-JSON data and should fail decoding
_, err := jsonrpc.DecodeMessage(evt.Data)
if err == nil {
t.Errorf("event %d: ping event unexpectedly decoded as valid JSON-RPC", i)
}
}
}
}