Skip to content

Commit fd3424a

Browse files
committed
[scm] Stash 5.1
1 parent e284add commit fd3424a

File tree

9 files changed

+82
-326
lines changed

9 files changed

+82
-326
lines changed

src/@types/synchronizers/synchronizer-ws-server/index.d.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
/// synchronizer-ws-server
22

33
import type {Id, IdOrNull, Ids} from '../../common/index.d.ts';
4-
import type {Persister, Persists} from '../../persisters/index.d.ts';
54
import type {IdAddedOrRemoved} from '../../store/index.d.ts';
65
import type {WebSocketServer} from 'ws';
76

@@ -49,12 +48,4 @@ export interface WsServer {
4948
}
5049

5150
/// createWsServer
52-
export function createWsServer<
53-
PathPersister extends Persister<
54-
Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore
55-
>,
56-
>(
57-
webSocketServer: WebSocketServer,
58-
createPersisterForPath?: (pathId: Id) => Promise<PathPersister | undefined>,
59-
destroyPersisterForPath?: (pathId: Id, persister: PathPersister) => void,
60-
): WsServer;
51+
export function createWsServer(webSocketServer: WebSocketServer): WsServer;

src/@types/synchronizers/synchronizer-ws-server/with-schemas/index.d.ts

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
/// synchronizer-ws-server
22

33
import type {Id, IdOrNull, Ids} from '../../../common/with-schemas/index.d.ts';
4-
import type {
5-
IdAddedOrRemoved,
6-
OptionalSchemas,
7-
} from '../../../store/with-schemas/index.d.ts';
8-
import type {
9-
Persister,
10-
Persists,
11-
} from '../../../persisters/with-schemas/index.d.ts';
4+
import type {IdAddedOrRemoved} from '../../../store/with-schemas/index.d.ts';
125
import type {WebSocketServer} from 'ws';
136

147
/// PathIdsListener
@@ -55,14 +48,4 @@ export interface WsServer {
5548
}
5649

5750
/// createWsServer
58-
export function createWsServer<
59-
Schemas extends OptionalSchemas,
60-
PathPersister extends Persister<
61-
Schemas,
62-
Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore
63-
>,
64-
>(
65-
webSocketServer: WebSocketServer,
66-
createPersisterForPath?: (pathId: Id) => Promise<PathPersister | undefined>,
67-
destroyPersisterForPath?: (pathId: Id, persister: PathPersister) => void,
68-
): WsServer;
51+
export function createWsServer(webSocketServer: WebSocketServer): WsServer;

