@@ -202,12 +202,12 @@ class QueueImpl final {
202202 // If we are already closed or errored, set totalQueueSize to zero.
203203 void maybeUpdateBackpressure () {
204204 totalQueueSize = 0 ;
205- KJ_IF_SOME (ready, state.tryGetActiveUnsafe () ) {
205+ state.whenActive ([ this ](Ready& ready ) {
206206 auto consumers = ready.consumers .snapshot ();
207207 for (auto consumer: consumers) {
208208 totalQueueSize = kj::max (totalQueueSize, consumer->size ());
209209 }
210- }
210+ });
211211 }
212212
213213 // Forwards the entry to all consumers (except skipConsumer if given).
@@ -483,12 +483,12 @@ class ConsumerImpl final {
483483
484484 void cancelPendingReads (jsg::Lock& js, jsg::JsValue reason) {
485485 // Already closed or errored - nothing to do.
486- KJ_IF_SOME (ready, state.tryGetActiveUnsafe () ) {
486+ state.whenActive ([&](Ready& ready ) {
487487 for (auto & request: ready.readRequests ) {
488488 request->resolver .reject (js, reason);
489489 }
490490 ready.readRequests .clear ();
491- }
491+ });
492492 }
493493
494494 void visitForGc (jsg::GcVisitor& visitor) {
@@ -552,13 +552,9 @@ class ConsumerImpl final {
552552 bool isClosing () {
553553 // Closing state is determined by whether there is a Close sentinel that has been
554554 // pushed into the end of Ready state buffer.
555- KJ_IF_SOME (ready, state.tryGetActiveUnsafe ()) {
556- if (ready.buffer .empty ()) {
557- return false ;
558- }
559- return ready.buffer .back ().template is <Close>();
560- }
561- return false ;
555+ return state.whenActiveOr ([](Ready& ready) {
556+ return !ready.buffer .empty () && ready.buffer .back ().template is <Close>();
557+ }, false );
562558 }
563559
564560 void maybeDrainAndSetState (jsg::Lock& js, kj::Maybe<jsg::Value> maybeReason = kj::none) {
0 commit comments