Skip to content

Commit 2200c8a

Browse files
authored
Merge pull request #3482 from garden-co/perf/storage-reconciliation-backpressure
feat(storage-reconciliation): wait for coValues' sync before ack-ing batch
2 parents 31e4f7d + cd7b4fa commit 2200c8a

File tree

8 files changed

+382
-31
lines changed

8 files changed

+382
-31
lines changed

.changeset/light-rooms-play.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"cojson": patch
3+
---
4+
5+
Wait for coValue sync before acknowledging a storage reconciliation batch
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Delayed Reconcile ACK for Storage Reconciliation
2+
3+
## Overview
4+
5+
Today, the server handles a `reconcile` batch by immediately replying with:
6+
7+
1. one `load` per out-of-sync CoValue, then
8+
2. `reconcile-ack` for the batch.
9+
10+
This allows the client to send the next batch before the current batch has actually finished syncing content, which can create too much concurrent load on servers.
11+
12+
This design changes the behavior so the server sends `reconcile-ack` **only after all out-of-sync CoValues in that batch are fully synced**.
13+
14+
## Goal
15+
16+
Apply backpressure across reconciliation batches:
17+
18+
- The client should only send the next batch after the previous batch is actually synced.
19+
- The server should not acknowledge a batch until all reconcile-triggered sync work for that batch is complete.
20+
21+
## Completion Semantics
22+
23+
A CoValue in a reconcile batch is considered complete when the server observes completion of the corresponding load request:
24+
25+
- `known` response: complete immediately.
26+
- `content` response: complete when streaming finishes (existing `trackLoadRequestComplete` behavior).
27+
28+
This avoids relying on a strict known-state equality checkpoint, which can be unstable while writes continue.
29+
30+
## Scope
31+
32+
- **In scope**: server-side batching/ack behavior for `reconcile` in `packages/cojson/src/sync.ts`.
33+
- **In scope**: tests validating delayed ACK behavior and batch backpressure.
34+
- **Out of scope**: changing reconcile message format, lock scheduling strategy, or client-side batch ordering logic.
35+
36+
## Proposed Changes
37+
38+
### 1. Add server-side reconcile batch state tracking
39+
40+
**File:** `packages/cojson/src/sync.ts`
41+
42+
Track per-peer per-batch pending CoValues:
43+
44+
- `reconcileBatchState[peerId][batchId] = { pending: Set<RawCoID>, createdAt }`
45+
- Reverse index for fast completion:
46+
- `reconcileBatchesByCoValue[peerId][coValueId] -> Set<batchId>`
47+
48+
Requirements:
49+
50+
- O(1)-ish completion update by CoValue ID.
51+
- Safe cleanup on ack/disconnect.
52+
- No global coupling between peers.
53+
54+
### 2. Update `handleReconcile` to defer ACK
55+
56+
**File:** `packages/cojson/src/sync.ts`
57+
58+
For each `[coValueId, sessionsHash]`:
59+
60+
- If in sync: do nothing.
61+
- If out of sync: send `load` and register `coValueId` in batch `pending`.
62+
63+
After iteration:
64+
65+
- If `pending.size === 0`, send `reconcile-ack` immediately.
66+
- Else, persist batch tracker and **do not** ack yet.
67+
68+
### 3. Mark reconcile items complete from existing receive paths
69+
70+
**File:** `packages/cojson/src/sync.ts`
71+
72+
After existing load-complete logic in:
73+
74+
- `handleKnownState`
75+
- `handleNewContent`
76+
77+
call:
78+
79+
- `markReconcileItemComplete(peer.id, msg.id)`
80+
81+
`markReconcileItemComplete` behavior:
82+
83+
- Remove the CoValue from all pending batches for that peer.
84+
- If a batch becomes empty, send `reconcile-ack` for that batch and clear tracker state.
85+
86+
### 4. Add lifecycle cleanup and safety guards
87+
88+
**File:** `packages/cojson/src/sync.ts`
89+
90+
- On peer close, clear outstanding reconcile batch trackers for that peer.
91+
- Add optional stale-batch timeout logging and cleanup to avoid memory leaks in pathological cases.
92+
93+
### 5. Keep client-side behavior unchanged
94+
95+
`startStorageReconciliation` already waits for `reconcile-ack` before continuing to the next storage batch via `StorageReconciliationAckTracker.waitForAck`.
96+
97+
No protocol or client flow changes are required to gain backpressure once server ACK timing changes.
98+
99+
## Testing Plan
100+
101+
### Unit/Integration tests
102+
103+
**Likely files:** `packages/cojson/src/tests/*sync*.test.ts`
104+
105+
Add cases:
106+
107+
1. Server does not send `reconcile-ack` immediately after receiving a reconcile batch with mismatches.
108+
2. Server sends `reconcile-ack` only after all mismatched CoValues complete (`known` or final `content`).
109+
3. Delayed completion of one CoValue delays the batch ack.
110+
4. Batch with only in-sync CoValues is acked immediately.
111+
5. Peer disconnect mid-batch clears tracker state and does not emit stale ack after reconnect.
112+
6. Client-side next-batch progression remains gated by ack (effective backpressure).
113+
114+
## Rollout Notes
115+
116+
- This is a server behavior change but wire-compatible (no message schema changes).
117+
- Expected result: lower peak server load by reducing overlap between reconciliation batches.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import { RawCoID } from "./ids.js";
2+
import { PeerID, ReconcileBatchID } from "./sync.js";
3+
4+
/**
5+
* Tracks ongoing storage reconciliation batches in a server.
6+
*/
7+
export class OngoingStorageReconciliationTracker {
8+
private reconcileBatches: Map<PeerID, Map<ReconcileBatchID, Set<RawCoID>>> =
9+
new Map();
10+
private reconcileBatchesByCoValue: Map<
11+
PeerID,
12+
Map<RawCoID, Set<ReconcileBatchID>>
13+
> = new Map();
14+
15+
trackBatch(
16+
peerId: PeerID,
17+
batchId: ReconcileBatchID,
18+
pendingCoValues: Set<RawCoID>,
19+
): void {
20+
if (pendingCoValues.size === 0) {
21+
return;
22+
}
23+
24+
let batchesForPeer = this.reconcileBatches.get(peerId);
25+
if (!batchesForPeer) {
26+
batchesForPeer = new Map();
27+
this.reconcileBatches.set(peerId, batchesForPeer);
28+
}
29+
30+
batchesForPeer.set(batchId, pendingCoValues);
31+
32+
let coValuesToBatches = this.reconcileBatchesByCoValue.get(peerId);
33+
if (!coValuesToBatches) {
34+
coValuesToBatches = new Map();
35+
this.reconcileBatchesByCoValue.set(peerId, coValuesToBatches);
36+
}
37+
38+
for (const coValueId of pendingCoValues) {
39+
let batchIds = coValuesToBatches.get(coValueId);
40+
if (!batchIds) {
41+
batchIds = new Set();
42+
coValuesToBatches.set(coValueId, batchIds);
43+
}
44+
batchIds.add(batchId);
45+
}
46+
}
47+
48+
/**
49+
* Marks a coValue as reconciled and returns the batch IDs that completed syncing.
50+
*/
51+
markItemComplete(peerId: PeerID, coValueId: RawCoID): ReconcileBatchID[] {
52+
const coValuesToBatches = this.reconcileBatchesByCoValue.get(peerId);
53+
const batchesForPeer = this.reconcileBatches.get(peerId);
54+
55+
if (!coValuesToBatches || !batchesForPeer) {
56+
return [];
57+
}
58+
59+
const batchIdsForCoValue = coValuesToBatches.get(coValueId);
60+
if (!batchIdsForCoValue || batchIdsForCoValue.size === 0) {
61+
return [];
62+
}
63+
64+
const completedBatchIds: ReconcileBatchID[] = [];
65+
66+
for (const batchId of batchIdsForCoValue) {
67+
const batch = batchesForPeer.get(batchId);
68+
if (!batch) {
69+
continue;
70+
}
71+
batch.delete(coValueId);
72+
if (batch.size === 0) {
73+
completedBatchIds.push(batchId);
74+
}
75+
}
76+
77+
// This coValue was just completed, so it is no longer pending in any batch.
78+
coValuesToBatches.delete(coValueId);
79+
for (const batchId of completedBatchIds) {
80+
batchesForPeer.delete(batchId);
81+
}
82+
83+
if (coValuesToBatches.size === 0) {
84+
this.reconcileBatchesByCoValue.delete(peerId);
85+
}
86+
if (batchesForPeer.size === 0) {
87+
this.reconcileBatches.delete(peerId);
88+
}
89+
90+
return completedBatchIds;
91+
}
92+
93+
clearPeer(peerId: PeerID): void {
94+
this.reconcileBatches.delete(peerId);
95+
this.reconcileBatchesByCoValue.delete(peerId);
96+
}
97+
}

packages/cojson/src/StorageReconciliationAckTracker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { PeerState } from "./PeerState.js";
22

3-
export class StorageReconciliationAckTracker {
3+
export class StorageReconciliationServerAckTracker {
44
/**
5-
* Tracks pending reconcile acks: "batchId#peerId->offset".
5+
* Tracks pending reconcile acks from the server: "batchId#peerId->offset".
66
* Cleared in handleAck.
77
*/
88
pendingReconciliationAck: Map<string, number> = new Map();

0 commit comments

Comments
 (0)