|
18 | 18 | #include <memory> |
19 | 19 | #include <vector> |
20 | 20 |
|
| 21 | +#include "../quic/streams.h" |
| 22 | + |
21 | 23 | namespace node { |
22 | 24 |
|
23 | 25 | using v8::ArrayBufferView; |
@@ -1062,9 +1064,81 @@ class FdEntry final : public EntryImpl { |
1062 | 1064 | friend class ReaderImpl; |
1063 | 1065 | }; |
1064 | 1066 |
|
| 1067 | +} // namespace |
| 1068 | +// ============================================================================ |
| 1069 | + |
| 1070 | +class FeederEntry final : public EntryImpl { |
| 1071 | + public: |
| 1072 | + FeederEntry(DataQueueFeeder* feeder) : feeder_(feeder) { |
| 1073 | + } |
| 1074 | + |
| 1075 | + static std::unique_ptr<FeederEntry> Create(DataQueueFeeder* feeder) { |
| 1076 | + return std::make_unique<FeederEntry>(feeder); |
| 1077 | + } |
| 1078 | + |
| 1079 | + std::shared_ptr<DataQueue::Reader> get_reader() override { |
| 1080 | + return ReaderImpl::Create(this); |
| 1081 | + } |
| 1082 | + |
| 1083 | + std::unique_ptr<Entry> slice( |
| 1084 | + uint64_t start, std::optional<uint64_t> end = std::nullopt) override { |
| 1085 | + // we are not idempotent |
| 1086 | + return std::unique_ptr<Entry>(nullptr); |
| 1087 | + } |
| 1088 | + |
| 1089 | + std::optional<uint64_t> size() const override { |
| 1090 | + return std::optional<uint64_t>(); |
| 1091 | + } |
| 1092 | + |
| 1093 | + bool is_idempotent() const override { return false; } |
| 1094 | + |
| 1095 | + SET_NO_MEMORY_INFO() |
| 1096 | + SET_MEMORY_INFO_NAME(FeederEntry) |
| 1097 | + SET_SELF_SIZE(FeederEntry) |
| 1098 | + |
| 1099 | + private: |
| 1100 | + DataQueueFeeder* feeder_; |
| 1101 | + |
| 1102 | + class ReaderImpl final : public DataQueue::Reader, |
| 1103 | + public std::enable_shared_from_this<ReaderImpl> { |
| 1104 | + public: |
| 1105 | + static std::shared_ptr<ReaderImpl> Create(FeederEntry* entry) { |
| 1106 | + return std::make_shared<ReaderImpl>(entry); |
| 1107 | + }; |
| 1108 | + |
| 1109 | + explicit ReaderImpl(FeederEntry* entry) : entry_(entry) { |
| 1110 | + } |
| 1111 | + |
| 1112 | + ~ReaderImpl() { |
| 1113 | + entry_->feeder_->DrainAndClose(); |
| 1114 | + } |
| 1115 | + |
| 1116 | + int Pull(Next next, |
| 1117 | + int options, |
| 1118 | + DataQueue::Vec* data, |
| 1119 | + size_t count, |
| 1120 | + size_t max_count_hint = bob::kMaxCountHint) override { |
| 1121 | + if (entry_->feeder_->Done()) { |
| 1122 | + std::move(next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {}); |
| 1123 | + return bob::STATUS_EOS; |
| 1124 | + } |
| 1125 | + entry_->feeder_->addPendingPull( |
| 1126 | + DataQueueFeeder::PendingPull(std::move(next))); |
| 1127 | + entry_->feeder_->tryWakePulls(); |
| 1128 | + return bob::STATUS_WAIT; |
| 1129 | + } |
| 1130 | + |
| 1131 | + SET_NO_MEMORY_INFO() |
| 1132 | + SET_MEMORY_INFO_NAME(FeederEntry::Reader) |
| 1133 | + SET_SELF_SIZE(ReaderImpl) |
| 1134 | + |
| 1135 | + private: |
| 1136 | + FeederEntry* entry_; |
| 1137 | + }; |
| 1138 | +}; |
| 1139 | + |
1065 | 1140 | // ============================================================================ |
1066 | 1141 |
|
1067 | | -} // namespace |
1068 | 1142 |
|
1069 | 1143 | std::shared_ptr<DataQueue> DataQueue::CreateIdempotent( |
1070 | 1144 | std::vector<std::unique_ptr<Entry>> list) { |
@@ -1138,6 +1212,11 @@ std::unique_ptr<DataQueue::Entry> DataQueue::CreateFdEntry(Environment* env, |
1138 | 1212 | return FdEntry::Create(env, path); |
1139 | 1213 | } |
1140 | 1214 |
|
| 1215 | +std::unique_ptr<DataQueue::Entry> DataQueue::CreateFeederEntry( |
| 1216 | + DataQueueFeeder* feeder) { |
| 1217 | + return FeederEntry::Create(feeder); |
| 1218 | +} |
| 1219 | + |
1141 | 1220 | void DataQueue::Initialize(Environment* env, v8::Local<v8::Object> target) { |
1142 | 1221 | // Nothing to do here currently. |
1143 | 1222 | } |
|
0 commit comments