Skip to content

Commit af246dd

Browse files
committed
Fix SSE multiline encoding and add test helper
Reviewer feedback fixes: 1. SSE multiline payload encoding: - Per SSE spec, each line in payload needs its own "data:" prefix - Added encodeSSEData() helper that splits on newlines - "line1\nline2" now becomes "data: line1\ndata: line2\n\n" 2. Test helper for SSE fetch pattern: - Added fetchSSE() helper to reduce boilerplate - Handles AbortController, timeout, chunked reading, cleanup - Refactored cursor, JSON, and newline tests to use helper 3. Strengthened newline test: - Now verifies each line has its own "data:" prefix - Catches SSE spec violations
1 parent b23e560 commit af246dd

File tree

2 files changed

+104
-132
lines changed

2 files changed

+104
-132
lines changed

packages/conformance-tests/src/index.ts

Lines changed: 89 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,61 @@ export interface ConformanceTestOptions {
1818
baseUrl: string
1919
}
2020

21+
/**
22+
* Helper to fetch SSE stream and read until a condition is met.
23+
* Handles AbortController, timeout, and cleanup automatically.
24+
*/
25+
async function fetchSSE(
26+
url: string,
27+
opts: {
28+
timeoutMs?: number
29+
maxChunks?: number
30+
untilContent?: string
31+
headers?: Record<string, string>
32+
} = {}
33+
): Promise<{ response: Response; received: string }> {
34+
const { timeoutMs = 2000, maxChunks = 10, untilContent, headers = {} } = opts
35+
36+
const controller = new AbortController()
37+
const timeoutId = setTimeout(() => controller.abort(), timeoutMs)
38+
39+
try {
40+
const response = await fetch(url, {
41+
method: `GET`,
42+
headers,
43+
signal: controller.signal,
44+
})
45+
46+
if (!response.body) {
47+
clearTimeout(timeoutId)
48+
return { response, received: `` }
49+
}
50+
51+
const reader = response.body.getReader()
52+
const decoder = new TextDecoder()
53+
let received = ``
54+
55+
for (let i = 0; i < maxChunks; i++) {
56+
const { done, value } = await reader.read()
57+
if (done) break
58+
received += decoder.decode(value, { stream: true })
59+
if (untilContent && received.includes(untilContent)) break
60+
}
61+
62+
clearTimeout(timeoutId)
63+
reader.cancel()
64+
65+
return { response, received }
66+
} catch (e) {
67+
clearTimeout(timeoutId)
68+
if (e instanceof Error && e.name === `AbortError`) {
69+
// Return empty result on timeout/abort
70+
return { response: new Response(), received: `` }
71+
}
72+
throw e
73+
}
74+
}
75+
2176
/**
2277
* Run the full conformance test suite against a server
2378
*/
@@ -2418,43 +2473,14 @@ export function runConformanceTests(options: ConformanceTestOptions): void {
24182473
body: `test data`,
24192474
})
24202475

2421-
// SSE request with cursor should be accepted
2422-
const controller = new AbortController()
2423-
const timeoutId = setTimeout(() => controller.abort(), 1000)
2424-
2425-
try {
2426-
const response = await fetch(
2427-
`${getBaseUrl()}${streamPath}?offset=-1&live=sse&cursor=test-cursor-456`,
2428-
{
2429-
method: `GET`,
2430-
}
2431-
)
2432-
2433-
clearTimeout(timeoutId)
2434-
expect(response.status).toBe(200)
2435-
2436-
// Read a bit and verify cursor is echoed in control event
2437-
const reader = response.body!.getReader()
2438-
const decoder = new TextDecoder()
2439-
let received = ``
2440-
2441-
for (let i = 0; i < 10; i++) {
2442-
const { done, value } = await reader.read()
2443-
if (done) break
2444-
received += decoder.decode(value, { stream: true })
2445-
if (received.includes(`Stream-Cursor`)) break
2446-
}
2447-
2448-
reader.cancel()
2476+
const { response, received } = await fetchSSE(
2477+
`${getBaseUrl()}${streamPath}?offset=-1&live=sse&cursor=test-cursor-456`,
2478+
{ untilContent: `Stream-Cursor` }
2479+
)
24492480

2450-
// Cursor should be echoed in control events
2451-
expect(received).toContain(`test-cursor-456`)
2452-
} catch (e) {
2453-
clearTimeout(timeoutId)
2454-
if (e instanceof Error && e.name !== `AbortError`) {
2455-
throw e
2456-
}
2457-
}
2481+
expect(response.status).toBe(200)
2482+
// Cursor should be echoed in control events
2483+
expect(received).toContain(`test-cursor-456`)
24582484
})
24592485

