Skip to content

Commit e284add

Browse files
committed
[ws] create and destroy PersisterForPath
1 parent f241d95 commit e284add

File tree

4 files changed

+67
-61
lines changed

4 files changed

+67
-61
lines changed

src/@types/synchronizers/synchronizer-ws-server/index.d.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,12 @@ export interface WsServer {
4949
}
5050

5151
/// createWsServer
52-
export function createWsServer(
52+
export function createWsServer<
53+
PathPersister extends Persister<
54+
Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore
55+
>,
56+
>(
5357
webSocketServer: WebSocketServer,
54-
createPersister?: (
55-
pathId: Id,
56-
) =>
57-
| Persister<Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore>
58-
| undefined,
58+
createPersisterForPath?: (pathId: Id) => Promise<PathPersister | undefined>,
59+
destroyPersisterForPath?: (pathId: Id, persister: PathPersister) => void,
5960
): WsServer;

src/@types/synchronizers/synchronizer-ws-server/with-schemas/index.d.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ export interface WsServer {
5555
}
5656

5757
/// createWsServer
58-
export function createWsServer<Schemas extends OptionalSchemas>(
58+
export function createWsServer<
59+
Schemas extends OptionalSchemas,
60+
PathPersister extends Persister<
61+
Schemas,
62+
Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore
63+
>,
64+
>(
5965
webSocketServer: WebSocketServer,
60-
createPersister?: (
61-
pathId: Id,
62-
) =>
63-
| Persister<
64-
Schemas,
65-
Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore
66-
>
67-
| undefined,
66+
createPersisterForPath?: (pathId: Id) => Promise<PathPersister | undefined>,
67+
destroyPersisterForPath?: (pathId: Id, persister: PathPersister) => void,
6868
): WsServer;

src/synchronizers/synchronizer-ws-server/index.ts

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -43,25 +43,24 @@ import {getListenerFunctions} from '../../common/listeners.ts';
4343
import {ifNotUndefined} from '../../common/other.ts';
4444
import {objFreeze} from '../../common/obj.ts';
4545

46-
type ServerClient = {
47-
persister: Persister<
48-
Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore
49-
>;
50-
synchronizer: Synchronizer;
51-
send: (payload: string) => void;
52-
};
53-
5446
const PATH_REGEX = /\/([^?]*)/;
5547
const SERVER_CLIENT_ID = 'S';
5648

57-
export const createWsServer = ((
49+
export const createWsServer = (<
50+
PathPersister extends Persister<
51+
Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore
52+
>,
53+
>(
5854
webSocketServer: WebSocketServer,
59-
createPersister?: (
60-
path: Id,
61-
) =>
62-
| Persister<Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore>
63-
| undefined,
55+
createPersisterForPath?: (pathId: Id) => Promise<PathPersister | undefined>,
56+
destroyPersisterForPath?: (pathId: Id, persister: PathPersister) => void,
6457
) => {
58+
type ServerClient = {
59+
persister: PathPersister;
60+
synchronizer: Synchronizer;
61+
send: (payload: string) => void;
62+
};
63+
6564
const pathIdListeners: IdSet2 = mapNew();
6665
const clientIdListeners: IdSet2 = mapNew();
6766
const clientsByPath: IdMap2<WebSocket> = mapNew();
@@ -71,34 +70,34 @@ export const createWsServer = ((
7170
() => wsServer,
7271
);
7372

74-
const startServerClient = (pathId: Id) =>
75-
ifNotUndefined(createPersister?.(pathId), async (persister) => {
76-
const serverClient = mapEnsure(
77-
serverClientsByPath,
78-
pathId,
79-
() => ({persister}) as ServerClient,
80-
);
81-
const messageHandler = getMessageHandler(SERVER_CLIENT_ID, pathId);
82-
83-
serverClient.synchronizer = await createCustomSynchronizer(
84-
persister.getStore() as MergeableStore,
85-
(toClientId, requestId, message, body) =>
86-
messageHandler(createPayload(toClientId, requestId, message, body)),
87-
(receive: Receive) =>
88-
(serverClient.send = (payload) => receivePayload(payload, receive)),
89-
() => {},
90-
0.1,
91-
).startSync();
92-
await persister.startAutoLoad();
93-
await persister.startAutoSave();
94-
});
73+
const startServerClient = async (pathId: Id) =>
74+
ifNotUndefined(
75+
await createPersisterForPath?.(pathId),
76+
async (persister) => {
77+
const serverClient = mapEnsure(
78+
serverClientsByPath,
79+
pathId,
80+
() => ({persister}) as ServerClient,
81+
);
82+
const messageHandler = getMessageHandler(SERVER_CLIENT_ID, pathId);
83+
serverClient.synchronizer = await createCustomSynchronizer(
84+
persister.getStore() as MergeableStore,
85+
(toClientId, requestId, message, body) =>
86+
messageHandler(createPayload(toClientId, requestId, message, body)),
87+
(receive: Receive) =>
88+
(serverClient.send = (payload) => receivePayload(payload, receive)),
89+
() => {},
90+
0.1,
91+
).startSync();
92+
},
93+
);
9594

9695
const stopServerClient = (pathId: Id) =>
9796
ifNotUndefined(
9897
mapGet(serverClientsByPath, pathId),
9998
({persister, synchronizer}) => {
100-
persister.destroy();
10199
synchronizer?.destroy();
100+
destroyPersisterForPath?.(pathId, persister);
102101
collDel(serverClientsByPath, pathId);
103102
},
104103
);
@@ -129,11 +128,11 @@ export const createWsServer = ((
129128

130129
webSocketServer.on('connection', (webSocket, request) =>
131130
ifNotUndefined(request.url?.match(PATH_REGEX), ([, pathId]) =>
132-
ifNotUndefined(request.headers['sec-websocket-key'], (clientId) => {
131+
ifNotUndefined(request.headers['sec-websocket-key'], async (clientId) => {
133132
const clients = mapEnsure(clientsByPath, pathId, mapNew<Id, WebSocket>);
134133
if (collIsEmpty(clients)) {
135134
callListeners(pathIdListeners, undefined, pathId, 1);
136-
startServerClient(pathId);
135+
await startServerClient(pathId);
137136
}
138137
mapSet(clients, clientId, webSocket);
139138
callListeners(clientIdListeners, [pathId], clientId, 1);

test/unit/other/synchronizer-ws-server.test.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -202,16 +202,22 @@ describe('Persistence', () => {
202202
const serverStore = createMergeableStore('ss');
203203
const wsServer = createWsServer(
204204
new WebSocketServer({port: 8050}),
205-
(pathId) => createPersister(serverStore, pathId),
205+
async (pathId) => {
206+
const persister = createPersister(serverStore, pathId);
207+
await persister.startAutoLoad();
208+
await persister.startAutoSave();
209+
return persister;
210+
},
211+
(_, persister) => persister.destroy(),
206212
);
207213

208-
const store1 = createMergeableStore('s1');
209-
const synchronizer1 = await createWsSynchronizer(
210-
store1,
214+
const clientStore = createMergeableStore('s1');
215+
const synchronizer = await createWsSynchronizer(
216+
clientStore,
211217
new WebSocket('ws://localhost:8050'),
212218
);
213-
await synchronizer1.startSync();
214-
store1.setCell('pets', 'fido', 'legs', 4);
219+
await synchronizer.startSync();
220+
clientStore.setCell('pets', 'fido', 'legs', 4);
215221

216222
await pause();
217223
expect(serverStore.getTables()).toEqual({pets: {fido: {legs: 4}}});
@@ -238,7 +244,7 @@ describe('Persistence', () => {
238244

239245
serverStore.setCell('pets', 'felix', 'legs', 3);
240246
await pause();
241-
expect(store1.getTables()).toEqual({
247+
expect(clientStore.getTables()).toEqual({
242248
pets: {fido: {legs: 4}, felix: {legs: 3}},
243249
});
244250
expect(JSON.parse(readFileSync(join(tmpDir, '.json'), 'utf-8'))).toEqual([
@@ -267,7 +273,7 @@ describe('Persistence', () => {
267273
[{}, '', 0],
268274
]);
269275

270-
synchronizer1.destroy();
276+
synchronizer.destroy();
271277
wsServer.destroy();
272278
});
273279
});

0 commit comments

Comments
 (0)