Skip to content

Commit 6ca6210

Browse files
committed
feat: experimental interoperability implementation
1 parent 128a877 commit 6ca6210

14 files changed

+519
-83
lines changed

src/internal/batch.ts

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
import type { SubscribeConsumer } from './subscribeConsumer';
2-
3-
export const subscribersQueue: SubscribeConsumer<any, any>[] = [];
4-
let willProcessQueue = false;
1+
import { beginBatch } from '../interop';
52

63
/**
74
* Batches multiple changes to stores while calling the provided function,
@@ -44,33 +41,16 @@ let willProcessQueue = false;
4441
* ```
4542
*/
4643
export const batch = <T>(fn: () => T): T => {
47-
const needsProcessQueue = !willProcessQueue;
48-
willProcessQueue = true;
49-
let success = true;
5044
let res;
51-
let error;
45+
let queueError;
46+
const endBatch = beginBatch();
5247
try {
5348
res = fn();
5449
} finally {
55-
if (needsProcessQueue) {
56-
while (subscribersQueue.length > 0) {
57-
const consumer = subscribersQueue.shift()!;
58-
try {
59-
consumer.notify();
60-
} catch (e) {
61-
// an error in one consumer should not impact others
62-
if (success) {
63-
// will throw the first error
64-
success = false;
65-
error = e;
66-
}
67-
}
68-
}
69-
willProcessQueue = false;
70-
}
50+
queueError = endBatch();
7151
}
72-
if (success) {
73-
return res;
52+
if (queueError) {
53+
throw queueError.error;
7454
}
75-
throw error;
55+
return res;
7656
};

src/internal/store.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
import type { SignalStore, SubscribableStore } from '../types';
2+
import type { Consumer as InteropConsumer } from '../interop';
3+
4+
export interface TansuInteropConsumer extends InteropConsumer {
5+
addTansuProducer?<T>(producer: RawStore<T>): void;
6+
}
27

38
export interface Consumer {
49
markDirty(): void;
@@ -13,6 +18,8 @@ export const enum RawStoreFlags {
1318
// the following flags are used in RawStoreComputedOrDerived and derived classes
1419
COMPUTING = 1 << 3,
1520
DIRTY = 1 << 4,
21+
// used in Watcher
22+
START_CALLED = 1 << 5,
1623
}
1724

1825
export interface BaseLink<T> {
@@ -29,7 +36,8 @@ export interface RawStore<T, Link extends BaseLink<T> = BaseLink<T>>
2936
unregisterConsumer(link: Link): void;
3037
updateValue(): void;
3138
isLinkUpToDate(link: Link): boolean;
32-
updateLink(link: Link): T;
39+
updateLink(link: Link): void;
40+
readValue(): T;
3341
}
3442

3543
export const updateLinkProducerValue = <T>(link: BaseLink<T>): void => {

src/internal/storeComputed.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1-
import type { BaseLink, Consumer, RawStore } from './store';
1+
import { getActiveConsumer, setActiveConsumer, type Signal } from '../interop';
2+
import type { BaseLink, Consumer, RawStore, TansuInteropConsumer } from './store';
23
import { RawStoreFlags, updateLinkProducerValue } from './store';
34
import {
45
COMPUTED_ERRORED,
56
COMPUTED_UNSET,
67
RawStoreComputedOrDerived,
78
} from './storeComputedOrDerived';
9+
import { fromInteropSignal } from './storeFromWatch';
810
import { epoch, notificationPhase } from './storeWritable';
9-
import { activeConsumer, setActiveConsumer, type ActiveConsumer } from './untrack';
1011

1112
export class RawStoreComputed<T>
1213
extends RawStoreComputedOrDerived<T>
13-
implements Consumer, ActiveConsumer
14+
implements Consumer, TansuInteropConsumer
1415
{
1516
private producerIndex = 0;
1617
private producerLinks: BaseLink<any>[] = [];
@@ -26,30 +27,41 @@ export class RawStoreComputed<T>
2627

2728
override updateValue(): void {
2829
const flags = this.flags;
29-
if (flags & RawStoreFlags.START_USE_CALLED && this.epoch === epoch) {
30+
if (
31+
flags & RawStoreFlags.START_USE_CALLED &&
32+
!(flags & RawStoreFlags.DIRTY) &&
33+
!(flags & RawStoreFlags.COMPUTING)
34+
) {
3035
return;
3136
}
3237
super.updateValue();
3338
this.epoch = epoch;
3439
}
3540

3641
override get(): T {
42+
// FIXME: better test all cases of this optimization:
43+
const flags = this.flags;
3744
if (
38-
!activeConsumer &&
45+
!getActiveConsumer() &&
3946
!notificationPhase &&
40-
this.epoch === epoch &&
41-
(!(this.flags & RawStoreFlags.HAS_VISIBLE_ONUSE) ||
42-
this.flags & RawStoreFlags.START_USE_CALLED)
47+
!(flags & RawStoreFlags.COMPUTING) &&
48+
!(flags & RawStoreFlags.DIRTY) &&
49+
(flags & RawStoreFlags.START_USE_CALLED ||
50+
(this.epoch === epoch && !(flags & RawStoreFlags.HAS_VISIBLE_ONUSE)))
4351
) {
4452
return this.readValue();
4553
}
4654
return super.get();
4755
}
4856

49-
addProducer<U, L extends BaseLink<U>>(producer: RawStore<U, L>): U {
57+
addProducer(signal: Signal): void {
58+
this.addTansuProducer(fromInteropSignal(signal));
59+
}
60+
61+
addTansuProducer<U>(producer: RawStore<U>): void {
5062
const producerLinks = this.producerLinks;
5163
const producerIndex = this.producerIndex;
52-
let link = producerLinks[producerIndex] as L | undefined;
64+
let link = producerLinks[producerIndex] as BaseLink<U> | undefined;
5365
if (link?.producer !== producer) {
5466
if (link) {
5567
producerLinks.push(link); // push the existing link at the end (to be removed later)
@@ -62,7 +74,7 @@ export class RawStoreComputed<T>
6274
if (producer.flags & RawStoreFlags.HAS_VISIBLE_ONUSE) {
6375
this.flags |= RawStoreFlags.HAS_VISIBLE_ONUSE;
6476
}
65-
return producer.updateLink(link);
77+
producer.updateLink(link);
6678
}
6779

6880
override startUse(): void {

src/internal/storeComputedOrDerived.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
import { setActiveConsumer } from '../interop';
12
import type { Consumer } from './store';
23
import { RawStoreFlags } from './store';
34
import { RawStoreTrackingUsage } from './storeTrackingUsage';
4-
import { setActiveConsumer } from './untrack';
55

66
const MAX_CHANGE_RECOMPUTES = 1000;
77

@@ -61,7 +61,7 @@ export abstract class RawStoreComputedOrDerived<T>
6161
do {
6262
iterations++;
6363
this.flags &= ~RawStoreFlags.DIRTY;
64-
if (this.areProducersUpToDate()) {
64+
if (this.areProducersUpToDate() && !(this.flags & RawStoreFlags.DIRTY)) {
6565
return;
6666
}
6767
} while (this.flags & RawStoreFlags.DIRTY && iterations < MAX_CHANGE_RECOMPUTES);

src/internal/storeConst.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ export class RawStoreConst<T> implements RawStore<T, BaseLink<T>> {
2121
isLinkUpToDate(_link: BaseLink<T>): boolean {
2222
return true;
2323
}
24-
updateLink(_link: BaseLink<T>): T {
24+
updateLink(_link: BaseLink<T>): void {}
25+
readValue(): T {
2526
return this.value;
2627
}
2728
get(): T {

src/internal/storeDerived.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ abstract class RawStoreDerived<T, S extends StoresInput>
8282
override recompute(): void {
8383
try {
8484
this.callCleanUpFn();
85-
const values = this.producerLinks!.map((link) => link.producer.updateLink(link));
85+
const values = this.producerLinks!.map((link) => {
86+
const producer = link.producer;
87+
producer.updateLink(link);
88+
return producer.readValue();
89+
});
8690
this.cleanUpFn = normalizeUnsubscribe(this.derive(this.arrayMode ? values : values[0]));
8791
} catch (error) {
8892
this.error = error;

src/internal/storeFromWatch.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import type { Signal, Watcher } from '../interop';
2+
import { type RawStore, RawStoreFlags } from './store';
3+
import { RawStoreComputedOrDerived } from './storeComputedOrDerived';
4+
import { RawStoreWritable } from './storeWritable';
5+
6+
export class RawStoreFromWatch extends RawStoreComputedOrDerived<undefined> {
7+
// FIXME: remove HAS_VISIBLE_ONUSE and add a new HAS_INTEROP_DEPENDENCIES ? (so that watcher.stop is called asynchronously)
8+
override flags = RawStoreFlags.HAS_VISIBLE_ONUSE | RawStoreFlags.DIRTY;
9+
private watcher: Watcher;
10+
11+
constructor(interopSignal: Signal) {
12+
super(undefined as any);
13+
this.watcher = interopSignal.watchSignal(this.markDirty.bind(this));
14+
}
15+
16+
override equal(): boolean {
17+
return false;
18+
}
19+
20+
override startUse(): void {
21+
this.watcher.start();
22+
this.flags |= RawStoreFlags.DIRTY;
23+
}
24+
25+
override areProducersUpToDate(): boolean {
26+
return !this.watcher!.update();
27+
}
28+
29+
override recompute(): void {
30+
this.set(undefined);
31+
}
32+
33+
override endUse(): void {
34+
this.watcher.stop();
35+
}
36+
37+
protected override increaseEpoch(): void {
38+
// do nothing
39+
}
40+
}
41+
42+
const interopSignals = new WeakMap<Signal, RawStoreFromWatch>();
43+
const tansuWatchSignal = RawStoreWritable.prototype.watchSignal;
44+
export const fromInteropSignal = (signal: Signal): RawStore<any> => {
45+
if (signal.watchSignal === tansuWatchSignal) {
46+
return signal as RawStoreWritable<any>;
47+
}
48+
let store = interopSignals.get(signal);
49+
if (!store) {
50+
store = new RawStoreFromWatch(signal);
51+
interopSignals.set(signal, store);
52+
}
53+
return store;
54+
};

src/internal/storeTrackingUsage.ts

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import { getActiveConsumer } from '../interop';
12
import { type Flushable, inFlushUnused, planFlush } from './asyncFlush';
2-
import { RawStoreFlags } from './store';
3+
import { RawStoreFlags, type TansuInteropConsumer } from './store';
34
import { checkNotInNotificationPhase, RawStoreWritable } from './storeWritable';
4-
import { activeConsumer, untrack } from './untrack';
5+
import { untrack } from './untrack';
56

67
export abstract class RawStoreTrackingUsage<T> extends RawStoreWritable<T> implements Flushable {
78
private extraUsages = 0;
@@ -36,24 +37,30 @@ export abstract class RawStoreTrackingUsage<T> extends RawStoreWritable<T> imple
3637

3738
override get(): T {
3839
checkNotInNotificationPhase();
39-
if (activeConsumer) {
40-
return activeConsumer.addProducer(this);
41-
} else {
42-
this.extraUsages++;
43-
try {
40+
this.extraUsages++;
41+
try {
42+
const activeConsumer = getActiveConsumer();
43+
const addTansuProducer = (activeConsumer as TansuInteropConsumer)?.addTansuProducer;
44+
if (!addTansuProducer) {
45+
// tansu calls updateValue in addTansuProducer, so we don't need to call it here
4446
this.updateValue();
4547
// Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!)
4648
// there should be no way to trigger this error.
4749
/* v8 ignore next 3 */
4850
if (this.flags & RawStoreFlags.DIRTY) {
4951
throw new Error('assert failed: store still dirty after updating it');
5052
}
51-
return this.readValue();
52-
} finally {
53-
const extraUsages = --this.extraUsages;
54-
if (extraUsages === 0) {
55-
this.checkUnused();
56-
}
53+
}
54+
if (addTansuProducer) {
55+
addTansuProducer.call(activeConsumer, this);
56+
} else if (activeConsumer) {
57+
activeConsumer.addProducer(this);
58+
}
59+
return this.readValue();
60+
} finally {
61+
const extraUsages = --this.extraUsages;
62+
if (extraUsages === 0) {
63+
this.checkUnused();
5764
}
5865
}
5966
}

src/internal/storeWritable.ts

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
import { beginBatch, getActiveConsumer, type Signal, type Watcher } from '../interop';
12
import type { Subscriber, UnsubscribeFunction, UnsubscribeObject, Updater } from '../types';
2-
import { batch } from './batch';
33
import { equal } from './equal';
44
import type { Consumer, RawStore } from './store';
55
import { RawStoreFlags } from './store';
66
import { SubscribeConsumer } from './subscribeConsumer';
7-
import { activeConsumer } from './untrack';
7+
import { watchRawStore } from './watch';
88

99
export let notificationPhase = false;
1010

@@ -25,7 +25,7 @@ export interface ProducerConsumerLink<T> {
2525
skipMarkDirty: boolean;
2626
}
2727

28-
export class RawStoreWritable<T> implements RawStore<T, ProducerConsumerLink<T>> {
28+
export class RawStoreWritable<T> implements RawStore<T, ProducerConsumerLink<T>>, Signal {
2929
constructor(protected value: T) {}
3030
flags = RawStoreFlags.NONE;
3131
private version = 0;
@@ -64,10 +64,9 @@ export class RawStoreWritable<T> implements RawStore<T, ProducerConsumerLink<T>>
6464
return res;
6565
}
6666

67-
updateLink(link: ProducerConsumerLink<T>): T {
67+
updateLink(link: ProducerConsumerLink<T>): void {
6868
link.value = this.value;
6969
link.version = this.version;
70-
return this.readValue();
7170
}
7271

7372
registerConsumer(link: ProducerConsumerLink<T>): ProducerConsumerLink<T> {
@@ -115,12 +114,19 @@ export class RawStoreWritable<T> implements RawStore<T, ProducerConsumerLink<T>>
115114
checkNotInNotificationPhase();
116115
const same = this.equal(this.value, newValue);
117116
if (!same) {
118-
batch(() => {
117+
const endBatch = beginBatch();
118+
let queueError;
119+
try {
119120
this.value = newValue;
120121
this.version++;
121122
this.equalCache = null;
122123
this.increaseEpoch();
123-
});
124+
} finally {
125+
queueError = endBatch();
126+
}
127+
if (queueError) {
128+
throw queueError.error;
129+
}
124130
}
125131
}
126132

@@ -145,7 +151,8 @@ export class RawStoreWritable<T> implements RawStore<T, ProducerConsumerLink<T>>
145151

146152
get(): T {
147153
checkNotInNotificationPhase();
148-
return activeConsumer ? activeConsumer.addProducer(this) : this.readValue();
154+
getActiveConsumer()?.addProducer(this);
155+
return this.readValue();
149156
}
150157

151158
readValue(): T {
@@ -159,4 +166,8 @@ export class RawStoreWritable<T> implements RawStore<T, ProducerConsumerLink<T>>
159166
unsubscriber.unsubscribe = unsubscriber;
160167
return unsubscriber;
161168
}
169+
170+
watchSignal(notify: () => void): Watcher {
171+
return watchRawStore(this, notify);
172+
}
162173
}

0 commit comments

Comments
 (0)