Skip to content

Commit 484f936

Browse files
committed
feat: refactor Peer communication and schedule incoming messages on sync
1 parent 477fd8a commit 484f936

38 files changed

+711
-1029
lines changed

packages/cojson-transport-ws/src/BatchedOutgoingMessages.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,34 @@ export class BatchedOutgoingMessages {
1212
push(msg: SyncMessage) {
1313
const payload = addMessageToBacklog(this.backlog, msg);
1414

15-
if (this.timeout) {
16-
clearTimeout(this.timeout);
17-
}
18-
1915
const maxChunkSizeReached =
2016
payload.length >= MAX_OUTGOING_MESSAGES_CHUNK_BYTES;
2117
const backlogExists = this.backlog.length > 0;
2218

2319
if (maxChunkSizeReached && backlogExists) {
2420
this.sendMessagesInBulk();
2521
this.backlog = addMessageToBacklog("", msg);
26-
this.timeout = setTimeout(() => {
27-
this.sendMessagesInBulk();
28-
}, 0);
2922
} else if (maxChunkSizeReached) {
3023
this.backlog = payload;
3124
this.sendMessagesInBulk();
3225
} else {
3326
this.backlog = payload;
27+
}
28+
29+
// Throttling the sending of messages to once every 10ms
30+
if (!this.timeout) {
3431
this.timeout = setTimeout(() => {
3532
this.sendMessagesInBulk();
36-
}, 0);
33+
this.timeout = null;
34+
}, 10);
3735
}
3836
}
3937

4038
sendMessagesInBulk() {
41-
this.send(this.backlog);
42-
this.backlog = "";
39+
if (this.backlog.length > 0) {
40+
this.send(this.backlog);
41+
this.backlog = "";
42+
}
4343
}
4444

4545
close() {

packages/cojson-transport-ws/src/createWebSocketPeer.ts

Lines changed: 96 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,4 @@
1-
import {
2-
type DisconnectedError,
3-
type Peer,
4-
type PingTimeoutError,
5-
type SyncMessage,
6-
cojsonInternals,
7-
logger,
8-
} from "cojson";
1+
import { type Peer, type SyncMessage, cojsonInternals, logger } from "cojson";
92
import { BatchedOutgoingMessages } from "./BatchedOutgoingMessages.js";
103
import { deserializeMessages } from "./serialization.js";
114
import type { AnyWebSocket } from "./types.js";
@@ -62,9 +55,13 @@ function waitForWebSocketOpen(websocket: AnyWebSocket) {
6255
});
6356
}
6457

