File tree Expand file tree Collapse file tree 2 files changed +5
-3
lines changed
Expand file tree Collapse file tree 2 files changed +5
-3
lines changed Original file line number Diff line number Diff line change @@ -450,9 +450,11 @@ class NonIdempotentDataQueueReader final
450450 status == bob::Status::STATUS_EOS,
451451 vecs == nullptr && count == 0 );
452452 if (status == bob::Status::STATUS_EOS) {
453+ current_reader_ = nullptr ; // must be done before erasing the entries
454+ // as FdEntry's and FeederEntry's reader hold a pointer to the entry!
455+ // and FeederEntry invoke it in destructor
453456 data_queue_->entries_ .erase (data_queue_->entries_ .begin ());
454457 ended_ = data_queue_->entries_ .empty ();
455- current_reader_ = nullptr ;
456458 if (!ended_) status = bob::Status::STATUS_CONTINUE;
457459 std::move (next)(status, nullptr , 0 , [](uint64_t ) {});
458460 return ;
Original file line number Diff line number Diff line change @@ -225,7 +225,7 @@ Maybe<std::shared_ptr<DataQueue>> Stream::GetDataQueueFromSource(
225225 ASSIGN_OR_RETURN_UNWRAP (
226226 &dataQueueFeeder, value, Nothing<std::shared_ptr<DataQueue>>());
227227 std::shared_ptr<DataQueue> dataQueue = DataQueue::Create ();
228- dataQueue->append (std::move ( DataQueue::CreateFeederEntry (dataQueueFeeder) ));
228+ dataQueue->append (DataQueue::CreateFeederEntry (dataQueueFeeder));
229229 return Just (dataQueue);
230230 }
231231
@@ -1349,6 +1349,7 @@ void DataQueueFeeder::tryWakePulls() {
13491349
13501350void DataQueueFeeder::DrainAndClose () {
13511351 if (done) return ;
1352+ done = true ; // do not do this several time, and note, it may be called several times.
13521353 while (!pendingPulls_.empty ()) {
13531354 auto & pending = pendingPulls_.front ();
13541355 auto pop = OnScopeLeave ([this ] { pendingPulls_.pop_front (); });
@@ -1359,7 +1360,6 @@ void DataQueueFeeder::DrainAndClose() {
13591360 (void )resolver->Resolve (env ()->context (), v8::False (env ()->isolate ()));
13601361 readFinish_.Reset ();
13611362 }
1362- done = true ;
13631363}
13641364
13651365JS_METHOD_IMPL (DataQueueFeeder::New) {
You can’t perform that action at this time.
0 commit comments