Skip to content

Commit e5bd337

Browse files
Arguably more understandable asyncStreamOf with a queue of waiting nexts
1 parent 633dbcb commit e5bd337

File tree

1 file changed

+20
-11
lines changed

1 file changed

+20
-11
lines changed

libraries/common/io/requests.effekt

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,28 +141,37 @@ namespace js {
141141

142142
/// Returns the data emitted via dataEv as an async stream until endEv
143143
def asyncStreamOf[D, E](em: EventEmitter, dataEv: Event[D], endEv: Event[E]): (EffektAsyncIterator[D], Promise[E]) = {
144-
val nextResolve = ref(promise::make())
144+
val waitingNexts = ref(Nil[Promise[Option[D]]]())
145+
val nextWasCalled = ref(promise::make())
145146
val done = ref(false)
146147
val endPromise = promise::make()
147148
em.on(dataEv, box { chunk =>
148-
val waitingResolve = nextResolve.get().await()
149-
nextResolve.set(promise::make())
150-
waitingResolve.resolve(Some(chunk))
149+
loop { {l} =>
150+
nextWasCalled.get().await()
151+
nextWasCalled.set(promise::make())
152+
waitingNexts.get() match {
153+
case Cons(hd, tl) =>
154+
waitingNexts.set(tl)
155+
hd.resolve(Some(chunk))
156+
l.break()
157+
case Nil() => ()
158+
}
159+
}
151160
})
152161
em.on(endEv, box { e =>
153-
val waitingResolve = nextResolve.get().await()
154-
waitingResolve.resolve(None())
155162
done.set(true)
163+
waitingNexts.get().foreach{ p => p.resolve(None()) }
156164
endPromise.resolve(e)
157165
})
158166
(EffektAsyncIterator(box {
159-
val resPromise = promise::make()
160-
if(done.get()) {
161-
resPromise.resolve(None())
167+
val res = promise::make()
168+
if (done.get()) {
169+
res.resolve(None())
162170
} else {
163-
nextResolve.get().resolve(resPromise)
171+
waitingNexts.set(waitingNexts.get().append([res]))
172+
nextWasCalled.get().resolve(())
164173
}
165-
resPromise
174+
res
166175
}), endPromise)
167176
}
168177

0 commit comments

Comments
 (0)