24602486
test(`should wrap JSON data in arrays for SSE and produce valid JSON`, async () => {
@@ -2467,57 +2493,26 @@ export function runConformanceTests(options: ConformanceTestOptions): void {
24672493
body: JSON.stringify({ id: 1, message: `hello` }),
24682494
})
24692495

2470-
// Make SSE request
2471-
const controller = new AbortController()
2472-
const timeoutId = setTimeout(() => controller.abort(), 2000)
2473-
2474-
try {
2475-
const response = await fetch(
2476-
`${getBaseUrl()}${streamPath}?offset=-1&live=sse`,
2477-
{
2478-
method: `GET`,
2479-
signal: controller.signal,
2480-
}
2481-
)
2482-
2483-
expect(response.status).toBe(200)
2484-
2485-
// Read response
2486-
const reader = response.body!.getReader()
2487-
const decoder = new TextDecoder()
2488-
let received = ``
2489-
2490-
for (let i = 0; i < 10; i++) {
2491-
const { done, value } = await reader.read()
2492-
if (done) break
2493-
received += decoder.decode(value, { stream: true })
2494-
if (received.includes(`event: data`)) break
2495-
}
2496-
2497-
clearTimeout(timeoutId)
2498-
reader.cancel()
2496+
const { response, received } = await fetchSSE(
2497+
`${getBaseUrl()}${streamPath}?offset=-1&live=sse`,
2498+
{ untilContent: `event: data` }
2499+
)
24992500

2500-
// JSON data should be wrapped in array brackets
2501-
expect(received).toContain(`event: data`)
2501+
expect(response.status).toBe(200)
2502+
expect(received).toContain(`event: data`)
25022503

2503-
// Extract and parse the JSON payload to verify it's valid
2504-
const dataLine = received
2505-
.split(`\n`)
2506-
.find((l) => l.startsWith(`data: `) && l.includes(`[`))
2507-
expect(dataLine).toBeDefined()
2504+
// Extract and parse the JSON payload to verify it's valid
2505+
const dataLine = received
2506+
.split(`\n`)
2507+
.find((l) => l.startsWith(`data: `) && l.includes(`[`))
2508+
expect(dataLine).toBeDefined()
25082509

2509-
const payload = dataLine!.slice(`data: `.length)
2510-
// This will throw if JSON is invalid (e.g., trailing comma)
2511-
const parsed = JSON.parse(payload)
2510+
const payload = dataLine!.slice(`data: `.length)
2511+
// This will throw if JSON is invalid (e.g., trailing comma)
2512+
const parsed = JSON.parse(payload)
25122513

2513-
// Verify the structure matches what we sent
2514-
expect(parsed).toEqual([{ id: 1, message: `hello` }])
2515-
} catch (e) {
2516-
clearTimeout(timeoutId)
2517-
if (e instanceof Error && e.name !== `AbortError`) {
2518-
throw e
2519-
}
2520-
}
2514+
// Verify the structure matches what we sent
2515+
expect(parsed).toEqual([{ id: 1, message: `hello` }])
25212516
})
25222517

