Skip to content

Commit dbbae80

Browse files
dubzzzbenlesh
andauthored
perf(Subject): subscription, unsubscription, and nexting improved ~4x (#7240)
* fix(subject): Faster subscriptions management in Subject The current implementation of the Subject was backing itself on an array of observers. When one observer unsubscribed itself, it was performing a linear scan on the array of observers and dropping the item from the array of observers. The algorithm complexity of this operation was O(n) with n being the number of observers connected to the Subject. It caused code like: ```js const allSubscriptions = []; const source = new Subject(); for (let index = 0 ; index !== 1_000_000 ; ++index) { allSubscriptions.push(source.subscribe()); // rather quick } for (const subscription of allSubscriptions) { subscription.unsubscribe(); // taking very long } ``` The proposed approach consists into changing our backing collection (an array) into a Map. But we lose somehow the set capability on subject.observers (might possibly be patched in some way). Following that change the command `cross-env TS_NODE_PROJECT=tsconfig.mocha.json mocha --config spec/support/.mocharc.js "spec/**/Subje*-spec.ts"` passed to 452ms from 10s. * refactor(Subject): some perf optimizations + Loops over indices of the observers array rather than creating a new iterator and using for-of for each emission + Removes unnecessary dirty boolean in favor of clearing the snapshot, which frees up memory a little + Centralizes observers list teardown and ensures it's called during `unsubscribe()`. --------- Co-authored-by: Ben Lesh <ben@benlesh.com>
1 parent a292214 commit dbbae80

File tree

2 files changed

+83
-22
lines changed

2 files changed

+83
-22
lines changed

spec/Subject-spec.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { expect } from 'chai';
2-
import { Subject, ObjectUnsubscribedError, Observable, AsyncSubject, Observer, of, config, throwError, concat } from 'rxjs';
2+
import { Subject, ObjectUnsubscribedError, Observable, AsyncSubject, Observer, of, config, throwError, concat, Subscription } from 'rxjs';
33
import { AnonymousSubject } from 'rxjs/internal/Subject';
44
import { catchError, delay, map, mergeMap } from 'rxjs/operators';
55
import { TestScheduler } from 'rxjs/testing';
@@ -683,6 +683,47 @@ describe('Subject', () => {
683683
expect(true).to.be.true;
684684
});
685685
});
686+
687+
describe('many subscribers', () => {
688+
it('should be able to subscribe and unsubscribe huge amounts of subscribers', () => {
689+
let numResultsReceived = 0;
690+
const allSubscriptions: Subscription[] = [];
691+
const source = new Subject<number>();
692+
const numSubscribers = 100000;
693+
for (let index = 0 ; index !== numSubscribers ; ++index) {
694+
allSubscriptions.push(source.subscribe(() => {
695+
++numResultsReceived;
696+
}));
697+
}
698+
expect(numResultsReceived).to.eq(0);
699+
expect(source.observed).to.be.true;
700+
source.next(42);
701+
expect(numResultsReceived).to.eq(numSubscribers);
702+
expect(source.observed).to.be.true;
703+
for (const subscription of allSubscriptions) {
704+
subscription.unsubscribe();
705+
}
706+
expect(numResultsReceived).to.eq(numSubscribers);
707+
expect(source.observed).to.be.false;
708+
});
709+
});
710+
711+
describe('re-rentrant subscribers', () => {
712+
it('should handle re-entrant subscribers', () => {
713+
const seenValues: number[] = [];
714+
const source = new Subject<number>();
715+
source.subscribe((value) => {
716+
seenValues.push(value);
717+
source.subscribe(nestedValue => {
718+
seenValues.push(nestedValue);
719+
});
720+
});
721+
source.next(1);
722+
source.next(2);
723+
source.next(3);
724+
expect(seenValues).to.deep.eq([1, 2, 2, 3, 3, 3]);
725+
});
726+
});
686727
});
687728