src/persisters/index.ts

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -179,24 +179,19 @@ export const createCustomPersister = <
179179
initialContent?: Content,
180180
): Promise<Persister<Persist>> => {
181181
await stopAutoLoad().load(initialContent);
182-
try {
183-
autoLoadHandle = addPersisterListener(async (content, changes) => {
184-
if (changes || content) {
185-
/*! istanbul ignore else */
186-
if (loadSave != 2) {
187-
loadSave = 1;
188-
loads++;
189-
setContentOrChanges(changes ?? content);
190-
loadSave = 0;
191-
}
192-
} else {
193-
await load();
182+
autoLoadHandle = addPersisterListener(async (content, changes) => {
183+
if (changes || content) {
184+
/*! istanbul ignore else */
185+
if (loadSave != 2) {
186+
loadSave = 1;
187+
loads++;
188+
setContentOrChanges(changes ?? content);
189+
loadSave = 0;
194190
}
195-
});
196-
} catch (error) {
197-
/*! istanbul ignore next */
198-
onIgnoredError?.(error);
199-
}
191+
} else {
192+
await load();
193+
}
194+
});
200195
return persister;
201196
};
202197

src/persisters/persister-file/index.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import {EMPTY_STRING, UTF8} from '../../common/strings.ts';
2-
import {FSWatcher, existsSync, watch, writeFileSync} from 'fs';
1+
import {FSWatcher, watch} from 'fs';
32
import type {
43
FilePersister,
54
createFilePersister as createFilePersisterDecl,
@@ -17,6 +16,7 @@ import {
1716
import {readFile, writeFile} from 'fs/promises';
1817
import type {MergeableStore} from '../../@types/mergeable-store/index.d.ts';
1918
import type {Store} from '../../@types/store/index.d.ts';
19+
import {UTF8} from '../../common/strings.ts';
2020

2121
export const createFilePersister = ((
2222
store: Store | MergeableStore,
@@ -34,12 +34,7 @@ export const createFilePersister = ((
3434

3535
const addPersisterListener = (
3636
listener: PersisterListener<PersistsType.StoreOrMergeableStore>,
37-
): FSWatcher => {
38-
if (!existsSync(filePath)) {
39-
writeFileSync(filePath, EMPTY_STRING, UTF8);
40-
}
41-
return watch(filePath, () => listener());
42-
};
37+
): FSWatcher => watch(filePath, () => listener());
4338

4439
const delPersisterListener = (watcher: FSWatcher): void => watcher?.close();
4540

src/synchronizers/common.ts

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1 @@
1-
import type {Id, IdOrNull} from '../@types/common/index.d.ts';
2-
import type {Message, Receive} from '../@types/synchronizers/index.d.ts';
3-
import {
4-
jsonParseWithUndefined,
5-
jsonStringWithUndefined,
6-
} from '../common/json.ts';
7-
import {EMPTY_STRING} from '../common/strings.ts';
8-
import {slice} from '../common/other.ts';
9-
10-
const MESSAGE_SEPARATOR = '\n';
11-
12-
export const ifPayloadValid = (
13-
payload: string,
14-
then: (clientId: string, remainder: string) => void,
15-
) => {
16-
const splitAt = payload.indexOf(MESSAGE_SEPARATOR);
17-
splitAt !== -1
18-
? then(slice(payload, 0, splitAt), slice(payload, splitAt + 1))
19-
: 0;
20-
};
21-
22-
export const receivePayload = (payload: string, receive: Receive) =>
23-
ifPayloadValid(payload, (fromClientId, remainder) =>
24-
receive(
25-
fromClientId,
26-
...(jsonParseWithUndefined(remainder) as [
27-
requestId: IdOrNull,
28-
message: Message,
29-
body: any,
30-
]),
31-
),
32-
);
33-
34-
export const createPayload = (
35-
toClientId: IdOrNull,
36-
...args: [requestId: IdOrNull, message: Message, body: any]
37-
): string =>
38-
createRawPayload(toClientId ?? EMPTY_STRING, jsonStringWithUndefined(args));
39-
40-
export const createRawPayload = (toClientId: Id, remainder: string): string =>
41-
toClientId + MESSAGE_SEPARATOR + remainder;
1+
export const MESSAGE_SEPARATOR = '\n';

src/synchronizers/synchronizer-ws-client/index.ts

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1+
import {EMPTY_STRING, UTF8} from '../../common/strings.ts';
12
import type {Message, Receive} from '../../@types/synchronizers/index.d.ts';
23
import type {
34
WebSocketTypes,
45
createWsSynchronizer as createWsSynchronizerDecl,
56
} from '../../@types/synchronizers/synchronizer-ws-client/index.d.ts';
6-
import {createPayload, receivePayload} from '../common.ts';
7+
import {
8+
jsonParseWithUndefined,
9+
jsonStringWithUndefined,
10+
} from '../../common/json.ts';
11+
import {promiseNew, slice} from '../../common/other.ts';
712
import type {IdOrNull} from '../../@types/common/index.d.ts';
13+
import {MESSAGE_SEPARATOR} from '../common.ts';
814
import type {MergeableStore} from '../../@types/mergeable-store/index.d.ts';
9-
import {UTF8} from '../../common/strings.ts';
1015
import {createCustomSynchronizer} from '../index.ts';
11-
import {promiseNew} from '../../common/other.ts';
1216

1317
export const createWsSynchronizer = (async <
1418
WebSocketType extends WebSocketTypes,
@@ -22,14 +26,30 @@ export const createWsSynchronizer = (async <
2226
(webSocket.addEventListener as any)(event, handler);
2327

2428
const registerReceive = (receive: Receive): void =>
25-
addEventListener('message', ({data}) =>
26-
receivePayload(data.toString(UTF8), receive),
27-
);
29+
addEventListener('message', ({data}) => {
30+
const payload = data.toString(UTF8);
31+
const splitAt = payload.indexOf(MESSAGE_SEPARATOR);
32+
if (splitAt !== -1) {
33+
receive(
34+
slice(data, 0, splitAt),
35+
...(jsonParseWithUndefined(slice(data, splitAt + 1)) as [
36+
requestId: IdOrNull,
37+
message: Message,
38+
body: any,
39+
]),
40+
);
41+
}
42+
});
2843

2944
const send = (
3045
toClientId: IdOrNull,
3146
...args: [requestId: IdOrNull, message: Message, body: any]
32-
): void => webSocket.send(createPayload(toClientId, ...args));
47+
): void =>
48+
webSocket.send(
49+
(toClientId ?? EMPTY_STRING) +
50+
MESSAGE_SEPARATOR +
51+
jsonStringWithUndefined(args),
52+
);
3353

3454
const destroy = (): void => {
3555
webSocket.close();

src/synchronizers/synchronizer-ws-server/index.ts

Lines changed: 26 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import type {
88
import {EMPTY_STRING, UTF8} from '../../common/strings.ts';
99
import type {Id, IdOrNull, Ids} from '../../@types/common/index.d.ts';
1010
import {
11-
IdMap,
1211
IdMap2,
1312
mapEnsure,
1413
mapForEach,
@@ -17,11 +16,6 @@ import {
1716
mapNew,
1817
mapSet,
1918
} from '../../common/map.ts';
20-
import type {Persister, Persists} from '../../@types/persisters/index.d.ts';
21-
import type {
22-
Receive,
23-
Synchronizer,
24-
} from '../../@types/synchronizers/index.d.ts';
2519
import {WebSocket, WebSocketServer} from 'ws';
2620
import {
2721
collClear,
@@ -30,122 +24,59 @@ import {
3024
collSize,
3125
collSize2,
3226
} from '../../common/coll.ts';
33-
import {
34-
createPayload,
35-
createRawPayload,
36-
ifPayloadValid,
37-
receivePayload,
38-
} from '../common.ts';
27+
import {ifNotUndefined, slice} from '../../common/other.ts';
3928
import {IdSet2} from '../../common/set.ts';
40-
import type {MergeableStore} from '../../@types/mergeable-store/index.d.ts';
41-
import {createCustomSynchronizer} from '../index.ts';
29+
import {MESSAGE_SEPARATOR} from '../common.ts';
4230
import {getListenerFunctions} from '../../common/listeners.ts';
43-
import {ifNotUndefined} from '../../common/other.ts';
4431
import {objFreeze} from '../../common/obj.ts';
4532

4633
const PATH_REGEX = /\/([^?]*)/;
47-
const SERVER_CLIENT_ID = 'S';
48-
49-
export const createWsServer = (<
50-
PathPersister extends Persister<
51-
Persists.MergeableStoreOnly | Persists.StoreOrMergeableStore
52-
>,
53-
>(
54-
webSocketServer: WebSocketServer,
55-
createPersisterForPath?: (pathId: Id) => Promise<PathPersister | undefined>,
56-
destroyPersisterForPath?: (pathId: Id, persister: PathPersister) => void,
57-
) => {
58-
type ServerClient = {
59-
persister: PathPersister;
60-
synchronizer: Synchronizer;
61-
send: (payload: string) => void;
62-
};
6334

35+
export const createWsServer = ((webSocketServer: WebSocketServer) => {
6436
const pathIdListeners: IdSet2 = mapNew();
6537
const clientIdListeners: IdSet2 = mapNew();
6638
const clientsByPath: IdMap2<WebSocket> = mapNew();
67-
const serverClientsByPath: IdMap<ServerClient> = mapNew();
6839

6940
const [addListener, callListeners, delListenerImpl] = getListenerFunctions(
7041
() => wsServer,
7142
);
7243

73-
const startServerClient = async (pathId: Id) =>
74-
ifNotUndefined(
75-
await createPersisterForPath?.(pathId),
76-
async (persister) => {
77-
const serverClient = mapEnsure(
78-
serverClientsByPath,
79-
pathId,
80-
() => ({persister}) as ServerClient,
81-
);
82-
const messageHandler = getMessageHandler(SERVER_CLIENT_ID, pathId);
83-
serverClient.synchronizer = await createCustomSynchronizer(
84-
persister.getStore() as MergeableStore,
85-
(toClientId, requestId, message, body) =>
86-
messageHandler(createPayload(toClientId, requestId, message, body)),
87-
(receive: Receive) =>
88-
(serverClient.send = (payload) => receivePayload(payload, receive)),
89-
() => {},
90-
0.1,
91-
).startSync();
92-
},
93-
);
94-
95-
const stopServerClient = (pathId: Id) =>
96-
ifNotUndefined(
97-
mapGet(serverClientsByPath, pathId),
98-
({persister, synchronizer}) => {
99-
synchronizer?.destroy();
100-
destroyPersisterForPath?.(pathId, persister);
101-
collDel(serverClientsByPath, pathId);
102-
},
103-
);
104-
105-
const getMessageHandler = (clientId: Id, pathId: Id) => {
106-
const clients = mapGet(clientsByPath, pathId);
107-
const serverClient = mapGet(serverClientsByPath, pathId);
108-
return (payload: string) =>
109-
ifPayloadValid(payload, (toClientId, remainder) => {
110-
const forwardedPayload = createRawPayload(clientId, remainder);
111-
if (toClientId === EMPTY_STRING) {
112-
clientId !== SERVER_CLIENT_ID
113-
? serverClient?.send(forwardedPayload)
114-
: 0;
115-
mapForEach(clients, (otherClientId, otherWebSocket) =>
116-
otherClientId !== clientId
117-
? otherWebSocket.send(forwardedPayload)
118-
: 0,
119-
);
120-
} else {
121-
(toClientId === SERVER_CLIENT_ID
122-
? serverClient
123-
: mapGet(clients, toClientId)
124-
)?.send(forwardedPayload);
125-
}
126-
});
127-
};
128-
12944
webSocketServer.on('connection', (webSocket, request) =>
13045
ifNotUndefined(request.url?.match(PATH_REGEX), ([, pathId]) =>
131-
ifNotUndefined(request.headers['sec-websocket-key'], async (clientId) => {
46+
ifNotUndefined(request.headers['sec-websocket-key'], (clientId) => {
13247
const clients = mapEnsure(clientsByPath, pathId, mapNew<Id, WebSocket>);
133-
if (collIsEmpty(clients)) {
48+
mapSet(clients, clientId, webSocket);
49+
50+
if (clients.size == 1) {
13451
callListeners(pathIdListeners, undefined, pathId, 1);
135-
await startServerClient(pathId);
13652
}
137-
mapSet(clients, clientId, webSocket);
13853
callListeners(clientIdListeners, [pathId], clientId, 1);
13954

140-
const messageHandler = getMessageHandler(clientId, pathId);
141-
webSocket.on('message', (data) => messageHandler(data.toString(UTF8)));
55+
webSocket.on('message', (data) => {
56+
const payload = data.toString(UTF8);
57+
const splitAt = payload.indexOf(MESSAGE_SEPARATOR);
58+
if (splitAt !== -1) {
59+
const toClientId = slice(payload, 0, splitAt);
60+
const message = slice(payload, splitAt + 1);
61+
toClientId === EMPTY_STRING
62+
? mapForEach(clients, (otherClientId, otherWebSocket) =>
63+
otherClientId != clientId
64+
? otherWebSocket.send(
65+
clientId + MESSAGE_SEPARATOR + message,
66+
)
67+
: 0,
68+
)
69+
: mapGet(clients, toClientId)?.send(
70+
clientId + MESSAGE_SEPARATOR + message,
71+
);
72+
}
73+
});
14274

14375
webSocket.on('close', () => {
14476
collDel(clients, clientId);
14577
callListeners(clientIdListeners, [pathId], clientId, -1);
14678
if (collIsEmpty(clients)) {
14779
collDel(clientsByPath, pathId);
148-
stopServerClient(pathId);
14980
callListeners(pathIdListeners, undefined, pathId, -1);
15081
}
15182
});

0 commit comments

Comments
 (0)