Skip to content

Commit f7115c2

Browse files
committed
feat: watch and fromWatch functions
1 parent 37ec893 commit f7115c2

File tree

12 files changed

+403
-15
lines changed

12 files changed

+403
-15
lines changed

src/index.spec.ts

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { BehaviorSubject, from } from 'rxjs';
55
import { writable as svelteWritable } from 'svelte/store';
66
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from 'vitest';
77
import type {
8+
InteropWatcher,
9+
InteropWatcherFactory,
810
OnUseArgument,
911
Readable,
1012
ReadableSignal,
@@ -24,10 +26,12 @@ import {
2426
computed,
2527
derived,
2628
equal,
29+
fromWatch,
2730
get,
2831
readable,
2932
symbolObservable,
3033
untrack,
34+
watch,
3135
writable,
3236
} from './index';
3337
import { rawStoreSymbol } from './internal/exposeRawStores';
@@ -3565,4 +3569,198 @@ describe('stores', () => {
35653569
expectCorrectlyCleanedUp(doubleDoubleDoubleStore);
35663570
});
35673571
});
3572+
3573+
describe('watch', () => {
3574+
it('should work', () => {
3575+
const onUseCalls: { onUnused: number }[] = [];
3576+
const store = writable(0, {
3577+
onUse: () => {
3578+
const call = { onUnused: 0 };
3579+
onUseCalls.push(call);
3580+
return () => {
3581+
call.onUnused++;
3582+
};
3583+
},
3584+
});
3585+
const notify = vi.fn();
3586+
const watcher = watch(store, notify);
3587+
expect(watcher.isUpToDate()).toBe(false);
3588+
expect(onUseCalls.length).toBe(0);
3589+
expect(watcher.update()).toBe(true);
3590+
expect(onUseCalls.length).toBe(1);
3591+
expect(onUseCalls[0].onUnused).toBe(0);
3592+
expect(watcher.isUpToDate()).toBe(true);
3593+
expect(watcher.get()).toBe(0);
3594+
expect(notify).not.toHaveBeenCalled();
3595+
store.set(1);
3596+
expect(notify).toHaveBeenCalledOnce();
3597+
notify.mockClear();
3598+
expect(watcher.isUpToDate()).toBe(false);
3599+
store.set(2);
3600+
expect(notify).not.toHaveBeenCalled();
3601+
expect(watcher.update()).toBe(true);
3602+
expect(watcher.isUpToDate()).toBe(true);
3603+
expect(watcher.get()).toBe(2);
3604+
expect(notify).not.toHaveBeenCalled();
3605+
store.set(3);
3606+
expect(notify).toHaveBeenCalledOnce();
3607+
expect(watcher.isUpToDate()).toBe(false);
3608+
notify.mockClear();
3609+
store.set(4);
3610+
store.set(2);
3611+
expect(notify).not.toHaveBeenCalled();
3612+
expect(watcher.update()).toBe(false);
3613+
expect(watcher.isUpToDate()).toBe(true);
3614+
expect(watcher.update()).toBe(false);
3615+
expect(watcher.get()).toBe(2);
3616+
watcher.destroy();
3617+
expect(() => watcher.get()).toThrowError('invalid watcher state');
3618+
});
3619+
});
3620+
3621+
describe('fromWatch', () => {
3622+
it('should work', () => {
3623+
let notify: () => void;
3624+
let isUpdated = true;
3625+
let value = 0;
3626+
const watcher = {
3627+
update: vi.fn(() => isUpdated),
3628+
get: vi.fn(() => value),
3629+
destroy: vi.fn(),
3630+
} satisfies InteropWatcher<number>;
3631+
const watchFn = vi.fn((notifyFn: () => void) => {
3632+
notify = notifyFn;
3633+
return watcher;
3634+
}) satisfies InteropWatcherFactory<number>;
3635+
const clearMocks = () => {
3636+
watchFn.mockClear();
3637+
watcher.update.mockClear();
3638+
watcher.get.mockClear();
3639+
watcher.destroy.mockClear();
3640+
};
3641+
const store = fromWatch(watchFn);
3642+
expect(watchFn).not.toHaveBeenCalled();
3643+
expect(store.get()).toBe(0);
3644+
expect(watchFn).toHaveBeenCalledOnce();
3645+
expect(watcher.update).toHaveBeenCalledOnce();
3646+
expect(watcher.get).toHaveBeenCalledOnce();
3647+
expect(watcher.destroy).toHaveBeenCalledOnce();
3648+
clearMocks();
3649+
const values: number[] = [];
3650+
const unsubscribe = store.subscribe((value) => {
3651+
values.push(value);
3652+
});
3653+
expect(values).toEqual([0]);
3654+
expect(watchFn).toHaveBeenCalledOnce();
3655+
expect(watcher.update).toHaveBeenCalledOnce();
3656+
expect(watcher.get).toHaveBeenCalledOnce();
3657+
expect(watcher.destroy).not.toHaveBeenCalled();
3658+
clearMocks();
3659+
isUpdated = false;
3660+
batch(() => {
3661+
notify!();
3662+
});
3663+
expect(watcher.update).toHaveBeenCalledOnce();
3664+
expect(watchFn).not.toHaveBeenCalled();
3665+
expect(watcher.get).not.toHaveBeenCalled();
3666+
expect(watcher.destroy).not.toHaveBeenCalled();
3667+
clearMocks();
3668+
expect(values).toEqual([0]);
3669+
isUpdated = true;
3670+
value = 2;
3671+
batch(() => {
3672+
notify!();
3673+
});
3674+
expect(watcher.update).toHaveBeenCalledOnce();
3675+
expect(watcher.get).toHaveBeenCalledOnce();
3676+
expect(watcher.destroy).not.toHaveBeenCalled();
3677+
expect(watchFn).not.toHaveBeenCalled();
3678+
clearMocks();
3679+
expect(values).toEqual([0, 2]);
3680+
isUpdated = true;
3681+
watcher.get.mockImplementation(() => {
3682+
throw new Error('myerror');
3683+
});
3684+
expect(() => {
3685+
batch(() => {
3686+
notify!();
3687+
});
3688+
}).toThrowError('myerror');
3689+
expect(watcher.update).toHaveBeenCalledOnce();
3690+
expect(watcher.get).toHaveBeenCalledOnce();
3691+
expect(watcher.destroy).not.toHaveBeenCalled();
3692+
expect(watchFn).not.toHaveBeenCalled();
3693+
clearMocks();
3694+
expect(() => {
3695+
store.get();
3696+
}).toThrowError('myerror');
3697+
expect(values).toEqual([0, 2]);
3698+
expect(watcher.update).not.toHaveBeenCalled();
3699+
expect(watcher.get).not.toHaveBeenCalled();
3700+
expect(watcher.destroy).not.toHaveBeenCalled();
3701+
expect(watchFn).not.toHaveBeenCalled();
3702+
unsubscribe();
3703+
expect(watcher.destroy).toHaveBeenCalledOnce();
3704+
expect(watcher.update).not.toHaveBeenCalled();
3705+
expect(watcher.get).not.toHaveBeenCalled();
3706+
expect(watchFn).not.toHaveBeenCalled();
3707+
});
3708+
});
3709+
3710+
describe('watch / fromWatch', () => {
3711+
it('should work to convert back and forth a basic writable', () => {
3712+
const store = writable(0);
3713+
const otherStore = fromWatch((notify) => watch(store, notify));
3714+
expect(otherStore()).toBe(0);
3715+
store.set(1);
3716+
expect(otherStore()).toBe(1);
3717+
store.set(2);
3718+
expect(otherStore()).toBe(2);
3719+
3720+
const values: number[] = [];
3721+
const unsubscribe = otherStore.subscribe((value) => {
3722+
values.push(value);
3723+
});
3724+
expect(values).toEqual([2]);
3725+
store.set(3);
3726+
expect(values).toEqual([2, 3]);
3727+
expect(otherStore()).toBe(3);
3728+
batch(() => {
3729+
store.set(4);
3730+
expect(otherStore()).toBe(4);
3731+
store.set(5);
3732+
store.set(3);
3733+
});
3734+
expect(values).toEqual([2, 3]);
3735+
unsubscribe();
3736+
});
3737+
3738+
it('should work to convert back and forth a computed', () => {
3739+
const store = writable(0);
3740+
const doubleStore = computed(() => store() * 2);
3741+
const otherStore = fromWatch((notify) => watch(doubleStore, notify));
3742+
expect(otherStore()).toBe(0);
3743+
store.set(1);
3744+
expect(otherStore()).toBe(2);
3745+
store.set(2);
3746+
expect(otherStore()).toBe(4);
3747+
3748+
const values: number[] = [];
3749+
const unsubscribe = otherStore.subscribe((value) => {
3750+
values.push(value);
3751+
});
3752+
expect(values).toEqual([4]);
3753+
store.set(3);
3754+
expect(values).toEqual([4, 6]);
3755+
expect(otherStore()).toBe(6);
3756+
batch(() => {
3757+
store.set(4);
3758+
expect(otherStore()).toBe(8);
3759+
store.set(5);
3760+
store.set(3);
3761+
});
3762+
expect(values).toEqual([4, 6]);
3763+
unsubscribe();
3764+
});
3765+
});
35683766
});

