Skip to content

Commit 4cb733f

Browse files
Extract events to async stream logic into helper function
1 parent dd31f8d commit 4cb733f

File tree

1 file changed

+29
-22
lines changed

1 file changed

+29
-22
lines changed

libraries/common/io/requests.effekt

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,33 @@ namespace js {
139139
def wait[T](em: EventEmitter, ev: Event[T]): T =
140140
em.unsafeWait(ev.name)
141141

142+
/// Returns the data emitted via dataEv as an async stream until endEv
143+
def asyncStreamOf[D, E](em: EventEmitter, dataEv: Event[D], endEv: Event[E]): (EffektAsyncIterator[D], Promise[E]) = {
144+
val nextResolve = ref(promise::make())
145+
val done = ref(false)
146+
val endPromise = promise::make()
147+
em.on(dataEv, box { chunk =>
148+
val waitingResolve = nextResolve.get().await()
149+
nextResolve.set(promise::make())
150+
waitingResolve.resolve(Some(chunk))
151+
})
152+
em.on(endEv, box { e =>
153+
val waitingResolve = nextResolve.get().await()
154+
waitingResolve.resolve(None())
155+
done.set(true)
156+
endPromise.resolve(e)
157+
})
158+
(EffektAsyncIterator(box {
159+
val resPromise = promise::make()
160+
if(done.get()) {
161+
resPromise.resolve(None())
162+
} else {
163+
nextResolve.get().resolve(resPromise)
164+
}
165+
resPromise
166+
}), endPromise)
167+
}
168+
142169
// Dict-like JS objects
143170
// --------------------
144171
extern type JsObj
@@ -181,28 +208,8 @@ namespace jsNode {
181208
extern pure def events(r: NativeResponse): js::EventEmitter = jsNode "${r}"
182209

183210
def getBody(r: NativeResponse): EffektAsyncIterator[js::NativeBytes] = {
184-
val nextResolve = ref(promise::make())
185-
val done = ref(false)
186-
r.events.js::on(js::ev::data(), box { chunk =>
187-
val waitingResolve = nextResolve.get().await()
188-
nextResolve.set(promise::make())
189-
waitingResolve.resolve(Some(chunk))
190-
})
191-
r.events.js::on(js::ev::end(), box { _ =>
192-
val waitingResolve = nextResolve.get().await()
193-
//nextResolve.set(promise::make())
194-
waitingResolve.resolve(None())
195-
done.set(true)
196-
})
197-
EffektAsyncIterator(box {
198-
val resPromise = promise::make()
199-
if (done.get()) {
200-
resPromise.resolve(None())
201-
} else {
202-
nextResolve.get().resolve(resPromise)
203-
}
204-
resPromise
205-
})
211+
val (res, _) = js::asyncStreamOf(r.events, js::ev::data(), js::ev::end())
212+
res
206213
}
207214

208215
extern io def isError(r: NativeResponse): Bool =

0 commit comments

Comments
 (0)