@@ -36,6 +36,7 @@ class NonIdempotentDataQueueReader;
3636class EntryImpl : public DataQueue ::Entry {
3737 public:
3838 virtual std::shared_ptr<DataQueue::Reader> get_reader () = 0;
39+ virtual void clearPendingNext () = 0;
3940};
4041
4142class DataQueueImpl final : public DataQueue,
@@ -159,6 +160,12 @@ class DataQueueImpl final : public DataQueue,
159160 return std::nullopt ;
160161 }
161162
163+ void clearPendingNext () override {
164+ for (const auto & entry : entries_) {
165+ entry->clearPendingNext ();
166+ }
167+ }
168+
162169 void MemoryInfo (node::MemoryTracker* tracker) const override {
163170 tracker->TrackField (
164171 " entries" , entries_, " std::vector<std::unique_ptr<Entry>>" );
@@ -590,6 +597,10 @@ class EmptyEntry final : public EntryImpl {
590597 return std::make_unique<EmptyEntry>();
591598 }
592599
600+ void clearPendingNext () override {
601+ // this is a noop for an empty object
602+ }
603+
593604 std::optional<uint64_t > size () const override { return 0 ; }
594605
595606 bool is_idempotent () const override { return true ; }
@@ -702,6 +713,10 @@ class InMemoryEntry final : public EntryImpl {
702713 return makeEntry (start, byte_length_ - start);
703714 }
704715
716+ void clearPendingNext () override {
717+ // this is a noop for an object, that immediately calls next.
718+ }
719+
705720 std::optional<uint64_t > size () const override { return byte_length_; }
706721
707722 bool is_idempotent () const override { return true ; }
@@ -750,6 +765,11 @@ class DataQueueEntry : public EntryImpl {
750765 return std::make_unique<DataQueueEntry>(std::move (sliced));
751766 }
752767
768+ void clearPendingNext () override {
769+ // this is just calls clearPendingNext of the queue
770+ data_queue_->clearPendingNext ();
771+ }
772+
753773 // Returns the number of bytes represented by this Entry if it is
754774 // known. Certain types of entries, such as those backed by streams
755775 // might not know the size in advance and therefore cannot provide
@@ -868,6 +888,10 @@ class FdEntry final : public EntryImpl {
868888 return std::make_unique<FdEntry>(env_, path_, stat_, new_start, new_end);
869889 }
870890
891+ void clearPendingNext () override {
892+ clearPendingNextImpl ();
893+ }
894+
871895 std::optional<uint64_t > size () const override { return end_ - start_; }
872896
873897 bool is_idempotent () const override { return true ; }
@@ -885,6 +909,16 @@ class FdEntry final : public EntryImpl {
885909 uint64_t start_ = 0 ;
886910 uint64_t end_ = 0 ;
887911
912+ class ReaderImpl ;
913+ std::unordered_set<ReaderImpl*> readers_;
914+
915+ void clearPendingNextImpl () {
916+ // this should clear all pending pulls from the FD reader
917+ for (ReaderImpl* p : readers_) {
918+ p->ClearAllPendingPulls ();
919+ }
920+ }
921+
888922 bool is_modified (const uv_stat_t & other) {
889923 return other.st_size != stat_.st_size ||
890924 other.st_mtim .tv_nsec != stat_.st_mtim .tv_nsec ;
@@ -931,12 +965,14 @@ class FdEntry final : public EntryImpl {
931965 : env_(handle->env ()), handle_(std::move(handle)), entry_(entry) {
932966 handle_->PushStreamListener (this );
933967 handle_->env ()->AddCleanupHook (cleanup, this );
968+ entry_->readers_ .insert (this );
934969 }
935970
936971 ~ReaderImpl () override {
937972 handle_->env ()->RemoveCleanupHook (cleanup, this );
938973 DrainAndClose ();
939974 handle_->RemoveStreamListener (this );
975+ entry_->readers_ .erase (this );
940976 }
941977
942978 uv_buf_t OnStreamAlloc (size_t suggested_size) override {
@@ -1061,6 +1097,10 @@ class FdEntry final : public EntryImpl {
10611097 return std::move (pending_pulls_.front ());
10621098 }
10631099
1100+ void ClearAllPendingPulls () {
1101+ pending_pulls_.clear ();
1102+ }
1103+
10641104 friend class FdEntry ;
10651105 };
10661106
@@ -1093,6 +1133,10 @@ class FeederEntry final : public EntryImpl {
10931133 }
10941134
10951135 bool is_idempotent () const override { return false ; }
1136+
1137+ void clearPendingNext () override {
1138+ if (feeder_) feeder_->clearPendingNext ();
1139+ }
10961140
10971141 SET_NO_MEMORY_INFO ()
10981142 SET_MEMORY_INFO_NAME (FeederEntry)
0 commit comments