25232518
test(`should handle SSE for empty stream with correct offset`, async () => {
@@ -2643,53 +2638,19 @@ export function runConformanceTests(options: ConformanceTestOptions): void {
26432638
body: `line1\nline2\nline3`,
26442639
})
26452640

2646-
// Make SSE request
2647-
const controller = new AbortController()
2648-
const timeoutId = setTimeout(() => controller.abort(), 2000)
2649-
2650-
try {
2651-
const response = await fetch(
2652-
`${getBaseUrl()}${streamPath}?offset=-1&live=sse`,
2653-
{
2654-
method: `GET`,
2655-
signal: controller.signal,
2656-
}
2657-
)
2658-
2659-
expect(response.status).toBe(200)
2660-
2661-
// Read response
2662-
const reader = response.body!.getReader()
2663-
const decoder = new TextDecoder()
2664-
let received = ``
2665-
2666-
for (let i = 0; i < 10; i++) {
2667-
const { done, value } = await reader.read()
2668-
if (done) break
2669-
received += decoder.decode(value, { stream: true })
2670-
if (received.includes(`event: control`)) break
2671-
}
2641+
const { response, received } = await fetchSSE(
2642+
`${getBaseUrl()}${streamPath}?offset=-1&live=sse`,
2643+
{ untilContent: `event: control` }
2644+
)
26722645

2673-
clearTimeout(timeoutId)
2674-
reader.cancel()
2646+
expect(response.status).toBe(200)
2647+
expect(received).toContain(`event: data`)
26752648

2676-
// The SSE data field should contain the text
2677-
// Note: SSE spec says newlines in data need multiple data: lines
2678-
// or the newlines become field separators
2679-
expect(received).toContain(`event: data`)
2680-
2681-
// At minimum, the original content should be recoverable
2682-
// (server may encode newlines as multiple data: lines per SSE spec)
2683-
const hasLine1 = received.includes(`line1`)
2684-
const hasLine2 = received.includes(`line2`)
2685-
const hasLine3 = received.includes(`line3`)
2686-
expect(hasLine1 && hasLine2 && hasLine3).toBe(true)
2687-
} catch (e) {
2688-
clearTimeout(timeoutId)
2689-
if (e instanceof Error && e.name !== `AbortError`) {
2690-
throw e
2691-
}
2692-
}
2649+
// Per SSE spec, multiline data must use multiple "data:" lines
2650+
// Each line should have its own data: prefix
2651+
expect(received).toContain(`data: line1`)
2652+
expect(received).toContain(`data: line2`)
2653+
expect(received).toContain(`data: line3`)
26932654
})
26942655

26952656
test(`should maintain monotonic offsets over multiple messages`, async () => {

packages/server/src/server.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ const OFFSET_QUERY_PARAM = `offset`
2121
const LIVE_QUERY_PARAM = `live`
2222
const CURSOR_QUERY_PARAM = `cursor`
2323

24+
/**
25+
* Encode data for SSE format.
26+
* Per SSE spec, each line in the payload needs its own "data:" prefix.
27+
* Newlines in the payload become separate data: lines.
28+
*/
29+
function encodeSSEData(payload: string): string {
30+
const lines = payload.split(`\n`)
31+
return lines.map((line) => `data: ${line}`).join(`\n`) + `\n\n`
32+
}
33+
2434
/**
2535
* HTTP server for testing durable streams.
2636
* Supports both in-memory and file-backed storage modes.
@@ -545,9 +555,10 @@ export class DurableStreamTestServer {
545555
dataPayload = decoder.decode(message.data)
546556
}
547557

548-
// Send data event
558+
// Send data event - encode multiline payloads per SSE spec
559+
// Each line in the payload needs its own "data:" prefix
549560
res.write(`event: data\n`)
550-
res.write(`data: ${dataPayload}\n\n`)
561+
res.write(encodeSSEData(dataPayload))
551562

552563
currentOffset = message.offset
553564
}
@@ -565,7 +576,7 @@ export class DurableStreamTestServer {
565576
}
566577

567578
res.write(`event: control\n`)
568-
res.write(`data: ${JSON.stringify(controlData)}\n\n`)
579+
res.write(encodeSSEData(JSON.stringify(controlData)))
569580

570581
// Update currentOffset for next iteration (use controlOffset for consistency)
571582
currentOffset = controlOffset
@@ -591,7 +602,7 @@ export class DurableStreamTestServer {
591602
keepAliveData[STREAM_CURSOR_HEADER] = cursor
592603
}
593604
res.write(`event: control\n`)
594-
res.write(`data: ${JSON.stringify(keepAliveData)}\n\n`)
605+
res.write(encodeSSEData(JSON.stringify(keepAliveData)))
595606
}
596607
// Loop will continue and read new messages
597608
}

0 commit comments

Comments
 (0)