Skip to content

Commit 9ef2b27

Browse files
Fix: Skip non-message SSE events in processStream (#637)
Fixes #636 ## Problem The StreamableClientTransport tries to JSON-decode ALL SSE events, including ping events sent by servers for keep-alive purposes. This causes errors like 'invalid character 'p' looking for beginning of value' when encountering non-JSON event data. ## Solution Modified processStream to check evt.Name and skip events that are not 'message' events (or unnamed events, which default to 'message' per SSE spec). ## Changes - Added event name check in processStream before attempting JSON decode - Added test case for ping event filtering - Added test for scanning multiple events including ping events ## Testing - All existing tests pass - New tests verify ping events are properly filtered - Tested with DeepWiki MCP server which sends ping events
1 parent faddd76 commit 9ef2b27

File tree

2 files changed

+156
-0
lines changed

2 files changed

+156
-0
lines changed

mcp/streamable.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1656,6 +1656,12 @@ func (c *streamableClientConn) processStream(requestSummary string, resp *http.R
16561656
lastEventID = evt.ID
16571657
}
16581658

1659+
// Skip non-message events (e.g., "ping" events used for keep-alive)
1660+
// According to SSE spec, events with no name default to "message"
1661+
if evt.Name != "" && evt.Name != "message" {
1662+
continue
1663+
}
1664+
16591665
msg, err := jsonrpc.DecodeMessage(evt.Data)
16601666
if err != nil {
16611667
c.fail(fmt.Errorf("%s: failed to decode event: %v", requestSummary, err))

mcp/streamable_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1746,3 +1746,153 @@ func mustNotPanic(t *testing.T, h http.Handler) http.Handler {
17461746
h.ServeHTTP(w, req)
17471747
})
17481748
}
1749+
1750+
// TestPingEventFiltering verifies that the streamable client correctly filters
1751+
// out SSE "ping" events, which are used for keep-alive but should not be
1752+
// treated as JSON-RPC messages.
1753+
//
1754+
// This test addresses issue #636: the client should skip non-"message" events
1755+
// according to the SSE specification. It tests the fix in processStream where
1756+
// events with evt.Name != "" && evt.Name != "message" are skipped.
1757+
func TestPingEventFiltering(t *testing.T) {
1758+
// This test verifies the low-level processStream filtering.
1759+
// We create a mock response with ping and message events.
1760+
1761+
sseData := `event: ping
1762+
data: ping
1763+
1764+
event: message
1765+
id: 1
1766+
data: {"jsonrpc":"2.0","id":1,"result":{}}
1767+
1768+
event: ping
1769+
data: keepalive
1770+
1771+
`
1772+
1773+
resp := &http.Response{
1774+
StatusCode: http.StatusOK,
1775+
Header: http.Header{"Content-Type": []string{"text/event-stream"}},
1776+
Body: io.NopCloser(bytes.NewReader([]byte(sseData))),
1777+
}
1778+
1779+
// Create a minimal streamableClientConn for testing
1780+
ctx, cancel := context.WithCancel(context.Background())
1781+
defer cancel()
1782+
1783+
incoming := make(chan jsonrpc.Message, 10)
1784+
done := make(chan struct{})
1785+
1786+
conn := &streamableClientConn{
1787+
ctx: ctx,
1788+
done: done,
1789+
incoming: incoming,
1790+
}
1791+
1792+
// Create a test request
1793+
testReq := &jsonrpc.Request{
1794+
ID: jsonrpc2.Int64ID(1),
1795+
Method: "test",
1796+
}
1797+
1798+
// Process the stream
1799+
go conn.processStream("test", resp, testReq)
1800+
1801+
// Collect messages with timeout
1802+
var messages []jsonrpc.Message
1803+
timeout := time.After(1 * time.Second)
1804+
1805+
collectLoop:
1806+
for {
1807+
select {
1808+
case msg := <-incoming:
1809+
messages = append(messages, msg)
1810+
// We expect only 1 message (the response), not the ping events
1811+
if len(messages) >= 1 {
1812+
break collectLoop
1813+
}
1814+
case <-timeout:
1815+
break collectLoop
1816+
}
1817+
}
1818+
1819+
// Verify we only received the actual message, not the ping events
1820+
if len(messages) != 1 {
1821+
t.Errorf("got %d messages, want 1 (ping events should be filtered)", len(messages))
1822+
for i, msg := range messages {
1823+
t.Logf("message %d: %T", i, msg)
1824+
}
1825+
}
1826+
1827+
// Verify the message is the response
1828+
if len(messages) > 0 {
1829+
resp, ok := messages[0].(*jsonrpc.Response)
1830+
if !ok {
1831+
t.Errorf("first message is %T, want *jsonrpc.Response", messages[0])
1832+
} else if resp.ID.Raw() != int64(1) {
1833+
t.Errorf("response ID is %v, want 1", resp.ID.Raw())
1834+
}
1835+
}
1836+
}
1837+
1838+
// TestScanEventsPingFiltering is a unit test for the low-level event scanning
1839+
// with ping events to verify scanEvents properly parses all event types.
1840+
func TestScanEventsPingFiltering(t *testing.T) {
1841+
// Create SSE stream with mixed events
1842+
sseData := `event: ping
1843+
data: ping
1844+
1845+
event: message
1846+
data: {"jsonrpc":"2.0","method":"test","params":{}}
1847+
1848+
event: ping
1849+
data: keepalive
1850+
1851+
event: message
1852+
data: {"jsonrpc":"2.0","method":"test2","params":{}}
1853+
1854+
`
1855+
1856+
reader := strings.NewReader(sseData)
1857+
var events []Event
1858+
1859+
// Scan all events
1860+
for evt, err := range scanEvents(reader) {
1861+
if err != nil {
1862+
if err != io.EOF {
1863+
t.Fatalf("scanEvents error: %v", err)
1864+
}
1865+
break
1866+
}
1867+
events = append(events, evt)
1868+
}
1869+
1870+
// Verify we got all 4 events
1871+
if len(events) != 4 {
1872+
t.Fatalf("got %d events, want 4", len(events))
1873+
}
1874+
1875+
// Verify event types
1876+
expectedNames := []string{"ping", "message", "ping", "message"}
1877+
for i, evt := range events {
1878+
if evt.Name != expectedNames[i] {
1879+
t.Errorf("event %d: got name %q, want %q", i, evt.Name, expectedNames[i])
1880+
}
1881+
}
1882+
1883+
// Verify that we can decode the message events but would fail on ping events
1884+
for i, evt := range events {
1885+
if evt.Name == "message" {
1886+
_, err := jsonrpc.DecodeMessage(evt.Data)
1887+
if err != nil {
1888+
t.Errorf("event %d: failed to decode message event: %v", i, err)
1889+
}
1890+
} else if evt.Name == "ping" {
1891+
// Ping events have non-JSON data and should fail decoding
1892+
_, err := jsonrpc.DecodeMessage(evt.Data)
1893+
if err == nil {
1894+
t.Errorf("event %d: ping event unexpectedly decoded as valid JSON-RPC", i)
1895+
}
1896+
}
1897+
}
1898+
}

0 commit comments

Comments
 (0)