@@ -153,7 +153,7 @@ class QueueImpl final {
153153 using Entry = typename Self::Entry;
154154 using State = typename Self::State;
155155
156- explicit QueueImpl (size_t highWaterMark): highWaterMark(highWaterMark) {}
156+ explicit QueueImpl (double highWaterMark): highWaterMark(highWaterMark) {}
157157
158158 QueueImpl (QueueImpl&&) = default ;
159159 QueueImpl& operator =(QueueImpl&&) = default ;
@@ -176,7 +176,7 @@ class QueueImpl final {
176176 // The value can be zero or negative, in which case backpressure is
177177 // signaled on the queue.
178178 // If the queue is already closed or errored, return 0.
179- inline ssize_t desiredSize () const {
179+ inline double desiredSize () const {
180180 return state.template is <Ready>() ? highWaterMark - size () : 0 ;
181181 }
182182
@@ -231,7 +231,7 @@ class QueueImpl final {
231231 }
232232
233233 // The current size of consumer with the most stored data.
234- size_t size () const {
234+ double size () const {
235235 return totalQueueSize;
236236 }
237237
@@ -293,8 +293,8 @@ class QueueImpl final {
293293 SmallSet<ConsumerImpl*> consumers;
294294 };
295295
296- size_t highWaterMark;
297- size_t totalQueueSize = 0 ;
296+ double highWaterMark;
297+ double totalQueueSize = 0 ;
298298 kj::OneOf<Ready, Closed, Errored> state = Ready();
299299
300300 void addConsumer (ConsumerImpl* consumer) {
@@ -432,7 +432,7 @@ class ConsumerImpl final {
432432 }
433433
434434 // The current total calculated size of the consumer's internal buffer.
435- size_t size () const {
435+ double size () const {
436436 KJ_SWITCH_ONEOF (state) {
437437 KJ_CASE_ONEOF (e, Errored) {
438438 return 0 ;
@@ -550,7 +550,7 @@ class ConsumerImpl final {
550550 struct Ready {
551551 workerd::RingBuffer<kj::OneOf<QueueEntry, Close>, 16 > buffer;
552552 std::list<ReadRequest> readRequests;
553- size_t queueTotalSize = 0 ;
553+ double queueTotalSize = 0 ;
554554
555555 inline kj::StringPtr jsgGetMemoryName () const ;
556556 inline size_t jsgGetMemorySelfSize () const ;
@@ -712,7 +712,7 @@ class ValueQueue final {
712712
713713 void reset ();
714714
715- size_t size ();
715+ double size ();
716716
717717 kj::Own<Consumer> clone (
718718 jsg::Lock& js, kj::Maybe<ConsumerImpl::StateListener&> stateListener = kj::none);
@@ -732,19 +732,19 @@ class ValueQueue final {
732732 friend class ValueQueue ;
733733 };
734734
735- explicit ValueQueue (size_t highWaterMark);
735+ explicit ValueQueue (double highWaterMark);
736736
737737 void close (jsg::Lock& js);
738738
739- ssize_t desiredSize () const ;
739+ double desiredSize () const ;
740740
741741 void error (jsg::Lock& js, jsg::Value reason);
742742
743743 void maybeUpdateBackpressure ();
744744
745745 void push (jsg::Lock& js, kj::Rc<Entry> entry);
746746
747- size_t size () const ;
747+ double size () const ;
748748
749749 size_t getConsumerCount ();
750750
@@ -932,7 +932,7 @@ class ByteQueue final {
932932
933933 void reset ();
934934
935- size_t size () const ;
935+ double size () const ;
936936
937937 kj::Own<Consumer> clone (
938938 jsg::Lock& js, kj::Maybe<ConsumerImpl::StateListener&> stateListener = kj::none);
@@ -949,19 +949,19 @@ class ByteQueue final {
949949 ConsumerImpl impl;
950950 };
951951
952- explicit ByteQueue (size_t highWaterMark);
952+ explicit ByteQueue (double highWaterMark);
953953
954954 void close (jsg::Lock& js);
955955
956- ssize_t desiredSize () const ;
956+ double desiredSize () const ;
957957
958958 void error (jsg::Lock& js, jsg::Value reason);
959959
960960 void maybeUpdateBackpressure ();
961961
962962 void push (jsg::Lock& js, kj::Rc<Entry> entry);
963963
964- size_t size () const ;
964+ double size () const ;
965965
966966 size_t getConsumerCount ();
967967
0 commit comments