Skip to content

Commit 04fcc6b

Browse files
committed
feat: add doc context args to WS server hooks and harden handshake response
1 parent 4c1d6c3 commit 04fcc6b

File tree

15 files changed

+574
-169
lines changed

15 files changed

+574
-169
lines changed

rust/loro-protocol/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ edition = "2021"
55
description = "Rust implementation of the Loro Syncing Protocol encoder/decoder"
66
license = "MIT"
77
repository = "https://example.com/"
8-
readme = false
8+
readme = "README.md"
99

1010
[lib]
1111
name = "loro_protocol"

rust/loro-protocol/README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# loro-protocol (Rust)
2+
3+
Rust implementation of the Loro syncing protocol encoder/decoder. Mirrors the TypeScript package in `packages/loro-protocol` and follows the wire format described in `protocol.md` and the end-to-end encrypted flow in `protocol-e2ee.md`.
4+
5+
## Features
6+
- Encode/decode Join, DocUpdate, FragmentHeader/Fragment, UpdateError, and Leave messages
7+
- 256 KiB message size guard to match the wire spec
8+
- Bytes utilities (`BytesWriter`, `BytesReader`) for varint/varbytes/varstring
9+
10+
## Usage
11+
12+
Add the crate to your workspace (published crate name matches the package):
13+
14+
```bash
15+
cargo add loro-protocol
16+
```
17+
18+
Encode and decode messages:
19+
20+
```rust
21+
use loro_protocol::{encode, decode, ProtocolMessage, CrdtType};
22+
23+
let msg = ProtocolMessage::JoinRequest {
24+
crdt: CrdtType::Loro,
25+
room_id: "room-123".to_string(),
26+
auth: vec![],
27+
version: vec![],
28+
};
29+
30+
let bytes = encode(&msg)?;
31+
let roundtrip = decode(&bytes)?;
32+
assert_eq!(roundtrip, msg);
33+
```
34+
35+
Streaming-friendly decode:
36+
37+
```rust
38+
use loro_protocol::try_decode;
39+
40+
let buf = /* bytes from the wire */;
41+
if let Some(msg) = try_decode(&buf) {
42+
// valid message
43+
} else {
44+
// malformed or incomplete buffer
45+
}
46+
```
47+
48+
## Tests
49+
50+
```bash
51+
cargo test -p loro-protocol
52+
```
53+
54+
## Spec References
55+
56+
- `protocol.md` for the wire format and message semantics
57+
- `protocol-e2ee.md` for %ELO encryption details

rust/loro-protocol/src/lib.rs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
1-
//! Loro Syncing Protocol (Rust)
1+
//! # Loro Syncing Protocol (Rust)
22
//!
3-
//! This crate provides a pure-Rust implementation of the message
4-
//! encoding/decoding used by the Loro syncing protocol. It mirrors the
5-
//! TypeScript package in `packages/loro-protocol` and follows the wire format
6-
//! specified in `/protocol.md`.
3+
//! Rust encoder/decoder for the Loro wire format described in `/protocol.md`
4+
//! It mirrors the TypeScript implementation in `packages/loro-protocol` and
5+
//! is shared by the Rust WebSocket server/client crates.
76
//!
8-
//! Features:
9-
//! - Compact, allocation-friendly binary encoding
10-
//! - Support for Loro, Yjs, Loro Ephemeral Store, and Yjs Awareness CRDT kinds
11-
//! - Strict validation for message type, room id length, and maximum message size (256KB)
12-
//! - Helpers for variable-length integers/bytes/strings
7+
//! ## Guarantees
8+
//! - Enforces the 256 KiB per-message limit during encoding
9+
//! - Validates CRDT magic bytes and room id length (<= 128 bytes)
10+
//! - Uses compact, allocation-friendly varint encoding for strings/bytes
1311
//!
14-
//! Quick start
12+
//! ## Crate layout
13+
//! - `protocol`: message enums/constants and the `ProtocolMessage` enum
14+
//! - `encoding`: `encode`, `decode`, and `try_decode` helpers
15+
//! - `bytes`: `BytesWriter`/`BytesReader` for varint-friendly buffers
16+
//!
17+
//! ## Quick start
1518
//!
1619
//! ```
1720
//! use loro_protocol::{encode, decode, ProtocolMessage, CrdtType};
@@ -30,14 +33,30 @@
3033
//! assert_eq!(roundtrip, msg);
3134
//! ```
3235
//!
33-
//! For the protocol details and field ordering, see `protocol.md`.
36+
//! ## Streaming decode
37+
//!
38+
//! ```
39+
//! use loro_protocol::{encode, try_decode, ProtocolMessage, CrdtType};
40+
//!
41+
//! // Prepare any valid message buffer
42+
//! let buf = encode(&ProtocolMessage::Leave {
43+
//! crdt: CrdtType::Loro,
44+
//! room_id: "room-123".into(),
45+
//! }).unwrap();
46+
//!
47+
//! let maybe = try_decode(&buf);
48+
//! assert!(maybe.is_some());
49+
//! ```
50+
//!
51+
//! See `protocol.md` and `protocol-e2ee.md` for the full field ordering and
52+
//! validation rules.
3453
3554
pub mod bytes;
36-
pub mod protocol;
37-
pub mod encoding;
3855
pub mod elo;
56+
pub mod encoding;
57+
pub mod protocol;
3958

