Skip to content

Commit 99d99da

Browse files
committed
Using a linked list instead of an array for subscribers queue
1 parent 9dca683 commit 99d99da

File tree

4 files changed

+111
-21
lines changed

4 files changed

+111
-21
lines changed

src/index.spec.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2848,6 +2848,28 @@ describe('stores', () => {
28482848
a.next(4);
28492849
expect(values).toEqual([0, 6]);
28502850
});
2851+
2852+
it('should work to unsubscribe inside batch', () => {
2853+
const valuesA: number[] = [];
2854+
const valuesB: number[] = [];
2855+
const a = writable(0);
2856+
const b = computed(() => a() + 1);
2857+
const unsubscribeA = a.subscribe((v) => {
2858+
valuesA.push(v);
2859+
});
2860+
const unsubscribeB = b.subscribe((v) => {
2861+
valuesB.push(v);
2862+
});
2863+
expect(valuesA).toEqual([0]);
2864+
expect(valuesB).toEqual([1]);
2865+
batch(() => {
2866+
a.set(5);
2867+
unsubscribeB();
2868+
});
2869+
expect(valuesA).toEqual([0, 5]);
2870+
expect(valuesB).toEqual([1]);
2871+
unsubscribeA();
2872+
});
28512873
});
28522874

28532875
describe('Listeners and batch timing', () => {

src/internal/batch.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import { createQueue } from './linkedQueue';
12
import type { SubscribeConsumer } from './subscribeConsumer';
23

3-
export const subscribersQueue: SubscribeConsumer<any, any>[] = [];
44
let willProcessQueue = false;
5+
const { add, remove, shift } = createQueue<SubscribeConsumer<any, any>>();
6+
7+
export { add as addToQueue, remove as removeFromQueue };
58

69
/**
710
* Batches multiple changes to stores while calling the provided function,
@@ -53,8 +56,8 @@ export const batch = <T>(fn: () => T): T => {
5356
res = fn();
5457
} finally {
5558
if (needsProcessQueue) {
56-
while (subscribersQueue.length > 0) {
57-
const consumer = subscribersQueue.shift()!;
59+
let consumer = shift();
60+
while (consumer) {
5861
try {
5962
consumer.notify();
6063
} catch (e) {
@@ -65,6 +68,7 @@ export const batch = <T>(fn: () => T): T => {
6568
error = e;
6669
}
6770
}
71+
consumer = shift();
6872
}
6973
willProcessQueue = false;
7074
}

src/internal/linkedQueue.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
export interface QueueItem<T> {
2+
next: T | null;
3+
prev: T | null;
4+
}
5+
6+
export const createQueue = <T extends QueueItem<T>>(): {
7+
add: (item: T) => boolean;
8+
remove: (item: T) => boolean;
9+
shift: () => T | null;
10+
} => {
11+
let first: T | null = null;
12+
let last: T | null = null;
13+
14+
const remove = (item: T): boolean => {
15+
const { prev, next } = item;
16+
if (prev || next || first === item) {
17+
item.prev = null;
18+
item.next = null;
19+
if (prev) {
20+
prev.next = next;
21+
} else {
22+
first = next;
23+
}
24+
if (next) {
25+
next.prev = prev;
26+
} else {
27+
last = prev;
28+
}
29+
return true;
30+
}
31+
return false;
32+
};
33+
34+
const add = (item: T): boolean => {
35+
if (item === last) {
36+
// already the last in the queue, nothing to do !
37+
return false;
38+
}
39+
const existing = remove(item);
40+
item.prev = last;
41+
if (last) {
42+
last.next = item;
43+
} else {
44+
first = item;
45+
}
46+
last = item;
47+
return !existing;
48+
};
49+
50+
const shift = (): T | null => {
51+
const item = first;
52+
if (item) {
53+
remove(item);
54+
}
55+
return item;
56+
};
57+
58+
return {
59+
add,
60+
remove,
61+
shift,
62+
};
63+
};

src/internal/subscribeConsumer.ts

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { Subscriber, SubscriberObject } from '../types';
2-
import { subscribersQueue } from './batch';
2+
import { addToQueue, removeFromQueue } from './batch';
3+
import type { QueueItem } from './linkedQueue';
34
import { updateLinkProducerValue, type BaseLink, type Consumer, type RawStore } from './store';
45

56
export const noop = (): void => {};
@@ -21,10 +22,14 @@ const toSubscriberObject = <T>(subscriber: Subscriber<T>): SubscriberObject<T> =
2122
resume: bind(subscriber, 'resume'),
2223
});
2324

24-
export class SubscribeConsumer<T, Link extends BaseLink<T>> implements Consumer {
25+
export class SubscribeConsumer<T, Link extends BaseLink<T>>
26+
implements Consumer, QueueItem<SubscribeConsumer<any, any>>
27+
{
2528
private readonly link: Link;
2629
private subscriber: SubscriberObject<T>;
27-
dirtyCount = 1;
30+
prev: SubscribeConsumer<any, any> | null = null;
31+
next: SubscribeConsumer<any, any> | null = null;
32+
2833
constructor(producer: RawStore<T, Link>, subscriber: Subscriber<T>) {
2934
this.subscriber = toSubscriberObject(subscriber);
3035
this.link = producer.registerConsumer(producer.newLink(this));
@@ -34,31 +39,27 @@ export class SubscribeConsumer<T, Link extends BaseLink<T>> implements Consumer
3439
unsubscribe(): void {
3540
if (this.subscriber !== noopSubscriber) {
3641
this.subscriber = noopSubscriber;
42+
removeFromQueue(this);
3743
this.link.producer.unregisterConsumer(this.link);
3844
}
3945
}
4046

4147
markDirty(): void {
42-
this.dirtyCount++;
43-
subscribersQueue.push(this);
44-
if (this.dirtyCount === 1) {
48+
if (addToQueue(this)) {
4549
this.subscriber.pause();
4650
}
4751
}
4852

4953
notify(first = false): void {
50-
this.dirtyCount--;
51-
if (this.dirtyCount === 0 && this.subscriber !== noopSubscriber) {
52-
const link = this.link;
53-
const producer = link.producer;
54-
updateLinkProducerValue(link);
55-
if (producer.isLinkUpToDate(link) && !first) {
56-
this.subscriber.resume();
57-
} else {
58-
// note that the following line can throw
59-
const value = producer.updateLink(link);
60-
this.subscriber.next(value);
61-
}
54+
const link = this.link;
55+
const producer = link.producer;
56+
updateLinkProducerValue(link);
57+
if (producer.isLinkUpToDate(link) && !first) {
58+
this.subscriber.resume();
59+
} else {
60+
// note that the following line can throw
61+
const value = producer.updateLink(link);
62+
this.subscriber.next(value);
6263
}
6364
}
6465
}

0 commit comments

Comments
 (0)