@@ -141,28 +141,37 @@ namespace js {
141
141
142
142
/// Returns the data emitted via dataEv as an async stream until endEv
143
143
def asyncStreamOf[D, E](em: EventEmitter, dataEv: Event[D], endEv: Event[E]): (EffektAsyncIterator[D], Promise[E]) = {
144
- val nextResolve = ref(promise::make())
144
+ val waitingNexts = ref(Nil[Promise[Option[D]]]())
145
+ val nextWasCalled = ref(promise::make())
145
146
val done = ref(false)
146
147
val endPromise = promise::make()
147
148
em.on(dataEv, box { chunk =>
148
- val waitingResolve = nextResolve.get().await()
149
- nextResolve.set(promise::make())
150
- waitingResolve.resolve(Some(chunk))
149
+ loop { {l} =>
150
+ nextWasCalled.get().await()
151
+ nextWasCalled.set(promise::make())
152
+ waitingNexts.get() match {
153
+ case Cons(hd, tl) =>
154
+ waitingNexts.set(tl)
155
+ hd.resolve(Some(chunk))
156
+ l.break()
157
+ case Nil() => ()
158
+ }
159
+ }
151
160
})
152
161
em.on(endEv, box { e =>
153
- val waitingResolve = nextResolve.get().await()
154
- waitingResolve.resolve(None())
155
162
done.set(true)
163
+ waitingNexts.get().foreach{ p => p.resolve(None()) }
156
164
endPromise.resolve(e)
157
165
})
158
166
(EffektAsyncIterator(box {
159
- val resPromise = promise::make()
160
- if(done.get()) {
161
- resPromise .resolve(None())
167
+ val res = promise::make()
168
+ if (done.get()) {
169
+ res .resolve(None())
162
170
} else {
163
- nextResolve.get().resolve(resPromise)
171
+ waitingNexts.set(waitingNexts.get().append([res]))
172
+ nextWasCalled.get().resolve(())
164
173
}
165
- resPromise
174
+ res
166
175
}), endPromise)
167
176
}
168
177
0 commit comments