@@ -12,6 +12,13 @@ import {
1212
1313export type Snapshot = JSONObject | JSONArray ;
1414
15+ /**
16+ * Applies Yjs events to a base object.
17+ * IMPORTANT: `base` must be a Mutative draft object. Direct mutations
18+ * are safe only within a Mutative draft context.
19+ * @param base The draft object to mutate (from Mutative's create)
20+ * @param event The Yjs event describing the change
21+ */
1522function applyYEvent < T extends JSONValue > ( base : T , event : Y . YEvent < any > ) {
1623 if ( event instanceof Y . YMapEvent && isJSONObject ( base ) ) {
1724 const source = event . target as Y . Map < any > ;
@@ -74,7 +81,7 @@ function defaultApplyPatch(target: Y.Map<any> | Y.Array<any>, patch: Patch) {
7481
7582 if ( ! path . length ) {
7683 if ( op !== PATCH_REPLACE ) {
77- notImplemented ( ) ;
84+ notImplemented ( `Cannot apply ${ op } operation to root level` ) ;
7885 }
7986
8087 if ( target instanceof Y . Map && isJSONObject ( value ) ) {
@@ -84,9 +91,11 @@ function defaultApplyPatch(target: Y.Map<any> | Y.Array<any>, patch: Patch) {
8491 }
8592 } else if ( target instanceof Y . Array && isJSONArray ( value ) ) {
8693 target . delete ( 0 , target . length ) ;
87- target . push ( value . map ( toYDataType ) ) ;
94+ target . push ( value . map ( ( v ) => toYDataType ( v ) ) ) ;
8895 } else {
89- notImplemented ( ) ;
96+ notImplemented (
97+ `Cannot replace root of type ${ target . constructor . name } with value type ${ typeof value } `
98+ ) ;
9099 }
91100
92101 return ;
@@ -116,20 +125,39 @@ function defaultApplyPatch(target: Y.Map<any> | Y.Array<any>, patch: Patch) {
116125 base . insert ( property , [ toYDataType ( value ) ] ) ;
117126 break ;
118127 case PATCH_REPLACE :
119- base . delete ( property ) ;
120- base . insert ( property , [ toYDataType ( value ) ] ) ;
128+ // If both old and new values are objects, try incremental update
129+ // to preserve other collaborators' changes
130+ const oldValue = base . get ( property ) ;
131+ if ( oldValue instanceof Y . Map && isJSONObject ( value ) ) {
132+ // Incremental update: update properties instead of replacing
133+ oldValue . clear ( ) ;
134+ Object . entries ( value ) . forEach ( ( [ k , v ] ) => {
135+ oldValue . set ( k , toYDataType ( v ) ) ;
136+ } ) ;
137+ } else {
138+ // For primitives or type changes, do full replacement
139+ base . delete ( property , 1 ) ;
140+ base . insert ( property , [ toYDataType ( value ) ] ) ;
141+ }
121142 break ;
122143 case PATCH_REMOVE :
123- base . delete ( property ) ;
144+ base . delete ( property , 1 ) ;
124145 break ;
125146 }
126147 } else if ( base instanceof Y . Array && property === 'length' ) {
127148 if ( value < base . length ) {
149+ // Shrink array
128150 const diff = base . length - value ;
129151 base . delete ( value , diff ) ;
152+ } else if ( value > base . length ) {
153+ // Expand array with null values
154+ const toAdd = new Array ( value - base . length ) . fill ( null ) ;
155+ base . push ( toAdd ) ;
130156 }
131157 } else {
132- notImplemented ( ) ;
158+ notImplemented (
159+ `Unsupported patch operation: ${ op } on ${ base ?. constructor ?. name ?? 'unknown' } .${ String ( property ) } `
160+ ) ;
133161 }
134162}
135163
@@ -148,18 +176,27 @@ function applyUpdate<S extends Snapshot>(
148176 fn : UpdateFn < S > ,
149177 applyPatch : typeof defaultApplyPatch ,
150178 patchesOptions : PatchesOptions
151- ) {
152- const [ , patches ] = create ( snapshot , fn , {
179+ ) : S {
180+ const [ nextState , patches ] = create ( snapshot , fn , {
153181 enablePatches : patchesOptions ,
154182 } ) ;
155183 for ( const patch of patches ) {
156184 applyPatch ( source , patch ) ;
157185 }
186+ return nextState ;
158187}
159188
160189export type ListenerFn < S extends Snapshot > = ( snapshot : S ) => void ;
161190export type UnsubscribeFn = ( ) => void ;
162191
192+ export type SubscribeOptions = {
193+ /**
194+ * If true, the listener will be called immediately with the current snapshot.
195+ * @default false
196+ */
197+ immediate ?: boolean ;
198+ } ;
199+
163200export type Binder < S extends Snapshot > = {
164201 /**
165202 * Release the binder.
@@ -181,8 +218,10 @@ export type Binder<S extends Snapshot> = {
181218 * Subscribe to snapshot update, fired when:
182219 * 1. User called update(fn).
183220 * 2. y.js source.observeDeep() fired.
221+ * @param fn Listener function that receives the new snapshot
222+ * @param options Optional configuration for subscription behavior
184223 */
185- subscribe : ( fn : ListenerFn < S > ) => UnsubscribeFn ;
224+ subscribe : ( fn : ListenerFn < S > , options ?: SubscribeOptions ) => UnsubscribeFn ;
186225} ;
187226
188227export type Options < S extends Snapshot > = {
@@ -210,6 +249,8 @@ export type Options<S extends Snapshot> = {
210249 * @param source The y.js data type to bind.
211250 * @param options Change default behavior, can be omitted.
212251 */
252+ const MUTATIVE_YJS_ORIGIN = Symbol ( 'mutative-yjs' ) ;
253+
213254export function bind < S extends Snapshot > (
214255 source : Y . Map < any > | Y . Array < any > ,
215256 options ?: Options < S >
@@ -220,12 +261,18 @@ export function bind<S extends Snapshot>(
220261
221262 const subscription = new Set < ListenerFn < S > > ( ) ;
222263
223- const subscribe = ( fn : ListenerFn < S > ) => {
264+ const subscribe = ( fn : ListenerFn < S > , options ?: SubscribeOptions ) => {
224265 subscription . add ( fn ) ;
266+ if ( options ?. immediate ) {
267+ fn ( get ( ) ) ;
268+ }
225269 return ( ) => void subscription . delete ( fn ) ;
226270 } ;
227271
228- const observer = ( events : Y . YEvent < any > [ ] ) => {
272+ const observer = ( events : Y . YEvent < any > [ ] , transaction : Y . Transaction ) => {
273+ // Skip events originated from this binder to prevent circular updates
274+ if ( transaction . origin === MUTATIVE_YJS_ORIGIN ) return ;
275+
229276 snapshot = applyYEvents ( get ( ) , events ) ;
230277 subscription . forEach ( ( fn ) => fn ( get ( ) ) ) ;
231278 } ;
@@ -256,13 +303,17 @@ export function bind<S extends Snapshot>(
256303 }
257304
258305 const doApplyUpdate = ( ) => {
259- applyUpdate ( source , get ( ) , fn , applyPatch , patchesOptionsInOption ) ;
306+ snapshot = applyUpdate ( source , get ( ) , fn , applyPatch , patchesOptionsInOption ) ;
260307 } ;
261308
262309 if ( doc ) {
263- Y . transact ( doc , doApplyUpdate ) ;
310+ Y . transact ( doc , doApplyUpdate , MUTATIVE_YJS_ORIGIN ) ;
311+ // Notify subscribers after transaction since observer skips our origin
312+ subscription . forEach ( ( fn ) => fn ( get ( ) ) ) ;
264313 } else {
314+ // Without doc, manually update snapshot and notify subscribers
265315 doApplyUpdate ( ) ;
316+ subscription . forEach ( ( fn ) => fn ( get ( ) ) ) ;
266317 }
267318 } ;
268319
0 commit comments