1
1
import { SSEClientTransport } from "./sse.js" ;
2
- import { createServer , type Server } from "http" ;
2
+ import { createServer , type Server , type IncomingMessage } from "http" ;
3
3
import { JSONRPCMessage } from "../types.js" ;
4
4
import { AddressInfo } from "net" ;
5
5
6
6
describe ( "SSEClientTransport" , ( ) => {
7
7
let server : Server ;
8
8
let transport : SSEClientTransport ;
9
9
let baseUrl : URL ;
10
+ let lastServerRequest : IncomingMessage ;
11
+ let sendServerMessage : ( ( message : string ) => void ) | null = null ;
10
12
11
13
beforeEach ( ( done ) => {
14
+ // Reset state
15
+ lastServerRequest = null as unknown as IncomingMessage ;
16
+ sendServerMessage = null ;
17
+
12
18
// Create a test server that will receive the EventSource connection
13
19
server = createServer ( ( req , res ) => {
14
- // Store the received headers for verification
15
- ( server as any ) . lastRequest = req ;
20
+ lastServerRequest = req ;
16
21
17
22
// Send SSE headers
18
23
res . writeHead ( 200 , {
19
24
"Content-Type" : "text/event-stream" ,
20
25
"Cache-Control" : "no-cache" ,
21
- " Connection" : "keep-alive"
26
+ Connection : "keep-alive" ,
22
27
} ) ;
23
28
24
29
// Send the endpoint event
25
30
res . write ( "event: endpoint\n" ) ;
26
31
res . write ( `data: ${ baseUrl . href } \n\n` ) ;
32
+
33
+ // Store reference to send function for tests
34
+ sendServerMessage = ( message : string ) => {
35
+ res . write ( `data: ${ message } \n\n` ) ;
36
+ } ;
37
+
38
+ // Handle request body for POST endpoints
39
+ if ( req . method === "POST" ) {
40
+ let body = "" ;
41
+ req . on ( "data" , ( chunk ) => {
42
+ body += chunk ;
43
+ } ) ;
44
+ req . on ( "end" , ( ) => {
45
+ ( req as IncomingMessage & { body : string } ) . body = body ;
46
+ res . end ( ) ;
47
+ } ) ;
48
+ }
27
49
} ) ;
28
50
29
51
// Start server on random port
@@ -40,68 +62,227 @@ describe("SSEClientTransport", () => {
40
62
} ) ;
41
63
} ) ;
42
64
43
- it ( "uses custom fetch implementation from EventSourceInit to add auth headers" , async ( ) => {
44
- const authToken = "Bearer test-token" ;
65
+ describe ( "connection handling" , ( ) => {
66
+ it ( "establishes SSE connection and receives endpoint" , async ( ) => {
67
+ transport = new SSEClientTransport ( baseUrl ) ;
68
+ await transport . start ( ) ;
45
69
46
- // Create a fetch wrapper that adds auth header
47
- const fetchWithAuth = ( url : string | URL , init ?: RequestInit ) => {
48
- const headers = new Headers ( init ?. headers ) ;
49
- headers . set ( "Authorization" , authToken ) ;
50
- return fetch ( url . toString ( ) , { ...init , headers } ) ;
51
- } ;
70
+ expect ( lastServerRequest . headers . accept ) . toBe ( "text/event-stream" ) ;
71
+ expect ( lastServerRequest . method ) . toBe ( "GET" ) ;
72
+ } ) ;
52
73
53
- transport = new SSEClientTransport ( baseUrl , {
54
- eventSourceInit : {
55
- fetch : fetchWithAuth
56
- }
74
+ it ( "rejects if server returns non-200 status" , async ( ) => {
75
+ // Create a server that returns 403
76
+ server . close ( ) ;
77
+ await new Promise ( ( resolve ) => server . on ( "close" , resolve ) ) ;
78
+
79
+ server = createServer ( ( req , res ) => {
80
+ res . writeHead ( 403 ) ;
81
+ res . end ( ) ;
82
+ } ) ;
83
+
84
+ await new Promise < void > ( ( resolve ) => {
85
+ server . listen ( 0 , "127.0.0.1" , ( ) => {
86
+ const addr = server . address ( ) as AddressInfo ;
87
+ baseUrl = new URL ( `http://127.0.0.1:${ addr . port } ` ) ;
88
+ resolve ( ) ;
89
+ } ) ;
90
+ } ) ;
91
+
92
+ transport = new SSEClientTransport ( baseUrl ) ;
93
+ await expect ( transport . start ( ) ) . rejects . toThrow ( ) ;
57
94
} ) ;
58
95
59
- await transport . start ( ) ;
96
+ it ( "closes EventSource connection on close()" , async ( ) => {
97
+ transport = new SSEClientTransport ( baseUrl ) ;
98
+ await transport . start ( ) ;
60
99
61
- // Verify the auth header was received by the server
62
- const headers = ( server as any ) . lastRequest . headers ;
63
- expect ( headers . authorization ) . toBe ( authToken ) ;
100
+ const closePromise = new Promise ( ( resolve ) => {
101
+ lastServerRequest . on ( "close" , resolve ) ;
102
+ } ) ;
103
+
104
+ await transport . close ( ) ;
105
+ await closePromise ;
106
+ } ) ;
64
107
} ) ;
65
108
66
- it ( "passes custom headers to fetch requests" , async ( ) => {
67
- const customHeaders = {
68
- Authorization : "Bearer test-token" ,
69
- "X-Custom-Header" : "custom-value"
70
- } ;
109
+ describe ( "message handling" , ( ) => {
110
+ it ( "receives and parses JSON-RPC messages" , async ( ) => {
111
+ const receivedMessages : JSONRPCMessage [ ] = [ ] ;
112
+ transport = new SSEClientTransport ( baseUrl ) ;
113
+ transport . onmessage = ( msg ) => receivedMessages . push ( msg ) ;
71
114
72
- transport = new SSEClientTransport ( baseUrl , {
73
- requestInit : {
74
- headers : customHeaders
75
- }
115
+ await transport . start ( ) ;
116
+
117
+ const testMessage : JSONRPCMessage = {
118
+ jsonrpc : "2.0" ,
119
+ id : "test-1" ,
120
+ method : "test" ,
121
+ params : { foo : "bar" } ,
122
+ } ;
123
+
124
+ sendServerMessage ! ( JSON . stringify ( testMessage ) ) ;
125
+
126
+ // Wait for message processing
127
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 50 ) ) ;
128
+
129
+ expect ( receivedMessages ) . toHaveLength ( 1 ) ;
130
+ expect ( receivedMessages [ 0 ] ) . toEqual ( testMessage ) ;
131
+ } ) ;
132
+
133
+ it ( "handles malformed JSON messages" , async ( ) => {
134
+ const errors : Error [ ] = [ ] ;
135
+ transport = new SSEClientTransport ( baseUrl ) ;
136
+ transport . onerror = ( err ) => errors . push ( err ) ;
137
+
138
+ await transport . start ( ) ;
139
+
140
+ sendServerMessage ! ( "invalid json" ) ;
141
+
142
+ // Wait for message processing
143
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 50 ) ) ;
144
+
145
+ expect ( errors ) . toHaveLength ( 1 ) ;
146
+ expect ( errors [ 0 ] . message ) . toMatch ( / J S O N / ) ;
76
147
} ) ;
77
148
78
- await transport . start ( ) ;
149
+ it ( "handles messages via POST requests" , async ( ) => {
150
+ transport = new SSEClientTransport ( baseUrl ) ;
151
+ await transport . start ( ) ;
152
+
153
+ const testMessage : JSONRPCMessage = {
154
+ jsonrpc : "2.0" ,
155
+ id : "test-1" ,
156
+ method : "test" ,
157
+ params : { foo : "bar" } ,
158
+ } ;
159
+
160
+ await transport . send ( testMessage ) ;
161
+
162
+ // Wait for request processing
163
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 50 ) ) ;
79
164
80
- // Mock fetch for the message sending test
81
- global . fetch = jest . fn ( ) . mockResolvedValue ( {
82
- ok : true
165
+ expect ( lastServerRequest . method ) . toBe ( "POST" ) ;
166
+ expect ( lastServerRequest . headers [ "content-type" ] ) . toBe (
167
+ "application/json" ,
168
+ ) ;
169
+ expect (
170
+ JSON . parse (
171
+ ( lastServerRequest as IncomingMessage & { body : string } ) . body ,
172
+ ) ,
173
+ ) . toEqual ( testMessage ) ;
83
174
} ) ;
84
175
85
- const message : JSONRPCMessage = {
86
- jsonrpc : "2.0" ,
87
- id : "1" ,
88
- method : "test" ,
89
- params : { }
90
- } ;
91
-
92
- await transport . send ( message ) ;
93
-
94
- // Verify fetch was called with correct headers
95
- expect ( global . fetch ) . toHaveBeenCalledWith (
96
- expect . any ( URL ) ,
97
- expect . objectContaining ( {
98
- headers : expect . any ( Headers )
99
- } )
100
- ) ;
101
-
102
- const calledHeaders = ( global . fetch as jest . Mock ) . mock . calls [ 0 ] [ 1 ] . headers ;
103
- expect ( calledHeaders . get ( "Authorization" ) ) . toBe ( customHeaders . Authorization ) ;
104
- expect ( calledHeaders . get ( "X-Custom-Header" ) ) . toBe ( customHeaders [ "X-Custom-Header" ] ) ;
105
- expect ( calledHeaders . get ( "content-type" ) ) . toBe ( "application/json" ) ;
176
+ it ( "handles POST request failures" , async ( ) => {
177
+ // Create a server that returns 500 for POST
178
+ server . close ( ) ;
179
+ await new Promise ( ( resolve ) => server . on ( "close" , resolve ) ) ;
180
+
181
+ server = createServer ( ( req , res ) => {
182
+ if ( req . method === "GET" ) {
183
+ res . writeHead ( 200 , {
184
+ "Content-Type" : "text/event-stream" ,
185
+ "Cache-Control" : "no-cache" ,
186
+ Connection : "keep-alive" ,
187
+ } ) ;
188
+ res . write ( "event: endpoint\n" ) ;
189
+ res . write ( `data: ${ baseUrl . href } \n\n` ) ;
190
+ } else {
191
+ res . writeHead ( 500 ) ;
192
+ res . end ( "Internal error" ) ;
193
+ }
194
+ } ) ;
195
+
196
+ await new Promise < void > ( ( resolve ) => {
197
+ server . listen ( 0 , "127.0.0.1" , ( ) => {
198
+ const addr = server . address ( ) as AddressInfo ;
199
+ baseUrl = new URL ( `http://127.0.0.1:${ addr . port } ` ) ;
200
+ resolve ( ) ;
201
+ } ) ;
202
+ } ) ;
203
+
204
+ transport = new SSEClientTransport ( baseUrl ) ;
205
+ await transport . start ( ) ;
206
+
207
+ const testMessage : JSONRPCMessage = {
208
+ jsonrpc : "2.0" ,
209
+ id : "test-1" ,
210
+ method : "test" ,
211
+ params : { } ,
212
+ } ;
213
+
214
+ await expect ( transport . send ( testMessage ) ) . rejects . toThrow ( / 5 0 0 / ) ;
215
+ } ) ;
216
+ } ) ;
217
+
218
+ describe ( "header handling" , ( ) => {
219
+ it ( "uses custom fetch implementation from EventSourceInit to add auth headers" , async ( ) => {
220
+ const authToken = "Bearer test-token" ;
221
+
222
+ // Create a fetch wrapper that adds auth header
223
+ const fetchWithAuth = ( url : string | URL , init ?: RequestInit ) => {
224
+ const headers = new Headers ( init ?. headers ) ;
225
+ headers . set ( "Authorization" , authToken ) ;
226
+ return fetch ( url . toString ( ) , { ...init , headers } ) ;
227
+ } ;
228
+
229
+ transport = new SSEClientTransport ( baseUrl , {
230
+ eventSourceInit : {
231
+ fetch : fetchWithAuth ,
232
+ } ,
233
+ } ) ;
234
+
235
+ await transport . start ( ) ;
236
+
237
+ // Verify the auth header was received by the server
238
+ expect ( lastServerRequest . headers . authorization ) . toBe ( authToken ) ;
239
+ } ) ;
240
+
241
+ it ( "passes custom headers to fetch requests" , async ( ) => {
242
+ const customHeaders = {
243
+ Authorization : "Bearer test-token" ,
244
+ "X-Custom-Header" : "custom-value" ,
245
+ } ;
246
+
247
+ transport = new SSEClientTransport ( baseUrl , {
248
+ requestInit : {
249
+ headers : customHeaders ,
250
+ } ,
251
+ } ) ;
252
+
253
+ await transport . start ( ) ;
254
+
255
+ // Mock fetch for the message sending test
256
+ global . fetch = jest . fn ( ) . mockResolvedValue ( {
257
+ ok : true ,
258
+ } ) ;
259
+
260
+ const message : JSONRPCMessage = {
261
+ jsonrpc : "2.0" ,
262
+ id : "1" ,
263
+ method : "test" ,
264
+ params : { } ,
265
+ } ;
266
+
267
+ await transport . send ( message ) ;
268
+
269
+ // Verify fetch was called with correct headers
270
+ expect ( global . fetch ) . toHaveBeenCalledWith (
271
+ expect . any ( URL ) ,
272
+ expect . objectContaining ( {
273
+ headers : expect . any ( Headers ) ,
274
+ } ) ,
275
+ ) ;
276
+
277
+ const calledHeaders = ( global . fetch as jest . Mock ) . mock . calls [ 0 ] [ 1 ]
278
+ . headers ;
279
+ expect ( calledHeaders . get ( "Authorization" ) ) . toBe (
280
+ customHeaders . Authorization ,
281
+ ) ;
282
+ expect ( calledHeaders . get ( "X-Custom-Header" ) ) . toBe (
283
+ customHeaders [ "X-Custom-Header" ] ,
284
+ ) ;
285
+ expect ( calledHeaders . get ( "content-type" ) ) . toBe ( "application/json" ) ;
286
+ } ) ;
106
287
} ) ;
107
- } ) ;
288
+ } ) ;
0 commit comments