Skip to content

Commit 0fbe06f

Browse files
committed
[ws] ServerClient refactor
1 parent 59dc2c9 commit 0fbe06f

File tree

1 file changed

+78
-54
lines changed
  • src/synchronizers/synchronizer-ws-server

1 file changed

+78
-54
lines changed

src/synchronizers/synchronizer-ws-server/index.ts

Lines changed: 78 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import type {
2323
Synchronizer,
2424
} from '../../@types/synchronizers/index.d.ts';
2525
import {WebSocket, WebSocketServer} from 'ws';
26+
import {arrayForEach, arrayPush} from '../../common/array.ts';
2627
import {
2728
collClear,
2829
collDel,
@@ -43,6 +44,19 @@ import {getListenerFunctions} from '../../common/listeners.ts';
4344
import {ifNotUndefined} from '../../common/other.ts';
4445
import {objFreeze} from '../../common/obj.ts';
4546

47+
enum SC {
48+
State = 0,
49+
Persister = 1,
50+
Synchronizer = 2,
51+
Send = 3,
52+
Buffer = 4,
53+
}
54+
enum SCState {
55+
Ready,
56+
Configured,
57+
Starting,
58+
}
59+
4660
const PATH_REGEX = /\/([^?]*)/;
4761
const SERVER_CLIENT_ID = 'S';
4862

@@ -54,11 +68,13 @@ export const createWsServer = (<
5468
webSocketServer: WebSocketServer,
5569
createPersisterForPath?: (pathId: Id) => PathPersister | undefined,
5670
) => {
57-
type ServerClient = {
58-
persister: PathPersister;
59-
synchronizer: Synchronizer;
60-
send: (payload: string) => void;
61-
};
71+
type ServerClient = [
72+
state: SCState,
73+
persister: PathPersister,
74+
synchronizer: Synchronizer,
75+
send: (payload: string) => void,
76+
buffer: [],
77+
];
6278

6379
const pathIdListeners: IdSet2 = mapNew();
6480
const clientIdListeners: IdSet2 = mapNew();
@@ -69,102 +85,110 @@ export const createWsServer = (<
6985
() => wsServer,
7086
);
7187

72-
const createServerClient = (pathId: Id) =>
88+
const configureServerClient = (
89+
serverClient: ServerClient,
90+
pathId: Id,
91+
clients: IdMap<WebSocket>,
92+
) =>
7393
ifNotUndefined(createPersisterForPath?.(pathId), (persister) => {
74-
const serverClient = mapEnsure(
75-
serverClientsByPath,
76-
pathId,
77-
() => ({persister}) as ServerClient,
94+
serverClient[SC.State] = 1;
95+
serverClient[SC.Persister] = persister;
96+
const messageHandler = getMessageHandler(
97+
SERVER_CLIENT_ID,
98+
clients,
99+
serverClient,
78100
);
79-
const messageHandler = getMessageHandler(SERVER_CLIENT_ID, pathId);
80-
serverClient.synchronizer = createCustomSynchronizer(
101+
serverClient[SC.Synchronizer] = createCustomSynchronizer(
81102
persister.getStore() as MergeableStore,
82103
(toClientId, requestId, message, body) =>
83104
messageHandler(createPayload(toClientId, requestId, message, body)),
84105
(receive: Receive) =>
85-
(serverClient.send = (payload) => receivePayload(payload, receive)),
106+
(serverClient[SC.Send] = (payload: string) =>
107+
receivePayload(payload, receive)),
86108
() => {},
87109
1,
88110
);
111+
serverClient[SC.Buffer] = [];
89112
});
90113

91-
const startServerClient = async (pathId: Id) =>
92-
await ifNotUndefined(
93-
mapGet(serverClientsByPath, pathId),
94-
async ({persister, synchronizer}) => {
95-
await persister.schedule(
96-
persister.startAutoLoad,
97-
persister.startAutoSave,
98-
synchronizer.startSync,
99-
);
100-
},
114+
const startServerClient = async (serverClient: ServerClient) => {
115+
serverClient[SC.State] = SCState.Starting;
116+
await serverClient[SC.Persister].schedule(
117+
serverClient[SC.Persister].startAutoLoad,
118+
serverClient[SC.Persister].startAutoSave,
119+
serverClient[SC.Synchronizer].startSync,
101120
);
121+
serverClient[SC.State] = SCState.Ready;
122+
};
102123

103-
const stopServerClient = (pathId: Id) =>
104-
ifNotUndefined(
105-
mapGet(serverClientsByPath, pathId),
106-
({persister, synchronizer}) => {
107-
synchronizer?.destroy();
108-
persister?.destroy();
109-
collDel(serverClientsByPath, pathId);
110-
},
111-
);
124+
const stopServerClient = (serverClient: ServerClient) => {
125+
serverClient[SC.Persister]?.destroy();
126+
serverClient[SC.Synchronizer]?.destroy();
127+
};
112128

113-
const getMessageHandler = (clientId: Id, pathId: Id) => {
114-
const clients = mapGet(clientsByPath, pathId);
115-
const serverClient = mapGet(serverClientsByPath, pathId);
116-
const handler = (payload: string) =>
129+
const getMessageHandler =
130+
(clientId: Id, clients: IdMap<WebSocket>, serverClient: ServerClient) =>
131+
(payload: string) =>
117132
ifPayloadValid(payload, (toClientId, remainder) => {
118133
const forwardedPayload = createRawPayload(clientId, remainder);
119134
if (toClientId === EMPTY_STRING) {
120135
clientId !== SERVER_CLIENT_ID
121-
? serverClient?.send(forwardedPayload)
136+
? serverClient[SC.Send]?.(forwardedPayload)
122137
: 0;
123138
mapForEach(clients, (otherClientId, otherWebSocket) =>
124139
otherClientId !== clientId
125140
? otherWebSocket.send(forwardedPayload)
126141
: 0,
127142
);
128143
} else {
129-
(toClientId === SERVER_CLIENT_ID
130-
? serverClient
131-
: mapGet(clients, toClientId)
132-
)?.send(forwardedPayload);
144+
toClientId === SERVER_CLIENT_ID
145+
? serverClient[SC.Send]?.(forwardedPayload)
146+
: mapGet(clients, toClientId)?.send(forwardedPayload);
133147
}
134148
});
135-
return serverClient?.persister
136-
? (payload: string) =>
137-
serverClient.persister.schedule(async () => handler(payload))
138-
: handler;
139-
};
140149

141150
webSocketServer.on('connection', (webSocket, request) =>
142151
ifNotUndefined(request.url?.match(PATH_REGEX), ([, pathId]) =>
143152
ifNotUndefined(request.headers['sec-websocket-key'], async (clientId) => {
144153
const clients = mapEnsure(clientsByPath, pathId, mapNew<Id, WebSocket>);
145-
let shouldStartServerClient = 0;
154+
const serverClient: ServerClient = mapEnsure(
155+
serverClientsByPath,
156+
pathId,
157+
() => [SCState.Ready] as any,
158+
);
159+
const messageHandler = getMessageHandler(
160+
clientId,
161+
clients,
162+
serverClient,
163+
);
146164

147165
if (collIsEmpty(clients)) {
148166
callListeners(pathIdListeners, undefined, pathId, 1);
149-
createServerClient(pathId);
150-
shouldStartServerClient = 1;
167+
configureServerClient(serverClient, pathId, clients);
151168
}
152169
mapSet(clients, clientId, webSocket);
153170
callListeners(clientIdListeners, [pathId], clientId, 1);
154171

155-
const messageHandler = getMessageHandler(clientId, pathId);
156-
webSocket.on('message', (data) => messageHandler(data.toString(UTF8)));
172+
webSocket.on('message', (data) => {
173+
const payload = data.toString(UTF8);
174+
serverClient[SC.State] == SCState.Ready
175+
? messageHandler(payload)
176+
: arrayPush(serverClient[SC.Buffer], payload);
177+
});
157178

158-
if (shouldStartServerClient) {
159-
await startServerClient(pathId);
179+
if (serverClient[SC.State] == SCState.Configured) {
180+
await startServerClient(serverClient);
181+
arrayForEach(serverClient[SC.Buffer], messageHandler);
182+
serverClient[SC.Buffer] = [];
160183
}
161184

162185
webSocket.on('close', () => {
163186
collDel(clients, clientId);
164187
callListeners(clientIdListeners, [pathId], clientId, -1);
165188
if (collIsEmpty(clients)) {
189+
stopServerClient(serverClient);
190+
collDel(serverClientsByPath, pathId);
166191
collDel(clientsByPath, pathId);
167-
stopServerClient(pathId);
168192
callListeners(pathIdListeners, undefined, pathId, -1);
169193
}
170194
});

0 commit comments

Comments
 (0)