58+
const { PriorityBasedMessageQueue, CO_VALUE_PRIORITY, ConnectedPeerChannel } =
59+
cojsonInternals;
60+
6561
function createOutgoingMessagesManager(
6662
websocket: AnyWebSocket,
6763
batchingByDefault: boolean,
64+
role: Peer["role"],
6865
) {
6966
let closed = false;
7067
const outgoingMessages = new BatchedOutgoingMessages((messages) => {
@@ -74,34 +71,63 @@ function createOutgoingMessagesManager(
7471
});
7572

7673
let batchingEnabled = batchingByDefault;
74+
const queue = new PriorityBasedMessageQueue(CO_VALUE_PRIORITY.HIGH, {
75+
peerRole: role,
76+
});
77+
let processing = false;
7778

78-
async function sendMessage(msg: SyncMessage) {
79-
if (closed) {
80-
return Promise.reject(new Error("WebSocket closed"));
79+
async function processQueue() {
80+
if (processing) {
81+
return;
8182
}
8283

83-
if (websocket.readyState !== 1) {
84-
await waitForWebSocketOpen(websocket);
85-
}
84+
processing = true;
8685

87-
while (
88-
websocket.bufferedAmount > BUFFER_LIMIT &&
89-
websocket.readyState === 1
90-
) {
91-
await new Promise<void>((resolve) =>
92-
setTimeout(resolve, BUFFER_LIMIT_POLLING_INTERVAL),
93-
);
94-
}
86+
let msg = queue.pull();
9587

96-
if (websocket.readyState !== 1) {
97-
return;
88+
while (msg) {
89+
if (closed) {
90+
return;
91+
}
92+
93+
if (websocket.readyState !== 1) {
94+
await waitForWebSocketOpen(websocket);
95+
}
96+
97+
while (
98+
websocket.bufferedAmount > BUFFER_LIMIT &&
99+
websocket.readyState === 1
100+
) {
101+
await new Promise<void>((resolve) =>
102+
setTimeout(resolve, BUFFER_LIMIT_POLLING_INTERVAL),
103+
);
104+
}
105+
106+
if (websocket.readyState !== 1) {
107+
return;
108+
}
109+
110+
if (!batchingEnabled) {
111+
websocket.send(JSON.stringify(msg));
112+
} else {
113+
outgoingMessages.push(msg);
114+
}
115+
116+
msg = queue.pull();
98117
}
99118

100-
if (!batchingEnabled) {
101-
websocket.send(JSON.stringify(msg));
102-
} else {
103-
outgoingMessages.push(msg);
119+
processing = false;
120+
}
121+
122+
function sendMessage(msg: SyncMessage) {
123+
if (closed) {
124+
return Promise.reject(new Error("WebSocket closed"));
104125
}
126+
127+
queue.push(msg);
128+
processQueue().catch((e) => {
129+
logger.error("Error while processing sendMessage queue", { err: e });
130+
});
105131
}
106132

107133
return {
@@ -137,17 +163,11 @@ export function createWebSocketPeer({
137163
onSuccess,
138164
onClose,
139165
}: CreateWebSocketPeerOpts): Peer {
140-
const incoming = new cojsonInternals.Channel<
141-
SyncMessage | DisconnectedError | PingTimeoutError
142-
>();
166+
const incoming = new ConnectedPeerChannel();
143167
const emitClosedEvent = createClosedEventEmitter(onClose);
144168

145169
function handleClose() {
146-
incoming
147-
.push("Disconnected")
148-
.catch((e) =>
149-
logger.error("Error while pushing disconnect msg", { err: e }),
150-
);
170+
incoming.push("Disconnected");
151171
emitClosedEvent();
152172
}
153173

@@ -166,18 +186,19 @@ export function createWebSocketPeer({
166186
expectPings,
167187
pingTimeout,
168188
() => {
169-
incoming
170-
.push("PingTimeout")
171-
.catch((e) =>
172-
logger.error("Error while pushing ping timeout", { err: e }),
173-
);
189+
incoming.push("Disconnected");
190+
logger.error("Ping timeout from peer", {
191+
peerId: id,
192+
peerRole: role,
193+
});
174194
emitClosedEvent();
175195
},
176196
);
177197

178198
const outgoingMessages = createOutgoingMessagesManager(
179199
websocket,
180200
batchingByDefault,
201+
role,
181202
);
182203
let isFirstMessage = true;
183204

@@ -211,45 +232,50 @@ export function createWebSocketPeer({
211232

212233
for (const msg of messages) {
213234
if (msg && "action" in msg) {
214-
incoming
215-
.push(msg)
216-
.catch((e) =>
217-
logger.error("Error while pushing incoming msg", { err: e }),
218-
);
235+
incoming.push(msg);
219236
}
220237
}
221238
}
222239

223240
websocket.addEventListener("message", handleIncomingMsg);
224241

242+
const outgoing = new ConnectedPeerChannel();
243+
244+
outgoing.onMessage((msg) => {
245+
if (msg === "Disconnected") {
246+
handleClose();
247+
return;
248+
}
249+
250+
outgoingMessages.sendMessage(msg);
251+
});
252+
253+
outgoing.onClose(() => {
254+
outgoingMessages.close();
255+
256+
websocket.removeEventListener("message", handleIncomingMsg);
257+
websocket.removeEventListener("close", handleClose);
258+
pingTimeoutListener.clear();
259+
emitClosedEvent();
260+
261+
if (websocket.readyState === 0) {
262+
websocket.addEventListener(
263+
"open",
264+
function handleClose() {
265+
websocket.close();
266+
},
267+
{ once: true },
268+
);
269+
} else if (websocket.readyState === 1) {
270+
websocket.close();
271+
}
272+
});
273+
225274
return {
226275
id,
227276
incoming,
228-
outgoing: {
229-
push: outgoingMessages.sendMessage,
230-
close() {
231-
outgoingMessages.close();
232-
233-
websocket.removeEventListener("message", handleIncomingMsg);
234-
websocket.removeEventListener("close", handleClose);
235-
pingTimeoutListener.clear();
236-
emitClosedEvent();
237-
238-
if (websocket.readyState === 0) {
239-
websocket.addEventListener(
240-
"open",
241-
function handleClose() {
242-
websocket.close();
243-
},
244-
{ once: true },
245-
);
246-
} else if (websocket.readyState === 1) {
247-
websocket.close();
248-
}
249-
},
250-
},
277+
outgoing,
251278
role,
252-
crashOnClose: false,
253279
deletePeerStateOnClose,
254280
};
255281
}

packages/cojson-transport-ws/src/tests/BatchedOutgoingMessages.test.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ describe("BatchedOutgoingMessages", () => {
113113
expect(sendMock).toHaveBeenCalledWith(JSON.stringify(message));
114114
});
115115

116-
test("should clear timeout when pushing new messages", () => {
116+
test("should throttle the messages send", () => {
117117
const { sendMock, batchedMessages } = setup();
118118
const message1: SyncMessage = {
119119
action: "known",
@@ -129,13 +129,8 @@ describe("BatchedOutgoingMessages", () => {
129129
};
130130

131131
batchedMessages.push(message1);
132-
133-
const clearTimeoutSpy = vi.spyOn(global, "clearTimeout");
134-
135132
batchedMessages.push(message2);
136133

137-
expect(clearTimeoutSpy).toHaveBeenCalled();
138-
139134
vi.runAllTimers();
140135

141136
expect(sendMock).toHaveBeenCalledTimes(1);

packages/cojson-transport-ws/src/tests/createWebSocketPeer.test.ts

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,14 @@ describe("createWebSocketPeer", () => {
4848
expect(peer).toHaveProperty("incoming");
4949
expect(peer).toHaveProperty("outgoing");
5050
expect(peer).toHaveProperty("role", "client");
51-
expect(peer).toHaveProperty("crashOnClose", false);
5251
});
5352

5453
test("should handle disconnection", async () => {
5554
expect.assertions(1);
5655

5756
const { listeners, peer } = setup();
5857

59-
const incoming = peer.incoming as Channel<
60-
SyncMessage | "Disconnected" | "PingTimeout"
61-
>;
62-
const pushSpy = vi.spyOn(incoming, "push");
58+
const pushSpy = vi.spyOn(peer.incoming, "push");
6359

6460
const closeHandler = listeners.get("close");
6561

@@ -72,18 +68,15 @@ describe("createWebSocketPeer", () => {
7268
vi.useFakeTimers();
7369
const { listeners, peer } = setup();
7470

75-
const incoming = peer.incoming as Channel<
76-
SyncMessage | "Disconnected" | "PingTimeout"
77-
>;
78-
const pushSpy = vi.spyOn(incoming, "push");
71+
const pushSpy = vi.spyOn(peer.incoming, "push");
7972

8073
const messageHandler = listeners.get("message");
8174

8275
messageHandler?.(new MessageEvent("message", { data: "{}" }));
8376

8477
await vi.advanceTimersByTimeAsync(10_000);
8578

86-
expect(pushSpy).toHaveBeenCalledWith("PingTimeout");
79+
expect(pushSpy).toHaveBeenCalledWith("Disconnected");
8780

8881
vi.useRealTimers();
8982
});
@@ -97,15 +90,14 @@ describe("createWebSocketPeer", () => {
9790
header: false,
9891
sessions: {},
9992
};
100-
const promise = peer.outgoing.push(testMessage);
93+
94+
peer.outgoing.push(testMessage);
10195

10296
await waitFor(() => {
10397
expect(mockWebSocket.send).toHaveBeenCalledWith(
10498
JSON.stringify(testMessage),
10599
);
106100
});
107-
108-
await expect(promise).resolves.toBeUndefined();
109101
});
110102

111103
test("should stop sending messages when the websocket is closed", async () => {
@@ -153,23 +145,6 @@ describe("createWebSocketPeer", () => {
153145
expect(mockWebSocket.close).toHaveBeenCalled();
154146
});
155147

156-
test("should return a rejection if a message is sent after the peer is closed", async () => {
157-
const { peer } = setup();
158-
159-
peer.outgoing.close();
160-
161-
const message: SyncMessage = {
162-
action: "known",
163-
id: "co_ztest",
164-
header: false,
165-
sessions: {},
166-
};
167-
168-
await expect(peer.outgoing.push(message)).rejects.toThrow(
169-
"WebSocket closed",
170-
);
171-
});
172-
173148
test("should call onSuccess handler after receiving first message", () => {
174149
const onSuccess = vi.fn();
175150
const { listeners } = setup({ onSuccess });

packages/cojson/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
"@scure/base": "1.2.1",
4040
"jazz-crypto-rs": "0.0.7",
4141
"neverthrow": "^7.0.1",
42-
"queueueue": "^4.1.2",
4342
"unicode-segmenter": "^0.12.0"
4443
},
4544
"scripts": {

0 commit comments

Comments
 (0)