Skip to content

Commit 477fd8a

Browse files
committed
feat: simple backpressure for sync storage
1 parent e67c583 commit 477fd8a

File tree

2 files changed

+111
-2
lines changed

2 files changed

+111
-2
lines changed

packages/cojson/src/PriorityBasedMessageQueue.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Counter, ValueType, metrics } from "@opentelemetry/api";
2+
import { PeerState } from "./PeerState.js";
23
import { CO_VALUE_PRIORITY, type CoValuePriority } from "./priority.js";
34
import type { SyncMessage } from "./sync.js";
45

@@ -65,6 +66,10 @@ export class LinkedList<T> {
6566
this.meter?.pull();
6667
return value;
6768
}
69+
70+
isEmpty() {
71+
return this.head === undefined;
72+
}
6873
}
6974

7075
class QueueMeter {
@@ -152,3 +157,79 @@ export class PriorityBasedMessageQueue {
152157
return this.queues[priority]?.shift();
153158
}
154159
}
160+
161+
export class IncomingMessagesQueue {
162+
private queues: [LinkedList<SyncMessage>, PeerState][];
163+
private peerToQueue: WeakMap<PeerState, LinkedList<SyncMessage>>;
164+
currentQueue = 0;
165+
166+
constructor() {
167+
this.queues = [];
168+
this.peerToQueue = new WeakMap();
169+
}
170+
171+
public push(msg: SyncMessage, peer: PeerState) {
172+
const queue = this.peerToQueue.get(peer);
173+
174+
if (!queue) {
175+
const newQueue = new LinkedList<SyncMessage>();
176+
this.peerToQueue.set(peer, newQueue);
177+
this.queues.push([newQueue, peer]);
178+
newQueue.push(msg);
179+
} else {
180+
queue.push(msg);
181+
}
182+
}
183+
184+
public pull() {
185+
const entry = this.queues[this.currentQueue];
186+
187+
if (!entry) {
188+
return undefined;
189+
}
190+
191+
const [queue, peer] = entry;
192+
const msg = queue.shift();
193+
194+
if (queue.isEmpty()) {
195+
this.queues.splice(this.currentQueue, 1);
196+
this.peerToQueue.delete(peer);
197+
} else {
198+
this.currentQueue++;
199+
}
200+
201+
if (this.currentQueue >= this.queues.length) {
202+
this.currentQueue = 0;
203+
}
204+
205+
if (msg) {
206+
return { msg, peer };
207+
}
208+
209+
return undefined;
210+
}
211+
212+
processing = false;
213+
214+
processQueue(
215+
callback: (msg: SyncMessage, peer: PeerState, stop: () => void) => void,
216+
) {
217+
this.processing = true;
218+
219+
let entry: { msg: SyncMessage; peer: PeerState } | undefined;
220+
221+
while ((entry = this.pull())) {
222+
const { msg, peer } = entry;
223+
224+
callback(msg, peer, () => {
225+
this.processing = false;
226+
});
227+
228+
if (!this.processing) {
229+
break;
230+
}
231+
}
232+
233+
this.processing = false;
234+
}
235+
}

packages/cojson/src/sync.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { Histogram, ValueType, metrics } from "@opentelemetry/api";
22
import { PeerState } from "./PeerState.js";
3+
import {
4+
IncomingMessagesQueue,
5+
PriorityBasedMessageQueue,
6+
} from "./PriorityBasedMessageQueue.js";
37
import { SyncStateManager } from "./SyncStateManager.js";
48
import { CoValueCore } from "./coValueCore/coValueCore.js";
59
import { getDependedOnCoValuesFromRawData } from "./coValueCore/utils.js";
@@ -8,7 +12,7 @@ import { Signature } from "./crypto/crypto.js";
812
import { RawCoID, SessionID } from "./ids.js";
913
import { LocalNode } from "./localNode.js";
1014
import { logger } from "./logger.js";
11-
import { CoValuePriority } from "./priority.js";
15+
import { CO_VALUE_PRIORITY, CoValuePriority } from "./priority.js";
1216
import { accountOrAgentIDfromSessionID } from "./typeUtils/accountOrAgentIDfromSessionID.js";
1317
import { isAccountID } from "./typeUtils/isAccountID.js";
1418

@@ -303,6 +307,30 @@ export class SyncManager {
303307
}
304308
}
305309

310+
messagesQueue = new IncomingMessagesQueue();
311+
async pushMessage(incoming: SyncMessage, peer: PeerState) {
312+
this.messagesQueue.push(incoming, peer);
313+
314+
if (this.messagesQueue.processing) {
315+
return;
316+
}
317+
318+
let lastTimer = performance.now();
319+
let paused = false;
320+
this.messagesQueue.processQueue((msg, peer, stop) => {
321+
this.handleSyncMessage(msg, peer);
322+
323+
if (performance.now() - lastTimer > 50) {
324+
paused = true;
325+
stop();
326+
}
327+
});
328+
329+
if (paused) {
330+
await new Promise((resolve) => setTimeout(resolve, 0));
331+
}
332+
}
333+
306334
addPeer(peer: Peer) {
307335
const prevPeer = this.peers[peer.id];
308336

@@ -327,7 +355,7 @@ export class SyncManager {
327355

328356
peerState
329357
.processIncomingMessages((msg) => {
330-
this.handleSyncMessage(msg, peerState);
358+
this.pushMessage(msg, peerState);
331359
})
332360
.then(() => {
333361
if (peer.crashOnClose) {

0 commit comments

Comments
 (0)