@@ -77,12 +77,6 @@ class DataQueueImpl final : public DataQueue,
7777 DataQueueImpl& operator =(const DataQueueImpl&) = delete ;
7878 DataQueueImpl& operator =(DataQueueImpl&&) = delete ;
7979
80- ~DataQueueImpl () {
81- for (auto cur = readers_.begin (); cur != readers_.end (); cur++) {
82- (*cur)->clearPendingReads ();
83- }
84- }
85-
8680 std::shared_ptr<DataQueue> slice (
8781 uint64_t start,
8882 std::optional<uint64_t > maybeEnd = std::nullopt ) override {
@@ -232,7 +226,6 @@ class DataQueueImpl final : public DataQueue,
232226 class NotifyReader {
233227 public:
234228 virtual void newDataOrEnd () = 0;
235- virtual void clearPendingReads () = 0;
236229 };
237230
238231 void reader_destructed (NotifyReader* reader) { readers_.erase (reader); }
@@ -288,10 +281,6 @@ class IdempotentDataQueueReader final
288281 // Currently nothing, but it may change
289282 }
290283
291- void clearPendingReads () override {
292- // Currently nothing, but it may change
293- }
294-
295284 int Pull (Next next,
296285 int options,
297286 DataQueue::Vec* data,
@@ -446,7 +435,10 @@ class NonIdempotentDataQueueReader final
446435 }
447436
448437 ~NonIdempotentDataQueueReader () {
449- clearPendingReads ();
438+ if (waited_next_) {
439+ std::move (waited_next_)(
440+ bob::Status::STATUS_EOS, nullptr , 0 , [](uint64_t ) {});
441+ }
450442 data_queue_->reader_destructed (this );
451443 }
452444
@@ -470,13 +462,6 @@ class NonIdempotentDataQueueReader final
470462 }
471463 }
472464
473- void clearPendingReads () override {
474- if (waited_next_) {
475- std::move (waited_next_)(
476- bob::Status::STATUS_EOS, nullptr , 0 , [](uint64_t ) {});
477- }
478- }
479-
480465 int Pull (Next next,
481466 int options,
482467 DataQueue::Vec* data,
0 commit comments