@@ -193,11 +193,146 @@ impl Client {
193193#[ cfg( test) ]
194194mod tests {
195195 use super :: * ;
196+
196197 #[ test]
197198 fn client_error_display ( ) {
198199 let e = ClientError :: Protocol ( "bad" . into ( ) ) ;
199200 assert ! ( format!( "{}" , e) . contains( "protocol error" ) ) ;
200201 }
202+
203+ #[ derive( Default ) ]
204+ struct RecordingAdaptor {
205+ updates : Arc < Mutex < Vec < Vec < u8 > > > > ,
206+ }
207+
208+ #[ async_trait:: async_trait]
209+ impl CrdtDocAdaptor for RecordingAdaptor {
210+ fn crdt_type ( & self ) -> CrdtType {
211+ CrdtType :: Loro
212+ }
213+
214+ async fn version ( & self ) -> Vec < u8 > {
215+ Vec :: new ( )
216+ }
217+
218+ async fn set_ctx ( & mut self , _ctx : CrdtAdaptorContext ) { }
219+
220+ async fn handle_join_ok (
221+ & mut self ,
222+ _permission : protocol:: Permission ,
223+ _version : Vec < u8 > ,
224+ ) {
225+ }
226+
227+ async fn apply_update ( & mut self , updates : Vec < Vec < u8 > > ) {
228+ self . updates . lock ( ) . await . extend ( updates) ;
229+ }
230+ }
231+
232+ #[ tokio:: test( flavor = "current_thread" ) ]
233+ async fn fragment_reassembly_delivers_updates_in_order ( ) {
234+ let ( tx, _rx) = mpsc:: unbounded_channel :: < Message > ( ) ;
235+ let rooms = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
236+ let pending = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
237+ let adaptors = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
238+ let pre_join_buf = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
239+ let frag_batches = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
240+ let config = Arc :: new ( ClientConfig :: default ( ) ) ;
241+
242+ let worker = ConnectionWorker :: new (
243+ tx,
244+ rooms,
245+ pending,
246+ adaptors. clone ( ) ,
247+ pre_join_buf,
248+ frag_batches,
249+ config,
250+ ) ;
251+
252+ let room_id = "room-frag" . to_string ( ) ;
253+ let key = RoomKey {
254+ crdt : CrdtType :: Loro ,
255+ room : room_id. clone ( ) ,
256+ } ;
257+ let collected = Arc :: new ( Mutex :: new ( Vec :: < Vec < u8 > > :: new ( ) ) ) ;
258+ adaptors. lock ( ) . await . insert (
259+ key. clone ( ) ,
260+ Box :: new ( RecordingAdaptor {
261+ updates : collected. clone ( ) ,
262+ } ) ,
263+ ) ;
264+
265+ let batch_id = protocol:: BatchId ( [ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ] ) ;
266+ worker
267+ . handle_message ( ProtocolMessage :: DocUpdateFragmentHeader {
268+ crdt : CrdtType :: Loro ,
269+ room_id : room_id. clone ( ) ,
270+ batch_id,
271+ fragment_count : 2 ,
272+ total_size_bytes : 10 ,
273+ } )
274+ . await ;
275+ // Send fragments out of order to ensure slot ordering is respected
276+ worker
277+ . handle_message ( ProtocolMessage :: DocUpdateFragment {
278+ crdt : CrdtType :: Loro ,
279+ room_id : room_id. clone ( ) ,
280+ batch_id,
281+ index : 1 ,
282+ fragment : b"world" . to_vec ( ) ,
283+ } )
284+ . await ;
285+ worker
286+ . handle_message ( ProtocolMessage :: DocUpdateFragment {
287+ crdt : CrdtType :: Loro ,
288+ room_id,
289+ batch_id,
290+ index : 0 ,
291+ fragment : b"hello" . to_vec ( ) ,
292+ } )
293+ . await ;
294+
295+ let updates = collected. lock ( ) . await ;
296+ assert_eq ! ( updates. as_slice( ) , & [ b"helloworld" . to_vec( ) ] ) ;
297+ }
298+
299+ #[ tokio:: test( flavor = "current_thread" ) ]
300+ async fn elo_snapshot_container_roundtrips_plaintext ( ) {
301+ let doc = Arc :: new ( Mutex :: new ( LoroDoc :: new ( ) ) ) ;
302+ let key = [ 7u8 ; 32 ] ;
303+ let adaptor = EloDocAdaptor :: new ( doc, "kid" , key)
304+ . with_iv_factory ( Arc :: new ( || [ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 ] ) ) ;
305+ let plaintext = b"hello-elo" . to_vec ( ) ;
306+
307+ let container = adaptor. encode_elo_snapshot_container ( & plaintext) ;
308+ let records =
309+ protocol:: elo:: decode_elo_container ( & container) . expect ( "container should decode" ) ;
310+ assert_eq ! ( records. len( ) , 1 ) ;
311+ let parsed =
312+ protocol:: elo:: parse_elo_record_header ( records[ 0 ] ) . expect ( "header should parse" ) ;
313+ match parsed. header {
314+ protocol:: elo:: EloHeader :: Snapshot ( hdr) => {
315+ assert_eq ! ( hdr. key_id, "kid" ) ;
316+ assert_eq ! ( hdr. iv, [ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 ] ) ;
317+ let cipher = aes_gcm:: Aes256Gcm :: new_from_slice ( & key) . unwrap ( ) ;
318+ let decrypted = cipher
319+ . decrypt (
320+ aes_gcm:: Nonce :: from_slice ( & hdr. iv ) ,
321+ aes_gcm:: aead:: Payload {
322+ msg : parsed. ct ,
323+ aad : parsed. aad ,
324+ } ,
325+ )
326+ . unwrap ( ) ;
327+ assert_eq ! ( decrypted, plaintext) ;
328+ }
329+ _ => panic ! ( "expected snapshot header" ) ,
330+ }
331+ assert ! ( matches!(
332+ parsed. kind,
333+ protocol:: elo:: EloRecordKind :: Snapshot
334+ ) ) ;
335+ }
201336}
202337
203338#[ derive( Clone ) ]
@@ -1017,6 +1152,8 @@ impl Drop for LoroDocAdaptor {
10171152}
10181153
10191154// --- EloDocAdaptor: E2EE Loro (minimal snapshot-only packaging) ---
1155+ /// Experimental %ELO adaptor. Snapshot-only packaging is implemented today;
1156+ /// delta packaging and API stability are WIP and may change.
10201157pub struct EloDocAdaptor {
10211158 doc : Arc < Mutex < LoroDoc > > ,
10221159 ctx : Option < CrdtAdaptorContext > ,
@@ -1146,7 +1283,7 @@ impl CrdtDocAdaptor for EloDocAdaptor {
11461283
11471284 async fn handle_join_ok ( & mut self , _permission : protocol:: Permission , _version : Vec < u8 > ) {
11481285 // On join, send a full encrypted snapshot to establish baseline.
1149- // TODO: REVIEW [elo-packaging]
1286+ // WIP: %ELO snapshot-only packaging; TODO: REVIEW [elo-packaging]
11501287 // This minimal implementation uses snapshot-only packaging and empty VV.
11511288 // It is correct but not optimal; consider delta packaging in a follow-up.
11521289 if let Ok ( snap) = self . doc . lock ( ) . await . export ( loro:: ExportMode :: Snapshot ) {
0 commit comments