55
66#include " facade/dragonfly_connection.h"
77
8+ #include < absl/cleanup/cleanup.h>
89#include < absl/container/flat_hash_map.h>
910#include < absl/strings/escaping.h>
1011#include < absl/strings/match.h>
@@ -1112,6 +1113,7 @@ void Connection::ConnectionFlow() {
11121113 uint32_t prev_timeout{socket_->timeout ()};
11131114 static constexpr uint32_t kSocketDrainTimeoutMs {100 };
11141115 socket_->set_timeout (kSocketDrainTimeoutMs );
1116+ auto restore_timeout = absl::MakeCleanup ([&] { socket_->set_timeout (prev_timeout); });
11151117 while (!ec2) {
11161118 // Discard any received data.
11171119 io_buf_.Clear ();
@@ -1120,7 +1122,6 @@ void Connection::ConnectionFlow() {
11201122 break ; // Peer closed connection or timeout.
11211123 }
11221124 }
1123- socket_->set_timeout (prev_timeout);
11241125 }
11251126
11261127 if (ec && !FiberSocketBase::IsConnClosed (ec)) {
@@ -1165,7 +1166,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
11651166 request_consumed_bytes_ = 0 ;
11661167 bool has_more = consumed < read_buffer.size ();
11671168
1168- if (tl_traffic_logger.log_file && IsMain ()) {
1169+ if (tl_traffic_logger.log_file && IsMain () /* log only on the main interface */ ) {
11691170 LogTraffic (id_, has_more, *parsed_cmd_, service_->GetContextInfo (cc_.get ()));
11701171 }
11711172
@@ -1469,9 +1470,9 @@ void Connection::SquashPipeline() {
14691470
14701471 local_stats_.cmds += result.processed ;
14711472 last_interaction_ = time (nullptr );
1472- uint32_t dispatched = result.processed ;
1473- uint64_t before_flush = CycleClock::Now ();
1474- //
1473+ uint32_t num_dispatched_cmds = result.processed ;
1474+ uint64_t flush_start_cycle_cnt = CycleClock::Now ();
1475+
14751476 // TODO: to investigate if always flushing will improve P99 latency because otherwise we
14761477 // wait for the next batch to finish before fully flushing the current response.
14771478 if (pending_pipeline_cmd_cnt_ == pipeline_count ||
@@ -1486,19 +1487,20 @@ void Connection::SquashPipeline() {
14861487
14871488 if (result.account_in_stats ) {
14881489 stats_->pipeline_dispatch_calls ++;
1489- stats_->pipeline_dispatch_commands += dispatched;
1490- stats_->pipeline_dispatch_flush_usec += CycleClock::ToUsec (CycleClock::Now () - before_flush);
1490+ stats_->pipeline_dispatch_commands += num_dispatched_cmds;
1491+ stats_->pipeline_dispatch_flush_usec +=
1492+ CycleClock::ToUsec (CycleClock::Now () - flush_start_cycle_cnt);
14911493 }
14921494
14931495 // update parsed_to_execute_ pointer
1494- for (uint32_t i = 0 ; i < dispatched ; ++i) {
1496+ for (uint32_t i = 0 ; i < num_dispatched_cmds ; ++i) {
14951497 if (parsed_to_execute_) {
14961498 parsed_to_execute_ = parsed_to_execute_->next ;
14971499 }
14981500 }
14991501
15001502 auto * current{parsed_head_};
1501- for (size_t i{}; i < dispatched ; ++i) {
1503+ for (size_t i{}; i < num_dispatched_cmds ; ++i) {
15021504 DCHECK (current);
15031505 auto * next{current->next };
15041506
@@ -1515,7 +1517,7 @@ void Connection::SquashPipeline() {
15151517 }
15161518
15171519 // If interrupted due to pause, fall back to regular dispatch
1518- skip_next_squashing_ = (dispatched != pipeline_count);
1520+ skip_next_squashing_ = (num_dispatched_cmds != pipeline_count);
15191521}
15201522
15211523void Connection::ClearPipelinedMessages () {
@@ -1528,7 +1530,7 @@ void Connection::ClearPipelinedMessages() {
15281530 FiberAtomicGuard guard; // don't suspend when concluding to avoid getting new messages
15291531 if (msg.IsControl ())
15301532 visit (async_op, msg.handle ); // to not miss checkpoints
1531- RecycleIntrusiveMessage (std::move (msg));
1533+ UpdateIntrusiveMessageStats (std::move (msg));
15321534 }
15331535 dispatch_q_.clear ();
15341536
@@ -1541,7 +1543,7 @@ void Connection::ClearPipelinedMessages() {
15411543 if (stats_) {
15421544 stats_->dispatch_queue_bytes -= curr->UsedMemory ();
15431545 }
1544- RecyclePipelineCommand (curr);
1546+ UpdatePipelineCommandStats (curr);
15451547 PipelineMessagePtr ptr (curr); // reclaim ownership and destroy
15461548 }
15471549 parsed_head_ = nullptr ;
@@ -1647,7 +1649,7 @@ void Connection::AsyncFiber() {
16471649 }
16481650
16491651 if (ShouldEndAsyncFiber (msg)) {
1650- RecycleIntrusiveMessage (std::move (msg));
1652+ UpdateIntrusiveMessageStats (std::move (msg));
16511653 CHECK (dispatch_q_.empty ()) << DebugInfo ();
16521654 qbp.pipeline_cnd .notify_all ();
16531655 return ; // don't set conn closing flag
@@ -1656,7 +1658,7 @@ void Connection::AsyncFiber() {
16561658 cc_->async_dispatch = true ;
16571659 std::visit (async_op, msg.handle );
16581660 cc_->async_dispatch = false ;
1659- RecycleIntrusiveMessage (std::move (msg));
1661+ UpdateIntrusiveMessageStats (std::move (msg));
16601662
16611663 // Ensure we check for more control messages before touching data pipeline messages.
16621664 continue ;
@@ -1665,12 +1667,17 @@ void Connection::AsyncFiber() {
16651667 // 2. Handle Data Pipeline (Intrusive List) - only runs when dispatch_q_ is empty.
16661668 DCHECK (dispatch_q_.empty ());
16671669 if (parsed_head_) {
1670+ // Ensure pointers are synchronized before deciding how to execute.
1671+ DCHECK_EQ (parsed_head_, parsed_to_execute_) << " Pointers diverged before dispatch decision." ;
1672+
16681673 bool squashing_enabled{squashing_threshold > 0 };
16691674 bool threshold_reached{pending_pipeline_cmd_cnt_ > squashing_threshold};
16701675
16711676 if (squashing_enabled && threshold_reached && !skip_next_squashing_ &&
16721677 !IsReplySizeOverLimit ()) {
16731678 SquashPipeline ();
1679+ if (reply_builder_->GetError ())
1680+ break ;
16741681 } else {
16751682 auto replies_recorded_before{reply_builder_->RepliesRecorded ()};
16761683 if (!ExecuteRedisBatch ())
@@ -1692,7 +1699,7 @@ void Connection::AsyncFiber() {
16921699
16931700 if (subscriber_over_limit && stats_->dispatch_queue_subscriber_bytes < qbp.publish_buffer_limit )
16941701 qbp.pubsub_ec .notify ();
1695- }
1702+ } // while
16961703
16971704 DCHECK (cc_->conn_closing || reply_builder_->GetError ());
16981705 cc_->conn_closing = true ;
@@ -1721,8 +1728,8 @@ bool Connection::ExecuteRedisBatch() {
17211728 if (cmd == parsed_head_) {
17221729 // We've just executed the head of the pipeline queue. Advance the head and recycle command.
17231730 parsed_head_ = parsed_to_execute_;
1724- ReleaseParsedCommand (cmd, true );
17251731 }
1732+ ReleaseParsedCommand (cmd, true );
17261733
17271734 if (reply_builder_->GetError ()) {
17281735 return false ;
@@ -1911,7 +1918,7 @@ void Connection::SendAsync(MessageHandle msg) {
19111918 }
19121919}
19131920
1914- void Connection::RecycleIntrusiveMessage (MessageHandle msg) {
1921+ void Connection::UpdateIntrusiveMessageStats (MessageHandle msg) {
19151922 size_t used_mem = msg.UsedMemory ();
19161923
19171924 stats_->dispatch_queue_bytes -= used_mem;
@@ -1923,9 +1930,11 @@ void Connection::RecycleIntrusiveMessage(MessageHandle msg) {
19231930 qbp.subscriber_bytes .fetch_sub (used_mem, memory_order_relaxed);
19241931 stats_->dispatch_queue_subscriber_bytes -= used_mem;
19251932 }
1933+
1934+ DCHECK_GE (stats_->dispatch_queue_bytes , stats_->dispatch_queue_subscriber_bytes );
19261935}
19271936
1928- void Connection::RecyclePipelineCommand (facade::ParsedCommand* cmd) {
1937+ void Connection::UpdatePipelineCommandStats (facade::ParsedCommand* cmd) {
19291938 DCHECK (cmd);
19301939
19311940 if (cmd->parsed_cycle ) {
0 commit comments