diff --git a/changelog.d/23872_buffer_counter_underflowed.fix.md b/changelog.d/23872_buffer_counter_underflowed.fix.md new file mode 100644 index 0000000000000..cf0a3a4449b16 --- /dev/null +++ b/changelog.d/23872_buffer_counter_underflowed.fix.md @@ -0,0 +1,3 @@ +Fix buffer counter underflowed, caused by the counter has not been updated(increase) timely when new event is coming. + +authors: sialais diff --git a/lib/vector-buffers/src/topology/channel/sender.rs b/lib/vector-buffers/src/topology/channel/sender.rs index cdfc41912f05b..a4f948dcd7641 100644 --- a/lib/vector-buffers/src/topology/channel/sender.rs +++ b/lib/vector-buffers/src/topology/channel/sender.rs @@ -202,8 +202,16 @@ impl BufferSender { .as_ref() .map(|_| (item.event_count(), item.size_of())); - let mut sent_to_base = true; let mut was_dropped = false; + + if let Some(instrumentation) = self.instrumentation.as_ref() + && let Some((item_count, item_size)) = item_sizing + { + instrumentation.increment_received_event_count_and_byte_size( + item_count as u64, + item_size as u64, + ); + } match self.when_full { WhenFull::Block => self.base.send(item).await?, WhenFull::DropNewest => { @@ -213,7 +221,7 @@ impl BufferSender { } WhenFull::Overflow => { if let Some(item) = self.base.try_send(item).await? { - sent_to_base = false; + was_dropped = true; self.overflow .as_mut() .unwrap_or_else(|| unreachable!("overflow must exist")) @@ -223,23 +231,9 @@ impl BufferSender { } } - if (sent_to_base || was_dropped) - && let (Some(send_duration), Some(send_reference)) = - (self.send_duration.as_ref(), send_reference) - { - send_duration.emit(send_reference.elapsed()); - } - if let Some(instrumentation) = self.instrumentation.as_ref() && let Some((item_count, item_size)) = item_sizing { - if sent_to_base { - instrumentation.increment_received_event_count_and_byte_size( - item_count as u64, - item_size as u64, - ); - } - if was_dropped { instrumentation.increment_dropped_event_count_and_byte_size( item_count as u64, @@ -248,6 +242,9 @@ impl BufferSender { ); } } + if let (Some(send_duration), Some(send_reference)) = (self.send_duration.as_ref(), send_reference) { + send_duration.emit(send_reference.elapsed()); + } Ok(()) }