Skip to content

Commit 52509bc

Browse files
committed
Fix firing listeners in DataStream.
1 parent ccb56e3 commit 52509bc

File tree

1 file changed

+18
-14
lines changed

1 file changed

+18
-14
lines changed

packages/common/src/utils/DataStream.ts

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,23 @@ export class DataStream<ParsedData, SourceData = any> extends BaseObserver<DataS
142142
});
143143
}
144144

145-
protected async processQueue() {
145+
protected processQueue() {
146146
if (this.processingPromise) {
147147
return;
148148
}
149149

150+
let promise = this._processQueue().finally(() => {
151+
this.processingPromise = null;
152+
});
153+
this.processingPromise = promise;
154+
return promise;
155+
}
156+
157+
protected hasDataReader() {
158+
return Array.from(this.listeners.values()).some((l) => !!l.data);
159+
}
160+
161+
protected async _processQueue() {
150162
/**
151163
* Allow listeners to mutate the queue before processing.
152164
* This allows for operations such as dropping or compressing data
@@ -156,16 +168,7 @@ export class DataStream<ParsedData, SourceData = any> extends BaseObserver<DataS
156168
await this.iterateAsyncErrored(async (l) => l.highWater?.());
157169
}
158170

159-
return (this.processingPromise = this._processQueue());
160-
}
161-
162-
protected hasDataReader() {
163-
return Array.from(this.listeners.values()).some((l) => !!l.data);
164-
}
165-
166-
protected async _processQueue() {
167171
if (this.isClosed || !this.hasDataReader()) {
168-
Promise.resolve().then(() => (this.processingPromise = null));
169172
return;
170173
}
171174

@@ -179,16 +182,17 @@ export class DataStream<ParsedData, SourceData = any> extends BaseObserver<DataS
179182
await this.iterateAsyncErrored(async (l) => l.lowWater?.());
180183
}
181184

182-
this.processingPromise = null;
183-
184-
if (this.dataQueue.length) {
185+
if (this.dataQueue.length > 0) {
185186
// Next tick
186187
setTimeout(() => this.processQueue());
187188
}
188189
}
189190

190191
protected async iterateAsyncErrored(cb: (l: Partial<DataStreamListener<ParsedData>>) => Promise<void>) {
191-
for (let i of this.listeners.values()) {
192+
// Important: We need to copy the listeners, as calling a listener could result in adding another
193+
// listener, resulting in infinite loops.
194+
const listeners = Array.from(this.listeners.values());
195+
for (let i of listeners) {
192196
try {
193197
await cb(i);
194198
} catch (ex) {

0 commit comments

Comments
 (0)