Skip to content

Commit 8bfc4b6

Browse files
authored
Merge pull request #198 from jedwards1211/patch-1
fix(pubsub-async-iterator.ts): don't subscribe until first call to next()
2 parents 7853817 + c54b18d commit 8bfc4b6

File tree

2 files changed

+15
-13
lines changed

2 files changed

+15
-13
lines changed

src/pubsub-async-iterator.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,20 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
3939
this.pushQueue = [];
4040
this.listening = true;
4141
this.eventsArray = typeof eventNames === 'string' ? [eventNames] : eventNames;
42-
this.allSubscribed = this.subscribeAll();
4342
}
4443

4544
public async next() {
46-
await this.allSubscribed;
45+
await this.subscribeAll();
4746
return this.listening ? this.pullValue() : this.return();
4847
}
4948

5049
public async return() {
51-
this.emptyQueue(await this.allSubscribed);
50+
await this.emptyQueue();
5251
return { value: undefined, done: true };
5352
}
5453

5554
public async throw(error) {
56-
this.emptyQueue(await this.allSubscribed);
55+
await this.emptyQueue();
5756
return Promise.reject(error);
5857
}
5958

@@ -64,13 +63,13 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
6463
private pullQueue: Function[];
6564
private pushQueue: any[];
6665
private eventsArray: string[];
67-
private allSubscribed: Promise<number[]>;
66+
private subscriptionIds: Promise<number[]> | undefined;
6867
private listening: boolean;
6968
private pubsub: PubSubEngine;
7069
private options: Object;
7170

7271
private async pushValue(event) {
73-
await this.allSubscribed;
72+
await this.subscribeAll();
7473
if (this.pullQueue.length !== 0) {
7574
this.pullQueue.shift()({ value: event, done: false });
7675
} else {
@@ -88,20 +87,23 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
8887
});
8988
}
9089

91-
private emptyQueue(subscriptionIds: number[]) {
90+
private async emptyQueue() {
9291
if (this.listening) {
9392
this.listening = false;
94-
this.unsubscribeAll(subscriptionIds);
93+
if (this.subscriptionIds) this.unsubscribeAll(await this.subscriptionIds);
9594
this.pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
9695
this.pullQueue.length = 0;
9796
this.pushQueue.length = 0;
9897
}
9998
}
10099

101100
private subscribeAll() {
102-
return Promise.all(this.eventsArray.map(
103-
eventName => this.pubsub.subscribe(eventName, this.pushValue.bind(this), this.options),
104-
));
101+
if (!this.subscriptionIds) {
102+
this.subscriptionIds = Promise.all(this.eventsArray.map(
103+
eventName => this.pubsub.subscribe(eventName, this.pushValue.bind(this), this.options),
104+
));
105+
}
106+
return this.subscriptionIds
105107
}
106108

107109
private unsubscribeAll(subscriptionIds: number[]) {

src/test/integration-tests.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ describe('PubSubAsyncIterator', function() {
8585
expect(isAsyncIterable(ai)).to.be.true;
8686

8787
const r = (ai as AsyncIterator<any>).next();
88-
pubsub.publish(FIRST_EVENT, {});
88+
setTimeout(() => pubsub.publish(FIRST_EVENT, {}), 50);
8989

9090
return r;
9191
})
@@ -100,7 +100,7 @@ describe('PubSubAsyncIterator', function() {
100100
expect(isAsyncIterable(ai)).to.be.true;
101101

102102
const r = (ai as AsyncIterator<any>).next();
103-
pubsub.publish(SECOND_EVENT, {});
103+
setTimeout(() => pubsub.publish(SECOND_EVENT, {}), 50);
104104

105105
return r;
106106
})

0 commit comments

Comments
 (0)