Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pnpm clean
### Protocol Overview

- **CRDT magic bytes**: `%LOR` (Loro), `%EPH` (Ephemeral), `%YJS`, `%YAW`, `%ELO` (E2EE Loro).
- **Messages**: JoinRequest/JoinResponseOk/JoinError, DocUpdate, DocUpdateFragmentHeader/Fragment, UpdateError, Leave.
- **Messages**: JoinRequest/JoinResponseOk/JoinError, DocUpdate (with batchId), DocUpdateFragmentHeader/Fragment, Ack, RoomError, Leave.
- **Limits**: 256 KiB max per message; large payloads are fragmented and reassembled.
- **Keepalive**: Text frames `"ping"`/`"pong"` are connection‑scoped and bypass the envelope.
- **%ELO**: DocUpdate payload is a container of encrypted records (DeltaSpan/Snapshot). Each record has a plaintext header (peer/version metadata, `keyId`, 12‑byte IV) and AES‑GCM ciphertext (`ct||tag`). Servers route/broadcast without decrypting.
Expand Down
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pnpm -r clean
Source of truth: `/protocol.md`.

- Envelope: 4‑byte CRDT magic, varBytes roomId (≤128B), 1‑byte type, payload
- Types: JoinRequest/JoinResponseOk/JoinError, DocUpdate, DocUpdateFragmentHeader/Fragment, UpdateError, Leave
- Types: JoinRequest/JoinResponseOk/JoinError, DocUpdate (with batchId), DocUpdateFragmentHeader/Fragment, Ack, RoomError, Leave
- Limits: 256 KiB per message; fragment large updates
- Keepalive: connection‑scoped text frames "ping"/"pong"

Expand All @@ -45,7 +45,7 @@ Key TS files: `packages/loro-protocol/src/{bytes,encoding,protocol}.ts`.

## Notes for Implementers

- Fragmentation: reassemble fragments by batch header + index; timeout triggers UpdateError.FragmentTimeout
- Fragmentation: reassemble fragments by batch header + index; timeout triggers Ack.FragmentTimeout
- Rooms: a WS connection can join multiple CRDT+room pairs
- Auth/persistence hooks exposed by `SimpleServer` and Rust server

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

loro-protocol is a small, transport-agnostic syncing protocol for collaborative CRDT documents. This repo hosts the protocol implementation, a WebSocket client, and minimal servers for local testing or self‑hosting.

- Protocol: multiplex multiple rooms on one connection, 256 KiB max per message, large update fragmentation supported
- Protocol: multiplex multiple rooms on one connection, 256 KiB max per message, large update fragmentation supported, positive/negative delivery via `Ack`, room eviction via `RoomError`
- CRDTs: Loro document, Loro ephemeral store; extensible (e.g., Yjs, Yjs Awareness)
- Transports: WebSocket or any integrity-preserving transport (e.g., WebRTC)

