Skip to content

Commit db6bfab

Browse files
authored
feat: Add persistent EphemeralStore adaptors (#18)
* docs: document persisted ephemeral store type * Allow persisted store backfill without peers * Handle SimpleServer start errors * feat: add rust support * fix: exportSnapshot
1 parent 1690aac commit db6bfab

File tree

16 files changed

+775
-105
lines changed

16 files changed

+775
-105
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ The Rust workspace contains a minimal async WebSocket server (`loro-websocket-se
157157

158158
## Protocol Highlights
159159

160-
- Magic bytes per CRDT: "%LOR" (Loro doc), "%EPH" (Loro ephemeral), "%YJS", "%YAW", …
160+
- Magic bytes per CRDT: "%LOR" (Loro doc), "%EPH" (Loro ephemeral), "%EPS" (persisted Loro ephemeral – tells the server to keep the latest state so new peers can load it immediately), "%YJS", "%YAW", …
161161
- Messages: JoinRequest/JoinResponseOk/JoinError, DocUpdate, DocUpdateFragmentHeader/Fragment, UpdateError, Leave
162162
- Limits: 256 KiB per message; large updates must be fragmented; default reassembly timeout 10s
163163
- Multi‑room: room ID is part of every message; one connection can join multiple rooms

packages/loro-adaptors/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The websocket client (`loro-websocket`) speaks the binary wire protocol. These a
1414

1515
- `LoroAdaptor`: wraps a `LoroDoc` and streams local updates to the connection; applies remote updates on receipt
1616
- `LoroEphemeralAdaptor`: wraps an `EphemeralStore` for transient presence/cursor data
17+
- `LoroPersistentStoreAdaptor`: wraps an `EphemeralStore` but marks updates as persisted so the server stores them for new peers
1718
- `EloLoroAdaptor`: wraps a `LoroDoc` and packages updates into %ELO containers with AES‑GCM; decrypts inbound containers and imports plaintext.
1819

1920
## Usage
@@ -23,6 +24,7 @@ import { LoroWebsocketClient } from "loro-websocket";
2324
import {
2425
LoroAdaptor,
2526
LoroEphemeralAdaptor,
27+
LoroPersistentStoreAdaptor,
2628
EloLoroAdaptor,
2729
} from "loro-adaptors";
2830
import { LoroDoc, EphemeralStore } from "loro-crdt";
@@ -41,6 +43,14 @@ const eph = new EphemeralStore(30_000);
4143
const ephAdaptor = new LoroEphemeralAdaptor(eph);
4244
const roomEph = await client.join({ roomId: "demo", crdtAdaptor: ephAdaptor });
4345

46+
// Persisted presence that should be available to late joiners
47+
const persistedStore = new EphemeralStore(30_000);
48+
const persistedAdaptor = new LoroPersistentStoreAdaptor(persistedStore);
49+
const roomPersisted = await client.join({
50+
roomId: "demo-persisted",
51+
crdtAdaptor: persistedAdaptor,
52+
});
53+
4454
// %ELO (end‑to‑end encrypted Loro)
4555
const key = new Uint8Array(32);
4656
const elo = new EloLoroAdaptor({
@@ -55,13 +65,15 @@ doc.commit();
5565
// Cleanup
5666
await roomEph.destroy();
5767
await roomDoc.destroy();
68+
await roomPersisted.destroy();
5869
await secure.destroy();
5970
```
6071

6172
## API
6273

6374
- `new LoroAdaptor(doc?: LoroDoc, config?: { onImportError?, onUpdateError? })`
6475
- `new LoroEphemeralAdaptor(store?: EphemeralStore)`
76+
- `new LoroPersistentStoreAdaptor(store?: EphemeralStore)`
6577
- `new EloLoroAdaptor(docOrConfig: LoroDoc | { getPrivateKey, ivFactory?, onDecryptError?, onUpdateError? })`
6678
- `getPrivateKey: (keyId?) => Promise<{ keyId: string, key: CryptoKey | Uint8Array }>`
6779
- Optional `ivFactory()` for testing (12‑byte IV)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from "./loro-adaptor";
22
export * from "./loro-ephemeral-adaptor";
3+
export * from "./loro-persistent-store-adaptor";
34
export * from "./elo-adaptor";
45
export * from "./flock-adaptor";
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { EphemeralStore } from "loro-crdt";
2+
import { CrdtType, JoinResponseOk } from "loro-protocol";
3+
import type { CrdtAdaptorContext, CrdtDocAdaptor } from "./types";
4+
5+
export class LoroPersistentStoreAdaptor implements CrdtDocAdaptor {
6+
readonly crdtType = CrdtType.LoroEphemeralStorePersisted;
7+
8+
private store: EphemeralStore;
9+
private ctx?: CrdtAdaptorContext;
10+
private localUpdateUnsubscribe?: () => void;
11+
private destroyed = false;
12+
13+
constructor(store?: EphemeralStore) {
14+
this.store = store || new EphemeralStore();
15+
}
16+
17+
waitForReachingServerVersion(): Promise<void> {
18+
return Promise.resolve();
19+
}
20+
21+
cmpVersion(_v: Uint8Array) {
22+
return 0 as const;
23+
}
24+
25+
handleJoinErr?: undefined;
26+
27+
getStore(): EphemeralStore {
28+
return this.store;
29+
}
30+
31+
setCtx(ctx: CrdtAdaptorContext): void {
32+
this.ctx = ctx;
33+
this.localUpdateUnsubscribe = this.store.subscribeLocalUpdates(updates => {
34+
if (!this.destroyed && this.ctx) {
35+
this.ctx.send([updates]);
36+
}
37+
});
38+
}
39+
40+
getVersion(): Uint8Array {
41+
return new Uint8Array();
42+
}
43+
44+
getAlternativeVersion(_currentVersion: Uint8Array): Uint8Array | undefined {
45+
return undefined;
46+
}
47+
48+
async handleJoinOk(_res: JoinResponseOk): Promise<void> {
49+
if (this.destroyed) return;
50+
if (this.ctx) {
51+
const allState = this.store.encodeAll();
52+
if (allState.length > 0) {
53+
this.ctx.send([allState]);
54+
}
55+
}
56+
}
57+
58+
applyUpdate(updates: Uint8Array[]): void {
59+
if (this.destroyed || !updates?.length) return;
60+
for (const update of updates) {
61+
if (update?.length > 0) {
62+
try {
63+
this.store.apply(update);
64+
} catch (error) {
65+
this.ctx?.onImportError(
66+
error instanceof Error ? error : new Error(String(error)),
67+
[update]
68+
);
69+
}
70+
}
71+
}
72+
}
73+
74+
destroy(): void {
75+
if (this.destroyed) return;
76+
this.destroyed = true;
77+
78+
if (this.localUpdateUnsubscribe) {
79+
this.localUpdateUnsubscribe();
80+
this.localUpdateUnsubscribe = undefined;
81+
}
82+
83+
this.store.destroy();
84+
this.ctx = undefined;
85+
}
86+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from "./server-registry";
22
export * from "./server-loro-adaptor";
33
export * from "./server-loro-ephemeral-adaptor";
4+
export * from "./server-loro-persistent-store-adaptor";
45
export * from "./server-yjs-awareness-adaptor";
56
export * from "./server-flock-adaptor";
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import { EphemeralStore } from "loro-crdt";
2+
import {
3+
CrdtType,
4+
Permission,
5+
MessageType,
6+
JoinResponseOk,
7+
UpdateError,
8+
UpdateErrorCode,
9+
} from "loro-protocol";
10+
import type { CrdtServerAdaptor } from "../types";
11+
12+
export interface LoroPersistentStoreServerAdaptorConfig {
13+
timeout?: number;
14+
}
15+
16+
export class LoroPersistentStoreServerAdaptor implements CrdtServerAdaptor {
17+
readonly crdtType = CrdtType.LoroEphemeralStorePersisted;
18+
private readonly timeout: number;
19+
20+
constructor(config: LoroPersistentStoreServerAdaptorConfig = {}) {
21+
this.timeout = config.timeout ?? 10_000;
22+
}
23+
24+
createEmpty(): Uint8Array {
25+
const store = new EphemeralStore(this.timeout);
26+
try {
27+
return store.encodeAll();
28+
} finally {
29+
store.inner.free();
30+
}
31+
}
32+
33+
handleJoinRequest(
34+
documentData: Uint8Array,
35+
_clientVersion: Uint8Array,
36+
permission: Permission
37+
): {
38+
response: JoinResponseOk;
39+
updates?: Uint8Array[];
40+
} {
41+
const response: JoinResponseOk = {
42+
type: MessageType.JoinResponseOk,
43+
crdt: this.crdtType,
44+
roomId: "",
45+
permission,
46+
version: new Uint8Array(),
47+
};
48+
49+
const updates = documentData.length > 0 ? [documentData] : undefined;
50+
return { response, updates };
51+
}
52+
53+
applyUpdates(
54+
documentData: Uint8Array,
55+
updates: Uint8Array[],
56+
permission: Permission
57+
): {
58+
success: boolean;
59+
newDocumentData?: Uint8Array;
60+
error?: UpdateError;
61+
broadcastUpdates?: Uint8Array[];
62+
} {
63+
if (permission === "read") {
64+
return {
65+
success: false,
66+
error: {
67+
type: MessageType.UpdateError,
68+
crdt: this.crdtType,
69+
roomId: "",
70+
code: UpdateErrorCode.PermissionDenied,
71+
message: "Read-only permission, cannot apply updates",
72+
},
73+
};
74+
}
75+
76+
const store = new EphemeralStore(this.timeout);
77+
const broadcastUpdates: Uint8Array[] = [];
78+
79+
try {
80+
if (documentData.length > 0) {
81+
store.apply(documentData);
82+
}
83+
for (const update of updates) {
84+
if (update.length > 0) {
85+
store.apply(update);
86+
broadcastUpdates.push(update);
87+
}
88+
}
89+
90+
const newDocumentData = store.encodeAll();
91+
92+
return {
93+
success: true,
94+
newDocumentData,
95+
broadcastUpdates:
96+
broadcastUpdates.length > 0 ? broadcastUpdates : undefined,
97+
};
98+
} catch (error) {
99+
return {
100+
success: false,
101+
error: {
102+
type: MessageType.UpdateError,
103+
crdt: this.crdtType,
104+
roomId: "",
105+
code: UpdateErrorCode.InvalidUpdate,
106+
message: error instanceof Error ? error.message : "Invalid update",
107+
},
108+
};
109+
} finally {
110+
store.destroy();
111+
store.inner.free();
112+
}
113+
}
114+
115+
getVersion(_documentData: Uint8Array): Uint8Array {
116+
return new Uint8Array();
117+
}
118+
119+
merge(documents: Uint8Array[]): Uint8Array {
120+
const store = new EphemeralStore(this.timeout);
121+
for (const data of documents) {
122+
if (data.length > 0) {
123+
store.apply(data);
124+
}
125+
}
126+
try {
127+
return store.encodeAll();
128+
} finally {
129+
store.destroy();
130+
store.inner.free();
131+
}
132+
}
133+
}
134+
135+
export const loroPersistentStoreServerAdaptor =
136+
new LoroPersistentStoreServerAdaptor();

packages/loro-adaptors/tests/adaptors.test.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { describe, it, expect, beforeEach, vi } from "vitest";
22
import { LoroDoc, EphemeralStore, VersionVector } from "loro-crdt";
3-
import { LoroAdaptor, LoroEphemeralAdaptor } from "../src/adaptors";
3+
import {
4+
LoroAdaptor,
5+
LoroEphemeralAdaptor,
6+
LoroPersistentStoreAdaptor,
7+
} from "../src/adaptors";
48
import { CrdtType, JoinResponseOk, MessageType } from "loro-protocol";
59

610
describe("LoroAdaptor", () => {
@@ -125,6 +129,46 @@ describe("LoroEphemeralAdaptor", () => {
125129
});
126130
});
127131

132+
describe("LoroPersistentStoreAdaptor", () => {
133+
let adaptor: LoroPersistentStoreAdaptor;
134+
let store: EphemeralStore;
135+
136+
beforeEach(() => {
137+
store = new EphemeralStore();
138+
adaptor = new LoroPersistentStoreAdaptor(store);
139+
});
140+
141+
it("should have correct CRDT type", () => {
142+
expect(adaptor.crdtType).toBe(CrdtType.LoroEphemeralStorePersisted);
143+
});
144+
145+
it("should return the underlying EphemeralStore", () => {
146+
expect(adaptor.getStore()).toBe(store);
147+
});
148+
149+
it("should send full state on join", async () => {
150+
const mockSend = vi.fn();
151+
adaptor.setCtx({
152+
send: mockSend,
153+
onJoinFailed: vi.fn(),
154+
onImportError: vi.fn(),
155+
});
156+
157+
store.set("cursor", { x: 1 });
158+
const joinOk: JoinResponseOk = {
159+
type: MessageType.JoinResponseOk,
160+
crdt: CrdtType.LoroEphemeralStorePersisted,
161+
roomId: "room",
162+
permission: "write",
163+
version: new Uint8Array(),
164+
};
165+
166+
await adaptor.handleJoinOk(joinOk);
167+
168+
expect(mockSend).toHaveBeenCalledWith([expect.any(Uint8Array)]);
169+
});
170+
});
171+
128172
describe("Utility functions", () => {
129173
it("should create LoroAdaptor with new LoroDoc", () => {
130174
const adaptor = new LoroAdaptor();

packages/loro-protocol/src/encoding.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
const validCrdtTypes = [
2323
CrdtType.Loro,
2424
CrdtType.LoroEphemeralStore,
25+
CrdtType.LoroEphemeralStorePersisted,
2526
CrdtType.Yjs,
2627
CrdtType.YjsAwareness,
2728
CrdtType.Elo,
@@ -35,6 +36,7 @@ function isCrdtType(x: unknown): x is CrdtType {
3536
typeof x === "string" &&
3637
(x === CrdtType.Loro ||
3738
x === CrdtType.LoroEphemeralStore ||
39+
x === CrdtType.LoroEphemeralStorePersisted ||
3840
x === CrdtType.Yjs ||
3941
x === CrdtType.YjsAwareness ||
4042
x === CrdtType.Flock ||

packages/loro-protocol/src/protocol.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export const MAX_MESSAGE_SIZE = 256 * 1024;
55
export const CrdtType = {
66
Loro: "%LOR",
77
LoroEphemeralStore: "%EPH",
8+
LoroEphemeralStorePersisted: "%EPS",
89
Yjs: "%YJS",
910
YjsAwareness: "%YAW",
1011
Elo: "%ELO",
@@ -13,6 +14,7 @@ export const CrdtType = {
1314
export const CrdtTypeId = {
1415
Loro: "loro",
1516
LoroEphemeralStore: "eph",
17+
LoroEphemeralStorePersisted: "eps",
1618
Yjs: "yjs",
1719
YjsAwareness: "yaw",
1820
Elo: "elo",
@@ -23,6 +25,7 @@ export type CrdtId = (typeof CrdtTypeId)[keyof typeof CrdtTypeId];
2325
export const MagicBytesToCrdtId = {
2426
"%LOR": "loro",
2527
"%EPH": "eph",
28+
"%EPS": "eps",
2629
"%YJS": "yjs",
2730
"%YAW": "yaw",
2831
"%ELO": "elo",

0 commit comments

Comments
 (0)