688729
describe('AnonymousSubject', () => {

src/internal/Subject.ts

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { Subscriber } from './Subscriber';
33
import { Subscription } from './Subscription';
44
import { Observer, SubscriptionLike, TeardownLogic } from './types';
55
import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
6-
import { arrRemove } from './util/arrRemove';
76

87
/**
98
* A Subject is a special type of Observable that allows values to be
@@ -15,14 +14,26 @@ import { arrRemove } from './util/arrRemove';
1514
export class Subject<T> extends Observable<T> implements SubscriptionLike {
1615
closed = false;
1716

18-
private currentObservers: Observer<T>[] | null = null;
17+
private currentObservers = new Map<Subscription, Observer<T>>();
18+
19+
/**
20+
* This is used to track a known array of observers, so we don't have to
21+
* clone them while iterating to prevent reentrant behaviors.
22+
* (for example, what if the subject is subscribed to when nexting to an observer)
23+
*/
24+
private observerSnapshot: Observer<T>[] | undefined;
25+
26+
/** @internal */
27+
get observers(): Observer<T>[] {
28+
return (this.observerSnapshot ??= Array.from(this.currentObservers.values()));
29+
}
1930

20-
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
21-
observers: Observer<T>[] = [];
2231
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
2332
isStopped = false;
33+
2434
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
2535
hasError = false;
36+
2637
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
2738
thrownError: any = null;
2839

@@ -48,14 +59,18 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {
4859
}
4960
}
5061

62+
protected _clearObservers() {
63+
this.currentObservers.clear();
64+
this.observerSnapshot = undefined;
65+
}
66+
5167
next(value: T) {
5268
this._throwIfClosed();
5369
if (!this.isStopped) {
54-
if (!this.currentObservers) {
55-
this.currentObservers = Array.from(this.observers);
56-
}
57-
for (const observer of this.currentObservers) {
58-
observer.next(value);
70+
const { observers } = this;
71+
const len = observers.length;
72+
for (let i = 0; i < len; i++) {
73+
observers[i].next(value);
5974
}
6075
}
6176
}
@@ -66,9 +81,11 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {
6681
this.hasError = this.isStopped = true;
6782
this.thrownError = err;
6883
const { observers } = this;
69-
while (observers.length) {
70-
observers.shift()!.error(err);
84+
const len = observers.length;
85+
for (let i = 0; i < len; i++) {
86+
observers[i].error(err);
7187
}
88+
this._clearObservers();
7289
}
7390
}
7491

@@ -77,19 +94,21 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {
7794
if (!this.isStopped) {
7895
this.isStopped = true;
7996
const { observers } = this;
80-
while (observers.length) {
81-
observers.shift()!.complete();
97+
const len = observers.length;
98+
for (let i = 0; i < len; i++) {
99+
observers[i].complete();
82100
}
101+
this._clearObservers();
83102
}
84103
}
85104

86105
unsubscribe() {
87106
this.isStopped = this.closed = true;
88-
this.observers = this.currentObservers = null!;
107+
this._clearObservers();
89108
}
90109

91110
get observed() {
92-
return this.observers?.length > 0;
111+
return this.currentObservers.size > 0;
93112
}
94113

95114
/** @internal */
@@ -107,16 +126,17 @@ export class Subject<T> extends Observable<T> implements SubscriptionLike {
107126

108127
/** @internal */
109128
protected _innerSubscribe(subscriber: Subscriber<any>) {
110-
const { hasError, isStopped, observers } = this;
129+
const { hasError, isStopped, currentObservers } = this;
111130
if (hasError || isStopped) {
112131
return Subscription.EMPTY;
113132
}
114-
this.currentObservers = null;
115-
observers.push(subscriber);
116-
return new Subscription(() => {
117-
this.currentObservers = null;
118-
arrRemove(observers, subscriber);
133+
const subscription = new Subscription(() => {
134+
currentObservers.delete(subscription);
135+
this.observerSnapshot = undefined;
119136
});
137+
currentObservers.set(subscription, subscriber);
138+
this.observerSnapshot = undefined;
139+
return subscription;
120140
}
121141

122142
/** @internal */

0 commit comments

Comments
 (0)