Skip to content

Commit a519c18

Browse files
authored
mcp: export Event (#145)
Export the Event type in preparation for providing user-definable storage for resumable streams. Also, move event code to a separate file. This PR is just renaming and code motion.
1 parent 3bbe74f commit a519c18

File tree

6 files changed

+256
-227
lines changed

6 files changed

+256
-227
lines changed

mcp/event.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright 2025 The Go MCP SDK Authors. All rights reserved.
2+
// Use of this source code is governed by an MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
// This file is for SSE events.
6+
// See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events.
7+
8+
package mcp
9+
10+
import (
11+
"bufio"
12+
"bytes"
13+
"errors"
14+
"fmt"
15+
"io"
16+
"iter"
17+
"net/http"
18+
"strings"
19+
)
20+
21+
// An Event is a server-sent event.
22+
// See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields.
23+
type Event struct {
24+
Name string // the "event" field
25+
ID string // the "id" field
26+
Data []byte // the "data" field
27+
}
28+
29+
// Empty reports whether the Event is empty.
30+
func (e Event) Empty() bool {
31+
return e.Name == "" && e.ID == "" && len(e.Data) == 0
32+
}
33+
34+
// writeEvent writes the event to w, and flushes.
35+
func writeEvent(w io.Writer, evt Event) (int, error) {
36+
var b bytes.Buffer
37+
if evt.Name != "" {
38+
fmt.Fprintf(&b, "event: %s\n", evt.Name)
39+
}
40+
if evt.ID != "" {
41+
fmt.Fprintf(&b, "id: %s\n", evt.ID)
42+
}
43+
fmt.Fprintf(&b, "data: %s\n\n", string(evt.Data))
44+
n, err := w.Write(b.Bytes())
45+
if f, ok := w.(http.Flusher); ok {
46+
f.Flush()
47+
}
48+
return n, err
49+
}
50+
51+
// scanEvents iterates SSE events in the given scanner. The iterated error is
52+
// terminal: if encountered, the stream is corrupt or broken and should no
53+
// longer be used.
54+
//
55+
// TODO(rfindley): consider a different API here that makes failure modes more
56+
// apparent.
57+
func scanEvents(r io.Reader) iter.Seq2[Event, error] {
58+
scanner := bufio.NewScanner(r)
59+
const maxTokenSize = 1 * 1024 * 1024 // 1 MiB max line size
60+
scanner.Buffer(nil, maxTokenSize)
61+
62+
// TODO: investigate proper behavior when events are out of order, or have
63+
// non-standard names.
64+
var (
65+
eventKey = []byte("event")
66+
idKey = []byte("id")
67+
dataKey = []byte("data")
68+
)
69+
70+
return func(yield func(Event, error) bool) {
71+
// iterate event from the wire.
72+
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#examples
73+
//
74+
// - `key: value` line records.
75+
// - Consecutive `data: ...` fields are joined with newlines.
76+
// - Unrecognized fields are ignored. Since we only care about 'event', 'id', and
77+
// 'data', these are the only three we consider.
78+
// - Lines starting with ":" are ignored.
79+
// - Records are terminated with two consecutive newlines.
80+
var (
81+
evt Event
82+
dataBuf *bytes.Buffer // if non-nil, preceding field was also data
83+
)
84+
flushData := func() {
85+
if dataBuf != nil {
86+
evt.Data = dataBuf.Bytes()
87+
dataBuf = nil
88+
}
89+
}
90+
for scanner.Scan() {
91+
line := scanner.Bytes()
92+
if len(line) == 0 {
93+
flushData()
94+
// \n\n is the record delimiter
95+
if !evt.Empty() && !yield(evt, nil) {
96+
return
97+
}
98+
evt = Event{}
99+
continue
100+
}
101+
before, after, found := bytes.Cut(line, []byte{':'})
102+
if !found {
103+
yield(Event{}, fmt.Errorf("malformed line in SSE stream: %q", string(line)))
104+
return
105+
}
106+
if !bytes.Equal(before, dataKey) {
107+
flushData()
108+
}
109+
switch {
110+
case bytes.Equal(before, eventKey):
111+
evt.Name = strings.TrimSpace(string(after))
112+
case bytes.Equal(before, idKey):
113+
evt.ID = strings.TrimSpace(string(after))
114+
case bytes.Equal(before, dataKey):
115+
data := bytes.TrimSpace(after)
116+
if dataBuf != nil {
117+
dataBuf.WriteByte('\n')
118+
dataBuf.Write(data)
119+
} else {
120+
dataBuf = new(bytes.Buffer)
121+
dataBuf.Write(data)
122+
}
123+
}
124+
}
125+
if err := scanner.Err(); err != nil {
126+
if errors.Is(err, bufio.ErrTooLong) {
127+
err = fmt.Errorf("event exceeded max line length of %d", maxTokenSize)
128+
}
129+
if !yield(Event{}, err) {
130+
return
131+
}
132+
}
133+
flushData()
134+
if !evt.Empty() {
135+
yield(evt, nil)
136+
}
137+
}
138+
}

mcp/event_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2025 The Go MCP SDK Authors. All rights reserved.
2+
// Use of this source code is governed by an MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package mcp
6+
7+
import (
8+
"strings"
9+
"testing"
10+
)
11+
12+
func TestScanEvents(t *testing.T) {
13+
tests := []struct {
14+
name string
15+
input string
16+
want []Event
17+
wantErr string
18+
}{
19+
{
20+
name: "simple event",
21+
input: "event: message\nid: 1\ndata: hello\n\n",
22+
want: []Event{
23+
{Name: "message", ID: "1", Data: []byte("hello")},
24+
},
25+
},
26+
{
27+
name: "multiple data lines",
28+
input: "data: line 1\ndata: line 2\n\n",
29+
want: []Event{
30+
{Data: []byte("line 1\nline 2")},
31+
},
32+
},
33+
{
34+
name: "multiple events",
35+
input: "data: first\n\nevent: second\ndata: second\n\n",
36+
want: []Event{
37+
{Data: []byte("first")},
38+
{Name: "second", Data: []byte("second")},
39+
},
40+
},
41+
{
42+
name: "no trailing newline",
43+
input: "data: hello",
44+
want: []Event{
45+
{Data: []byte("hello")},
46+
},
47+
},
48+
{
49+
name: "malformed line",
50+
input: "invalid line\n\n",
51+
wantErr: "malformed line",
52+
},
53+
}
54+
55+
for _, tt := range tests {
56+
t.Run(tt.name, func(t *testing.T) {
57+
r := strings.NewReader(tt.input)
58+
var got []Event
59+
var err error
60+
for e, err2 := range scanEvents(r) {
61+
if err2 != nil {
62+
err = err2
63+
break
64+
}
65+
got = append(got, e)
66+
}
67+
68+
if tt.wantErr != "" {
69+
if err == nil {
70+
t.Fatalf("scanEvents() got nil error, want error containing %q", tt.wantErr)
71+
}
72+
if !strings.Contains(err.Error(), tt.wantErr) {
73+
t.Fatalf("scanEvents() error = %q, want containing %q", err, tt.wantErr)
74+
}
75+
return
76+
}
77+
78+
if err != nil {
79+
t.Fatalf("scanEvents() returned unexpected error: %v", err)
80+
}
81+
82+
if len(got) != len(tt.want) {
83+
t.Fatalf("scanEvents() got %d events, want %d", len(got), len(tt.want))
84+
}
85+
86+
for i := range got {
87+
if g, w := got[i].Name, tt.want[i].Name; g != w {
88+
t.Errorf("event %d: name = %q, want %q", i, g, w)
89+
}
90+
if g, w := got[i].ID, tt.want[i].ID; g != w {
91+
t.Errorf("event %d: id = %q, want %q", i, g, w)
92+
}
93+
if g, w := string(got[i].Data), string(tt.want[i].Data); g != w {
94+
t.Errorf("event %d: data = %q, want %q", i, g, w)
95+
}
96+
}
97+
})
98+
}
99+
}

0 commit comments

Comments
 (0)