4059
pub use bytes::{BytesReader, BytesWriter};
60+
pub use elo::*;
4161
pub use encoding::{decode, encode, try_decode};
4262
pub use protocol::*;
43-
pub use elo::*;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# loro-websocket-client (Rust)
2+
3+
Async WebSocket client for the Loro protocol. Exposes:
4+
- Low-level `Client` to send/receive raw `loro_protocol::ProtocolMessage`.
5+
- High-level `LoroWebsocketClient` that joins rooms and mirrors updates into a `loro::LoroDoc`, matching the TypeScript client behavior.
6+
7+
## Quick start
8+
9+
```rust
10+
use std::sync::Arc;
11+
use loro::{LoroDoc};
12+
use loro_websocket_client::LoroWebsocketClient;
13+
14+
# #[tokio::main(flavor = "current_thread")]
15+
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
16+
let client = LoroWebsocketClient::connect("ws://127.0.0.1:9000/ws1?token=secret").await?;
17+
let doc = Arc::new(tokio::sync::Mutex::new(LoroDoc::new()));
18+
let _room = client.join_loro("room1", doc.clone()).await?;
19+
// mutate doc then commit; the client auto-sends updates
20+
{ let mut d = doc.lock().await; d.get_text("text").insert(0, "hello")?; d.commit(); }
21+
# Ok(()) }
22+
```
23+
24+
## Features
25+
- Handles protocol keepalive (`"ping"/"pong"`) and filters control frames.
26+
- Automatic fragmentation/reassembly thresholds aligned with the server.
27+
- %ELO adaptor helpers to encrypt/decrypt containers alongside Loro.
28+
29+
## Tests
30+
31+
```bash
32+
cargo test -p loro-websocket-client
33+
```
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# loro-websocket-server (Rust)
2+
3+
Minimal async WebSocket server for the Loro protocol. Broadcasts DocUpdates between clients and provides hooks for auth and persistence. It mirrors the TypeScript server in `packages/loro-websocket`.
4+
5+
## Features
6+
- Supports `%LOR`, `%EPH`, `%ELO` and related CRDT types with fragment reassembly (≤256 KiB per message).
7+
- Connection keepalive handling (`"ping"/"pong"` text frames).
8+
- Workspace isolation via URL path (`/{workspace}`) and optional handshake auth.
9+
- Load/save hooks with optional per-document metadata context to assist persistence.
10+
11+
## Quick start
12+
13+
Run the bundled SQLite-backed example:
14+
15+
```bash
16+
cargo run -p loro-websocket-server --example simple-server -- --addr 127.0.0.1:9000 --db loro.db
17+
```
18+
19+
Then connect clients to `ws://127.0.0.1:9000/ws1`.
20+
21+
Integrate your own storage by wiring `ServerConfig.on_load_document` and `on_save_document`.
22+
23+
## Tests
24+
25+
```bash
26+
cargo test -p loro-websocket-server
27+
```

