Skip to content

Commit db121d5

Browse files
committed
[ws] Allow server to manipulate store data after persistence starts
1 parent 31b1efe commit db121d5

File tree

5 files changed

+187
-10
lines changed

5 files changed

+187
-10
lines changed

src/@types/synchronizers/synchronizer-ws-server/docs.js

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -493,10 +493,20 @@
493493
* by the WsServer. As a result, the server MergeableStore will be kept in sync
494494
* with the clients on that path, and in turn with whatever persistence layer
495495
* you have configured. See the example below.
496+
*
497+
* It is not safe to add or manipulate data in the MergeableStore during the
498+
* `createPersisterForPath` function, since changes will probably be overwritten
499+
* when the Persister starts. If you wish to modify data - or upgrade a schema,
500+
* for example - you can have that function instead return an array containing
501+
* the Persister _and_ a callback that takes the MergeableStore. That callback
502+
* will get called after the Persister has started, and is an appropriate place
503+
* to manipulate data in a way that will be transmitted to clients. Again, see
504+
* the example below.
496505
* @param webSocketServer A WebSocketServer object from your server environment.
497506
* @param createPersisterForPath An optional function that will create a
498-
* Persister (with a MergeableStore) to synchronize with the clients on a given
499-
* path.
507+
* Persister to synchronize with the clients on a given path (or a two-item
508+
* array of Persister and callback that lets you handle data after persistence
509+
* has started).
500510
* @returns A reference to the new WsServer object.
501511
* @example
502512
* This example creates a WsServer that synchronizes two clients on a shared
@@ -610,6 +620,46 @@
610620
* rmSync('petShop.json');
611621
* ```
612622
* @example
623+
* This example creates a WsServer that persists a MergeableStore to file that
624+
* is synchronized with two clients on a shared path, but also which updates its
625+
* data once synchronization has started.
626+
*
627+
* ```js
628+
* import {WebSocketServer} from 'ws';
629+
* import {createFilePersister} from 'tinybase/persisters/persister-file';
630+
* import {createMergeableStore} from 'tinybase';
631+
* import {createWsServer} from 'tinybase/synchronizers/synchronizer-ws-server';
632+
* import {createWsSynchronizer} from 'tinybase/synchronizers/synchronizer-ws-client';
633+
* import {rmSync} from 'fs';
634+
*
635+
* // Server
636+
* const server = createWsServer(
637+
* new WebSocketServer({port: 8047}),
638+
* (pathId) => [
639+
* createFilePersister(createMergeableStore(), pathId + '.json'),
640+
* (store) => store.setValue('pathId', pathId),
641+
* ],
642+
* );
643+
*
644+
* const clientStore = createMergeableStore();
645+
* clientStore.setCell('pets', 'fido', 'species', 'dog');
646+
* const synchronizer = await createWsSynchronizer(
647+
* clientStore,
648+
* new WebSocket('ws://localhost:8047/petShop'),
649+
* );
650+
* await synchronizer.startSync();
651+
* // ...
652+
*
653+
* console.log(clientStore.getContent());
654+
* // -> [{pets: {fido: {species: 'dog'}}}, {"pathId": "petShop"}]
655+
*
656+
* synchronizer.destroy();
657+
* server.destroy();
658+
*
659+
* // Remove file for the purposes of this demo.
660+
* rmSync('petShop.json');
661+
* ```
662+
* @example
613663
* This example creates a WsServer with a custom listener that displays
614664
* information about the address of the client that connects to it.
615665
*

src/@types/synchronizers/synchronizer-ws-server/index.d.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import type {Id, IdOrNull, Ids} from '../../common/index.d.ts';
44
import type {Persister, Persists} from '../../persisters/index.d.ts';
55
import type {IdAddedOrRemoved} from '../../store/index.d.ts';
6+
import type {MergeableStore} from '../../mergeable-store/index.d.ts';
67
import type {WebSocketServer} from 'ws';
78

89
/// PathIdsListener
@@ -55,5 +56,10 @@ export function createWsServer<
5556
>,
5657
>(
5758
webSocketServer: WebSocketServer,
58-
createPersisterForPath?: (pathId: Id) => PathPersister | undefined,
59+
createPersisterForPath?: (
60+
pathId: Id,
61+
) =>
62+
| PathPersister
63+
| [PathPersister, (store: MergeableStore) => void]
64+
| undefined,
5965
): WsServer;

src/@types/synchronizers/synchronizer-ws-server/with-schemas/index.d.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type {
99
Persister,
1010
Persists,
1111
} from '../../../persisters/with-schemas/index.d.ts';
12+
import type {MergeableStore} from '../../../mergeable-store/with-schemas/index.d.ts';
1213
import type {WebSocketServer} from 'ws';
1314

1415
/// PathIdsListener
@@ -63,5 +64,10 @@ export function createWsServer<
6364
>,
6465
>(
6566
webSocketServer: WebSocketServer,
66-
createPersisterForPath?: (pathId: Id) => PathPersister | undefined,
67+
createPersisterForPath?: (
68+
pathId: Id,
69+
) =>
70+
| PathPersister
71+
| [PathPersister, (store: MergeableStore<Schemas>) => void]
72+
| undefined,
6773
): WsServer;

src/synchronizers/synchronizer-ws-server/index.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ import {
3737
ifPayloadValid,
3838
receivePayload,
3939
} from '../common.ts';
40+
import {ifNotUndefined, isArray} from '../../common/other.ts';
4041
import {IdSet2} from '../../common/set.ts';
4142
import type {MergeableStore} from '../../@types/mergeable-store/index.d.ts';
4243
import {createCustomSynchronizer} from '../index.ts';
4344
import {getListenerFunctions} from '../../common/listeners.ts';
44-
import {ifNotUndefined} from '../../common/other.ts';
4545
import {objFreeze} from '../../common/obj.ts';
4646

4747
enum Sc {
@@ -50,6 +50,7 @@ enum Sc {
5050
Synchronizer = 2,
5151
Send = 3,
5252
Buffer = 4,
53+
Then = 5,
5354
}
5455
enum ScState {
5556
Ready,
@@ -66,14 +67,20 @@ export const createWsServer = (<
6667
>,
6768
>(
6869
webSocketServer: WebSocketServer,
69-
createPersisterForPath?: (pathId: Id) => PathPersister | undefined,
70+
createPersisterForPath?: (
71+
pathId: Id,
72+
) =>
73+
| [PathPersister, (store: MergeableStore) => void]
74+
| PathPersister
75+
| undefined,
7076
) => {
7177
type ServerClient = [
7278
state: ScState,
7379
persister: PathPersister,
7480
synchronizer: Synchronizer,
7581
send: (payload: string) => void,
7682
buffer: [],
83+
then: (store: MergeableStore) => void,
7784
];
7885

7986
const pathIdListeners: IdSet2 = mapNew();
@@ -90,16 +97,18 @@ export const createWsServer = (<
9097
pathId: Id,
9198
clients: IdMap<WebSocket>,
9299
) =>
93-
ifNotUndefined(createPersisterForPath?.(pathId), (persister) => {
100+
ifNotUndefined(createPersisterForPath?.(pathId), (persisterMaybeThen) => {
94101
serverClient[Sc.State] = 1;
95-
serverClient[Sc.Persister] = persister;
102+
serverClient[Sc.Persister] = isArray(persisterMaybeThen)
103+
? persisterMaybeThen[0]
104+
: persisterMaybeThen;
96105
const messageHandler = getMessageHandler(
97106
SERVER_CLIENT_ID,
98107
clients,
99108
serverClient,
100109
);
101110
serverClient[Sc.Synchronizer] = createCustomSynchronizer(
102-
persister.getStore() as MergeableStore,
111+
serverClient[Sc.Persister].getStore() as MergeableStore,
103112
(toClientId, requestId, message, body) =>
104113
messageHandler(createPayload(toClientId, requestId, message, body)),
105114
(receive: Receive) =>
@@ -109,6 +118,9 @@ export const createWsServer = (<
109118
1,
110119
);
111120
serverClient[Sc.Buffer] = [];
121+
serverClient[Sc.Then] = isArray(persisterMaybeThen)
122+
? persisterMaybeThen[1]
123+
: (_) => 0;
112124
});
113125

114126
const startServerClient = async (serverClient: ServerClient) => {
@@ -118,6 +130,9 @@ export const createWsServer = (<
118130
serverClient[Sc.Persister].startAutoSave,
119131
serverClient[Sc.Synchronizer].startSync,
120132
);
133+
serverClient[Sc.Then](
134+
serverClient[Sc.Persister].getStore() as MergeableStore,
135+
);
121136
serverClient[Sc.State] = ScState.Ready;
122137
};
123138

test/unit/mergeables/synchronizer-ws-server.test.ts

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import type {Id, MergeableStore} from 'tinybase';
44
import {WebSocket, WebSocketServer} from 'ws';
5+
import {readFileSync, writeFileSync} from 'fs';
56
import type {WsServer} from 'tinybase/synchronizers/synchronizer-ws-server';
67
import type {WsSynchronizer} from 'tinybase/synchronizers/synchronizer-ws-client';
78
import {createFilePersister} from 'tinybase/persisters/persister-file';
@@ -10,7 +11,6 @@ import {createWsServer} from 'tinybase/synchronizers/synchronizer-ws-server';
1011
import {createWsSynchronizer} from 'tinybase/synchronizers/synchronizer-ws-client';
1112
import {join} from 'path';
1213
import {pause} from '../common/other.ts';
13-
import {readFileSync} from 'fs';
1414
import {resetHlc} from '../common/mergeable.ts';
1515
import tmp from 'tmp';
1616

@@ -259,6 +259,106 @@ describe('Persistence', () => {
259259
wsServer.destroy();
260260
});
261261

262+
describe('single client to existing path', () => {
263+
let serverStore: MergeableStore;
264+
265+
beforeEach(() => {
266+
serverStore = createMergeableStore('ss');
267+
writeFileSync(
268+
join(tmpDir, 'p1.json'),
269+
JSON.stringify([
270+
[
271+
{
272+
t1: [
273+
{
274+
r1: [
275+
{c1: [1, 'Nn1JUF-----7JQY8', 1003668370]},
276+
'',
277+
550994372,
278+
],
279+
},
280+
'',
281+
1072852846,
282+
],
283+
},
284+
'',
285+
1771939739,
286+
],
287+
[{}, '', 0],
288+
]),
289+
'utf-8',
290+
);
291+
});
292+
293+
test('alters data prematurely', async () => {
294+
const wsServer = createWsServer(
295+
new WebSocketServer({port: 8049}),
296+
(pathId) => {
297+
serverStore.setValue('p', pathId);
298+
return createPersister(serverStore, pathId);
299+
},
300+
);
301+
302+
const clientStore = createMergeableStore('s1');
303+
const synchronizer = await createWsSynchronizer(
304+
clientStore,
305+
new WebSocket('ws://localhost:8049/p1'),
306+
);
307+
await synchronizer.startSync();
308+
309+
await pause();
310+
expect(serverStore.getContent()).toEqual([{t1: {r1: {c1: 1}}}, {}]);
311+
312+
synchronizer.destroy();
313+
wsServer.destroy();
314+
});
315+
316+
test('alters data after path first persisted', async () => {
317+
const wsServer = createWsServer(
318+
new WebSocketServer({port: 8049}),
319+
(pathId) => {
320+
serverStore.setValue('p', pathId);
321+
return [
322+
createPersister(serverStore, pathId),
323+
(store) => store.setValue('p', pathId),
324+
];
325+
},
326+
);
327+
328+
const clientStore = createMergeableStore('s1');
329+
const synchronizer = await createWsSynchronizer(
330+
clientStore,
331+
new WebSocket('ws://localhost:8049/p1'),
332+
);
333+
await synchronizer.startSync();
334+
335+
await pause();
336+
expect(serverStore.getContent()).toEqual([
337+
{t1: {r1: {c1: 1}}},
338+
{p: 'p1'},
339+
]);
340+
expect(
341+
JSON.parse(readFileSync(join(tmpDir, 'p1.json'), 'utf-8')),
342+
).toEqual([
343+
[
344+
{
345+
t1: [
346+
{r1: [{c1: [1, 'Nn1JUF-----7JQY8', 1003668370]}, '', 550994372]},
347+
'',
348+
1072852846,
349+
],
350+
},
351+
'',
352+
1771939739,
353+
],
354+
[{p: ['p1', 'Nn1JUF----05JWdY', 328213929]}, '', 1622699135],
355+
]);
356+
357+
synchronizer.destroy();
358+
wsServer.destroy();
359+
});
360+
});
361+
262362
test('multiple clients, one path', async () => {
263363
const serverStore = createMergeableStore('ss');
264364
const wsServer = createWsServer(

0 commit comments

Comments
 (0)