Skip to content

Commit 5752eb4

Browse files
committed
Fix SSE JSON payload and empty stream offset bugs
Fixes identified during code review: 1. JSON SSE payload trailing comma bug: - processJsonAppend adds trailing commas for storage - SSE was wrapping raw data in [], producing invalid JSON like [{"id":1},] - Now uses formatResponse() which properly strips trailing commas 2. Empty stream offset bug: - SSE control events were using initialOffset (-1) for empty streams - HTTP GET uses stream.currentOffset for empty results - SSE now computes controlOffset the same way as HTTP GET 3. Strengthened conformance tests: - JSON SSE test now parses payload and verifies it's valid JSON - Empty stream test verifies SSE offset matches HTTP GET offset
1 parent 6103662 commit 5752eb4

File tree

2 files changed

+48
-8
lines changed

2 files changed

+48
-8
lines changed

packages/conformance-tests/src/index.ts

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2457,7 +2457,7 @@ export function runConformanceTests(options: ConformanceTestOptions): void {
24572457
}
24582458
})
24592459

2460-
test(`should wrap JSON data in arrays for SSE`, async () => {
2460+
test(`should wrap JSON data in arrays for SSE and produce valid JSON`, async () => {
24612461
const streamPath = `/v1/stream/sse-json-wrap-test-${Date.now()}`
24622462

24632463
// Create JSON stream
@@ -2499,8 +2499,19 @@ export function runConformanceTests(options: ConformanceTestOptions): void {
24992499

25002500
// JSON data should be wrapped in array brackets
25012501
expect(received).toContain(`event: data`)
2502-
// The data line should contain the JSON wrapped in []
2503-
expect(received).toMatch(/data:.*\[.*\{.*"id".*:.*1.*\}.*\]/)
2502+
2503+
// Extract and parse the JSON payload to verify it's valid
2504+
const dataLine = received
2505+
.split(`\n`)
2506+
.find((l) => l.startsWith(`data: `) && l.includes(`[`))
2507+
expect(dataLine).toBeDefined()
2508+
2509+
const payload = dataLine!.slice(`data: `.length)
2510+
// This will throw if JSON is invalid (e.g., trailing comma)
2511+
const parsed = JSON.parse(payload)
2512+
2513+
// Verify the structure matches what we sent
2514+
expect(parsed).toEqual([{ id: 1, message: `hello` }])
25042515
} catch (e) {
25052516
clearTimeout(timeoutId)
25062517
if (e instanceof Error && e.name !== `AbortError`) {
@@ -2509,7 +2520,7 @@ export function runConformanceTests(options: ConformanceTestOptions): void {
25092520
}
25102521
})
25112522

2512-
test(`should handle SSE for empty stream`, async () => {
2523+
test(`should handle SSE for empty stream with correct offset`, async () => {
25132524
const streamPath = `/v1/stream/sse-empty-test-${Date.now()}`
25142525

25152526
// Create empty stream
@@ -2518,6 +2529,12 @@ export function runConformanceTests(options: ConformanceTestOptions): void {
25182529
headers: { "Content-Type": `text/plain` },
25192530
})
25202531

2532+
// First, get the offset from HTTP GET (the canonical source)
2533+
const httpResponse = await fetch(`${getBaseUrl()}${streamPath}`)
2534+
const httpOffset = httpResponse.headers.get(`Stream-Next-Offset`)
2535+
expect(httpOffset).toBeDefined()
2536+
expect(httpOffset).not.toBe(`-1`) // Should be the stream's actual offset, not -1
2537+
25212538
// Make SSE request
25222539
const controller = new AbortController()
25232540
const timeoutId = setTimeout(() => controller.abort(), 1000)
@@ -2550,6 +2567,20 @@ export function runConformanceTests(options: ConformanceTestOptions): void {
25502567

25512568
// Should get a control event even for empty stream
25522569
expect(received).toContain(`event: control`)
2570+
2571+
// Parse the control event and verify offset matches HTTP GET
2572+
const controlLine = received
2573+
.split(`\n`)
2574+
.find(
2575+
(l) => l.startsWith(`data: `) && l.includes(`Stream-Next-Offset`)
2576+
)
2577+
expect(controlLine).toBeDefined()
2578+
2579+
const controlPayload = controlLine!.slice(`data: `.length)
2580+
const controlData = JSON.parse(controlPayload)
2581+
2582+
// SSE control offset should match HTTP GET offset (not -1)
2583+
expect(controlData[`Stream-Next-Offset`]).toBe(httpOffset)
25532584
} catch (e) {
25542585
clearTimeout(timeoutId)
25552586
if (e instanceof Error && e.name !== `AbortError`) {

packages/server/src/server.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ export class DurableStreamTestServer {
515515

516516
let currentOffset = initialOffset
517517
let isConnected = true
518+
const decoder = new TextDecoder()
518519

519520
// Handle client disconnect
520521
res.on(`close`, () => {
@@ -537,10 +538,11 @@ export class DurableStreamTestServer {
537538
// Format data based on content type
538539
let dataPayload: string
539540
if (isJsonStream) {
540-
// Wrap JSON in array brackets for SSE
541-
dataPayload = `[${new TextDecoder().decode(message.data)}]`
541+
// Use formatResponse to get properly formatted JSON (strips trailing commas)
542+
const jsonBytes = this.store.formatResponse(path, [message])
543+
dataPayload = decoder.decode(jsonBytes)
542544
} else {
543-
dataPayload = new TextDecoder().decode(message.data)
545+
dataPayload = decoder.decode(message.data)
544546
}
545547

546548
// Send data event
@@ -550,9 +552,13 @@ export class DurableStreamTestServer {
550552
currentOffset = message.offset
551553
}
552554

555+
// Compute offset the same way as HTTP GET: last message's offset, or stream's current offset
556+
const controlOffset =
557+
messages[messages.length - 1]?.offset ?? stream!.currentOffset
558+
553559
// Send control event with current offset/cursor
554560
const controlData: Record<string, string> = {
555-
[STREAM_OFFSET_HEADER]: currentOffset,
561+
[STREAM_OFFSET_HEADER]: controlOffset,
556562
}
557563
if (cursor) {
558564
controlData[STREAM_CURSOR_HEADER] = cursor
@@ -561,6 +567,9 @@ export class DurableStreamTestServer {
561567
res.write(`event: control\n`)
562568
res.write(`data: ${JSON.stringify(controlData)}\n\n`)
563569

570+
// Update currentOffset for next iteration (use controlOffset for consistency)
571+
currentOffset = controlOffset
572+
564573
// If caught up, wait for new messages
565574
if (upToDate) {
566575
const result = await this.store.waitForMessages(

0 commit comments

Comments
 (0)