diff --git a/src/index.spec.ts b/src/index.spec.ts index 829a696..9183d5c 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -37,7 +37,8 @@ import type { RawStoreWritable } from './internal/storeWritable'; const expectCorrectlyCleanedUp = (store: StoreInput) => { const rawStore = (store as any)[rawStoreSymbol] as RawStoreWritable; - expect(rawStore.consumerLinks.length).toBe(0); + expect(rawStore.consumerFirst).toBe(null); + expect(rawStore.consumerLast).toBe(null); expect(rawStore.flags & RawStoreFlags.START_USE_CALLED).toBeFalsy(); }; @@ -2847,6 +2848,28 @@ describe('stores', () => { a.next(4); expect(values).toEqual([0, 6]); }); + + it('should work to unsubscribe inside batch', () => { + const valuesA: number[] = []; + const valuesB: number[] = []; + const a = writable(0); + const b = computed(() => a() + 1); + const unsubscribeA = a.subscribe((v) => { + valuesA.push(v); + }); + const unsubscribeB = b.subscribe((v) => { + valuesB.push(v); + }); + expect(valuesA).toEqual([0]); + expect(valuesB).toEqual([1]); + batch(() => { + a.set(5); + unsubscribeB(); + }); + expect(valuesA).toEqual([0, 5]); + expect(valuesB).toEqual([1]); + unsubscribeA(); + }); }); describe('Listeners and batch timing', () => { diff --git a/src/internal/batch.ts b/src/internal/batch.ts index 8469a39..5896bda 100644 --- a/src/internal/batch.ts +++ b/src/internal/batch.ts @@ -1,7 +1,10 @@ +import { createQueue } from './linkedQueue'; import type { SubscribeConsumer } from './subscribeConsumer'; -export const subscribersQueue: SubscribeConsumer[] = []; let willProcessQueue = false; +const { add, remove, shift } = createQueue>(); + +export { add as addToQueue, remove as removeFromQueue }; /** * Batches multiple changes to stores while calling the provided function, @@ -53,8 +56,8 @@ export const batch = (fn: () => T): T => { res = fn(); } finally { if (needsProcessQueue) { - while (subscribersQueue.length > 0) { - const consumer = subscribersQueue.shift()!; + let consumer = shift(); + while (consumer) { try { consumer.notify(); } catch (e) { @@ -65,6 +68,7 @@ export const batch = (fn: () => T): T => { error = e; } } + consumer = shift(); } willProcessQueue = false; } diff --git a/src/internal/linkedQueue.ts b/src/internal/linkedQueue.ts new file mode 100644 index 0000000..d5c36a5 --- /dev/null +++ b/src/internal/linkedQueue.ts @@ -0,0 +1,63 @@ +export interface QueueItem { + next: T | null; + prev: T | null; +} + +export const createQueue = >(): { + add: (item: T) => boolean; + remove: (item: T) => boolean; + shift: () => T | null; +} => { + let first: T | null = null; + let last: T | null = null; + + const remove = (item: T): boolean => { + const { prev, next } = item; + if (prev || next || first === item) { + item.prev = null; + item.next = null; + if (prev) { + prev.next = next; + } else { + first = next; + } + if (next) { + next.prev = prev; + } else { + last = prev; + } + return true; + } + return false; + }; + + const add = (item: T): boolean => { + if (item === last) { + // already the last in the queue, nothing to do ! + return false; + } + const existing = remove(item); + item.prev = last; + if (last) { + last.next = item; + } else { + first = item; + } + last = item; + return !existing; + }; + + const shift = (): T | null => { + const item = first; + if (item) { + remove(item); + } + return item; + }; + + return { + add, + remove, + shift, + }; +}; diff --git a/src/internal/store.ts b/src/internal/store.ts index 1d65ddc..1b1fbf4 100644 --- a/src/internal/store.ts +++ b/src/internal/store.ts @@ -17,6 +17,7 @@ export const enum RawStoreFlags { export interface BaseLink { producer: RawStore>; + nextInConsumer: BaseLink | null; skipMarkDirty?: boolean; } diff --git a/src/internal/storeComputed.ts b/src/internal/storeComputed.ts index 1bc94f9..1da3e33 100644 --- a/src/internal/storeComputed.ts +++ b/src/internal/storeComputed.ts @@ -12,8 +12,8 @@ export class RawStoreComputed extends RawStoreComputedOrDerived implements Consumer, ActiveConsumer { - private producerIndex = 0; - private producerLinks: BaseLink[] = []; + private producerFirst: BaseLink | null = null; + private producerLast: BaseLink | null = null; private epoch = -1; constructor(private readonly computeFn: () => T) { @@ -47,17 +47,21 @@ export class RawStoreComputed } addProducer>(producer: RawStore): U { - const producerLinks = this.producerLinks; - const producerIndex = this.producerIndex; - let link = producerLinks[producerIndex] as L | undefined; - if (link?.producer !== producer) { - if (link) { - producerLinks.push(link); // push the existing link at the end (to be removed later) - } + const nextLink = this.producerLast ? this.producerLast.nextInConsumer : this.producerFirst; + let link: L; + if (nextLink?.producer !== producer) { + // existing link cannot be reused link = producer.registerConsumer(producer.newLink(this)); + link.nextInConsumer = nextLink; + if (this.producerLast) { + this.producerLast.nextInConsumer = link; + } else { + this.producerFirst = link; + } + } else { + link = nextLink as L; } - producerLinks[producerIndex] = link; - this.producerIndex = producerIndex + 1; + this.producerLast = link; updateLinkProducerValue(link); if (producer.flags & RawStoreFlags.HAS_VISIBLE_ONUSE) { this.flags |= RawStoreFlags.HAS_VISIBLE_ONUSE; @@ -66,19 +70,19 @@ export class RawStoreComputed } override startUse(): void { - const producerLinks = this.producerLinks; - for (let i = 0, l = producerLinks.length; i < l; i++) { - const link = producerLinks[i]; + let link = this.producerFirst; + while (link) { link.producer.registerConsumer(link); + link = link.nextInConsumer; } this.flags |= RawStoreFlags.DIRTY; } override endUse(): void { - const producerLinks = this.producerLinks; - for (let i = 0, l = producerLinks.length; i < l; i++) { - const link = producerLinks[i]; + let link = this.producerFirst; + while (link) { link.producer.unregisterConsumer(link); + link = link.nextInConsumer; } } @@ -86,14 +90,14 @@ export class RawStoreComputed if (this.value === COMPUTED_UNSET) { return false; } - const producerLinks = this.producerLinks; - for (let i = 0, l = producerLinks.length; i < l; i++) { - const link = producerLinks[i]; + let link = this.producerFirst; + while (link) { const producer = link.producer; updateLinkProducerValue(link); if (!producer.isLinkUpToDate(link)) { return false; } + link = link.nextInConsumer; } return true; } @@ -102,7 +106,7 @@ export class RawStoreComputed let value: T; const prevActiveConsumer = setActiveConsumer(this); try { - this.producerIndex = 0; + this.producerLast = null; this.flags &= ~RawStoreFlags.HAS_VISIBLE_ONUSE; const computeFn = this.computeFn; value = computeFn(); @@ -114,13 +118,19 @@ export class RawStoreComputed setActiveConsumer(prevActiveConsumer); } // Remove unused producers: - const producerLinks = this.producerLinks; - const producerIndex = this.producerIndex; - if (producerIndex < producerLinks.length) { - for (let i = 0, l = producerLinks.length - producerIndex; i < l; i++) { - const link = producerLinks.pop()!; - link.producer.unregisterConsumer(link); - } + let link: BaseLink | null; + if (this.producerLast) { + link = this.producerLast.nextInConsumer; + this.producerLast.nextInConsumer = null; + } else { + link = this.producerFirst; + this.producerFirst = null; + } + while (link) { + const next = link.nextInConsumer; + link.producer.unregisterConsumer(link); + link.nextInConsumer = null; + link = next; } this.set(value); } diff --git a/src/internal/storeConst.ts b/src/internal/storeConst.ts index a1148db..ffb76d6 100644 --- a/src/internal/storeConst.ts +++ b/src/internal/storeConst.ts @@ -11,6 +11,7 @@ export class RawStoreConst implements RawStore> { newLink(_consumer: Consumer): BaseLink { return { producer: this, + nextInConsumer: null, }; } registerConsumer(link: BaseLink): BaseLink { diff --git a/src/internal/storeTrackingUsage.ts b/src/internal/storeTrackingUsage.ts index 6495145..b99e10e 100644 --- a/src/internal/storeTrackingUsage.ts +++ b/src/internal/storeTrackingUsage.ts @@ -1,9 +1,11 @@ +import { createQueue, type QueueItem } from './linkedQueue'; import { RawStoreFlags } from './store'; import { checkNotInNotificationPhase, RawStoreWritable } from './storeWritable'; import { activeConsumer, untrack } from './untrack'; -let flushUnusedQueue: RawStoreTrackingUsage[] | null = null; +const { shift, add, remove } = createQueue>(); let inFlushUnused = false; +let plannedFlushUnused = false; export const flushUnused = (): void => { // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) @@ -12,24 +14,27 @@ export const flushUnused = (): void => { if (inFlushUnused) { throw new Error('assert failed: recursive flushUnused call'); } + plannedFlushUnused = false; inFlushUnused = true; try { - const queue = flushUnusedQueue; - if (queue) { - flushUnusedQueue = null; - for (let i = 0, l = queue.length; i < l; i++) { - const producer = queue[i]; - producer.flags &= ~RawStoreFlags.FLUSH_PLANNED; - producer.checkUnused(); - } + let producer = shift(); + while (producer) { + producer.flags &= ~RawStoreFlags.FLUSH_PLANNED; + producer.checkUnused(); + producer = shift(); } } finally { inFlushUnused = false; } }; -export abstract class RawStoreTrackingUsage extends RawStoreWritable { +export abstract class RawStoreTrackingUsage + extends RawStoreWritable + implements QueueItem> +{ private extraUsages = 0; + next: RawStoreTrackingUsage | null = null; + prev: RawStoreTrackingUsage | null = null; abstract startUse(): void; abstract endUse(): void; @@ -39,9 +44,13 @@ export abstract class RawStoreTrackingUsage extends RawStoreWritable { // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) // there should be no way to trigger this error. /* v8 ignore next 3 */ - if (!this.extraUsages && !this.consumerLinks.length) { + if (!this.extraUsages && !this.consumerFirst) { throw new Error('assert failed: untracked producer usage'); } + if (flags & RawStoreFlags.FLUSH_PLANNED) { + remove(this); + this.flags &= ~RawStoreFlags.FLUSH_PLANNED; + } this.flags |= RawStoreFlags.START_USE_CALLED; untrack(() => this.startUse()); } @@ -49,17 +58,17 @@ export abstract class RawStoreTrackingUsage extends RawStoreWritable { override checkUnused(): void { const flags = this.flags; - if (flags & RawStoreFlags.START_USE_CALLED && !this.extraUsages && !this.consumerLinks.length) { + if (flags & RawStoreFlags.START_USE_CALLED && !this.extraUsages && !this.consumerFirst) { if (inFlushUnused || flags & RawStoreFlags.HAS_VISIBLE_ONUSE) { this.flags &= ~RawStoreFlags.START_USE_CALLED; untrack(() => this.endUse()); } else if (!(flags & RawStoreFlags.FLUSH_PLANNED)) { this.flags |= RawStoreFlags.FLUSH_PLANNED; - if (!flushUnusedQueue) { - flushUnusedQueue = []; + if (!plannedFlushUnused) { + plannedFlushUnused = true; queueMicrotask(flushUnused); } - flushUnusedQueue.push(this); + add(this); } } } diff --git a/src/internal/storeWritable.ts b/src/internal/storeWritable.ts index 9b0644c..d86ce1e 100644 --- a/src/internal/storeWritable.ts +++ b/src/internal/storeWritable.ts @@ -1,7 +1,7 @@ import type { Subscriber, UnsubscribeFunction, UnsubscribeObject, Updater } from '../types'; import { batch } from './batch'; import { equal } from './equal'; -import type { Consumer, RawStore } from './store'; +import type { BaseLink, Consumer, RawStore } from './store'; import { RawStoreFlags } from './store'; import { SubscribeConsumer } from './subscribeConsumer'; import { activeConsumer } from './untrack'; @@ -16,11 +16,12 @@ export const checkNotInNotificationPhase = (): void => { export let epoch = 0; -export interface ProducerConsumerLink { +export interface ProducerConsumerLink extends BaseLink { value: T; version: number; producer: RawStore>; - indexInProducer: number; + prevInProducer: ProducerConsumerLink | null; + nextInProducer: ProducerConsumerLink | null; consumer: Consumer; skipMarkDirty: boolean; } @@ -31,15 +32,18 @@ export class RawStoreWritable implements RawStore> private version = 0; equalFn = equal; private equalCache: Record | null = null; - consumerLinks: ProducerConsumerLink[] = []; + consumerFirst: ProducerConsumerLink | null = null; + consumerLast: ProducerConsumerLink | null = null; newLink(consumer: Consumer): ProducerConsumerLink { return { version: -1, value: undefined as any, producer: this, - indexInProducer: 0, + nextInProducer: null, + prevInProducer: null, consumer, + nextInConsumer: null, skipMarkDirty: false, }; } @@ -71,30 +75,40 @@ export class RawStoreWritable implements RawStore> } registerConsumer(link: ProducerConsumerLink): ProducerConsumerLink { - const consumerLinks = this.consumerLinks; - const indexInProducer = consumerLinks.length; - link.indexInProducer = indexInProducer; - consumerLinks[indexInProducer] = link; + // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) + // there should be no way to trigger this error. + /* v8 ignore next 3 */ + if (link.nextInProducer || link.prevInProducer) { + throw new Error('assert failed: registerConsumer with already used link'); + } + link.prevInProducer = this.consumerLast; + const last = this.consumerLast; + if (last) { + last.nextInProducer = link; + } else { + this.consumerFirst = link; + } + this.consumerLast = link; return link; } unregisterConsumer(link: ProducerConsumerLink): void { - const consumerLinks = this.consumerLinks; - const index = link.indexInProducer; - // Ignoring coverage for the following lines because, unless there is a bug in tansu (which would have to be fixed!) - // there should be no way to trigger this error. - /* v8 ignore next 3 */ - if (consumerLinks[index] !== link) { - throw new Error('assert failed: invalid indexInProducer'); + const next = link.nextInProducer; + const prev = link.prevInProducer; + link.nextInProducer = null; + link.prevInProducer = null; + if (next) { + next.prevInProducer = prev; + } else { + this.consumerLast = prev; } - // swap with the last item to avoid shifting the array - const lastConsumerLink = consumerLinks.pop()!; - const isLast = link === lastConsumerLink; - if (!isLast) { - consumerLinks[index] = lastConsumerLink; - lastConsumerLink.indexInProducer = index; - } else if (index === 0) { - this.checkUnused(); + if (prev) { + prev.nextInProducer = next; + } else { + this.consumerFirst = next; + if (!next) { + this.checkUnused(); + } } } @@ -132,11 +146,12 @@ export class RawStoreWritable implements RawStore> const prevNotificationPhase = notificationPhase; notificationPhase = true; try { - const consumerLinks = this.consumerLinks; - for (let i = 0, l = consumerLinks.length; i < l; i++) { - const link = consumerLinks[i]; - if (link.skipMarkDirty) continue; - link.consumer.markDirty(); + let link = this.consumerFirst; + while (link) { + if (!link.skipMarkDirty) { + link.consumer.markDirty(); + } + link = link.nextInProducer; } } finally { notificationPhase = prevNotificationPhase; diff --git a/src/internal/subscribeConsumer.ts b/src/internal/subscribeConsumer.ts index 78c49a5..be39854 100644 --- a/src/internal/subscribeConsumer.ts +++ b/src/internal/subscribeConsumer.ts @@ -1,5 +1,6 @@ import type { Subscriber, SubscriberObject } from '../types'; -import { subscribersQueue } from './batch'; +import { addToQueue, removeFromQueue } from './batch'; +import type { QueueItem } from './linkedQueue'; import { updateLinkProducerValue, type BaseLink, type Consumer, type RawStore } from './store'; export const noop = (): void => {}; @@ -21,10 +22,14 @@ const toSubscriberObject = (subscriber: Subscriber): SubscriberObject = resume: bind(subscriber, 'resume'), }); -export class SubscribeConsumer> implements Consumer { +export class SubscribeConsumer> + implements Consumer, QueueItem> +{ private readonly link: Link; private subscriber: SubscriberObject; - dirtyCount = 1; + prev: SubscribeConsumer | null = null; + next: SubscribeConsumer | null = null; + constructor(producer: RawStore, subscriber: Subscriber) { this.subscriber = toSubscriberObject(subscriber); this.link = producer.registerConsumer(producer.newLink(this)); @@ -34,31 +39,27 @@ export class SubscribeConsumer> implements Consumer unsubscribe(): void { if (this.subscriber !== noopSubscriber) { this.subscriber = noopSubscriber; + removeFromQueue(this); this.link.producer.unregisterConsumer(this.link); } } markDirty(): void { - this.dirtyCount++; - subscribersQueue.push(this); - if (this.dirtyCount === 1) { + if (addToQueue(this)) { this.subscriber.pause(); } } notify(first = false): void { - this.dirtyCount--; - if (this.dirtyCount === 0 && this.subscriber !== noopSubscriber) { - const link = this.link; - const producer = link.producer; - updateLinkProducerValue(link); - if (producer.isLinkUpToDate(link) && !first) { - this.subscriber.resume(); - } else { - // note that the following line can throw - const value = producer.updateLink(link); - this.subscriber.next(value); - } + const link = this.link; + const producer = link.producer; + updateLinkProducerValue(link); + if (producer.isLinkUpToDate(link) && !first) { + this.subscriber.resume(); + } else { + // note that the following line can throw + const value = producer.updateLink(link); + this.subscriber.next(value); } } }