Skip to content

Commit 453d8dd

Browse files
committed
[ws] Server persistence pt2
1 parent 1c38385 commit 453d8dd

File tree

4 files changed

+143
-72
lines changed

4 files changed

+143
-72
lines changed

src/synchronizers/common.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
1+
import type {Id, IdOrNull} from '../@types/common/index.d.ts';
12
import type {Message, Receive} from '../@types/synchronizers/index.d.ts';
23
import {
34
jsonParseWithUndefined,
45
jsonStringWithUndefined,
56
} from '../common/json.ts';
67
import {EMPTY_STRING} from '../common/strings.ts';
7-
import type {IdOrNull} from '../@types/common/index.d.ts';
88
import {slice} from '../common/other.ts';
99

10-
export const MESSAGE_SEPARATOR = '\n';
10+
const MESSAGE_SEPARATOR = '\n';
1111

1212
export const ifPayloadValid = (
1313
payload: string,
14-
then: (fromClientId: string, remainder: string) => void,
14+
then: (clientId: string, remainder: string) => void,
1515
) => {
1616
const splitAt = payload.indexOf(MESSAGE_SEPARATOR);
1717
splitAt !== -1
1818
? then(slice(payload, 0, splitAt), slice(payload, splitAt + 1))
1919
: 0;
2020
};
2121

22-
export const unpackAndReceiveWsPayload = (payload: string, receive: Receive) =>
22+
export const receivePayload = (payload: string, receive: Receive) =>
2323
ifPayloadValid(payload, (fromClientId, remainder) =>
2424
receive(
2525
fromClientId,
@@ -31,10 +31,11 @@ export const unpackAndReceiveWsPayload = (payload: string, receive: Receive) =>
3131
),
3232
);
3333

