@@ -77,6 +77,12 @@ class DataQueueImpl final : public DataQueue,
7777 DataQueueImpl& operator =(const DataQueueImpl&) = delete ;
7878 DataQueueImpl& operator =(DataQueueImpl&&) = delete ;
7979
80+ ~DataQueueImpl () {
81+ for (auto cur = readers_.begin (); cur != readers_.end (); cur++) {
82+ (*cur)->clearPendingReads ();
83+ }
84+ }
85+
8086 std::shared_ptr<DataQueue> slice (
8187 uint64_t start,
8288 std::optional<uint64_t > maybeEnd = std::nullopt ) override {
@@ -226,6 +232,7 @@ class DataQueueImpl final : public DataQueue,
226232 class NotifyReader {
227233 public:
228234 virtual void newDataOrEnd () = 0;
235+ virtual void clearPendingReads () = 0;
229236 };
230237
231238 void reader_destructed (NotifyReader* reader) { readers_.erase (reader); }
@@ -281,6 +288,10 @@ class IdempotentDataQueueReader final
281288 // Currently nothing, but it may change
282289 }
283290
291+ void clearPendingReads () override {
292+ // Currently nothing, but it may change
293+ }
294+
284295 int Pull (Next next,
285296 int options,
286297 DataQueue::Vec* data,
@@ -435,10 +446,7 @@ class NonIdempotentDataQueueReader final
435446 }
436447
437448 ~NonIdempotentDataQueueReader () {
438- if (waited_next_) {
439- std::move (waited_next_)(
440- bob::Status::STATUS_EOS, nullptr , 0 , [](uint64_t ) {});
441- }
449+ clearPendingReads ();
442450 data_queue_->reader_destructed (this );
443451 }
444452
@@ -462,6 +470,13 @@ class NonIdempotentDataQueueReader final
462470 }
463471 }
464472
473+ void clearPendingReads () override {
474+ if (waited_next_) {
475+ std::move (waited_next_)(
476+ bob::Status::STATUS_EOS, nullptr , 0 , [](uint64_t ) {});
477+ }
478+ }
479+
465480 int Pull (Next next,
466481 int options,
467482 DataQueue::Vec* data,
0 commit comments