src/index.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ import { RawStoreWithOnUse } from './internal/storeWithOnUse';
2424
import { RawStoreWritable } from './internal/storeWritable';
2525
import { noop } from './internal/subscribeConsumer';
2626
import { untrack } from './internal/untrack';
27+
import { WatcherConsumer } from './internal/watch';
2728
import type {
2829
AsyncDeriveFn,
2930
AsyncDeriveOptions,
31+
InteropWatcherFactory,
3032
OnUseFn,
3133
Readable,
3234
ReadableSignal,
@@ -41,15 +43,18 @@ import type {
4143
UnsubscribeObject,
4244
Unsubscriber,
4345
Updater,
46+
Watcher,
4447
Writable,
4548
WritableSignal,
4649
} from './types';
4750

48-
export { batch } from './internal/batch';
51+
import { batch } from './internal/batch';
52+
import { RawStoreFromWatch } from './internal/storeFromWatch';
4953
export { equal } from './internal/equal';
5054
export { symbolObservable } from './internal/exposeRawStores';
5155
export { untrack } from './internal/untrack';
5256
export type * from './types';
57+
export { batch };
5358

5459
/**
5560
* Returns a wrapper (for the given store) which only exposes the {@link ReadableSignal} interface.
@@ -509,3 +514,49 @@ export function computed<T>(
509514
): ReadableSignal<T> {
510515
return exposeRawStore(applyStoreOptions(new RawStoreComputed(fn), options));
511516
}
517+
518+
/**
519+
* Creates a watcher on a store and returns it.
520+
*
521+
* @remarks
522+
*
523+
* A watcher calls synchronously its notify function when any store in the set of transitive dependencies of the watched store is changing.
524+
*
525+
* Note that the notify function must not read or write any store. It should not do any heavy task, it usually only schedules some work to be done later.
526+
* It is called even inside {@link batch}.
527+
*
528+
* A watcher is initially created in the dirty state.
529+
*
530+
* When a watcher is in the dirty state, the notify function is not called until the {@link Watcher.update|update} method is called.
531+
*
532+
* The {@link Watcher.update|update} method clears the dirty state and updates the watched store, allowing the notify function to be called the next
533+
* time any store in the set of transitive dependencies of the watched store changes.
534+
*
535+
* When a watcher is no longer needed, it should be destroyed by calling its {@link Watcher.destroy|destroy} method.
536+
*
537+
* @param store - store to watch
538+
* @param notify - function that will be called synchronously when any store in the set of transitive dependencies of the watched store is changing
539+
* @returns watcher object
540+
*/
541+
export function watch<T>(store: StoreInput<T>, notify: () => void): Watcher<T> {
542+
const watcherConsumer = new WatcherConsumer(getRawStore(store), notify);
543+
return {
544+
isUpToDate: watcherConsumer.isUpToDate.bind(watcherConsumer),
545+
update: watcherConsumer.update.bind(watcherConsumer),
546+
get: watcherConsumer.get.bind(watcherConsumer),
547+
destroy: watcherConsumer.destroy.bind(watcherConsumer),
548+
};
549+
}
550+
551+
/**
552+
* Creates a tansu store from a watcher factory.
553+
* @param watch - watcher factory
554+
* @param options - store option. Allows to define the {@link StoreOptions.equal|equal} function, if needed
555+
* @returns store containing the value returned by the computation function
556+
*/
557+
export function fromWatch<T>(
558+
watch: InteropWatcherFactory<T>,
559+
options?: Omit<StoreOptions<T>, 'onUse'>
560+
): ReadableSignal<T> {
561+
return exposeRawStore(applyStoreOptions(new RawStoreFromWatch(watch), options));
562+
}

src/internal/batch.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import type { SubscribeConsumer } from './subscribeConsumer';
2-
3-
export const subscribersQueue: SubscribeConsumer<any, any>[] = [];
1+
export const subscribersQueue: { process(): void }[] = [];
42
let willProcessQueue = false;
53

64
/**
@@ -56,7 +54,7 @@ export const batch = <T>(fn: () => T): T => {
5654
while (subscribersQueue.length > 0) {
5755
const consumer = subscribersQueue.shift()!;
5856
try {
59-
consumer.notify();
57+
consumer.process();
6058
} catch (e) {
6159
// an error in one consumer should not impact others
6260
if (success) {

src/internal/store.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ export interface RawStore<T, Link extends BaseLink<T> = BaseLink<T>>
2929
unregisterConsumer(link: Link): void;
3030
updateValue(): void;
3131
isLinkUpToDate(link: Link): boolean;
32-
updateLink(link: Link): T;
32+
updateLink(link: Link): void;
33+
readValue(): T;
3334
}
3435

3536
export const updateLinkProducerValue = <T>(link: BaseLink<T>): void => {

src/internal/storeComputed.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ export class RawStoreComputed<T>
6262
if (producer.flags & RawStoreFlags.HAS_VISIBLE_ONUSE) {
6363
this.flags |= RawStoreFlags.HAS_VISIBLE_ONUSE;
6464
}
65-
return producer.updateLink(link);
65+
producer.updateLink(link);
66+
return producer.readValue();
6667
}
6768

6869
override startUse(): void {

src/internal/storeConst.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ export class RawStoreConst<T> implements RawStore<T, BaseLink<T>> {
2121
isLinkUpToDate(_link: BaseLink<T>): boolean {
2222
return true;
2323
}
24-
updateLink(_link: BaseLink<T>): T {
24+
updateLink(_link: BaseLink<T>): void {}
25+
readValue(): T {
2526
return this.value;
2627
}
2728
get(): T {

src/internal/storeDerived.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ abstract class RawStoreDerived<T, S extends StoresInput>
8282
override recompute(): void {
8383
try {
8484
this.callCleanUpFn();
85-
const values = this.producerLinks!.map((link) => link.producer.updateLink(link));
85+
const values = this.producerLinks!.map((link) => {
86+
const producer = link.producer;
87+
producer.updateLink(link);
88+
return producer.readValue();
89+
});
8690
this.cleanUpFn = normalizeUnsubscribe(this.derive(this.arrayMode ? values : values[0]));
8791
} catch (error) {
8892
this.error = error;

0 commit comments

Comments
 (0)