@@ -36,6 +36,7 @@ class NonIdempotentDataQueueReader;
3636class EntryImpl : public DataQueue ::Entry {
3737 public:
3838 virtual std::shared_ptr<DataQueue::Reader> get_reader () = 0;
39+ virtual void clearPendingNext () = 0;
3940};
4041
4142class DataQueueImpl final : public DataQueue,
@@ -159,6 +160,12 @@ class DataQueueImpl final : public DataQueue,
159160 return std::nullopt ;
160161 }
161162
163+ void clearPendingNext () override {
164+ for (const auto & entry : entries_) {
165+ entry->clearPendingNext ();
166+ }
167+ }
168+
162169 void MemoryInfo (node::MemoryTracker* tracker) const override {
163170 tracker->TrackField (
164171 " entries" , entries_, " std::vector<std::unique_ptr<Entry>>" );
@@ -590,6 +597,10 @@ class EmptyEntry final : public EntryImpl {
590597 return std::make_unique<EmptyEntry>();
591598 }
592599
600+ void clearPendingNext () override {
601+ // this is a noop for an empty object
602+ }
603+
593604 std::optional<uint64_t > size () const override { return 0 ; }
594605
595606 bool is_idempotent () const override { return true ; }
@@ -702,6 +713,10 @@ class InMemoryEntry final : public EntryImpl {
702713 return makeEntry (start, byte_length_ - start);
703714 }
704715
716+ void clearPendingNext () override {
717+ // this is a noop for an object, that immediately calls next.
718+ }
719+
705720 std::optional<uint64_t > size () const override { return byte_length_; }
706721
707722 bool is_idempotent () const override { return true ; }
@@ -750,6 +765,11 @@ class DataQueueEntry : public EntryImpl {
750765 return std::make_unique<DataQueueEntry>(std::move (sliced));
751766 }
752767
768+ void clearPendingNext () override {
769+ // this is just calls clearPendingNext of the queue
770+ data_queue_->clearPendingNext ();
771+ }
772+
753773 // Returns the number of bytes represented by this Entry if it is
754774 // known. Certain types of entries, such as those backed by streams
755775 // might not know the size in advance and therefore cannot provide
@@ -868,6 +888,10 @@ class FdEntry final : public EntryImpl {
868888 return std::make_unique<FdEntry>(env_, path_, stat_, new_start, new_end);
869889 }
870890
891+ void clearPendingNext () override {
892+ clearPendingNextImpl ();
893+ }
894+
871895 std::optional<uint64_t > size () const override { return end_ - start_; }
872896
873897 bool is_idempotent () const override { return true ; }
@@ -885,6 +909,16 @@ class FdEntry final : public EntryImpl {
885909 uint64_t start_ = 0 ;
886910 uint64_t end_ = 0 ;
887911
912+ class ReaderImpl ;
913+ std::unordered_set<ReaderImpl*> readers_;
914+
915+ void clearPendingNextImpl () {
916+ // this should clear all pending pulls from the FD reader
917+ for (ReaderImpl* p : readers_) {
918+ p->ClearAllPendingPulls ();
919+ }
920+ }
921+
888922 bool is_modified (const uv_stat_t & other) {
889923 return other.st_size != stat_.st_size ||
890924 other.st_mtim .tv_nsec != stat_.st_mtim .tv_nsec ;
@@ -932,12 +966,14 @@ class FdEntry final : public EntryImpl {
932966 : env_(handle->env ()), handle_(std::move(handle)), entry_(entry) {
933967 handle_->PushStreamListener (this );
934968 handle_->env ()->AddCleanupHook (cleanup, this );
969+ entry_->readers_ .insert (this );
935970 }
936971
937972 ~ReaderImpl () override {
938973 handle_->env ()->RemoveCleanupHook (cleanup, this );
939974 DrainAndClose ();
940975 handle_->RemoveStreamListener (this );
976+ entry_->readers_ .erase (this );
941977 }
942978
943979 uv_buf_t OnStreamAlloc (size_t suggested_size) override {
@@ -1062,6 +1098,10 @@ class FdEntry final : public EntryImpl {
10621098 return std::move (pending_pulls_.front ());
10631099 }
10641100
1101+ void ClearAllPendingPulls () {
1102+ pending_pulls_.clear ();
1103+ }
1104+
10651105 friend class FdEntry ;
10661106 };
10671107
@@ -1094,6 +1134,10 @@ class FeederEntry final : public EntryImpl {
10941134 }
10951135
10961136 bool is_idempotent () const override { return false ; }
1137+
1138+ void clearPendingNext () override {
1139+ if (feeder_) feeder_->clearPendingNext ();
1140+ }
10971141
10981142 SET_NO_MEMORY_INFO ()
10991143 SET_MEMORY_INFO_NAME (FeederEntry)
0 commit comments