1+ use crate :: core:: origin:: CollabOrigin ;
12use crate :: database:: rows:: { Cell , ROW_CELLS , ROW_HEIGHT , ROW_VISIBILITY , Row } ;
23use crate :: entity:: uuid_validation:: RowId ;
34
@@ -18,33 +19,47 @@ pub enum RowChange {
1819 DidUpdateVisibility {
1920 row_id : RowId ,
2021 value : bool ,
22+ is_local_change : bool ,
2123 } ,
2224 DidUpdateHeight {
2325 row_id : RowId ,
2426 value : i32 ,
27+ is_local_change : bool ,
2528 } ,
2629 DidUpdateCell {
2730 row_id : RowId ,
2831 field_id : String ,
2932 value : Cell ,
33+ is_local_change : bool ,
3034 } ,
3135 DidUpdateRowComment {
3236 row : Row ,
37+ is_local_change : bool ,
3338 } ,
3439}
3540
3641pub ( crate ) fn subscribe_row_data_change (
42+ origin : CollabOrigin ,
3743 row_id : RowId ,
3844 row_data_map : & MapRef ,
3945 change_tx : RowChangeSender ,
4046) {
4147 row_data_map. observe_deep_with ( "change" , move |txn, events| {
48+ let txn_origin = CollabOrigin :: from ( txn) ;
49+ let is_local_change = txn_origin == origin;
4250 for event in events. iter ( ) {
4351 match event {
4452 Event :: Text ( _) => { } ,
4553 Event :: Array ( _) => { } ,
4654 Event :: Map ( map_event) => {
47- handle_map_event ( & row_id, & change_tx, txn, event, map_event) ;
55+ handle_map_event (
56+ & row_id,
57+ & change_tx,
58+ is_local_change,
59+ txn,
60+ event,
61+ map_event,
62+ ) ;
4863 } ,
4964 Event :: XmlFragment ( _) => { } ,
5065 Event :: XmlText ( _) => { } ,
@@ -58,6 +73,7 @@ pub(crate) fn subscribe_row_data_change(
5873fn handle_map_event (
5974 row_id : & RowId ,
6075 change_tx : & RowChangeSender ,
76+ is_local_change : bool ,
6177 txn : & TransactionMut ,
6278 event : & Event ,
6379 map_event : & MapEvent ,
@@ -80,6 +96,7 @@ fn handle_map_event(
8096 let _ = change_tx. send ( RowChange :: DidUpdateHeight {
8197 row_id : * row_id,
8298 value : value as i32 ,
99+ is_local_change,
83100 } ) ;
84101 }
85102 } ,
@@ -88,6 +105,7 @@ fn handle_map_event(
88105 let _ = change_tx. send ( RowChange :: DidUpdateVisibility {
89106 row_id : * row_id,
90107 value,
108+ is_local_change,
91109 } ) ;
92110 }
93111 } ,
@@ -109,6 +127,7 @@ fn handle_map_event(
109127 row_id : * row_id,
110128 field_id,
111129 value : cell,
130+ is_local_change,
112131 } ) ;
113132 }
114133 } ,
@@ -127,6 +146,7 @@ fn handle_map_event(
127146 row_id : * row_id,
128147 field_id,
129148 value : cell,
149+ is_local_change,
130150 } ) ;
131151 }
132152 }
@@ -139,6 +159,7 @@ fn handle_map_event(
139159 row_id : * row_id,
140160 field_id,
141161 value : Cell :: default ( ) ,
162+ is_local_change,
142163 } ) ;
143164 }
144165 } ,
@@ -192,6 +213,7 @@ impl From<&str> for RowChangeValue {
192213#[ cfg( test) ]
193214mod tests {
194215 use super :: * ;
216+ use crate :: core:: origin:: { CollabClient , CollabOrigin } ;
195217 use crate :: database:: rows:: { CellBuilder , CellsUpdate } ;
196218 use crate :: preclude:: { Any , Doc , Map , MapExt , Transact } ;
197219 use std:: collections:: HashMap ;
@@ -220,14 +242,21 @@ mod tests {
220242 }
221243 }
222244
245+ fn local_and_remote_origins ( ) -> ( CollabOrigin , CollabOrigin ) {
246+ let local = CollabOrigin :: Client ( CollabClient :: new ( 0xdeadbeef , "local-device" ) ) ;
247+ let remote = CollabOrigin :: Client ( CollabClient :: new ( 0xfeedface , "remote-device" ) ) ;
248+ ( local, remote)
249+ }
250+
223251 #[ tokio:: test]
224252 async fn row_observer_emits_cell_height_and_visibility_changes ( ) {
225253 let doc = Doc :: new ( ) ;
226254 let row_data_map: MapRef = doc. get_or_insert_map ( "row_data" ) ;
227255 let row_id = Uuid :: new_v4 ( ) ;
256+ let ( origin, _) = local_and_remote_origins ( ) ;
228257
229258 let ( change_tx, mut change_rx) = broadcast:: channel ( 256 ) ;
230- subscribe_row_data_change ( row_id, & row_data_map, change_tx) ;
259+ subscribe_row_data_change ( origin . clone ( ) , row_id, & row_data_map, change_tx) ;
231260
232261 let field_id = Uuid :: new_v4 ( ) . to_string ( ) ;
233262 let initial_cell: CellBuilder = HashMap :: from ( [
@@ -236,12 +265,12 @@ mod tests {
236265 ] ) ;
237266
238267 {
239- let mut txn = doc. transact_mut ( ) ;
268+ let mut txn = doc. transact_mut_with ( origin . clone ( ) ) ;
240269 let _cells_map: MapRef = row_data_map. get_or_init ( & mut txn, ROW_CELLS ) ;
241270 }
242271
243272 {
244- let mut txn = doc. transact_mut ( ) ;
273+ let mut txn = doc. transact_mut_with ( origin . clone ( ) ) ;
245274 let cells_map: MapRef = row_data_map
246275 . get_with_txn ( & txn, ROW_CELLS )
247276 . expect ( "missing cells map" ) ;
@@ -259,9 +288,11 @@ mod tests {
259288 row_id : changed_row_id,
260289 field_id : changed_field_id,
261290 value,
291+ is_local_change,
262292 } => {
263293 assert_eq ! ( changed_row_id, row_id) ;
264294 assert_eq ! ( changed_field_id, field_id) ;
295+ assert ! ( is_local_change) ;
265296 assert_eq ! (
266297 value
267298 . get( "content" )
@@ -274,7 +305,7 @@ mod tests {
274305
275306 drain ( & mut change_rx) ;
276307 {
277- let mut txn = doc. transact_mut ( ) ;
308+ let mut txn = doc. transact_mut_with ( origin . clone ( ) ) ;
278309 let cells_map: MapRef = row_data_map
279310 . get_with_txn ( & txn, ROW_CELLS )
280311 . expect ( "missing cells map" ) ;
@@ -295,9 +326,11 @@ mod tests {
295326 row_id : changed_row_id,
296327 field_id : changed_field_id,
297328 value,
329+ is_local_change,
298330 } => {
299331 assert_eq ! ( changed_row_id, row_id) ;
300332 assert_eq ! ( changed_field_id, field_id) ;
333+ assert ! ( is_local_change) ;
301334 assert_eq ! (
302335 value
303336 . get( "content" )
@@ -310,7 +343,7 @@ mod tests {
310343
311344 drain ( & mut change_rx) ;
312345 {
313- let mut txn = doc. transact_mut ( ) ;
346+ let mut txn = doc. transact_mut_with ( origin . clone ( ) ) ;
314347 let cells_map: MapRef = row_data_map
315348 . get_with_txn ( & txn, ROW_CELLS )
316349 . expect ( "missing cells map" ) ;
@@ -328,24 +361,26 @@ mod tests {
328361 row_id : changed_row_id,
329362 field_id : changed_field_id,
330363 value,
364+ is_local_change,
331365 } => {
332366 assert_eq ! ( changed_row_id, row_id) ;
333367 assert_eq ! ( changed_field_id, field_id) ;
368+ assert ! ( is_local_change) ;
334369 assert ! ( value. is_empty( ) ) ;
335370 } ,
336371 other => panic ! ( "unexpected row change: {:?}" , other) ,
337372 }
338373
339374 drain ( & mut change_rx) ;
340375 {
341- let mut txn = doc. transact_mut ( ) ;
376+ let mut txn = doc. transact_mut_with ( origin . clone ( ) ) ;
342377 row_data_map. insert ( & mut txn, ROW_HEIGHT , Any :: BigInt ( 60 ) ) ;
343378 row_data_map. insert ( & mut txn, ROW_VISIBILITY , true ) ;
344379 }
345380
346381 drain ( & mut change_rx) ;
347382 {
348- let mut txn = doc. transact_mut ( ) ;
383+ let mut txn = doc. transact_mut_with ( origin . clone ( ) ) ;
349384 row_data_map. insert ( & mut txn, ROW_HEIGHT , Any :: BigInt ( 120 ) ) ;
350385 }
351386
@@ -359,16 +394,18 @@ mod tests {
359394 RowChange :: DidUpdateHeight {
360395 row_id : changed_row_id,
361396 value,
397+ is_local_change,
362398 } => {
363399 assert_eq ! ( changed_row_id, row_id) ;
364400 assert_eq ! ( value, 120 ) ;
401+ assert ! ( is_local_change) ;
365402 } ,
366403 other => panic ! ( "unexpected row change: {:?}" , other) ,
367404 }
368405
369406 drain ( & mut change_rx) ;
370407 {
371- let mut txn = doc. transact_mut ( ) ;
408+ let mut txn = doc. transact_mut_with ( origin . clone ( ) ) ;
372409 row_data_map. insert ( & mut txn, ROW_VISIBILITY , false ) ;
373410 }
374411
@@ -382,9 +419,67 @@ mod tests {
382419 RowChange :: DidUpdateVisibility {
383420 row_id : changed_row_id,
384421 value,
422+ is_local_change,
385423 } => {
386424 assert_eq ! ( changed_row_id, row_id) ;
387425 assert ! ( !value) ;
426+ assert ! ( is_local_change) ;
427+ } ,
428+ other => panic ! ( "unexpected row change: {:?}" , other) ,
429+ }
430+ }
431+
432+ #[ tokio:: test]
433+ async fn row_observer_marks_remote_changes_when_origin_differs ( ) {
434+ let doc = Doc :: new ( ) ;
435+ let row_data_map: MapRef = doc. get_or_insert_map ( "row_data" ) ;
436+ let row_id = Uuid :: new_v4 ( ) ;
437+ let ( origin, remote_origin) = local_and_remote_origins ( ) ;
438+
439+ let ( change_tx, mut change_rx) = broadcast:: channel ( 256 ) ;
440+ subscribe_row_data_change ( origin. clone ( ) , row_id, & row_data_map, change_tx) ;
441+
442+ let field_id = Uuid :: new_v4 ( ) . to_string ( ) ;
443+
444+ {
445+ let mut txn = doc. transact_mut_with ( origin. clone ( ) ) ;
446+ let _cells_map: MapRef = row_data_map. get_or_init ( & mut txn, ROW_CELLS ) ;
447+ }
448+
449+ drain ( & mut change_rx) ;
450+ {
451+ let mut txn = doc. transact_mut_with ( remote_origin) ;
452+ let cells_map: MapRef = row_data_map
453+ . get_with_txn ( & txn, ROW_CELLS )
454+ . expect ( "missing cells map" ) ;
455+ CellsUpdate :: new ( & mut txn, & cells_map) . insert (
456+ & field_id,
457+ HashMap :: from ( [ ( "content" . into ( ) , Any :: from ( "remote" ) ) ] ) ,
458+ ) ;
459+ }
460+
461+ let changed = loop {
462+ let change = recv_with_timeout ( & mut change_rx) . await ;
463+ if matches ! ( change, RowChange :: DidUpdateCell { .. } ) {
464+ break change;
465+ }
466+ } ;
467+ match changed {
468+ RowChange :: DidUpdateCell {
469+ row_id : changed_row_id,
470+ field_id : changed_field_id,
471+ value,
472+ is_local_change,
473+ } => {
474+ assert_eq ! ( changed_row_id, row_id) ;
475+ assert_eq ! ( changed_field_id, field_id) ;
476+ assert ! ( !is_local_change) ;
477+ assert_eq ! (
478+ value
479+ . get( "content" )
480+ . and_then( |v| v. clone( ) . cast:: <String >( ) . ok( ) ) ,
481+ Some ( "remote" . to_string( ) )
482+ ) ;
388483 } ,
389484 other => panic ! ( "unexpected row change: {:?}" , other) ,
390485 }
0 commit comments