Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -798,20 +798,19 @@ impl DefaultMQPushConsumerImpl {
let cached_message_size_in_mib = pull_request.process_queue.msg_size() / _1MB;
if cached_message_count > self.consumer_config.pull_threshold_for_queue as u64 {
if self.queue_flow_control_times % 1000 == 0 {
let msg_tree_map = pull_request.process_queue.msg_tree_map.read().await;
let first_key_value = msg_tree_map.first_key_value().unwrap();
let last_key_value = msg_tree_map.last_key_value().unwrap();
warn!(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, \
maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
self.consumer_config.pull_threshold_for_queue,
first_key_value.0,
last_key_value.0,
cached_message_count,
cached_message_size_in_mib,
pull_request.to_string(),
self.queue_flow_control_times
);
if let Some((min_offset, max_offset)) = pull_request.process_queue.get_offset_span().await {
warn!(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, \
maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
self.consumer_config.pull_threshold_for_queue,
min_offset,
max_offset,
cached_message_count,
cached_message_size_in_mib,
pull_request.to_string(),
self.queue_flow_control_times
);
}
}
self.execute_pull_request_later(pull_request, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);

Expand All @@ -821,20 +820,19 @@ impl DefaultMQPushConsumerImpl {

if cached_message_size_in_mib > self.consumer_config.pull_threshold_size_for_queue as u64 {
if self.queue_flow_control_times % 1000 == 0 {
let msg_tree_map = pull_request.process_queue.msg_tree_map.read().await;
let first_key_value = msg_tree_map.first_key_value().unwrap();
let last_key_value = msg_tree_map.last_key_value().unwrap();
warn!(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, \
maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
self.consumer_config.pull_threshold_size_for_queue,
first_key_value.0,
last_key_value.0,
cached_message_count,
cached_message_size_in_mib,
pull_request.to_string(),
self.queue_flow_control_times
);
if let Some((min_offset, max_offset)) = pull_request.process_queue.get_offset_span().await {
warn!(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, \
maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
self.consumer_config.pull_threshold_size_for_queue,
min_offset,
max_offset,
cached_message_count,
cached_message_size_in_mib,
pull_request.to_string(),
self.queue_flow_control_times
);
}
}
self.execute_pull_request_later(pull_request, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
self.queue_flow_control_times += 1;
Expand All @@ -845,18 +843,17 @@ impl DefaultMQPushConsumerImpl {
let max_span = pull_request.process_queue.get_max_span().await;
if max_span > self.consumer_config.consume_concurrently_max_span as u64 {
if self.queue_max_span_flow_control_times % 1000 == 0 {
let msg_tree_map = pull_request.process_queue.msg_tree_map.read().await;
let first_key_value = msg_tree_map.first_key_value().unwrap();
let last_key_value = msg_tree_map.last_key_value().unwrap();
warn!(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, \
maxSpan={}, pullRequest={}, flowControlTimes={}",
first_key_value.0,
last_key_value.0,
max_span,
pull_request.to_string(),
self.queue_max_span_flow_control_times
);
if let Some((min_offset, max_offset)) = pull_request.process_queue.get_offset_span().await {
warn!(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, \
maxSpan={}, pullRequest={}, flowControlTimes={}",
min_offset,
max_offset,
max_span,
pull_request.to_string(),
self.queue_max_span_flow_control_times
);
}
}
self.queue_max_span_flow_control_times += 1;
self.execute_pull_request_later(pull_request, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
Expand Down
Loading
Loading