@@ -377,6 +377,7 @@ class IdempotentDataQueueReader final
377377 // pull but it is not available yet. The
378378 // caller should not keep calling pull for
379379 // now but may check again later.
380+ // Though we may call a notifier to signal.
380381 // bob::Status::STATUS_WAIT - means that the entry has more data to
381382 // pull but it won't be provided
382383 // synchronously, instead the next() callback
@@ -438,8 +439,8 @@ class NonIdempotentDataQueueReader final
438439 delete ;
439440
440441 void newDataOrEnd () override {
441- if (blocked_next_ ) {
442- auto next = std::move (blocked_next_ );
442+ if (waited_next_ ) {
443+ auto next = std::move (waited_next_ );
443444 data_queue_->env ()->SetImmediate (
444445 [next, dropme = shared_from_this ()](Environment* env) {
445446 std::move (next)(
@@ -455,7 +456,7 @@ class NonIdempotentDataQueueReader final
455456 size_t count,
456457 size_t max_count_hint = bob::kMaxCountHint ) override {
457458 std::shared_ptr<DataQueue::Reader> self = shared_from_this ();
458- assert (!blocked_next_ ); // Do not call us with blocked next
459+ assert (!waited_next_ ); // Do not call us with blocked next
459460
460461 // If ended is true, this reader has already reached the end and cannot
461462 // provide any more data.
@@ -474,10 +475,18 @@ class NonIdempotentDataQueueReader final
474475 // status.
475476 if (!data_queue_->is_capped ()) {
476477 // Do we have to call next?
477- assert (!blocked_next_);
478- next (bob::Status::STATUS_BLOCK, nullptr , 0 , [](uint64_t ) {});
479- blocked_next_ = std::move (next);
480- return bob::STATUS_BLOCK;
478+ if (!(options & bob::OPTIONS_SYNC)) {
479+ assert (!waited_next_);
480+ next (bob::Status::STATUS_WAIT, nullptr , 0 , [](uint64_t ) {});
481+ waited_next_ = std::move (next);
482+ return bob::STATUS_WAIT;
483+ } else {
484+ std::move (next)(bob::Status::STATUS_BLOCK,
485+ nullptr ,
486+ 0 ,
487+ [](uint64_t ) {});
488+ return bob::STATUS_BLOCK;
489+ }
481490 }
482491
483492 // However, if we are capped, the status will depend on whether the size
@@ -488,9 +497,18 @@ class NonIdempotentDataQueueReader final
488497 // still might get more data. We just don't know exactly when that'll
489498 // come, so let's return a blocked status.
490499 if (data_queue_->size ().value () < data_queue_->capped_size_ .value ()) {
491- next (bob::Status::STATUS_BLOCK, nullptr , 0 , [](uint64_t ) {});
492- blocked_next_ = std::move (next);
493- return bob::STATUS_BLOCK;
500+ if (!(options & bob::OPTIONS_SYNC)) {
501+ assert (!waited_next_);
502+ next (bob::Status::STATUS_WAIT, nullptr , 0 , [](uint64_t ) {});
503+ waited_next_ = std::move (next);
504+ return bob::STATUS_WAIT;
505+ } else {
506+ std::move (next)(bob::Status::STATUS_BLOCK,
507+ nullptr ,
508+ 0 ,
509+ [](uint64_t ) {});
510+ return bob::STATUS_BLOCK;
511+ }
494512 }
495513
496514 // Otherwise, if size is equal to or greater than capped, we are done.
@@ -516,7 +534,7 @@ class NonIdempotentDataQueueReader final
516534 CHECK (!pull_pending_);
517535 pull_pending_ = true ;
518536 int status = current_reader->Pull (
519- [this , next = std::move (next)](
537+ [this , next = std::move (next), options ](
520538 int status, const DataQueue::Vec* vecs, uint64_t count, Done done) {
521539 pull_pending_ = false ;
522540
@@ -539,10 +557,21 @@ class NonIdempotentDataQueueReader final
539557 if (data_queue_->is_capped ()) {
540558 ended_ = true ;
541559 } else {
542- assert (!blocked_next_);
543- next (bob::Status::STATUS_BLOCK, nullptr , 0 , [](uint64_t ) {});
544- blocked_next_ = std::move (next);
545- return ; // we should not call with move
560+ if (!(options & bob::OPTIONS_SYNC)) {
561+ assert (!waited_next_);
562+ next (bob::Status::STATUS_WAIT,
563+ nullptr ,
564+ 0 ,
565+ [](uint64_t ) {});
566+ waited_next_ = std::move (next);
567+ return ;
568+ } else {
569+ std::move (next)(bob::Status::STATUS_BLOCK,
570+ nullptr ,
571+ 0 ,
572+ [](uint64_t ) {});
573+ return ;
574+ }
546575 }
547576 } else {
548577 status = bob::Status::STATUS_CONTINUE;
@@ -586,6 +615,7 @@ class NonIdempotentDataQueueReader final
586615 // pull but it is not available yet. The
587616 // caller should not keep calling pull for
588617 // now but may check again later.
618+ // Though we may call a notifier to signal.
589619 // bob::Status::STATUS_WAIT - means that the entry has more data to
590620 // pull but it won't be provided
591621 // synchronously, instead the next() callback
@@ -616,7 +646,7 @@ class NonIdempotentDataQueueReader final
616646 std::shared_ptr<DataQueue::Reader> current_reader_ = nullptr ;
617647 bool ended_ = false ;
618648 bool pull_pending_ = false ;
619- Next blocked_next_ ;
649+ Next waited_next_ ;
620650};
621651
622652std::shared_ptr<DataQueue::Reader> DataQueueImpl::get_reader () {
0 commit comments