Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion lib/vector-buffers/src/buffer_usage_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,10 @@ impl BufferUsage {
/// The `buffer_id` should be a unique name -- ideally the `component_id` of the sink using this buffer -- but is
/// not used for anything other than reporting, and so has no _requirement_ to be unique.
pub fn install(self, buffer_id: &str) {
let buffer_id = buffer_id.to_string();
let span = self.span;
let stages = self.stages;
let task_name = format!("buffer usage reporter ({buffer_id})");

let task = async move {
let mut interval = interval(Duration::from_secs(2));
Expand All @@ -264,6 +266,7 @@ impl BufferUsage {
let received = stage.received.consume();
if received.has_updates() {
emit(BufferEventsReceived {
buffer_id: buffer_id.clone(),
idx: stage.idx,
count: received.event_count,
byte_size: received.event_byte_size,
Expand All @@ -273,6 +276,7 @@ impl BufferUsage {
let sent = stage.sent.consume();
if sent.has_updates() {
emit(BufferEventsSent {
buffer_id: buffer_id.clone(),
idx: stage.idx,
count: sent.event_count,
byte_size: sent.event_byte_size,
Expand All @@ -282,6 +286,7 @@ impl BufferUsage {
let dropped = stage.dropped.consume();
if dropped.has_updates() {
emit(BufferEventsDropped {
buffer_id: buffer_id.clone(),
idx: stage.idx,
intentional: false,
reason: "corrupted_events",
Expand All @@ -293,6 +298,7 @@ impl BufferUsage {
let dropped_intentional = stage.dropped_intentional.consume();
if dropped_intentional.has_updates() {
emit(BufferEventsDropped {
buffer_id: buffer_id.clone(),
idx: stage.idx,
intentional: true,
reason: "drop_newest",
Expand All @@ -304,7 +310,6 @@ impl BufferUsage {
}
};

let task_name = format!("buffer usage reporter ({buffer_id})");
spawn_named(task.instrument(span.or_current()), task_name.as_str());
}
}
Loading
Loading