Skip to content

Commit b23e560

Browse files
committed
Add SSE edge case conformance tests
Additional tests to catch production edge cases: 1. Header hygiene test: - Verifies no Content-Length (streaming response) - Verifies Cache-Control: no-cache (prevents proxy buffering) 2. Newlines in text/plain payloads: - Tests that content with embedded newlines is handled correctly - SSE spec requires newlines to be encoded properly 3. Monotonic offset test: - Verifies Stream-Next-Offset values are monotonically increasing - Catches off-by-one bugs in offset tracking 4. Reconnection test: - Simulates disconnect/reconnect with last known offset - Verifies no duplicates and no gaps in data - Critical for real-world SSE reliability
1 parent 5752eb4 commit b23e560

File tree

1 file changed

+288
-0
lines changed

1 file changed

+288
-0
lines changed

packages/conformance-tests/src/index.ts

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2588,6 +2588,294 @@ export function runConformanceTests(options: ConformanceTestOptions): void {
25882588
}
25892589
}
25902590
})
2591+
2592+
test(`should have correct SSE headers (no Content-Length, proper Cache-Control)`, async () => {
2593+
const streamPath = `/v1/stream/sse-headers-test-${Date.now()}`
2594+
2595+
// Create stream with data
2596+
await fetch(`${getBaseUrl()}${streamPath}`, {
2597+
method: `PUT`,
2598+
headers: { "Content-Type": `text/plain` },
2599+
body: `test data`,
2600+
})
2601+
2602+
// Make SSE request
2603+
const controller = new AbortController()
2604+
const timeoutId = setTimeout(() => controller.abort(), 1000)
2605+
2606+
try {
2607+
const response = await fetch(
2608+
`${getBaseUrl()}${streamPath}?offset=-1&live=sse`,
2609+
{
2610+
method: `GET`,
2611+
signal: controller.signal,
2612+
}
2613+
)
2614+
2615+
expect(response.status).toBe(200)
2616+
2617+
// SSE MUST have text/event-stream content type
2618+
expect(response.headers.get(`content-type`)).toBe(`text/event-stream`)
2619+
2620+
// SSE MUST NOT have Content-Length (it's a streaming response)
2621+
expect(response.headers.get(`content-length`)).toBeNull()
2622+
2623+
// SSE SHOULD have Cache-Control: no-cache to prevent proxy buffering
2624+
const cacheControl = response.headers.get(`cache-control`)
2625+
expect(cacheControl).toContain(`no-cache`)
2626+
2627+
clearTimeout(timeoutId)
2628+
} catch (e) {
2629+
clearTimeout(timeoutId)
2630+
if (e instanceof Error && e.name !== `AbortError`) {
2631+
throw e
2632+
}
2633+
}
2634+
})
2635+
2636+
test(`should handle newlines in text/plain payloads`, async () => {
2637+
const streamPath = `/v1/stream/sse-newline-test-${Date.now()}`
2638+
2639+
// Create stream with text containing newlines
2640+
await fetch(`${getBaseUrl()}${streamPath}`, {
2641+
method: `PUT`,
2642+
headers: { "Content-Type": `text/plain` },
2643+
body: `line1\nline2\nline3`,
2644+
})
2645+
2646+
// Make SSE request
2647+
const controller = new AbortController()
2648+
const timeoutId = setTimeout(() => controller.abort(), 2000)
2649+
2650+
try {
2651+
const response = await fetch(
2652+
`${getBaseUrl()}${streamPath}?offset=-1&live=sse`,
2653+
{
2654+
method: `GET`,
2655+
signal: controller.signal,
2656+
}
2657+
)
2658+
2659+
expect(response.status).toBe(200)
2660+
2661+
// Read response
2662+
const reader = response.body!.getReader()
2663+
const decoder = new TextDecoder()
2664+
let received = ``
2665+
2666+
for (let i = 0; i < 10; i++) {
2667+
const { done, value } = await reader.read()
2668+
if (done) break
2669+
received += decoder.decode(value, { stream: true })
2670+
if (received.includes(`event: control`)) break
2671+
}
2672+
2673+
clearTimeout(timeoutId)
2674+
reader.cancel()
2675+
2676+
// The SSE data field should contain the text
2677+
// Note: SSE spec says newlines in data need multiple data: lines
2678+
// or the newlines become field separators
2679+
expect(received).toContain(`event: data`)
2680+
2681+
// At minimum, the original content should be recoverable
2682+
// (server may encode newlines as multiple data: lines per SSE spec)
2683+
const hasLine1 = received.includes(`line1`)
2684+
const hasLine2 = received.includes(`line2`)
2685+
const hasLine3 = received.includes(`line3`)
2686+
expect(hasLine1 && hasLine2 && hasLine3).toBe(true)
2687+
} catch (e) {
2688+
clearTimeout(timeoutId)
2689+
if (e instanceof Error && e.name !== `AbortError`) {
2690+
throw e
2691+
}
2692+
}
2693+
})
2694+
2695+
test(`should maintain monotonic offsets over multiple messages`, async () => {
2696+
const streamPath = `/v1/stream/sse-monotonic-offset-test-${Date.now()}`
2697+
2698+
// Create stream
2699+
await fetch(`${getBaseUrl()}${streamPath}`, {
2700+
method: `PUT`,
2701+
headers: { "Content-Type": `text/plain` },
2702+
})
2703+
2704+
// Append multiple messages
2705+
for (let i = 0; i < 5; i++) {
2706+
await fetch(`${getBaseUrl()}${streamPath}`, {
2707+
method: `POST`,
2708+
headers: { "Content-Type": `text/plain` },
2709+
body: `message ${i}`,
2710+
})
2711+
}
2712+
2713+
// Make SSE request
2714+
const controller = new AbortController()
2715+
const timeoutId = setTimeout(() => controller.abort(), 2000)
2716+
2717+
try {
2718+
const response = await fetch(
2719+
`${getBaseUrl()}${streamPath}?offset=-1&live=sse`,
2720+
{
2721+
method: `GET`,
2722+
signal: controller.signal,
2723+
}
2724+
)
2725+
2726+
expect(response.status).toBe(200)
2727+
2728+
// Read response
2729+
const reader = response.body!.getReader()
2730+
const decoder = new TextDecoder()
2731+
let received = ``
2732+
2733+
for (let i = 0; i < 20; i++) {
2734+
const { done, value } = await reader.read()
2735+
if (done) break
2736+
received += decoder.decode(value, { stream: true })
2737+
// Wait until we've seen at least 5 data events and a control
2738+
const dataCount = (received.match(/event: data/g) || []).length
2739+
if (dataCount >= 5 && received.includes(`event: control`)) break
2740+
}
2741+
2742+
clearTimeout(timeoutId)
2743+
reader.cancel()
2744+
2745+
// Extract all control event offsets
2746+
const controlLines = received
2747+
.split(`\n`)
2748+
.filter(
2749+
(l) => l.startsWith(`data: `) && l.includes(`Stream-Next-Offset`)
2750+
)
2751+
2752+
const offsets: Array<string> = []
2753+
for (const line of controlLines) {
2754+
const payload = line.slice(`data: `.length)
2755+
const data = JSON.parse(payload)
2756+
offsets.push(data[`Stream-Next-Offset`])
2757+
}
2758+
2759+
// Verify offsets are monotonically increasing (lexicographically)
2760+
for (let i = 1; i < offsets.length; i++) {
2761+
expect(offsets[i]! >= offsets[i - 1]!).toBe(true)
2762+
}
2763+
} catch (e) {
2764+
clearTimeout(timeoutId)
2765+
if (e instanceof Error && e.name !== `AbortError`) {
2766+
throw e
2767+
}
2768+
}
2769+
})
2770+
2771+
test(`should support reconnection with last known offset`, async () => {
2772+
const streamPath = `/v1/stream/sse-reconnect-test-${Date.now()}`
2773+
2774+
// Create stream with initial data
2775+
await fetch(`${getBaseUrl()}${streamPath}`, {
2776+
method: `PUT`,
2777+
headers: { "Content-Type": `text/plain` },
2778+
body: `message 1`,
2779+
})
2780+
2781+
await fetch(`${getBaseUrl()}${streamPath}`, {
2782+
method: `POST`,
2783+
headers: { "Content-Type": `text/plain` },
2784+
body: `message 2`,
2785+
})
2786+
2787+
// First SSE connection - get initial data and offset
2788+
const controller1 = new AbortController()
2789+
const timeoutId1 = setTimeout(() => controller1.abort(), 2000)
2790+
2791+
let lastOffset: string | null = null
2792+
2793+
try {
2794+
const response1 = await fetch(
2795+
`${getBaseUrl()}${streamPath}?offset=-1&live=sse`,
2796+
{
2797+
method: `GET`,
2798+
signal: controller1.signal,
2799+
}
2800+
)
2801+
2802+
const reader1 = response1.body!.getReader()
2803+
const decoder = new TextDecoder()
2804+
let received1 = ``
2805+
2806+
for (let i = 0; i < 10; i++) {
2807+
const { done, value } = await reader1.read()
2808+
if (done) break
2809+
received1 += decoder.decode(value, { stream: true })
2810+
if (received1.includes(`event: control`)) break
2811+
}
2812+
2813+
clearTimeout(timeoutId1)
2814+
reader1.cancel()
2815+
2816+
// Extract offset from control event
2817+
const controlLine = received1
2818+
.split(`\n`)
2819+
.find(
2820+
(l) => l.startsWith(`data: `) && l.includes(`Stream-Next-Offset`)
2821+
)
2822+
const controlPayload = controlLine!.slice(`data: `.length)
2823+
lastOffset = JSON.parse(controlPayload)[`Stream-Next-Offset`]
2824+
} catch (e) {
2825+
clearTimeout(timeoutId1)
2826+
if (e instanceof Error && e.name !== `AbortError`) {
2827+
throw e
2828+
}
2829+
}
2830+
2831+
expect(lastOffset).toBeDefined()
2832+
2833+
// Append more data while "disconnected"
2834+
await fetch(`${getBaseUrl()}${streamPath}`, {
2835+
method: `POST`,
2836+
headers: { "Content-Type": `text/plain` },
2837+
body: `message 3`,
2838+
})
2839+
2840+
// Reconnect with last known offset
2841+
const controller2 = new AbortController()
2842+
const timeoutId2 = setTimeout(() => controller2.abort(), 2000)
2843+
2844+
try {
2845+
const response2 = await fetch(
2846+
`${getBaseUrl()}${streamPath}?offset=${lastOffset}&live=sse`,
2847+
{
2848+
method: `GET`,
2849+
signal: controller2.signal,
2850+
}
2851+
)
2852+
2853+
const reader2 = response2.body!.getReader()
2854+
const decoder = new TextDecoder()
2855+
let received2 = ``
2856+
2857+
for (let i = 0; i < 10; i++) {
2858+
const { done, value } = await reader2.read()
2859+
if (done) break
2860+
received2 += decoder.decode(value, { stream: true })
2861+
if (received2.includes(`message 3`)) break
2862+
}
2863+
2864+
clearTimeout(timeoutId2)
2865+
reader2.cancel()
2866+
2867+
// Should receive message 3 (the new one), not duplicates of 1 and 2
2868+
expect(received2).toContain(`message 3`)
2869+
// Should NOT contain message 1 or 2 (already received before disconnect)
2870+
expect(received2).not.toContain(`message 1`)
2871+
expect(received2).not.toContain(`message 2`)
2872+
} catch (e) {
2873+
clearTimeout(timeoutId2)
2874+
if (e instanceof Error && e.name !== `AbortError`) {
2875+
throw e
2876+
}
2877+
}
2878+
})
25912879
})
25922880

25932881
// ============================================================================

0 commit comments

Comments
 (0)