rust/loro-websocket-server/examples/simple-server.rs

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
1111
use clap::Parser;
1212
use std::{error::Error, path::PathBuf};
13-
use tracing::{debug, error, info, warn};
13+
use tracing::info;
1414
use tracing_subscriber::EnvFilter;
1515

1616
use loro_websocket_server::protocol::CrdtType;
17-
use loro_websocket_server::{serve_incoming_with_config, ServerConfig};
17+
use loro_websocket_server::{
18+
serve_incoming_with_config, LoadDocArgs, LoadedDoc, SaveDocArgs, ServerConfig,
19+
};
1820
use tokio::net::TcpListener;
1921

2022
#[derive(Parser, Debug)]
@@ -72,37 +74,53 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
7274

7375
// Wire persistence hooks
7476
let db_for_load = db_path.clone();
75-
type LoadFut = std::pin::Pin<
76-
Box<dyn std::future::Future<Output = Result<Option<Vec<u8>>, String>> + Send>,
77-
>;
77+
type LoadFut =
78+
std::pin::Pin<Box<dyn std::future::Future<Output = Result<LoadedDoc<()>, String>> + Send>>;
7879
type SaveFut = std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send>>;
7980

80-
let on_load = std::sync::Arc::new(move |workspace: String, room: String, crdt: CrdtType| {
81+
let on_load = std::sync::Arc::new(move |args: LoadDocArgs| {
8182
let db_path = db_for_load.clone();
8283
let fut = async move {
83-
tokio::task::spawn_blocking(move || load_snapshot(&db_path, &workspace, &room, crdt))
84-
.await
85-
.map_err(|e| format!("task join error: {e}"))?
84+
let LoadDocArgs {
85+
workspace,
86+
room,
87+
crdt,
88+
} = args;
89+
let snapshot = tokio::task::spawn_blocking(move || {
90+
load_snapshot(&db_path, &workspace, &room, crdt)
91+
})
92+
.await
93+
.map_err(|e| format!("task join error: {e}"))?;
94+
let snapshot = snapshot?;
95+
Ok(LoadedDoc {
96+
snapshot,
97+
ctx: None,
98+
})
8699
};
87100
let fut: LoadFut = Box::pin(fut);
88101
fut
89102
});
90103

91104
let db_for_save = db_path.clone();
92-
let on_save = std::sync::Arc::new(
93-
move |workspace: String, room: String, crdt: CrdtType, data: Vec<u8>| {
94-
let db_path = db_for_save.clone();
95-
let fut = async move {
96-
tokio::task::spawn_blocking(move || {
97-
save_snapshot(&db_path, &workspace, &room, crdt, &data)
98-
})
99-
.await
100-
.map_err(|e| format!("task join error: {e}"))?
101-
};
102-
let fut: SaveFut = Box::pin(fut);
103-
fut
104-
},
105-
);
105+
let on_save = std::sync::Arc::new(move |args: SaveDocArgs<()>| {
106+
let db_path = db_for_save.clone();
107+
let fut = async move {
108+
let SaveDocArgs {
109+
workspace,
110+
room,
111+
crdt,
112+
data,
113+
..
114+
} = args;
115+
tokio::task::spawn_blocking(move || {
116+
save_snapshot(&db_path, &workspace, &room, crdt, &data)
117+
})
118+
.await
119+
.map_err(|e| format!("task join error: {e}"))?
120+
};
121+
let fut: SaveFut = Box::pin(fut);
122+
fut
123+
});
106124

107125
let cfg = ServerConfig {
108126
on_load_document: Some(on_load),

0 commit comments

Comments
 (0)