@@ -846,8 +846,9 @@ pair<string, string> Connection::GetClientInfoBeforeAfterTid() const {
846846 }
847847 string after;
848848 absl::StrAppend (&after, " irqmatch=" , int (cpu == my_cpu_id));
849- if (dispatch_q_.size ()) {
850- absl::StrAppend (&after, " pipeline=" , dispatch_q_.size ());
849+ size_t total_pending = dispatch_q_.size () + pending_pipeline_cmd_cnt_;
850+ if (total_pending > 0 ) {
851+ absl::StrAppend (&after, " pipeline=" , total_pending);
851852 absl::StrAppend (&after, " pbuf=" , pending_pipeline_bytes_);
852853 }
853854 absl::StrAppend (&after, " age=" , now - creation_time_, " idle=" , now - last_interaction_);
@@ -1389,13 +1390,12 @@ bool Connection::ShouldEndAsyncFiber(const MessageHandle& msg) {
13891390 return false ;
13901391 }
13911392
1392- if (dispatch_q_.empty ()) {
1393+ if (dispatch_q_.empty () && (parsed_head_ == nullptr ) && (parsed_to_execute_ == nullptr ) ) {
13931394 // Migration requests means we should terminate this function (and allow the fiber to
13941395 // join), so that we can re-launch the fiber in the new thread.
13951396 // We intentionally return and not break in order to keep the connection open.
13961397 return true ;
13971398 }
1398-
13991399 // There shouldn't be any other migration requests in the queue, but it's worth checking
14001400 // as otherwise it would lead to an endless loop.
14011401 bool has_migration_req =
@@ -1607,8 +1607,15 @@ void Connection::AsyncFiber() {
16071607 bool subscriber_over_limit =
16081608 stats_->dispatch_queue_subscriber_bytes >= qbp.publish_buffer_limit ;
16091609
1610+ // Detect if we are blocked by a migration request while having pending data.
1611+ // If so, we must skip the admin queue temporarily to drain the data pipeline.
1612+ bool blocked_migration =
1613+ !dispatch_q_.empty () &&
1614+ std::holds_alternative<MigrationRequestMessage>(dispatch_q_.front ().handle ) &&
1615+ (parsed_head_ != nullptr );
1616+
16101617 // 1. Prioritize Control/Admin messages (dispatch_q_) over the data pipeline.
1611- if (!dispatch_q_.empty ()) {
1618+ if (!dispatch_q_.empty () && !blocked_migration ) {
16121619 MessageHandle msg = std::move (dispatch_q_.front ());
16131620 dispatch_q_.pop_front ();
16141621
@@ -1637,7 +1644,7 @@ void Connection::AsyncFiber() {
16371644 }
16381645
16391646 // 2. Handle Data Pipeline (Intrusive List) - only runs when dispatch_q_ is empty.
1640- DCHECK (dispatch_q_.empty ());
1647+ DCHECK (dispatch_q_.empty () || blocked_migration );
16411648 if (parsed_head_) {
16421649 // Ensure pointers are synchronized before deciding how to execute.
16431650 DCHECK_EQ (parsed_head_, parsed_to_execute_) << " Pointers diverged before dispatch decision." ;
@@ -1651,16 +1658,9 @@ void Connection::AsyncFiber() {
16511658 if (reply_builder_->GetError ())
16521659 break ;
16531660 } else {
1654- auto replies_recorded_before{reply_builder_-> RepliesRecorded ()};
1661+ // Execute the batch. (This puts data in the buffer but does NOT flush).
16551662 if (!ExecuteRedisBatch ())
16561663 break ; // Fatal error (e.g. socket closed or builder error)
1657-
1658- // If we drained the pipeline but produced no new output in this batch
1659- // (e.g. filtered PubSub messages), the ReplyBuilder might still be holding buffered
1660- // data from previous commands. We must flush now to ensure that data is sent.
1661- if (!parsed_head_ && (replies_recorded_before == reply_builder_->RepliesRecorded ())) {
1662- reply_builder_->Flush ();
1663- }
16641664 }
16651665 }
16661666
@@ -1679,26 +1679,36 @@ void Connection::AsyncFiber() {
16791679}
16801680
16811681bool Connection::ExecuteRedisBatch () {
1682+ size_t processed_in_this_loop{};
1683+ static constexpr size_t kYieldInterval {32 };
1684+ uint64_t start_cycles{CycleClock::Now ()};
1685+
16821686 while (parsed_to_execute_) {
16831687 // During the loop, new admin commands may arrive in the dispatch_q_. They should be
16841688 // prioritized.
16851689 if (!dispatch_q_.empty ()) {
16861690 return true ;
16871691 }
16881692
1689- // Yield if we have been running too long to prevent starvation.
1690- if (ThisFiber::GetRunningTimeCycles () > max_busy_read_cycles_cached) {
1693+ // Optimization: Check the yield budget only every kYieldInterval commands.
1694+ // We use && to short-circuit and avoid calling the expensive CycleClock::Now()
1695+ // on every iteration.
1696+ if ((++processed_in_this_loop > kYieldInterval ) &&
1697+ (CycleClock::Now () - start_cycles > max_busy_read_cycles_cached)) {
16911698 ThisFiber::Yield ();
1699+ processed_in_this_loop = 0 ;
1700+ start_cycles = CycleClock::Now ();
16921701 }
16931702
16941703 ++local_stats_.cmds ;
16951704 last_interaction_ = time (nullptr );
16961705
16971706 auto * cmd{parsed_to_execute_};
1698- service_->DispatchCommand (ParsedArgs{*cmd}, cmd, facade::AsyncPreference::ONLY_ASYNC);
16991707 parsed_to_execute_ = cmd->next ;
17001708 DCHECK_EQ (cmd, parsed_head_);
17011709 parsed_head_ = parsed_to_execute_;
1710+
1711+ service_->DispatchCommand (ParsedArgs{*cmd}, cmd, facade::AsyncPreference::ONLY_ASYNC);
17021712 ReleaseParsedCommand (cmd, true );
17031713
17041714 if (reply_builder_->GetError ()) {
@@ -1709,8 +1719,10 @@ bool Connection::ExecuteRedisBatch() {
17091719 // pipeline queue is empty - flush manually since we are not sure if more commands are coming.
17101720 if (parsed_head_ == nullptr ) {
17111721 parsed_tail_ = nullptr ;
1712- // NOTE: we do not flush here to allow for more commands to batch and let the
1713- // caller decide when to flush.
1722+ parsed_to_execute_ = nullptr ;
1723+ // We finished the pipeline, so we MUST send whatever data is pending
1724+ // to prevent clients from hanging.
1725+ reply_builder_->Flush ();
17141726 }
17151727
17161728 DCHECK_EQ (parsed_head_, parsed_to_execute_) << " ExecuteRedisBatch: Pointers diverged." ;
@@ -1753,7 +1765,17 @@ bool Connection::Migrate(util::fb2::ProactorBase* dest) {
17531765 CHECK (!cc_->async_dispatch );
17541766 CHECK_EQ (cc_->subscriptions , 0 ); // are bound to thread local caches
17551767 CHECK_EQ (self_.use_count (), 1u ); // references cache our thread and backpressure
1756- //
1768+ //
1769+ // Migration is only safe when both admin queue and pipeline are empty.
1770+ const bool has_pending_cmds = !dispatch_q_.empty () || (parsed_head_ != nullptr );
1771+ // IoLoop v1 uses the async fiber; v2 runs inline. In both cases we must be idle.
1772+ DCHECK (dispatch_q_.empty ()) << DebugInfo ();
1773+ DCHECK (parsed_head_ == nullptr );
1774+ if (has_pending_cmds) {
1775+ LOG (ERROR) << " Migration skipped (" << (ioloop_v2_ ? " io_v2" : " io_v1" )
1776+ << " ): pending work remains " << DebugInfo ();
1777+ return false ;
1778+ }
17571779 if (ioloop_v2_ && socket_ && socket_->IsOpen ()) {
17581780 socket_->ResetOnRecvHook ();
17591781 }
0 commit comments