Skip to content

Commit 2c34275

Browse files
authored
Merge pull request #54 from MatthewWid/batching
Session event batching
2 parents e40374b + be6e62f commit 2c34275

File tree

6 files changed

+129
-11
lines changed

6 files changed

+129
-11
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
## 0.10.0 - 2023-09-28
11+
1012
### Added
1113

14+
* Added the [`Session#batch`](./docs/api.md#sessionbatch-batcher-eventbuffer--buffer-eventbuffer--void--promisevoid--promisevoid) method that can be used to batch multiple events into a single transmission over the wire.
1215
* Added the [`EventBuffer`](./docs/api.md#eventbuffer) class that can be used to write raw spec-compliant SSE fields into a text buffer that can be sent directly over the wire.
1316

1417
### Deprecated

docs/api.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,17 @@ This uses the [`push`](#session%23push%3A-(event%3A-string%2C-data%3A-any)-%3D>-
9595
|-|-|-|-|
9696
|`eventName`|`string`|`"iteration"`|Event name to use when dispatching a data event from the yielded value to the client.|
9797

98+
#### `Session#batch`: `(batcher: EventBuffer | ((buffer: EventBuffer) => void | Promise<void>)) => Promise<void>`
99+
100+
Batch and send multiple events at once.
101+
102+
If given an [`EventBuffer`](#eventbuffer) instance, its contents will be sent to the client.
103+
104+
If given a callback, it will be passed an instance of [`EventBuffer`](#eventbuffer) which uses the same serializer and sanitizer as the session.
105+
Once its execution completes - or once it resolves if it returns a promise - the contents of the passed [`EventBuffer`](#eventbuffer) will be sent to the client.
106+
107+
Returns a promise that resolves once all data from the event buffer has been successfully sent to the client.
108+
98109
#### `Session#event`: `(type: string) => this`
99110

100111
**⚠ DEPRECATED:** This method is deprecated. [See here](https://github.com/MatthewWid/better-sse/issues/52).
@@ -228,8 +239,6 @@ Takes the [same arguments as the Channel class constructor](#new-channel()).
228239

229240
An `EventBuffer` allows you to write [raw spec-compliant SSE fields](https://html.spec.whatwg.org/multipage/server-sent-events.html#processField) into a text buffer that can be sent directly over the wire.
230241

231-
This is made available for users with more advanced use-cases who need to create an event text stream from scratch themselves. Most users will not need to access this directly and can use the simplified helper methods provided by the [`Session`](#session) class instead.
232-
233242
#### `new EventBuffer([options = {}])`
234243

235244
`options` is an object with the following properties:

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "better-sse",
33
"description": "Dead simple, dependency-less, spec-compliant server-side events implementation for Node, written in TypeScript.",
4-
"version": "0.9.0",
4+
"version": "0.10.0",
55
"main": "./build/index.js",
66
"types": "./build/index.d.ts",
77
"license": "MIT",

src/EventBuffer.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ interface EventBufferOptions {
2222

2323
/**
2424
* An `EventBuffer` allows you to write raw spec-compliant SSE fields into a text buffer that can be sent directly over the wire.
25-
*
26-
* This is made available for users with more advanced use-cases who need to create an event text stream from scratch themselves. Most users will not need to access this directly and can use the simplified helper methods provided by the `Session` class instead.
2725
*/
2826
class EventBuffer {
2927
private buffer = "";
@@ -121,7 +119,7 @@ class EventBuffer {
121119
};
122120

123121
/**
124-
* Create, write and dispatch an event with the given data to the client all at once.
122+
* Create, write and dispatch an event with the given data all at once.
125123
*
126124
* This is equivalent to calling the methods `event`, `id`, `data` and `dispatch` in that order.
127125
*
@@ -151,7 +149,7 @@ class EventBuffer {
151149
* If no event name is given in the `options` object, the event name is set to `"stream"`.
152150
*
153151
* @param stream - Readable stream to consume data from.
154-
* @param options - Options to alter how the stream is flushed to the client.
152+
* @param options - Event name to use for each event created.
155153
*
156154
* @returns A promise that resolves or rejects based on the success of the stream write finishing.
157155
*/

src/Session.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
getBuffer,
1313
} from "./lib/testUtils";
1414
import {Session} from "./Session";
15+
import {EventBuffer} from "./EventBuffer";
1516

1617
let server: http.Server;
1718
let url: string;
@@ -521,6 +522,75 @@ describe("push", () => {
521522
}));
522523
});
523524

525+
describe("batching", () => {
526+
const data = "test-data";
527+
528+
it("given a synchronous callback, creates a new event buffer and writes its contents to the response after execution has finished", () =>
529+
new Promise<void>((done) => {
530+
server.on("request", async (req, res) => {
531+
const session = new Session(req, res);
532+
533+
await waitForConnect(session);
534+
535+
const write = vi.spyOn(res, "write");
536+
537+
await session.batch((buffer) => {
538+
buffer.push(data);
539+
});
540+
541+
expect(write.mock.calls[0][0]).toContain(data);
542+
543+
done();
544+
});
545+
546+
eventsource = new EventSource(url);
547+
}));
548+
549+
it("given an asynchronous callback, creates a new event buffer and writes its contents to the response after the returned promise has resolved", () =>
550+
new Promise<void>((done) => {
551+
server.on("request", async (req, res) => {
552+
const session = new Session(req, res);
553+
554+
await waitForConnect(session);
555+
556+
const write = vi.spyOn(res, "write");
557+
558+
await session.batch(async (buffer) => {
559+
buffer.push(data);
560+
});
561+
562+
expect(write.mock.calls[0][0]).toContain(data);
563+
564+
done();
565+
});
566+
567+
eventsource = new EventSource(url);
568+
}));
569+
570+
it("given an event buffer, writes its contents to the response", () =>
571+
new Promise<void>((done) => {
572+
server.on("request", async (req, res) => {
573+
const session = new Session(req, res);
574+
575+
await waitForConnect(session);
576+
577+
const write = vi.spyOn(res, "write");
578+
579+
const buffer = new EventBuffer();
580+
581+
buffer.push(data);
582+
583+
await session.batch(buffer);
584+
585+
expect(write.mock.calls[0][0]).toContain(data);
586+
587+
done();
588+
});
589+
590+
eventsource = new EventSource(url);
591+
}));
592+
});
593+
524594
describe("polyfill support", () => {
525595
const lastEventId = "123456";
526596

src/Session.ts

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import {TypedEmitter, EventMap} from "./lib/TypedEmitter";
99
import {generateId} from "./lib/generateId";
1010
import {createPushFromStream} from "./lib/createPushFromStream";
1111
import {createPushFromIterable} from "./lib/createPushFromIterable";
12+
import {serialize, SerializerFunction} from "./lib/serialize";
13+
import {sanitize, SanitizerFunction} from "./lib/sanitize";
1214

1315
interface SessionOptions
1416
extends Pick<EventBufferOptions, "serializer" | "sanitizer"> {
@@ -135,6 +137,8 @@ class Session<
135137
write: (chunk: string) => void;
136138
};
137139

140+
private serialize: SerializerFunction;
141+
private sanitize: SanitizerFunction;
138142
private trustClientEventId: boolean;
139143
private initialRetry: number | null;
140144
private keepAliveInterval: number | null;
@@ -153,10 +157,13 @@ class Session<
153157

154158
this.res = res;
155159

156-
this.buffer = new EventBuffer({
157-
serializer: options.serializer,
158-
sanitizer: options.sanitizer,
159-
});
160+
const serializer = options.serializer ?? serialize;
161+
const sanitizer = options.sanitizer ?? sanitize;
162+
163+
this.serialize = serializer;
164+
this.sanitize = sanitizer;
165+
166+
this.buffer = new EventBuffer({serializer, sanitizer});
160167

161168
this.trustClientEventId = options.trustClientEventId ?? true;
162169

@@ -377,6 +384,37 @@ class Session<
377384
* @returns A promise that resolves once all data has been successfully yielded from the iterable.
378385
*/
379386
iterate = createPushFromIterable(this.push);
387+
388+
/**
389+
* Batch and send multiple events at once.
390+
*
391+
* If given an `EventBuffer` instance, its contents will be sent to the client.
392+
*
393+
* If given a callback, it will be passed an instance of `EventBuffer` which uses the same serializer and sanitizer as the session.
394+
* Once its execution completes - or once it resolves if it returns a promise - the contents of the passed `EventBuffer` will be sent to the client.
395+
*
396+
* @param batcher - Event buffer to get contents from, or callback that takes an event buffer to write to.
397+
*
398+
* @returns A promise that resolves once all data from the event buffer has been successfully sent to the client.
399+
*
400+
* @see EventBuffer
401+
*/
402+
batch = async (
403+
batcher: EventBuffer | ((buffer: EventBuffer) => void | Promise<void>)
404+
) => {
405+
if (batcher instanceof EventBuffer) {
406+
this.res.write(batcher.read());
407+
} else {
408+
const buffer = new EventBuffer({
409+
serializer: this.serialize,
410+
sanitizer: this.sanitize,
411+
});
412+
413+
await batcher(buffer);
414+
415+
this.res.write(buffer.read());
416+
}
417+
};
380418
}
381419

382420
export type {SessionOptions, SessionEvents, DefaultSessionState};

0 commit comments

Comments
 (0)