@@ -329,7 +329,10 @@ class IdempotentDataQueueReader final
329329 int status = current_reader->Pull (
330330 [this , next = std::move (next)](
331331 int status, const DataQueue::Vec* vecs, uint64_t count, Done done) {
332- pull_pending_ = false ;
332+ if (status != bob::Status::STATUS_WAIT) {
333+ // do not set for an async call
334+ pull_pending_ = false ;
335+ }
333336 // In each of these cases, we do not expect that the source will
334337 // actually have provided any actual data.
335338 CHECK_IMPLIES (status == bob::Status::STATUS_BLOCK ||
@@ -365,8 +368,11 @@ class IdempotentDataQueueReader final
365368
366369 // The pull was handled synchronously. If we're not ended, we want to
367370 // make sure status returned is CONTINUE.
371+ // But if the source blocks, we should let the caller know,
372+ // otherwise, we may provoke a busy loop
368373 if (!pull_pending_) {
369- if (!ended_) return bob::Status::STATUS_CONTINUE;
374+ if (!ended_ && status != bob::Status::STATUS_BLOCK)
375+ return bob::Status::STATUS_CONTINUE;
370376 // For all other status, we just fall through and return it straightaway.
371377 }
372378
@@ -1148,6 +1154,13 @@ class FdEntry final : public EntryImpl {
11481154 std::move (next)(UV_EINVAL, nullptr , 0 , [](uint64_t ) {});
11491155 return UV_EINVAL;
11501156 }
1157+ // Note: we do not support sync reading, so if it is requested
1158+ // we need to bail out and say we are blocked
1159+ if ((options & bob::OPTIONS_SYNC)) {
1160+ std::move (next)(
1161+ bob::Status::STATUS_BLOCK, nullptr , 0 , [](uint64_t ) {});
1162+ return bob::STATUS_BLOCK;
1163+ }
11511164
11521165 pending_pulls_.emplace_back (std::move (next), shared_from_this ());
11531166 if (!reading_) {
0 commit comments