34-
export const packWsPayload = (
34+
export const createPayload = (
3535
toClientId: IdOrNull,
3636
...args: [requestId: IdOrNull, message: Message, body: any]
3737
): string =>
38-
(toClientId ?? EMPTY_STRING) +
39-
MESSAGE_SEPARATOR +
40-
jsonStringWithUndefined(args);
38+
createRawPayload(toClientId ?? EMPTY_STRING, jsonStringWithUndefined(args));
39+
40+
export const createRawPayload = (toClientId: Id, remainder: string): string =>
41+
toClientId + MESSAGE_SEPARATOR + remainder;

src/synchronizers/synchronizer-ws-client/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type {
33
WebSocketTypes,
44
createWsSynchronizer as createWsSynchronizerDecl,
55
} from '../../@types/synchronizers/synchronizer-ws-client/index.d.ts';
6-
import {packWsPayload, unpackAndReceiveWsPayload} from '../common.ts';
6+
import {createPayload, receivePayload} from '../common.ts';
77
import type {IdOrNull} from '../../@types/common/index.d.ts';
88
import type {MergeableStore} from '../../@types/mergeable-store/index.d.ts';
99
import {UTF8} from '../../common/strings.ts';
@@ -23,13 +23,13 @@ export const createWsSynchronizer = (async <
2323

2424
const registerReceive = (receive: Receive): void =>
2525
addEventListener('message', ({data}) =>
26-
unpackAndReceiveWsPayload(data.toString(UTF8), receive),
26+
receivePayload(data.toString(UTF8), receive),
2727
);
2828

2929
const send = (
3030
toClientId: IdOrNull,
3131
...args: [requestId: IdOrNull, message: Message, body: any]
32-
): void => webSocket.send(packWsPayload(toClientId, ...args));
32+
): void => webSocket.send(createPayload(toClientId, ...args));
3333

3434
const destroy = (): void => {
3535
webSocket.close();

src/synchronizers/synchronizer-ws-server/index.ts

Lines changed: 86 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ import {
1717
mapNew,
1818
mapSet,
1919
} from '../../common/map.ts';
20+
import type {Persister, Persists} from '../../@types/persisters/index.d.ts';
21+
import type {
22+
Receive,
23+
Synchronizer,
24+
} from '../../@types/synchronizers/index.d.ts';
2025
import {WebSocket, WebSocketServer} from 'ws';
2126
import {
2227
collClear,
@@ -25,86 +30,124 @@ import {
2530
collSize,
2631
collSize2,
2732
} from '../../common/coll.ts';
28-
import {ifNotUndefined, slice} from '../../common/other.ts';
33+
import {
34+
createPayload,
35+
createRawPayload,
36+
ifPayloadValid,
37+
receivePayload,
38+
} from '../common.ts';
2939
import {IdSet2} from '../../common/set.ts';
30-
import {MESSAGE_SEPARATOR} from '../common.ts';
3140
import type {MergeableStore} from '../../@types/mergeable-store/index.d.ts';
32-
import type {Persister} from '../../@types/persisters/index.d.ts';
33-
import {createMergeableStore} from '../../mergeable-store/index.ts';
41+
import {createCustomSynchronizer} from '../index.ts';
3442
import {getListenerFunctions} from '../../common/listeners.ts';
43+
import {ifNotUndefined} from '../../common/other.ts';
3544
import {objFreeze} from '../../common/obj.ts';
3645

46+
type ServerClient = {
47+
persister: Persister<
48+
Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore
49+
>;
50+
synchronizer: Synchronizer;
51+
send: (payload: string) => void;
52+
};
53+
3754
const PATH_REGEX = /\/([^?]*)/;
55+
const SERVER_CLIENT_ID = 'S';
3856

3957
export const createWsServer = ((
4058
webSocketServer: WebSocketServer,
41-
createPersister?: (store: MergeableStore, path: Id) => Persister | undefined,
59+
createPersister?: (
60+
path: Id,
61+
) =>
62+
| Persister<Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore>
63+
| undefined,
4264
) => {
4365
const pathIdListeners: IdSet2 = mapNew();
4466
const clientIdListeners: IdSet2 = mapNew();
4567
const clientsByPath: IdMap2<WebSocket> = mapNew();
46-
const persistersByPath: IdMap<Persister> = mapNew();
68+
const serverClientsByPath: IdMap<ServerClient> = mapNew();
4769

4870
const [addListener, callListeners, delListenerImpl] = getListenerFunctions(
4971
() => wsServer,
5072
);
5173

52-
const startPath = (pathId: Id) => {
53-
callListeners(pathIdListeners, undefined, pathId, 1);
74+
const startServerClient = (pathId: Id) =>
75+
ifNotUndefined(createPersister?.(pathId), async (persister) => {
76+
const serverClient = mapEnsure(
77+
serverClientsByPath,
78+
pathId,
79+
() => ({persister}) as ServerClient,
80+
);
81+
const messageHandler = getMessageHandler(SERVER_CLIENT_ID, pathId);
82+
83+
serverClient.synchronizer = await createCustomSynchronizer(
84+
persister.getStore() as MergeableStore,
85+
(toClientId, requestId, message, body) =>
86+
messageHandler(createPayload(toClientId, requestId, message, body)),
87+
(receive: Receive) =>
88+
(serverClient.send = (payload) => receivePayload(payload, receive)),
89+
() => {},
90+
0.1,
91+
).startSync();
92+
await persister.startAutoLoad();
93+
await persister.startAutoSave();
94+
});
95+
96+
const stopServerClient = (pathId: Id) =>
5497
ifNotUndefined(
55-
createPersister?.(createMergeableStore(), pathId),
56-
async (persister) => {
57-
mapSet(persistersByPath, pathId, persister);
58-
await persister.startAutoLoad();
59-
await persister.startAutoSave();
98+
mapGet(serverClientsByPath, pathId),
99+
({persister, synchronizer}) => {
100+
persister.destroy();
101+
synchronizer?.destroy();
102+
collDel(serverClientsByPath, pathId);
60103
},
61104
);
62-
};
63105

64-
const finishPath = (pathId: Id) => {
65-
ifNotUndefined(mapGet(persistersByPath, pathId), (persister) =>
66-
persister.destroy(),
67-
);
68-
callListeners(pathIdListeners, undefined, pathId, -1);
106+
const getMessageHandler = (clientId: Id, pathId: Id) => {
107+
const clients = mapGet(clientsByPath, pathId);
108+
const serverClient = mapGet(serverClientsByPath, pathId);
109+
return (payload: string) =>
110+
ifPayloadValid(payload, (toClientId, remainder) => {
111+
const forwardedPayload = createRawPayload(clientId, remainder);
112+
if (toClientId === EMPTY_STRING) {
113+
clientId !== SERVER_CLIENT_ID
114+
? serverClient?.send(forwardedPayload)
115+
: 0;
116+
mapForEach(clients, (otherClientId, otherWebSocket) =>
117+
otherClientId !== clientId
118+
? otherWebSocket.send(forwardedPayload)
119+
: 0,
120+
);
121+
} else {
122+
(toClientId === SERVER_CLIENT_ID
123+
? serverClient
124+
: mapGet(clients, toClientId)
125+
)?.send(forwardedPayload);
126+
}
127+
});
69128
};
70129

71130
webSocketServer.on('connection', (webSocket, request) =>
72131
ifNotUndefined(request.url?.match(PATH_REGEX), ([, pathId]) =>
73132
ifNotUndefined(request.headers['sec-websocket-key'], (clientId) => {
74133
const clients = mapEnsure(clientsByPath, pathId, mapNew<Id, WebSocket>);
75-
mapSet(clients, clientId, webSocket);
76-
77-
if (clients.size == 1) {
78-
startPath(pathId);
134+
if (collIsEmpty(clients)) {
135+
callListeners(pathIdListeners, undefined, pathId, 1);
136+
startServerClient(pathId);
79137
}
138+
mapSet(clients, clientId, webSocket);
80139
callListeners(clientIdListeners, [pathId], clientId, 1);
81140

82-
webSocket.on('message', (data) => {
83-
const payload = data.toString(UTF8);
84-
const splitAt = payload.indexOf(MESSAGE_SEPARATOR);
85-
if (splitAt !== -1) {
86-
const toClientId = slice(payload, 0, splitAt);
87-
const message = slice(payload, splitAt + 1);
88-
toClientId === EMPTY_STRING
89-
? mapForEach(clients, (otherClientId, otherWebSocket) =>
90-
otherClientId != clientId
91-
? otherWebSocket.send(
92-
clientId + MESSAGE_SEPARATOR + message,
93-
)
94-
: 0,
95-
)
96-
: mapGet(clients, toClientId)?.send(
97-
clientId + MESSAGE_SEPARATOR + message,
98-
);
99-
}
100-
});
141+
const messageHandler = getMessageHandler(clientId, pathId);
142+
webSocket.on('message', (data) => messageHandler(data.toString(UTF8)));
101143

102144
webSocket.on('close', () => {
103145
collDel(clients, clientId);
104146
callListeners(clientIdListeners, [pathId], clientId, -1);
105147
if (collIsEmpty(clients)) {
106148
collDel(clientsByPath, pathId);
107-
finishPath(pathId);
149+
stopServerClient(pathId);
150+
callListeners(pathIdListeners, undefined, pathId, -1);
108151
}
109152
});
110153
}),

0 commit comments

Comments
 (0)