Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 124 additions & 16 deletions packages/cojson-transport-ws/src/BatchedOutgoingMessages.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,156 @@
import type { SyncMessage } from "cojson";
import type { DisconnectedError, SyncMessage } from "cojson";
import type { Peer } from "cojson";
import {
type CojsonInternalTypes,
PriorityBasedMessageQueue,
cojsonInternals,
logger,
} from "cojson";
import { addMessageToBacklog } from "./serialization.js";
import type { AnyWebSocket } from "./types.js";
import {
hasWebSocketTooMuchBufferedData,
isWebSocketOpen,
waitForWebSocketBufferedAmount,
waitForWebSocketOpen,
} from "./utils.js";

const { CO_VALUE_PRIORITY } = cojsonInternals;

export const MAX_OUTGOING_MESSAGES_CHUNK_BYTES = 25_000;

export class BatchedOutgoingMessages {
export class BatchedOutgoingMessages
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the PriorityBasedMessageQueue inside this class, so now outgoing messages prioritization is done here.

Now we can fully control the outgoing messages scheduling from this class, and I've changed to scheduling from debouncing to a simple delay of 10ms before start the messages processing.

This makes the batching and prioritization more effective as subsequent operations are more likely to be synced and reduces its overhead because we don't do anymore setTimeout/clearTimeout on each outgoing.push.

implements CojsonInternalTypes.OutgoingPeerChannel
{
private backlog = "";
private timeout: ReturnType<typeof setTimeout> | null = null;
private queue: PriorityBasedMessageQueue;
private processing = false;
private closed = false;

constructor(private send: (messages: string) => void) {}
constructor(
private websocket: AnyWebSocket,
private batching: boolean,
peerRole: Peer["role"],
) {
this.queue = new PriorityBasedMessageQueue(
CO_VALUE_PRIORITY.HIGH,
"outgoing",
{
peerRole: peerRole,
},
);
}

push(msg: SyncMessage) {
const payload = addMessageToBacklog(this.backlog, msg);
push(msg: SyncMessage | DisconnectedError) {
if (msg === "Disconnected") {
this.close();
return;
}

this.queue.push(msg);

if (this.timeout) {
clearTimeout(this.timeout);
if (this.processing) {
return;
}

this.processQueue().catch((e) => {
logger.error("Error while processing sendMessage queue", { err: e });
});
}

private async processQueue() {
const { websocket } = this;

this.processing = true;

// Delay the initiation of the queue processing to accumulate messages
// before sending them, in order to do prioritization and batching
await new Promise<void>((resolve) => setTimeout(resolve));

let msg = this.queue.pull();

while (msg) {
if (this.closed) {
return;
}

if (!isWebSocketOpen(websocket)) {
await waitForWebSocketOpen(websocket);
}

if (hasWebSocketTooMuchBufferedData(websocket)) {
await waitForWebSocketBufferedAmount(websocket);
}

if (isWebSocketOpen(websocket)) {
this.processMessage(msg);

msg = this.queue.pull();
}
}

this.sendMessagesInBulk();
this.processing = false;
}

processMessage(msg: SyncMessage) {
if (!this.batching) {
this.websocket.send(JSON.stringify(msg));
return;
}

const payload = addMessageToBacklog(this.backlog, msg);

const maxChunkSizeReached =
payload.length >= MAX_OUTGOING_MESSAGES_CHUNK_BYTES;
const backlogExists = this.backlog.length > 0;

if (maxChunkSizeReached && backlogExists) {
this.sendMessagesInBulk();
this.backlog = addMessageToBacklog("", msg);
this.timeout = setTimeout(() => {
this.sendMessagesInBulk();
}, 0);
} else if (maxChunkSizeReached) {
this.backlog = payload;
this.sendMessagesInBulk();
} else {
this.backlog = payload;
this.timeout = setTimeout(() => {
this.sendMessagesInBulk();
}, 0);
}
}

sendMessagesInBulk() {
this.send(this.backlog);
this.backlog = "";
if (this.backlog.length > 0 && isWebSocketOpen(this.websocket)) {
this.websocket.send(this.backlog);
this.backlog = "";
}
}

setBatching(enabled: boolean) {
this.batching = enabled;
}

private closeListeners = new Set<() => void>();
onClose(callback: () => void) {
this.closeListeners.add(callback);
}

close() {
if (this.closed) {
return;
}

let msg = this.queue.pull();

while (msg) {
this.processMessage(msg);
msg = this.queue.pull();
}

this.closed = true;
this.sendMessagesInBulk();

for (const listener of this.closeListeners) {
listener();
}

this.closeListeners.clear();
}
}
151 changes: 33 additions & 118 deletions packages/cojson-transport-ws/src/createWebSocketPeer.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
import {
type DisconnectedError,
type Peer,
type PingTimeoutError,
type SyncMessage,
cojsonInternals,
logger,
} from "cojson";
import { type Peer, type SyncMessage, cojsonInternals, logger } from "cojson";
import { BatchedOutgoingMessages } from "./BatchedOutgoingMessages.js";
import { deserializeMessages } from "./serialization.js";
import type { AnyWebSocket } from "./types.js";

export const BUFFER_LIMIT = 100_000;
export const BUFFER_LIMIT_POLLING_INTERVAL = 10;
const { ConnectedPeerChannel } = cojsonInternals;

export type CreateWebSocketPeerOpts = {
id: string;
Expand Down Expand Up @@ -52,70 +44,6 @@ function createPingTimeoutListener(
};
}

function waitForWebSocketOpen(websocket: AnyWebSocket) {
return new Promise<void>((resolve) => {
if (websocket.readyState === 1) {
resolve();
} else {
websocket.addEventListener("open", () => resolve(), { once: true });
}
});
}

function createOutgoingMessagesManager(
websocket: AnyWebSocket,
batchingByDefault: boolean,
) {
let closed = false;
const outgoingMessages = new BatchedOutgoingMessages((messages) => {
if (websocket.readyState === 1) {
websocket.send(messages);
}
});

let batchingEnabled = batchingByDefault;

async function sendMessage(msg: SyncMessage) {
if (closed) {
return Promise.reject(new Error("WebSocket closed"));
}

if (websocket.readyState !== 1) {
await waitForWebSocketOpen(websocket);
}

while (
websocket.bufferedAmount > BUFFER_LIMIT &&
websocket.readyState === 1
) {
await new Promise<void>((resolve) =>
setTimeout(resolve, BUFFER_LIMIT_POLLING_INTERVAL),
);
}

if (websocket.readyState !== 1) {
return;
}

if (!batchingEnabled) {
websocket.send(JSON.stringify(msg));
} else {
outgoingMessages.push(msg);
}
}

return {
sendMessage,
setBatchingEnabled(enabled: boolean) {
batchingEnabled = enabled;
},
close() {
closed = true;
outgoingMessages.close();
},
};
}

function createClosedEventEmitter(callback = () => {}) {
let disconnected = false;

Expand All @@ -137,17 +65,11 @@ export function createWebSocketPeer({
onSuccess,
onClose,
}: CreateWebSocketPeerOpts): Peer {
const incoming = new cojsonInternals.Channel<
SyncMessage | DisconnectedError | PingTimeoutError
>();
const incoming = new ConnectedPeerChannel();
const emitClosedEvent = createClosedEventEmitter(onClose);

function handleClose() {
incoming
.push("Disconnected")
.catch((e) =>
logger.error("Error while pushing disconnect msg", { err: e }),
);
incoming.push("Disconnected");
emitClosedEvent();
}

Expand All @@ -166,18 +88,19 @@ export function createWebSocketPeer({
expectPings,
pingTimeout,
() => {
incoming
.push("PingTimeout")
.catch((e) =>
logger.error("Error while pushing ping timeout", { err: e }),
);
incoming.push("Disconnected");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the PingTimeout event, so now we only have "Disconnected" which is more generic and works for every situation.

This change simplified the Peer code in general, as we need to check only for a single exception.

logger.error("Ping timeout from peer", {
peerId: id,
peerRole: role,
});
emitClosedEvent();
},
);

const outgoingMessages = createOutgoingMessagesManager(
const outgoing = new BatchedOutgoingMessages(
websocket,
batchingByDefault,
role,
);
let isFirstMessage = true;

Expand Down Expand Up @@ -206,50 +129,42 @@ export function createWebSocketPeer({

if (messages.length > 1) {
// If more than one message is received, the other peer supports batching
outgoingMessages.setBatchingEnabled(true);
outgoing.setBatching(true);
}

for (const msg of messages) {
if (msg && "action" in msg) {
incoming
.push(msg)
.catch((e) =>
logger.error("Error while pushing incoming msg", { err: e }),
);
incoming.push(msg);
}
}
}

websocket.addEventListener("message", handleIncomingMsg);

outgoing.onClose(() => {
websocket.removeEventListener("message", handleIncomingMsg);
websocket.removeEventListener("close", handleClose);
pingTimeoutListener.clear();
emitClosedEvent();

if (websocket.readyState === 0) {
websocket.addEventListener(
"open",
function handleClose() {
websocket.close();
},
{ once: true },
);
} else if (websocket.readyState === 1) {
websocket.close();
}
});

return {
id,
incoming,
outgoing: {
push: outgoingMessages.sendMessage,
close() {
outgoingMessages.close();

websocket.removeEventListener("message", handleIncomingMsg);
websocket.removeEventListener("close", handleClose);
pingTimeoutListener.clear();
emitClosedEvent();

if (websocket.readyState === 0) {
websocket.addEventListener(
"open",
function handleClose() {
websocket.close();
},
{ once: true },
);
} else if (websocket.readyState === 1) {
websocket.close();
}
},
},
outgoing,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchedOutgoingMessages implements the OutgoingPeerChannel interface and can be used directly as outgoing channel.

role,
crashOnClose: false,
deletePeerStateOnClose,
};
}
Loading
Loading