@@ -98,6 +98,10 @@ ABSL_FLAG(uint32_t, max_busy_read_usec, 100,
98
98
" Maximum time we read and parse from "
99
99
" a socket without yielding. In microseconds." );
100
100
101
+ ABSL_FLAG (size_t , squashed_reply_size_limit, 0 ,
102
+ " Max bytes allowed for squashing_current_reply_size. If this limit is reached, "
103
+ " connections dispatching pipelines won't squash them." );
104
+
101
105
using namespace util ;
102
106
using namespace std ;
103
107
using absl::GetFlag;
@@ -180,6 +184,8 @@ bool TrafficLogger::Write(iovec* blobs, size_t len) {
180
184
thread_local TrafficLogger tl_traffic_logger{};
181
185
thread_local base::Histogram* io_req_size_hist = nullptr ;
182
186
187
+ thread_local const size_t reply_size_limit = absl::GetFlag(FLAGS_squashed_reply_size_limit);
188
+
183
189
void OpenTrafficLogger (string_view base_path) {
184
190
unique_lock lk{tl_traffic_logger.mutex };
185
191
if (tl_traffic_logger.log_file )
@@ -1158,7 +1164,7 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
1158
1164
last_interaction_ = time (nullptr );
1159
1165
1160
1166
// We might have blocked the dispatch queue from processing, wake it up.
1161
- if (dispatch_q_.size () > 0 )
1167
+ if (! dispatch_q_.empty () )
1162
1168
cnd_.notify_one ();
1163
1169
}
1164
1170
}
@@ -1632,7 +1638,8 @@ void Connection::AsyncFiber() {
1632
1638
bool squashing_enabled = squashing_threshold > 0 ;
1633
1639
bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold;
1634
1640
bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size ();
1635
- if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) {
1641
+ if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_ &&
1642
+ !IsReplySizeOverLimit ()) {
1636
1643
SquashPipeline ();
1637
1644
} else {
1638
1645
MessageHandle msg = std::move (dispatch_q_.front ());
@@ -2059,6 +2066,16 @@ void Connection::DecrNumConns() {
2059
2066
--stats_->num_conns_other ;
2060
2067
}
2061
2068
2069
+ bool Connection::IsReplySizeOverLimit () const {
2070
+ std::atomic<size_t >& reply_sz = tl_facade_stats->reply_stats .squashing_current_reply_size ;
2071
+ size_t current = reply_sz.load (std::memory_order_acquire);
2072
+ const bool over_limit = reply_size_limit != 0 && current > 0 && current > reply_size_limit;
2073
+ // Every 10 seconds. Otherwise, it can be too sensitive on certain workloads in production
2074
+ // instances.
2075
+ LOG_EVERY_N (INFO, 10 ) << " MultiCommandSquasher overlimit: " << current << " /" << reply_size_limit;
2076
+ return over_limit;
2077
+ }
2078
+
2062
2079
void Connection::SetMaxQueueLenThreadLocal (unsigned tid, uint32_t val) {
2063
2080
thread_queue_backpressure[tid].pipeline_queue_max_len = val;
2064
2081
thread_queue_backpressure[tid].pipeline_cnd .notify_all ();
@@ -2089,7 +2106,7 @@ void Connection::EnsureMemoryBudget(unsigned tid) {
2089
2106
2090
2107
Connection::WeakRef::WeakRef (std::shared_ptr<Connection> ptr, unsigned thread_id,
2091
2108
uint32_t client_id)
2092
- : ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} {
2109
+ : ptr_{std::move ( ptr) }, thread_id_{thread_id}, client_id_{client_id} {
2093
2110
}
2094
2111
2095
2112
unsigned Connection::WeakRef::Thread () const {
@@ -2115,7 +2132,7 @@ uint32_t Connection::WeakRef::GetClientId() const {
2115
2132
return client_id_;
2116
2133
}
2117
2134
2118
- bool Connection::WeakRef::operator <(const WeakRef& other) {
2135
+ bool Connection::WeakRef::operator <(const WeakRef& other) const {
2119
2136
return client_id_ < other.client_id_ ;
2120
2137
}
2121
2138
0 commit comments