Skip to content

Commit 6570a2e

Browse files
authored
feat!: protocol v1 (#34)
* docs: v1 protocol * feat: basic impl (buggy) * fix: client only rejoin room for join suggested * fix: stop emitting fake ack on ELO packaging failure * docs: clarify ack direction and report client failures * test: accept ws RawData buffer arrays in e2e helper * lint: satisfy oxlint in websocket tests and fragment batching * chore: drop unused adaptor hooks * feat: surface update errors * test: add onUpdateError e2e * chore: fix lint in onUpdateError e2e * fix ack error handling and clear batch cache on disconnect * test: fix test issues * test: remove unused skip logic in E2E tests * refactor: remove legacy update error hook * refactor: update error handling to use optional chaining
1 parent ec4cf17 commit 6570a2e

38 files changed

+1838
-796
lines changed

AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pnpm clean
4848
### Protocol Overview
4949

5050
- **CRDT magic bytes**: `%LOR` (Loro), `%EPH` (Ephemeral), `%YJS`, `%YAW`, `%ELO` (E2EE Loro).
51-
- **Messages**: JoinRequest/JoinResponseOk/JoinError, DocUpdate, DocUpdateFragmentHeader/Fragment, UpdateError, Leave.
51+
- **Messages**: JoinRequest/JoinResponseOk/JoinError, DocUpdate (with batchId), DocUpdateFragmentHeader/Fragment, Ack, RoomError, Leave.
5252
- **Limits**: 256 KiB max per message; large payloads are fragmented and reassembled.
5353
- **Keepalive**: Text frames `"ping"`/`"pong"` are connection‑scoped and bypass the envelope.
5454
- **%ELO**: DocUpdate payload is a container of encrypted records (DeltaSpan/Snapshot). Each record has a plaintext header (peer/version metadata, `keyId`, 12‑byte IV) and AES‑GCM ciphertext (`ct||tag`). Servers route/broadcast without decrypting.

CLAUDE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pnpm -r clean
3232
Source of truth: `/protocol.md`.
3333

3434
- Envelope: 4‑byte CRDT magic, varBytes roomId (≤128B), 1‑byte type, payload
35-
- Types: JoinRequest/JoinResponseOk/JoinError, DocUpdate, DocUpdateFragmentHeader/Fragment, UpdateError, Leave
35+
- Types: JoinRequest/JoinResponseOk/JoinError, DocUpdate (with batchId), DocUpdateFragmentHeader/Fragment, Ack, RoomError, Leave
3636
- Limits: 256 KiB per message; fragment large updates
3737
- Keepalive: connection‑scoped text frames "ping"/"pong"
3838

@@ -45,7 +45,7 @@ Key TS files: `packages/loro-protocol/src/{bytes,encoding,protocol}.ts`.
4545

4646
## Notes for Implementers
4747

48-
- Fragmentation: reassemble fragments by batch header + index; timeout triggers UpdateError.FragmentTimeout
48+
- Fragmentation: reassemble fragments by batch header + index; timeout triggers Ack.FragmentTimeout
4949
- Rooms: a WS connection can join multiple CRDT+room pairs
5050
- Auth/persistence hooks exposed by `SimpleServer` and Rust server
5151

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
loro-protocol is a small, transport-agnostic syncing protocol for collaborative CRDT documents. This repo hosts the protocol implementation, a WebSocket client, and minimal servers for local testing or self‑hosting.
44

5-
- Protocol: multiplex multiple rooms on one connection, 256 KiB max per message, large update fragmentation supported
5+
- Protocol: multiplex multiple rooms on one connection, 256 KiB max per message, large update fragmentation supported, positive/negative delivery via `Ack`, room eviction via `RoomError`
66
- CRDTs: Loro document, Loro ephemeral store; extensible (e.g., Yjs, Yjs Awareness)
77
- Transports: WebSocket or any integrity-preserving transport (e.g., WebRTC)
88

@@ -158,7 +158,7 @@ The Rust workspace contains a minimal async WebSocket server (`loro-websocket-se
158158
## Protocol Highlights
159159

160160
- Magic bytes per CRDT: "%LOR" (Loro doc), "%EPH" (Loro ephemeral), "%EPS" (persisted Loro ephemeral – tells the server to keep the latest state so new peers can load it immediately), "%YJS", "%YAW", …
161-
- Messages: JoinRequest/JoinResponseOk/JoinError, DocUpdate, DocUpdateFragmentHeader/Fragment, UpdateError, Leave
161+
- Messages: JoinRequest/JoinResponseOk/JoinError, DocUpdate (with batchId), DocUpdateFragmentHeader/Fragment, Ack, RoomError, Leave
162162
- Limits: 256 KiB per message; large updates must be fragmented; default reassembly timeout 10s
163163
- Multi‑room: room ID is part of every message; one connection can join multiple rooms
164164

conductor.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"scripts": {
3+
"setup": "pnpm install && pnpm build",
4+
"run": "pnpm test"
5+
}
6+
}

llms.md

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ end-to-end encrypted extension, and the full surface of the
3535
| `0x00` | `JoinRequest` | `varBytes joinPayload` (app-defined metadata, e.g., auth/session info), `varBytes version`. |
3636
| `0x01` | `JoinResponseOk` | `varString permission ("read"/"write")`, `varBytes version`, `varBytes extraMetadata`. |
3737
| `0x02` | `JoinError` | `u8 code`, `varString message`, optional `varBytes receiverVersion` when `code=version_unknown`. |
38-
| `0x03` | `DocUpdate` | `varUint N` updates followed by `N` `varBytes` chunks. |
38+
| `0x03` | `DocUpdate` | `varUint N` updates followed by `N` `varBytes` chunks, then 8-byte `batchId`. |
3939
| `0x04` | `DocUpdateFragmentHeader` | `8-byte batchId`, `varUint fragmentCount`, `varUint totalSizeBytes`. |
4040
| `0x05` | `DocUpdateFragment` | `8-byte batchId`, `varUint index`, `varBytes fragment`. |
41-
| `0x06` | `UpdateError` | `u8 code`, `varString message`, optional batch ID when `code=fragment_timeout`. |
41+
| `0x06` | `RoomError` | `u8 code`, `varString message`; receipt means the peer is evicted from the room until rejoin. |
4242
| `0x07` | `Leave` | No additional payload. |
43+
| `0x08` | `Ack` | `8-byte refId` (batch or fragment ID) + `u8 status` (see §1.4). |
4344

4445
### 1.2 Sync Lifecycle
4546

@@ -50,12 +51,14 @@ end-to-end encrypted extension, and the full surface of the
5051
- or `JoinError` when the join payload is rejected (e.g., auth failure) or
5152
for unknown version. On `version_unknown`, the server includes its
5253
version for reseeding.
53-
3. Clients broadcast local edits using `DocUpdate`. Payloads exceeding the
54-
size limit are sliced into fragments: send header first, then numbered
55-
fragments. Recipients reassemble by `batchId`.
56-
4. Clients send `Leave` when unsubscribing from a room.
57-
5. Servers may emit `UpdateError` when denying updates; clients SHOULD surface
58-
these via adaptor callbacks.
54+
3. Clients broadcast local edits using `DocUpdate` (with an 8-byte `batchId`).
55+
Oversize payloads are sliced into fragments: send header first, then
56+
numbered fragments. Recipients reassemble by `batchId`.
57+
4. Server replies with `Ack` for every `DocUpdate`/fragment batch: `status=0`
58+
on success, non-zero mirrors legacy `UpdateError` codes.
59+
5. Clients send `Leave` when unsubscribing from a room.
60+
6. Servers send `RoomError` to evict a peer from a room; the peer must rejoin
61+
to resume traffic.
5962

6063
### 1.3 Fragmentation Rules
6164

@@ -64,24 +67,27 @@ end-to-end encrypted extension, and the full surface of the
6467
overhead (~240 KiB per fragment in the current implementation).
6568
- Receivers store fragments per `batchId`, expect all `fragmentCount` entries,
6669
and enforce a default 10-second reassembly timeout.
67-
- On timeout (`UpdateError.fragment_timeout`), senders SHOULD resend the full
68-
batch (header + fragments).
70+
- On timeout, receivers send `Ack` with `status=fragment_timeout`; senders
71+
SHOULD resend the full batch (header + fragments).
6972

70-
### 1.4 Error Codes
73+
### 1.4 Status / Error Codes
7174

7275
- **`JoinError` codes:**
7376
- `0x00 unknown`
7477
- `0x01 version_unknown` (`receiverVersion` included)
7578
- `0x02 auth_failed`
7679
- `0x7F app_error` (`varString app_code`)
77-
- **`UpdateError` codes:**
78-
- `0x00 unknown`
80+
- **`Ack.status` codes (mirrors legacy `UpdateError`):**
81+
- `0x00 ok`
82+
- `0x01 unknown`
7983
- `0x03 permission_denied`
8084
- `0x04 invalid_update`
8185
- `0x05 payload_too_large`
8286
- `0x06 rate_limited`
83-
- `0x07 fragment_timeout` (`8-byte batchId`)
84-
- `0x7F app_error` (`varString app_code`)
87+
- `0x07 fragment_timeout`
88+
- `0x7F app_error`
89+
- **`RoomError` codes:**
90+
- `0x01 unknown` (eviction; peer must rejoin)
8591
- Protocol violations MAY be raised via host callbacks; implementations often
8692
close the connection on unrecoverable errors.
8793

@@ -156,7 +162,7 @@ The server parses headers for routing/deduplication but never decrypts `ct`.
156162
- Message size remains bounded by the base protocol (use fragments as needed).
157163
- Receivers SHOULD deduplicate spans via `peerId`/`start`/`end` metadata.
158164
- Unknown `keyId` SHOULD trigger key resolution and a retry; persistent
159-
failure is surfaced locally instead of emitting `UpdateError`.
165+
failure is surfaced locally; no Ack is emitted because encryption/auth is end-to-end.
160166

161167
---
162168

@@ -272,7 +278,7 @@ The resolved room implements:
272278
to stay within limits.
273279
- On receiving `DocUpdate`/fragments, the client reassembles updates and passes
274280
them to `crdtAdaptor.applyUpdate`.
275-
- `UpdateError` messages invoke `crdtAdaptor.handleUpdateError` if provided.
281+
- `Ack` messages with non‑zero status trigger `crdtAdaptor.onUpdateError(updates, status)` using the original sent batch; missing batches are still logged.
276282

277283
### 3.8 Ping/Pong Integration
278284

packages/loro-adaptors/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,4 @@ pnpm typecheck
126126

127127
## License
128128

129-
MIT
129+
MIT

packages/loro-adaptors/src/elo-adaptor.ts

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ import { LoroDoc, VersionVector, decodeImportBlobMeta } from "loro-crdt";
22
import {
33
CrdtType,
44
JoinResponseOk,
5-
UpdateError,
6-
MessageType,
7-
UpdateErrorCode,
85
} from "loro-protocol";
96
import type { CrdtAdaptorContext, CrdtDocAdaptor } from "./types";
107
import {
@@ -27,7 +24,11 @@ export interface EloAdaptorConfig {
2724
err: Error,
2825
meta: { kind: "delta" | "snapshot"; keyId: string }
2926
) => void;
30-
onUpdateError?: (error: UpdateError) => void;
27+
onUpdateError?: (
28+
updates: Uint8Array[],
29+
errorCode: number,
30+
reason?: string
31+
) => void;
3132
}
3233

3334
export class EloAdaptor implements CrdtDocAdaptor {
@@ -146,13 +147,8 @@ export class EloAdaptor implements CrdtDocAdaptor {
146147
await this.sendSnapshot();
147148
}
148149
} catch (err) {
149-
this.config.onUpdateError?.({
150-
type: MessageType.UpdateError,
151-
crdt: this.crdtType,
152-
roomId: "",
153-
code: UpdateErrorCode.Unknown,
154-
message: err instanceof Error ? err.message : String(err),
155-
});
150+
// Surface failure to host.
151+
console.error("ELO adaptor failed to package/send update", err);
156152
}
157153
})();
158154
});
@@ -166,6 +162,14 @@ export class EloAdaptor implements CrdtDocAdaptor {
166162
return undefined;
167163
}
168164

165+
onUpdateError(
166+
updates: Uint8Array[],
167+
errorCode: number,
168+
reason?: string
169+
): void {
170+
this.config.onUpdateError?.(updates, errorCode, reason);
171+
}
172+
169173
async handleJoinOk(res: JoinResponseOk): Promise<void> {
170174
if (this.destroyed) return;
171175
try {
@@ -235,10 +239,6 @@ export class EloAdaptor implements CrdtDocAdaptor {
235239
}
236240
}
237241

238-
handleUpdateError(error: UpdateError): void {
239-
this.config.onUpdateError?.(error);
240-
}
241-
242242
destroy(): void {
243243
if (this.destroyed) return;
244244
this.destroyed = true;

packages/loro-adaptors/src/flock-adaptor.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import { Flock } from "@loro-dev/flock";
22
import {
33
CrdtType,
4-
JoinError,
54
JoinResponseOk,
6-
UpdateError,
75
} from "loro-protocol";
86
import type { CrdtAdaptorContext, CrdtDocAdaptor } from "./types";
97

@@ -97,7 +95,11 @@ function deserializeBundle(bytes: Uint8Array): FlockExportBundle {
9795

9896
export interface FlockAdaptorConfig {
9997
onImportError?: (error: Error, data: Uint8Array[]) => void;
100-
onUpdateError?: (error: UpdateError) => void;
98+
onUpdateError?: (
99+
updates: Uint8Array[],
100+
errorCode: number,
101+
reason?: string
102+
) => void;
101103
}
102104

103105
/**
@@ -147,8 +149,6 @@ export class FlockAdaptor implements CrdtDocAdaptor {
147149
return compareVersions(this.flock.version(), remote);
148150
}
149151

150-
handleJoinErr?: (err: JoinError) => Promise<void>;
151-
152152
setCtx(ctx: CrdtAdaptorContext): void {
153153
this.ctx = ctx;
154154
if (this.unsubscribe) {
@@ -174,6 +174,14 @@ export class FlockAdaptor implements CrdtDocAdaptor {
174174
return undefined;
175175
}
176176

177+
onUpdateError(
178+
updates: Uint8Array[],
179+
errorCode: number,
180+
reason?: string
181+
): void {
182+
this.config.onUpdateError?.(updates, errorCode, reason);
183+
}
184+
177185
async handleJoinOk(res: JoinResponseOk): Promise<void> {
178186
if (this.destroyed) return;
179187
try {
@@ -228,10 +236,6 @@ export class FlockAdaptor implements CrdtDocAdaptor {
228236
}
229237
}
230238

231-
handleUpdateError(error: UpdateError): void {
232-
this.config.onUpdateError?.(error);
233-
}
234-
235239
destroy(): void {
236240
if (this.destroyed) return;
237241
this.destroyed = true;

packages/loro-adaptors/src/loro-adaptor.ts

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
import { LoroDoc, VersionVector } from "loro-crdt";
22
import {
33
CrdtType,
4-
JoinError,
54
JoinResponseOk,
6-
UpdateError,
75
} from "loro-protocol";
86
import type { CrdtAdaptorContext, CrdtDocAdaptor } from "./types";
97

108
export interface LoroAdaptorConfig {
119
onImportError?: (error: Error, data: Uint8Array[]) => void;
12-
onUpdateError?: (error: UpdateError) => void;
10+
onUpdateError?: (
11+
updates: Uint8Array[],
12+
errorCode: number,
13+
reason?: string
14+
) => void;
1315
}
1416

1517
export class LoroAdaptor implements CrdtDocAdaptor {
@@ -55,8 +57,6 @@ export class LoroAdaptor implements CrdtDocAdaptor {
5557
return this.doc.version().compare(vv) as 0 | 1 | -1 | undefined;
5658
}
5759

58-
handleJoinErr?: ((err: JoinError) => Promise<void>) | undefined;
59-
6060
getDoc(): LoroDoc {
6161
return this.doc;
6262
}
@@ -79,6 +79,14 @@ export class LoroAdaptor implements CrdtDocAdaptor {
7979
return undefined;
8080
}
8181

82+
onUpdateError(
83+
updates: Uint8Array[],
84+
errorCode: number,
85+
reason?: string
86+
): void {
87+
this.config.onUpdateError?.(updates, errorCode, reason);
88+
}
89+
8290
async handleJoinOk(res: JoinResponseOk): Promise<void> {
8391
if (this.destroyed) return;
8492

@@ -110,7 +118,7 @@ export class LoroAdaptor implements CrdtDocAdaptor {
110118
this.ctx?.send([updates]);
111119
}
112120
} catch (error) {
113-
this.ctx!.onJoinFailed(
121+
this.ctx?.onJoinFailed(
114122
error instanceof Error ? error.message : String(error)
115123
);
116124
throw error;
@@ -126,7 +134,11 @@ export class LoroAdaptor implements CrdtDocAdaptor {
126134
// Pending updates may occur when concurrent changes happen
127135
}
128136
} catch (error) {
129-
this.ctx!.onImportError(
137+
this.config.onImportError?.(
138+
error instanceof Error ? error : new Error(String(error)),
139+
updates
140+
);
141+
this.ctx?.onImportError(
130142
error instanceof Error ? error : new Error(String(error)),
131143
updates
132144
);
@@ -142,12 +154,6 @@ export class LoroAdaptor implements CrdtDocAdaptor {
142154
}
143155
}
144156

145-
handleUpdateError(error: UpdateError): void {
146-
if (this.config.onUpdateError) {
147-
this.config.onUpdateError(error);
148-
}
149-
}
150-
151157
destroy(): void {
152158
if (this.destroyed) return;
153159
this.destroyed = true;

packages/loro-adaptors/src/loro-ephemeral-adaptor.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ export class LoroEphemeralAdaptor implements CrdtDocAdaptor {
2222
return 0 as const;
2323
}
2424

25-
handleJoinErr?: undefined;
26-
2725
getStore(): EphemeralStore {
2826
return this.store;
2927
}

0 commit comments

Comments
 (0)