@@ -350,6 +350,9 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value>& args) {
350350 BaseObjectPtr<Blob::Reader> reader;
351351 Global<Function> callback;
352352 Environment* env;
353+ std::vector<DataQueue::Vec> vecs;
354+ std::list<bob::Done> dones;
355+ bool innext = false ;
353356 };
354357 // TODO(@jasnell): A unique_ptr is likely better here but making this a unique
355358 // pointer that is passed into the lambda causes the std::move(next) below to
@@ -375,22 +378,61 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value>& args) {
375378 }
376379
377380 if (count > 0 ) {
381+ impl->vecs .insert (
382+ impl->vecs .end (),
383+ vecs,
384+ vecs + count);
385+ impl->dones .push_back (std::move (doneCb));
386+ }
387+ if (impl->innext ) {
388+ CHECK (status != bob::STATUS_WAIT);
389+ return ;
390+ }
391+ if (status == bob::STATUS_CONTINUE) {
392+ // We pull some more,
393+ // but it must be sync as we
394+ // merge it together
395+ impl->innext = true ;
396+ while (status == bob::STATUS_CONTINUE) {
397+ auto snext = [impl](int status,
398+ const DataQueue::Vec* vecs,
399+ size_t count,
400+ bob::Done doneCb) {
401+ if (count > 0 ) {
402+ impl->vecs .insert (impl->vecs .end (), vecs, vecs + count);
403+ impl->dones .push_back (std::move (doneCb));
404+ }
405+ };
406+ status = impl->reader ->inner_ ->Pull (std::move (snext),
407+ node::bob::OPTIONS_SYNC,
408+ nullptr ,
409+ 0 );
410+ }
411+ }
412+ // otherwise we commit and call
413+ if (impl->vecs .size () > 0 ) {
378414 // Copy the returns vectors into a single ArrayBuffer.
379415 size_t total = 0 ;
380- for (size_t n = 0 ; n < count; n++) total += vecs[n].len ;
416+ const size_t vecs_size = impl->vecs .size ();
417+ const DataQueue::Vec* ivecs = &(*impl->vecs .begin ());
418+ for (size_t n = 0 ; n < vecs_size; n++) total += ivecs[n].len ;
381419
382420 std::shared_ptr<BackingStore> store = ArrayBuffer::NewBackingStore (
383421 env->isolate (),
384422 total,
385423 BackingStoreInitializationMode::kUninitialized );
386424 auto ptr = static_cast <uint8_t *>(store->Data ());
387- for (size_t n = 0 ; n < count ; n++) {
388- std::copy (vecs [n].base , vecs [n].base + vecs [n].len , ptr);
389- ptr += vecs [n].len ;
425+ for (size_t n = 0 ; n < vecs_size ; n++) {
426+ std::copy (ivecs [n].base , ivecs [n].base + ivecs [n].len , ptr);
427+ ptr += ivecs [n].len ;
390428 }
391429 // Since we copied the data buffers, signal that we're done with them.
392- std::move (doneCb)(0 );
393- Local<Value> argv[2 ] = {Uint32::New (env->isolate (), status),
430+ std::for_each (impl->dones .begin (),
431+ impl->dones .end (),
432+ [](bob::Done& done) {
433+ std::move (done)(0 );
434+ });
435+ Local<Value> argv[2 ] = {Uint32::New (env->isolate (), bob::STATUS_CONTINUE),
394436 ArrayBuffer::New (env->isolate (), store)};
395437 impl->reader ->MakeCallback (fn, arraysize (argv), argv);
396438 return ;
0 commit comments