Expand Down Expand Up @@ -158,7 +158,7 @@ The Rust workspace contains a minimal async WebSocket server (`loro-websocket-se
## Protocol Highlights

- Magic bytes per CRDT: "%LOR" (Loro doc), "%EPH" (Loro ephemeral), "%EPS" (persisted Loro ephemeral – tells the server to keep the latest state so new peers can load it immediately), "%YJS", "%YAW", …
- Messages: JoinRequest/JoinResponseOk/JoinError, DocUpdate, DocUpdateFragmentHeader/Fragment, UpdateError, Leave
- Messages: JoinRequest/JoinResponseOk/JoinError, DocUpdate (with batchId), DocUpdateFragmentHeader/Fragment, Ack, RoomError, Leave
- Limits: 256 KiB per message; large updates must be fragmented; default reassembly timeout 10s
- Multi‑room: room ID is part of every message; one connection can join multiple rooms

Expand Down
6 changes: 6 additions & 0 deletions conductor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"scripts": {
"setup": "pnpm install && pnpm build",
"run": "pnpm test"
}
}
40 changes: 23 additions & 17 deletions llms.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ end-to-end encrypted extension, and the full surface of the
| `0x00` | `JoinRequest` | `varBytes joinPayload` (app-defined metadata, e.g., auth/session info), `varBytes version`. |
| `0x01` | `JoinResponseOk` | `varString permission ("read"/"write")`, `varBytes version`, `varBytes extraMetadata`. |
| `0x02` | `JoinError` | `u8 code`, `varString message`, optional `varBytes receiverVersion` when `code=version_unknown`. |
| `0x03` | `DocUpdate` | `varUint N` updates followed by `N` `varBytes` chunks. |
| `0x03` | `DocUpdate` | `varUint N` updates followed by `N` `varBytes` chunks, then 8-byte `batchId`. |
| `0x04` | `DocUpdateFragmentHeader` | `8-byte batchId`, `varUint fragmentCount`, `varUint totalSizeBytes`. |
| `0x05` | `DocUpdateFragment` | `8-byte batchId`, `varUint index`, `varBytes fragment`. |
| `0x06` | `UpdateError` | `u8 code`, `varString message`, optional batch ID when `code=fragment_timeout`. |
| `0x06` | `RoomError` | `u8 code`, `varString message`; receipt means the peer is evicted from the room until rejoin. |
| `0x07` | `Leave` | No additional payload. |
| `0x08` | `Ack` | `8-byte refId` (batch or fragment ID) + `u8 status` (see §1.4). |

### 1.2 Sync Lifecycle

Expand All @@ -50,12 +51,14 @@ end-to-end encrypted extension, and the full surface of the
- or `JoinError` when the join payload is rejected (e.g., auth failure) or
for unknown version. On `version_unknown`, the server includes its
version for reseeding.
3. Clients broadcast local edits using `DocUpdate`. Payloads exceeding the
size limit are sliced into fragments: send header first, then numbered
fragments. Recipients reassemble by `batchId`.
4. Clients send `Leave` when unsubscribing from a room.
5. Servers may emit `UpdateError` when denying updates; clients SHOULD surface
these via adaptor callbacks.
3. Clients broadcast local edits using `DocUpdate` (with an 8-byte `batchId`).
Oversize payloads are sliced into fragments: send header first, then
numbered fragments. Recipients reassemble by `batchId`.
4. Server replies with `Ack` for every `DocUpdate`/fragment batch: `status=0`
on success, non-zero mirrors legacy `UpdateError` codes.
5. Clients send `Leave` when unsubscribing from a room.
6. Servers send `RoomError` to evict a peer from a room; the peer must rejoin
to resume traffic.

### 1.3 Fragmentation Rules

Expand All @@ -64,24 +67,27 @@ end-to-end encrypted extension, and the full surface of the
overhead (~240 KiB per fragment in the current implementation).
- Receivers store fragments per `batchId`, expect all `fragmentCount` entries,
and enforce a default 10-second reassembly timeout.
- On timeout (`UpdateError.fragment_timeout`), senders SHOULD resend the full
batch (header + fragments).
- On timeout, receivers send `Ack` with `status=fragment_timeout`; senders
SHOULD resend the full batch (header + fragments).

### 1.4 Error Codes
### 1.4 Status / Error Codes

- **`JoinError` codes:**
- `0x00 unknown`
- `0x01 version_unknown` (`receiverVersion` included)
- `0x02 auth_failed`
- `0x7F app_error` (`varString app_code`)
- **`UpdateError` codes:**
- `0x00 unknown`
- **`Ack.status` codes (mirrors legacy `UpdateError`):**
- `0x00 ok`
- `0x01 unknown`
- `0x03 permission_denied`
- `0x04 invalid_update`
- `0x05 payload_too_large`
- `0x06 rate_limited`
- `0x07 fragment_timeout` (`8-byte batchId`)
- `0x7F app_error` (`varString app_code`)
- `0x07 fragment_timeout`
- `0x7F app_error`
- **`RoomError` codes:**
- `0x01 unknown` (eviction; peer must rejoin)
- Protocol violations MAY be raised via host callbacks; implementations often
close the connection on unrecoverable errors.

Expand Down Expand Up @@ -156,7 +162,7 @@ The server parses headers for routing/deduplication but never decrypts `ct`.
- Message size remains bounded by the base protocol (use fragments as needed).
- Receivers SHOULD deduplicate spans via `peerId`/`start`/`end` metadata.
- Unknown `keyId` SHOULD trigger key resolution and a retry; persistent
failure is surfaced locally instead of emitting `UpdateError`.
failure is surfaced locally; no Ack is emitted because encryption/auth is end-to-end.

---

Expand Down Expand Up @@ -272,7 +278,7 @@ The resolved room implements:
to stay within limits.
- On receiving `DocUpdate`/fragments, the client reassembles updates and passes
them to `crdtAdaptor.applyUpdate`.
- `UpdateError` messages invoke `crdtAdaptor.handleUpdateError` if provided.
- `Ack` messages with non‑zero status trigger `crdtAdaptor.onUpdateError(updates, status)` using the original sent batch; missing batches are still logged.

### 3.8 Ping/Pong Integration

Expand Down
2 changes: 1 addition & 1 deletion packages/loro-adaptors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,4 @@ pnpm typecheck

## License

MIT
MIT
30 changes: 15 additions & 15 deletions packages/loro-adaptors/src/elo-adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ import { LoroDoc, VersionVector, decodeImportBlobMeta } from "loro-crdt";
import {
CrdtType,
JoinResponseOk,
UpdateError,
MessageType,
UpdateErrorCode,
} from "loro-protocol";
import type { CrdtAdaptorContext, CrdtDocAdaptor } from "./types";
import {
Expand All @@ -27,7 +24,11 @@ export interface EloAdaptorConfig {
err: Error,
meta: { kind: "delta" | "snapshot"; keyId: string }
) => void;
onUpdateError?: (error: UpdateError) => void;
onUpdateError?: (
updates: Uint8Array[],
errorCode: number,
reason?: string
) => void;
}

export class EloAdaptor implements CrdtDocAdaptor {
Expand Down Expand Up @@ -146,13 +147,8 @@ export class EloAdaptor implements CrdtDocAdaptor {
await this.sendSnapshot();
}
} catch (err) {
this.config.onUpdateError?.({
type: MessageType.UpdateError,
crdt: this.crdtType,
roomId: "",
code: UpdateErrorCode.Unknown,
message: err instanceof Error ? err.message : String(err),
});
// Surface failure to host.
console.error("ELO adaptor failed to package/send update", err);
}
})();
});
Expand All @@ -166,6 +162,14 @@ export class EloAdaptor implements CrdtDocAdaptor {
return undefined;
}

onUpdateError(
updates: Uint8Array[],
errorCode: number,
reason?: string
): void {
this.config.onUpdateError?.(updates, errorCode, reason);
}

async handleJoinOk(res: JoinResponseOk): Promise<void> {
if (this.destroyed) return;
try {
Expand Down Expand Up @@ -235,10 +239,6 @@ export class EloAdaptor implements CrdtDocAdaptor {
}
}

handleUpdateError(error: UpdateError): void {
this.config.onUpdateError?.(error);
}

destroy(): void {
if (this.destroyed) return;
this.destroyed = true;
Expand Down
22 changes: 13 additions & 9 deletions packages/loro-adaptors/src/flock-adaptor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { Flock } from "@loro-dev/flock";
import {
CrdtType,
JoinError,
JoinResponseOk,
UpdateError,
} from "loro-protocol";
import type { CrdtAdaptorContext, CrdtDocAdaptor } from "./types";

Expand Down Expand Up @@ -97,7 +95,11 @@ function deserializeBundle(bytes: Uint8Array): FlockExportBundle {

export interface FlockAdaptorConfig {
onImportError?: (error: Error, data: Uint8Array[]) => void;
onUpdateError?: (error: UpdateError) => void;
onUpdateError?: (
updates: Uint8Array[],
errorCode: number,
reason?: string
) => void;
}

/**
Expand Down Expand Up @@ -147,8 +149,6 @@ export class FlockAdaptor implements CrdtDocAdaptor {
return compareVersions(this.flock.version(), remote);
}

handleJoinErr?: (err: JoinError) => Promise<void>;

setCtx(ctx: CrdtAdaptorContext): void {
this.ctx = ctx;
if (this.unsubscribe) {
Expand All @@ -174,6 +174,14 @@ export class FlockAdaptor implements CrdtDocAdaptor {
return undefined;
}

onUpdateError(
updates: Uint8Array[],
errorCode: number,
reason?: string
): void {
this.config.onUpdateError?.(updates, errorCode, reason);
}

async handleJoinOk(res: JoinResponseOk): Promise<void> {
if (this.destroyed) return;
try {
Expand Down Expand Up @@ -228,10 +236,6 @@ export class FlockAdaptor implements CrdtDocAdaptor {
}
}

handleUpdateError(error: UpdateError): void {
this.config.onUpdateError?.(error);
}

destroy(): void {
if (this.destroyed) return;
this.destroyed = true;
Expand Down
32 changes: 19 additions & 13 deletions packages/loro-adaptors/src/loro-adaptor.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { LoroDoc, VersionVector } from "loro-crdt";
import {
CrdtType,
JoinError,
JoinResponseOk,
UpdateError,
} from "loro-protocol";
import type { CrdtAdaptorContext, CrdtDocAdaptor } from "./types";

export interface LoroAdaptorConfig {
onImportError?: (error: Error, data: Uint8Array[]) => void;
onUpdateError?: (error: UpdateError) => void;
onUpdateError?: (
updates: Uint8Array[],
errorCode: number,
reason?: string
) => void;
}

export class LoroAdaptor implements CrdtDocAdaptor {
Expand Down Expand Up @@ -55,8 +57,6 @@ export class LoroAdaptor implements CrdtDocAdaptor {
return this.doc.version().compare(vv) as 0 | 1 | -1 | undefined;
}

handleJoinErr?: ((err: JoinError) => Promise<void>) | undefined;

getDoc(): LoroDoc {
return this.doc;
}
Expand All @@ -79,6 +79,14 @@ export class LoroAdaptor implements CrdtDocAdaptor {
return undefined;
}

onUpdateError(
updates: Uint8Array[],
errorCode: number,
reason?: string
): void {
this.config.onUpdateError?.(updates, errorCode, reason);
}

async handleJoinOk(res: JoinResponseOk): Promise<void> {
if (this.destroyed) return;

Expand Down Expand Up @@ -110,7 +118,7 @@ export class LoroAdaptor implements CrdtDocAdaptor {
this.ctx?.send([updates]);
}
} catch (error) {
this.ctx!.onJoinFailed(
this.ctx?.onJoinFailed(
error instanceof Error ? error.message : String(error)
);
throw error;
Expand All @@ -126,7 +134,11 @@ export class LoroAdaptor implements CrdtDocAdaptor {
// Pending updates may occur when concurrent changes happen
}
} catch (error) {
this.ctx!.onImportError(
this.config.onImportError?.(
error instanceof Error ? error : new Error(String(error)),
updates
);
this.ctx?.onImportError(
error instanceof Error ? error : new Error(String(error)),
updates
);
Expand All @@ -142,12 +154,6 @@ export class LoroAdaptor implements CrdtDocAdaptor {
}
}

handleUpdateError(error: UpdateError): void {
if (this.config.onUpdateError) {
this.config.onUpdateError(error);
}
}

destroy(): void {
if (this.destroyed) return;
this.destroyed = true;
Expand Down
2 changes: 0 additions & 2 deletions packages/loro-adaptors/src/loro-ephemeral-adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ export class LoroEphemeralAdaptor implements CrdtDocAdaptor {
return 0 as const;
}

handleJoinErr?: undefined;

getStore(): EphemeralStore {
return this.store;
}
Expand Down
2 changes: 0 additions & 2 deletions packages/loro-adaptors/src/loro-persistent-store-adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ export class LoroPersistentStoreAdaptor implements CrdtDocAdaptor {
return 0 as const;
}

handleJoinErr?: undefined;

getStore(): EphemeralStore {
return this.store;
}
Expand Down
Loading