diff --git a/rust/numaflow-core/src/pipeline/forwarder/sink_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/sink_forwarder.rs index 9249c06776..54154ca0c0 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/sink_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/sink_forwarder.rs @@ -342,6 +342,530 @@ where Ok((task, isb_reader)) } +#[cfg(test)] +mod simple_buffer_tests { + use super::*; + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + use bytes::Bytes; + use chrono::Utc; + use numaflow::sink; + use numaflow_testing::simplebuffer::SimpleBuffer; + use tokio::sync::mpsc::Receiver; + use tokio_util::sync::CancellationToken; + + use crate::config::pipeline::isb::{BufferReaderConfig, Stream}; + use crate::message::{IntOffset, Message, MessageID, Offset}; + use crate::pipeline::isb::reader::{ISBReaderComponents, ISBReaderOrchestrator}; + use crate::pipeline::isb::simplebuffer::{SimpleBufferAdapter, WithSimpleBuffer}; + use crate::sinker::test_utils::{SinkTestHandle, SinkType as TestSinkType}; + use crate::tracker::Tracker; + + const TIMEOUT: u64 = 2; + + /// A UD sink that writes received messages into a SimpleBuffer for verification. + /// Uses a shared atomic counter to generate globally unique IDs for each write, + /// avoiding dedup collisions when multiple partitions send messages with similar IDs. + /// (Since we only have single simple buffer to write all the data to) + struct SimpleBufferSink { + output_buffer: SimpleBuffer, + counter: Arc, + } + + #[tonic::async_trait] + impl sink::Sinker for SimpleBufferSink { + async fn sink(&self, mut input: Receiver) -> Vec { + let mut responses = vec![]; + let writer = self.output_buffer.writer(); + while let Some(datum) = input.recv().await { + // Generate a unique ID so that we can avoid id collisions + // during multi partition tests. (Since our sink is backed by a single simple buffer) + let unique_id = format!( + "{}-{}", + datum.id, + self.counter.fetch_add(1, Ordering::Relaxed) + ); + let _ = writer + .write(unique_id, Bytes::from(datum.value), HashMap::new()) + .await; + responses.push(sink::Response::ok(datum.id)); + } + responses + } + } + + /// Helper to create and write test messages into a SimpleBufferAdapter via its ISBWriter. + async fn write_test_messages(adapter: &SimpleBufferAdapter, count: usize) { + use crate::pipeline::isb::ISBWriter; + let writer = adapter.writer(); + for i in 0..count { + let msg = Message { + typ: Default::default(), + keys: Arc::from(vec![format!("key-{}", i)]), + tags: None, + value: Bytes::from(format!("payload-{}", i)), + offset: Offset::Int(IntOffset::new(i as i64, 0)), + event_time: Utc::now(), + watermark: None, + id: MessageID { + vertex_name: "test-in".into(), + index: i as i32, + offset: format!("{}", i).into(), + }, + headers: Arc::new(HashMap::new()), + metadata: None, + is_late: false, + ack_handle: None, + }; + writer.write(msg).await.expect("write should succeed"); + } + } + + // End-to-end test for sink forwarder using SimpleBuffer. + // Reads from a SimpleBuffer-backed ISB, writes to a UD sink that stores + // messages in a different SimpleBuffer for verification. + #[tokio::test] + async fn test_sink_forwarder_with_single_stream() { + const MESSAGE_COUNT: usize = 10; + + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let batch_size = 500; + + // Input buffer + let input_adapter = SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "sink-input")); + + // Write all the messages to the input buffer + write_test_messages(&input_adapter, MESSAGE_COUNT).await; + + // Output buffer for verification (the UD sink writes here) + let output_buffer = SimpleBuffer::new(1000, 0, "sink-output"); + + // Create the UD sink backed by the output buffer + let sink_svc = SimpleBufferSink { + output_buffer: output_buffer.clone(), + counter: Arc::new(AtomicUsize::new(0)), + }; + + let SinkTestHandle { + sink_writer, + ud_sink_server_handle: _server_handle, + .. + } = SinkTestHandle::create_sink( + TestSinkType::UserDefined(sink_svc), + None, + None, + batch_size, + ) + .await; + + // ISB Reader Orchestrator + let input_stream = Stream::new("sink-input-buffer", "test-in", 0); + let buf_reader_config = BufferReaderConfig { + streams: vec![input_stream.clone()], + wip_ack_interval: Duration::from_millis(10), + ..Default::default() + }; + + let reader_components = ISBReaderComponents { + vertex_type: "Sink".to_string(), + stream: input_stream, + config: buf_reader_config, + tracker: tracker.clone(), + batch_size, + read_timeout: Duration::from_millis(500), + watermark_handle: None, + isb_config: None, + cln_token: cln_token.clone(), + }; + + let isb_reader: ISBReaderOrchestrator = + ISBReaderOrchestrator::new(reader_components, input_adapter.reader(), None) + .await + .unwrap(); + + // Create and start the SinkForwarder + let forwarder = SinkForwarder::::new(isb_reader, sink_writer).await; + + let forwarder_cln = cln_token.clone(); + let forwarder_handle = tokio::spawn(async move { forwarder.start(forwarder_cln).await }); + + // Wait until all messages appear in the output buffer + let result = tokio::time::timeout(Duration::from_secs(TIMEOUT), async { + loop { + if output_buffer.pending_count() >= MESSAGE_COUNT { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await; + + assert!( + result.is_ok(), + "Timed out waiting for messages in output buffer. Got {} of {}", + output_buffer.pending_count(), + MESSAGE_COUNT, + ); + + assert_eq!( + output_buffer.pending_count(), + MESSAGE_COUNT, + "All messages should be forwarded to the sink output buffer" + ); + + // Shutdown + cln_token.cancel(); + let forwarder_result = + tokio::time::timeout(Duration::from_secs(TIMEOUT), forwarder_handle).await; + assert!( + forwarder_result.is_ok(), + "Forwarder task should complete gracefully" + ); + } + + // End-to-end test for sink forwarder with multiple input streams. + // Reads from multiple SimpleBuffer-backed ISB partitions and writes to + // separate UD sinks (one per stream) that all + // store messages in the same SimpleBuffer for verification. + #[tokio::test] + async fn test_sink_forwarder_with_multi_streams() { + const MESSAGE_COUNT: usize = 100; + const NUM_PARTITIONS: usize = 5; + + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let batch_size = 500; + + // Shared output buffer for verification (all UD sinks write here) + let output_buffer = SimpleBuffer::new(10000, 0, "sink-multi-output"); + // Shared counter to ensure globally unique IDs across all sink instances + let shared_counter = Arc::new(AtomicUsize::new(0)); + + // Create input buffers and streams + let input_adapters: Vec = (0..NUM_PARTITIONS) + .map(|i| { + let name: &'static str = + Box::leak(format!("sink-multi-input-{}", i).into_boxed_str()); + SimpleBufferAdapter::new(SimpleBuffer::new(10000, i as u16, name)) + }) + .collect(); + + let input_streams: Vec = (0..NUM_PARTITIONS) + .map(|i| { + let name: &'static str = + Box::leak(format!("default-test-sink-forwarder-in-{}", i).into_boxed_str()); + Stream::new(name, "test", i as u16) + }) + .collect(); + + // Write messages to all input buffers + for adapter in &input_adapters { + write_test_messages(adapter, MESSAGE_COUNT).await; + } + + let buf_reader_config = BufferReaderConfig { + streams: input_streams.clone(), + wip_ack_interval: Duration::from_millis(10), + ..Default::default() + }; + + // Create one SinkWriter + SinkForwarder per stream + let mut forwarder_handles = vec![]; + let mut _server_handles = vec![]; + for (i, input_stream) in input_streams.iter().enumerate() { + // Each stream gets its own UD sink server writing to the shared output buffer + let sink_svc = SimpleBufferSink { + output_buffer: output_buffer.clone(), + counter: Arc::clone(&shared_counter), + }; + + let SinkTestHandle { + sink_writer, + ud_sink_server_handle, + .. + } = SinkTestHandle::create_sink( + TestSinkType::UserDefined(sink_svc), + None, + None, + batch_size, + ) + .await; + _server_handles.push(ud_sink_server_handle); + + let reader_components = ISBReaderComponents { + vertex_type: "Sink".to_string(), + stream: input_stream.clone(), + config: buf_reader_config.clone(), + tracker: tracker.clone(), + batch_size, + read_timeout: Duration::from_millis(500), + watermark_handle: None, + isb_config: None, + cln_token: cln_token.clone(), + }; + + let isb_reader: ISBReaderOrchestrator = ISBReaderOrchestrator::new( + reader_components, + input_adapters.get(i).unwrap().reader(), + None, + ) + .await + .unwrap(); + + let forwarder = SinkForwarder::::new(isb_reader, sink_writer).await; + + let forwarder_cln = cln_token.clone(); + let handle = tokio::spawn(async move { forwarder.start(forwarder_cln).await }); + forwarder_handles.push(handle); + } + + let total_expected = MESSAGE_COUNT * NUM_PARTITIONS; + + // Wait until all messages exit the input buffers + let input_result = tokio::time::timeout(Duration::from_secs(TIMEOUT), async { + for adapter in &input_adapters { + loop { + if adapter.pending_count() == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + } + }) + .await; + + assert!( + input_result.is_ok(), + "Timed out waiting for messages to exit input buffers." + ); + + // Ensure all messages have left the input buffers + for adapter in &input_adapters { + assert_eq!( + adapter.pending_count(), + 0, + "All messages should be consumed from input buffer" + ); + } + + // Wait until all messages appear in the output buffer + let output_result = tokio::time::timeout(Duration::from_secs(TIMEOUT), async { + loop { + if output_buffer.pending_count() >= total_expected { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await; + + assert!( + output_result.is_ok(), + "Timed out waiting for messages in output buffer. Got {} of {}", + output_buffer.pending_count(), + total_expected, + ); + + assert_eq!( + output_buffer.pending_count(), + total_expected, + "All messages from all partitions should be forwarded to the sink" + ); + + // Shutdown + cln_token.cancel(); + for handle in forwarder_handles { + let forwarder_result = tokio::time::timeout(Duration::from_secs(TIMEOUT), handle).await; + assert!( + forwarder_result.is_ok(), + "Forwarder task should complete gracefully" + ); + } + } + + // Test sink forwarder with a blackhole (builtin) sink using SimpleBuffer input. + // This verifies the forwarder completes without errors even when using a + // builtin sink that doesn't produce output. + #[tokio::test] + async fn test_sink_forwarder_with_blackhole_sink() { + const MESSAGE_COUNT: usize = 10; + + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let batch_size = 500; + + // Input buffer + let input_adapter = SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "bh-sink-input")); + + // Write all the messages to the input buffer + write_test_messages(&input_adapter, MESSAGE_COUNT).await; + + // Create a blackhole sink writer + use crate::sinker::sink::SinkClientType; + let SinkTestHandle { + sink_writer, + ud_sink_server_handle: _, + .. + } = SinkTestHandle::create_sink::( + TestSinkType::BuiltIn(SinkClientType::Blackhole), + None, + None, + batch_size, + ) + .await; + + // ISB Reader Orchestrator + let input_stream = Stream::new("bh-sink-input-buffer", "test-in", 0); + let buf_reader_config = BufferReaderConfig { + streams: vec![input_stream.clone()], + wip_ack_interval: Duration::from_millis(10), + ..Default::default() + }; + + let reader_components = ISBReaderComponents { + vertex_type: "Sink".to_string(), + stream: input_stream, + config: buf_reader_config, + tracker: tracker.clone(), + batch_size, + read_timeout: Duration::from_millis(500), + watermark_handle: None, + isb_config: None, + cln_token: cln_token.clone(), + }; + + let isb_reader: ISBReaderOrchestrator = + ISBReaderOrchestrator::new(reader_components, input_adapter.reader(), None) + .await + .unwrap(); + + // Create and start the SinkForwarder + let forwarder = SinkForwarder::::new(isb_reader, sink_writer).await; + + let forwarder_cln = cln_token.clone(); + let forwarder_handle = tokio::spawn(async move { forwarder.start(forwarder_cln).await }); + + // Wait until all messages are consumed from the input buffer + let result = tokio::time::timeout(Duration::from_secs(TIMEOUT), async { + loop { + if input_adapter.pending_count() == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await; + + assert!( + result.is_ok(), + "Timed out waiting for messages to be consumed. Remaining: {}", + input_adapter.pending_count(), + ); + + assert_eq!( + input_adapter.pending_count(), + 0, + "All messages should be consumed from the input buffer" + ); + + // Shutdown + cln_token.cancel(); + let forwarder_result = + tokio::time::timeout(Duration::from_secs(TIMEOUT), forwarder_handle).await; + assert!( + forwarder_result.is_ok(), + "Forwarder task should complete gracefully" + ); + } + + struct PanickingSink; + + #[tonic::async_trait] + impl sink::Sinker for PanickingSink { + async fn sink(&self, _input: Receiver) -> Vec { + panic!("Panic in sink"); + } + } + + // Test sink forwarder with a panic in the sink writer. + #[tokio::test] + async fn test_sink_forwarder_with_panic() { + const MESSAGE_COUNT: usize = 10; + + let cln_token = CancellationToken::new(); + let tracker = Tracker::new(None, cln_token.clone()); + let batch_size = 500; + + // Input buffer + let input_adapter = SimpleBufferAdapter::new(SimpleBuffer::new(1000, 0, "sink-input")); + + // Write all the messages to the input buffer + write_test_messages(&input_adapter, MESSAGE_COUNT).await; + + let SinkTestHandle { + sink_writer, + ud_sink_server_handle: _server_handle, + .. + } = SinkTestHandle::create_sink( + TestSinkType::UserDefined(PanickingSink), + None, + None, + batch_size, + ) + .await; + + // ISB Reader Orchestrator + let input_stream = Stream::new("sink-input-buffer", "test-in", 0); + let buf_reader_config = BufferReaderConfig { + streams: vec![input_stream.clone()], + wip_ack_interval: Duration::from_millis(10), + ..Default::default() + }; + + let reader_components = ISBReaderComponents { + vertex_type: "Sink".to_string(), + stream: input_stream, + config: buf_reader_config, + tracker: tracker.clone(), + batch_size, + read_timeout: Duration::from_millis(500), + watermark_handle: None, + isb_config: None, + cln_token: cln_token.clone(), + }; + + let isb_reader: ISBReaderOrchestrator = + ISBReaderOrchestrator::new(reader_components, input_adapter.reader(), None) + .await + .unwrap(); + + // Create and start the SinkForwarder + let forwarder = SinkForwarder::::new(isb_reader, sink_writer).await; + + let forwarder_cln = cln_token.clone(); + let forwarder_handle = tokio::spawn(async move { forwarder.start(forwarder_cln).await }); + + assert_eq!( + input_adapter.pending_count(), + MESSAGE_COUNT, + "All messages should stay in the input buffer" + ); + + // Shutdown + cln_token.cancel(); + let forwarder_result = + tokio::time::timeout(Duration::from_secs(TIMEOUT), forwarder_handle).await; + assert!( + forwarder_result.is_ok(), + "Forwarder task should complete gracefully" + ); + } +} + #[cfg(test)] mod tests { use std::sync::Arc;