diff --git a/changelog.d/24281_blackhole_sink_acknowledgements.fix.md b/changelog.d/24281_blackhole_sink_acknowledgements.fix.md new file mode 100644 index 0000000000000..d6b5c58b9cf1c --- /dev/null +++ b/changelog.d/24281_blackhole_sink_acknowledgements.fix.md @@ -0,0 +1,3 @@ +Fixed the blackhole sink to properly implement end-to-end acknowledgements. Previously, the sink consumed events without updating finalizer status, causing sources that depend on acknowledgements (like `aws_s3` with SQS) to never delete processed messages from the queue. + +authors: sanjams2 diff --git a/src/sinks/blackhole/mod.rs b/src/sinks/blackhole/mod.rs index 9d9046e208fde..d82a1c14bd89b 100644 --- a/src/sinks/blackhole/mod.rs +++ b/src/sinks/blackhole/mod.rs @@ -14,7 +14,8 @@ mod tests { blackhole::{config::BlackholeConfig, sink::BlackholeSink}, }, test_util::{ - components::run_and_assert_nonsending_sink_compliance, random_events_with_stream, + components::{SINK_TAGS, run_and_assert_sink_compliance}, + random_events_with_stream, }, }; @@ -29,6 +30,6 @@ mod tests { let sink = VectorSink::Stream(Box::new(sink)); let (_input_lines, events) = random_events_with_stream(100, 10, None); - run_and_assert_nonsending_sink_compliance(sink, events, &[]).await; + run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; } } diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index 8c934101545eb..ff0744913cc8b 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -21,7 +21,7 @@ use vector_lib::{ }; use crate::{ - event::{EventArray, EventContainer}, + event::{EventArray, EventContainer, EventStatus, Finalizable}, sinks::{blackhole::config::BlackholeConfig, util::StreamSink}, }; @@ -82,7 +82,7 @@ impl StreamSink for BlackholeSink { }); } - while let Some(events) = input.next().await { + while let Some(mut events) = input.next().await { if let Some(rate) = self.config.rate { let factor: f32 = 1.0 / rate as f32; let secs: f32 = factor * (events.len() as f32); @@ -98,6 +98,9 @@ impl StreamSink for BlackholeSink { .total_raw_bytes .fetch_add(message_len.get(), Ordering::AcqRel); + let finalizers = events.take_finalizers(); + finalizers.update_status(EventStatus::Delivered); + events_sent.emit(CountByteSize(events.len(), message_len)); bytes_sent.emit(ByteSize(message_len.get())); }