Skip to content

Commit e4462b3

Browse files
authored
Merge pull request durable-streams#20 from durable-streams/json-mode-and-batching
Add JSON mode and batching writer
2 parents d302def + d10f449 commit e4462b3

File tree

19 files changed

+1513
-327
lines changed

19 files changed

+1513
-327
lines changed

PROTOCOL.md

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,19 +400,62 @@ Clients **MUST** use the `Stream-Next-Offset` value returned in responses for su
400400

401401
## 7. Content Types
402402

403-
The protocol supports arbitrary MIME content types. The system operates at the byte level, leaving message framing and interpretation to clients.
403+
The protocol supports arbitrary MIME content types. Most content types operate at the byte level, leaving message framing and interpretation to clients. The `application/json` content type has special semantics defined below.
404404

405405
**Restriction:**
406406

407407
- SSE mode (Section 5.7) **REQUIRES** `content-type: text/*` or `application/json`
408408

409409
Clients **MAY** use any content type for their streams, including:
410410

411+
- `application/json` for JSON mode with message boundary preservation
411412
- `application/ndjson` for newline-delimited JSON
412413
- `application/x-protobuf` for Protocol Buffer messages
413414
- `text/plain` for plain text
414415
- Custom types for application-specific formats
415416

417+
### 7.1. JSON Mode
418+
419+
Streams created with `Content-Type: application/json` have special semantics for message boundaries and batch operations.
420+
421+
#### Message Boundaries
422+
423+
For `application/json` streams, servers **MUST** preserve message boundaries. Each POST request stores messages as a distinct unit, and GET responses **MUST** return data as a JSON array containing all messages from the requested offset range.
424+
425+
#### Array Flattening for Batch Operations
426+
427+
When a POST request body contains a JSON array, servers **MUST** flatten exactly one level of the array, treating each element as a separate message. This enables clients to batch multiple messages in a single HTTP request while preserving individual message semantics.
428+
429+
**Examples (direct POST to server):**
430+
431+
- POST body `{"event": "created"}` stores one message: `{"event": "created"}`
432+
- POST body `[{"event": "a"}, {"event": "b"}]` stores two messages: `{"event": "a"}`, `{"event": "b"}`
433+
- POST body `[[1,2], [3,4]]` stores two messages: `[1,2]`, `[3,4]`
434+
- POST body `[[[1,2,3]]]` stores one message: `[[1,2,3]]`
435+
436+
**Note:** Client libraries **MAY** automatically wrap individual values in arrays for batching. For example, a client calling `append({"x": 1})` might send POST body `[{"x": 1}]` to the server, which flattens it to store one message: `{"x": 1}`.
437+
438+
#### Empty Arrays
439+
440+
Servers **MUST** reject POST requests containing empty JSON arrays (`[]`) with `400 Bad Request`. Empty arrays represent no-op operations with no semantic meaning.
441+
442+
#### JSON Validation
443+
444+
Servers **MUST** validate that appended data is valid JSON. If validation fails, servers **MUST** return `400 Bad Request` with an appropriate error message.
445+
446+
#### Response Format
447+
448+
GET responses for `application/json` streams **MUST** return `Content-Type: application/json` with a body containing a JSON array of all messages in the requested range:
449+
450+
```http
451+
HTTP/1.1 200 OK
452+
Content-Type: application/json
453+
454+
[{"event":"created"},{"event":"updated"}]
455+
```
456+
457+
If no messages exist in the range, servers **MUST** return an empty JSON array `[]`.
458+
416459
## 8. Caching and Collapsing
417460

418461
### 8.1. Catch-up and Long-poll Reads

README.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -270,22 +270,23 @@ const stream = await DurableStream.create({
270270
})
271271

