@@ -139,6 +139,33 @@ namespace js {
139
139
def wait[T](em: EventEmitter, ev: Event[T]): T =
140
140
em.unsafeWait(ev.name)
141
141
142
+ /// Returns the data emitted via dataEv as an async stream until endEv
143
+ def asyncStreamOf[D, E](em: EventEmitter, dataEv: Event[D], endEv: Event[E]): (EffektAsyncIterator[D], Promise[E]) = {
144
+ val nextResolve = ref(promise::make())
145
+ val done = ref(false)
146
+ val endPromise = promise::make()
147
+ em.on(dataEv, box { chunk =>
148
+ val waitingResolve = nextResolve.get().await()
149
+ nextResolve.set(promise::make())
150
+ waitingResolve.resolve(Some(chunk))
151
+ })
152
+ em.on(endEv, box { e =>
153
+ val waitingResolve = nextResolve.get().await()
154
+ waitingResolve.resolve(None())
155
+ done.set(true)
156
+ endPromise.resolve(e)
157
+ })
158
+ (EffektAsyncIterator(box {
159
+ val resPromise = promise::make()
160
+ if(done.get()) {
161
+ resPromise.resolve(None())
162
+ } else {
163
+ nextResolve.get().resolve(resPromise)
164
+ }
165
+ resPromise
166
+ }), endPromise)
167
+ }
168
+
142
169
// Dict-like JS objects
143
170
// --------------------
144
171
extern type JsObj
@@ -181,28 +208,8 @@ namespace jsNode {
181
208
extern pure def events(r: NativeResponse): js::EventEmitter = jsNode "${r}"
182
209
183
210
def getBody(r: NativeResponse): EffektAsyncIterator[js::NativeBytes] = {
184
- val nextResolve = ref(promise::make())
185
- val done = ref(false)
186
- r.events.js::on(js::ev::data(), box { chunk =>
187
- val waitingResolve = nextResolve.get().await()
188
- nextResolve.set(promise::make())
189
- waitingResolve.resolve(Some(chunk))
190
- })
191
- r.events.js::on(js::ev::end(), box { _ =>
192
- val waitingResolve = nextResolve.get().await()
193
- //nextResolve.set(promise::make())
194
- waitingResolve.resolve(None())
195
- done.set(true)
196
- })
197
- EffektAsyncIterator(box {
198
- val resPromise = promise::make()
199
- if (done.get()) {
200
- resPromise.resolve(None())
201
- } else {
202
- nextResolve.get().resolve(resPromise)
203
- }
204
- resPromise
205
- })
211
+ val (res, _) = js::asyncStreamOf(r.events, js::ev::data(), js::ev::end())
212
+ res
206
213
}
207
214
208
215
extern io def isError(r: NativeResponse): Bool =
0 commit comments