272272
// Append individual JSON values
273-
await stream.append(JSON.stringify({ event: "user.created", userId: "123" }))
274-
await stream.append(JSON.stringify({ event: "user.updated", userId: "123" }))
275-
276-
// Read returns parsed JSON array automatically
277-
const result = await stream.read({ live: "catchup" })
278-
// result.data = [
279-
// { event: "user.created", userId: "123" },
280-
// { event: "user.updated", userId: "123" }
281-
// ]
273+
await stream.append({ event: "user.created", userId: "123" })
274+
await stream.append({ event: "user.updated", userId: "123" })
275+
276+
// Read returns parsed JSON array
277+
for await (const message of stream.json({ live: "catchup" })) {
278+
console.log(message)
279+
// { event: "user.created", userId: "123" }
280+
// { event: "user.updated", userId: "123" }
281+
}
282282
```
283283

284284
In JSON mode:
285285

286-
- Each append must be a valid JSON value
287-
- The server batches appends into JSON arrays for reads
288-
- Message boundaries are preserved
286+
- Each `append()` stores one message
287+
- Supports all JSON types: objects, arrays, strings, numbers, booleans, null
288+
- Message boundaries are preserved across reads
289+
- Reads return JSON arrays of all messages
289290
- Ideal for structured event streams
290291

291292
## Offset Semantics

packages/benchmarks/src/index.ts

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -312,21 +312,11 @@ export function runBenchmarks(options: BenchmarkOptions): void {
312312

313313
const startTime = performance.now()
314314

315-
// Create a readable stream that generates a fixed number of chunks
316-
let chunksGenerated = 0
317-
const messageStream = new ReadableStream({
318-
pull(controller) {
319-
if (chunksGenerated >= totalChunks) {
320-
controller.close()
321-
return
322-
}
323-
controller.enqueue(chunk)
324-
chunksGenerated++
325-
},
326-
})
327-
328-
// Stream all data through a single connection
329-
await stream.appendStream(messageStream)
315+
const appends = []
316+
for (let i = 0; i < totalChunks; i++) {
317+
appends.push(stream.append(chunk))
318+
}
319+
await Promise.all(appends)
330320

331321
const endTime = performance.now()
332322

packages/client/test/stream.test.ts

Lines changed: 0 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -305,204 +305,7 @@ describe(`DurableStream`, () => {
305305
})
306306
})
307307

308-
describe(`create`, () => {
309-
let mockFetch: Mock<typeof fetch>
310-
311-
beforeEach(() => {
312-
mockFetch = vi.fn()
313-
})
314-
315-
it(`should create stream with PUT`, async () => {
316-
mockFetch.mockResolvedValue(
317-
new Response(null, {
318-
status: 201,
319-
headers: { "content-type": `application/json` },
320-
})
321-
)
322-
323-
const stream = new DurableStream({
324-
url: `https://example.com/stream`,
325-
fetch: mockFetch,
326-
})
327-
328-
await stream.create({ contentType: `application/json` })
329-
330-
expect(mockFetch).toHaveBeenCalledWith(
331-
`https://example.com/stream`,
332-
expect.objectContaining({
333-
method: `PUT`,
334-
headers: expect.objectContaining({
335-
"content-type": `application/json`,
336-
}),
337-
})
338-
)
339-
})
340-
341-
it(`should throw FetchError on 409`, async () => {
342-
mockFetch.mockResolvedValue(
343-
new Response(`Already exists`, {
344-
status: 409,
345-
statusText: `Conflict`,
346-
})
347-
)
348-
349-
const stream = new DurableStream({
350-
url: `https://example.com/stream`,
351-
fetch: mockFetch,
352-
})
353-
354-
await expect(stream.create()).rejects.toThrow(FetchError)
355-
})
356-
})
357-
358-
describe(`delete`, () => {
359-
let mockFetch: Mock<typeof fetch>
360-
361-
beforeEach(() => {
362-
mockFetch = vi.fn()
363-
})
364-
365-
it(`should delete stream with DELETE`, async () => {
366-
mockFetch.mockResolvedValue(new Response(null, { status: 204 }))
367-
368-
const stream = new DurableStream({
369-
url: `https://example.com/stream`,
370-
fetch: mockFetch,
371-
})
372-
373-
await stream.delete()
374-
375-
expect(mockFetch).toHaveBeenCalledWith(
376-
`https://example.com/stream`,
377-
expect.objectContaining({ method: `DELETE` })
378-
)
379-
})
380-
381-
it(`should throw FetchError on 404`, async () => {
382-
mockFetch.mockResolvedValue(
383-
new Response(`Not found`, {
384-
status: 404,
385-
statusText: `Not Found`,
386-
})
387-
)
388-
389-
const stream = new DurableStream({
390-
url: `https://example.com/stream`,
391-
fetch: mockFetch,
392-
})
393-
394-
await expect(stream.delete()).rejects.toThrow(FetchError)
395-
})
396-
})
397-
398-
describe(`append`, () => {
399-
let mockFetch: Mock<typeof fetch>
400-
401-
beforeEach(() => {
402-
mockFetch = vi.fn()
403-
})
404-
405-
it(`should append string data as UTF-8`, async () => {
406-
mockFetch.mockResolvedValue(new Response(null, { status: 200 }))
407-
408-
const stream = new DurableStream({
409-
url: `https://example.com/stream`,
410-
fetch: mockFetch,
411-
})
412-
413-
await stream.append(`hello world`)
414-
415-
expect(mockFetch).toHaveBeenCalledWith(
416-
`https://example.com/stream`,
417-
expect.objectContaining({
418-
method: `POST`,
419-
body: expect.any(Uint8Array),
420-
})
421-
)
422-
423-
const call = mockFetch.mock.calls[0]!
424-
const body = call[1]?.body as Uint8Array
425-
expect(new TextDecoder().decode(body)).toBe(`hello world`)
426-
})
427-
428-
it(`should append Uint8Array directly`, async () => {
429-
mockFetch.mockResolvedValue(new Response(null, { status: 200 }))
430-
431-
const stream = new DurableStream({
432-
url: `https://example.com/stream`,
433-
fetch: mockFetch,
434-
})
435-
436-
const data = new Uint8Array([1, 2, 3, 4])
437-
await stream.append(data)
438-
439-
const call = mockFetch.mock.calls[0]!
440-
const body = call[1]?.body as Uint8Array
441-
expect(body).toEqual(data)
442-
})
443-
444-
it(`should include seq header when provided`, async () => {
445-
mockFetch.mockResolvedValue(new Response(null, { status: 200 }))
446-
447-
const stream = new DurableStream({
448-
url: `https://example.com/stream`,
449-
fetch: mockFetch,
450-
})
451-
452-
await stream.append(`data`, { seq: `123` })
453-
454-
expect(mockFetch).toHaveBeenCalledWith(
455-
expect.anything(),
456-
expect.objectContaining({
457-
headers: expect.objectContaining({
458-
"Stream-Seq": `123`,
459-
}),
460-
})
461-
)
462-
})
463-
464-
it(`should throw FetchError on 409`, async () => {
465-
mockFetch.mockResolvedValue(
466-
new Response(`Sequence conflict`, {
467-
status: 409,
468-
statusText: `Conflict`,
469-
})
470-
)
471-
472-
const stream = new DurableStream({
473-
url: `https://example.com/stream`,
474-
fetch: mockFetch,
475-
})
476-
477-
await expect(stream.append(`data`, { seq: `1` })).rejects.toThrow(
478-
FetchError
479-
)
480-
})
481-
})
482-
483308
describe(`static methods`, () => {
484-
it(`DurableStream.create should create and return handle`, async () => {
485-
const mockFetch = vi.fn().mockResolvedValue(
486-
new Response(null, {
487-
status: 201,
488-
headers: { "content-type": `text/plain` },
489-
})
490-
)
491-
492-
const stream = await DurableStream.create({
493-
url: `https://example.com/stream`,
494-
fetch: mockFetch,
495-
contentType: `text/plain`,
496-
})
497-
498-
expect(stream).toBeInstanceOf(DurableStream)
499-
expect(stream.url).toBe(`https://example.com/stream`)
500-
expect(mockFetch).toHaveBeenCalledWith(
501-
expect.anything(),
502-
expect.objectContaining({ method: `PUT` })
503-
)
504-
})
505-
506309
it(`DurableStream.connect should validate and return handle`, async () => {
507310
const mockFetch = vi.fn().mockResolvedValue(
508311
new Response(null, {
@@ -516,7 +319,6 @@ describe(`DurableStream`, () => {
516319
fetch: mockFetch,
517320
})
518321

519-
expect(stream).toBeInstanceOf(DurableStream)
520322
expect(stream.contentType).toBe(`application/json`)
521323
expect(mockFetch).toHaveBeenCalledWith(
522324
expect.anything(),
@@ -544,22 +346,6 @@ describe(`DurableStream`, () => {
544346
expect(result.contentType).toBe(`text/plain`)
545347
expect(result.offset).toBe(`5_100`)
546348
})
547-
548-
it(`DurableStream.delete should delete without returning handle`, async () => {
549-
const mockFetch = vi
550-
.fn()
551-
.mockResolvedValue(new Response(null, { status: 204 }))
552-
553-
await DurableStream.delete({
554-
url: `https://example.com/stream`,
555-
fetch: mockFetch,
556-
})
557-
558-
expect(mockFetch).toHaveBeenCalledWith(
559-
expect.anything(),
560-
expect.objectContaining({ method: `DELETE` })
561-
)
562-
})
563349
})
564350

565351
describe(`auth`, () => {

0 commit comments

Comments
 (0)