Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/batchmap-cat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ rust-version.workspace = true
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }

[dev-dependencies]
chrono.workspace = true
118 changes: 118 additions & 0 deletions examples/batchmap-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,121 @@ impl batchmap::BatchMapper for Cat {
responses
}
}

#[cfg(test)]
mod tests {
use super::*;
use numaflow::batchmap::BatchMapper;
use std::collections::HashMap;
use tokio::sync::mpsc;

fn create_datum(id: &str, keys: Vec<String>, value: Vec<u8>) -> Datum {
Datum {
id: id.to_string(),
keys,
value,
watermark: chrono::Utc::now(),
event_time: chrono::Utc::now(),
headers: HashMap::new(),
}
}

#[tokio::test]
async fn test_batchmap_single_message() {
let cat = Cat;
let (tx, rx) = mpsc::channel(10);

let datum = create_datum(
"msg-1",
vec!["key1".to_string()],
b"Hello, Numaflow!".to_vec(),
);
tx.send(datum).await.unwrap();
drop(tx);

let responses = cat.batchmap(rx).await;

assert_eq!(responses.len(), 1);
assert_eq!(responses[0].id, "msg-1");
assert_eq!(responses[0].message.len(), 1);
assert_eq!(responses[0].message[0].value, b"Hello, Numaflow!");
assert_eq!(responses[0].message[0].keys, Some(vec!["key1".to_string()]));
}

#[tokio::test]
async fn test_batchmap_multiple_messages() {
let cat = Cat;
let (tx, rx) = mpsc::channel(10);

for i in 0..5 {
let datum = create_datum(
&format!("msg-{}", i),
vec![format!("key-{}", i)],
format!("Message {}", i).into_bytes(),
);
tx.send(datum).await.unwrap();
}
drop(tx);

let responses = cat.batchmap(rx).await;

assert_eq!(responses.len(), 5);
for (i, response) in responses.iter().enumerate() {
assert_eq!(response.id, format!("msg-{}", i));
assert_eq!(response.message.len(), 1);
assert_eq!(
response.message[0].value,
format!("Message {}", i).into_bytes()
);
}
}

#[tokio::test]
async fn test_batchmap_empty_input() {
let cat = Cat;
let (tx, rx) = mpsc::channel(10);
drop(tx);

let responses = cat.batchmap(rx).await;

assert_eq!(responses.len(), 0, "No responses for empty input");
}

#[tokio::test]
async fn test_batchmap_preserves_keys() {
let cat = Cat;
let (tx, rx) = mpsc::channel(10);

let datum = create_datum(
"msg-1",
vec!["key1".to_string(), "key2".to_string()],
b"test value".to_vec(),
);
tx.send(datum).await.unwrap();
drop(tx);

let responses = cat.batchmap(rx).await;

assert_eq!(responses.len(), 1);
assert_eq!(
responses[0].message[0].keys,
Some(vec!["key1".to_string(), "key2".to_string()])
);
}

#[tokio::test]
async fn test_batchmap_empty_value() {
let cat = Cat;
let (tx, rx) = mpsc::channel(10);

let datum = create_datum("msg-empty", vec!["key1".to_string()], vec![]);
tx.send(datum).await.unwrap();
drop(tx);

let responses = cat.batchmap(rx).await;

assert_eq!(responses.len(), 1);
assert_eq!(responses[0].id, "msg-empty");
assert_eq!(responses[0].message[0].value, Vec::<u8>::new());
}
}
3 changes: 3 additions & 0 deletions examples/batchmap-flatmap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ rust-version.workspace = true
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }

[dev-dependencies]
chrono.workspace = true
152 changes: 152 additions & 0 deletions examples/batchmap-flatmap/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,155 @@ impl batchmap::BatchMapper for Flatmap {
responses
}
}

#[cfg(test)]
mod tests {
use super::*;
use numaflow::batchmap::BatchMapper;
use std::collections::HashMap;
use tokio::sync::mpsc;

fn create_datum(id: &str, keys: Vec<String>, value: Vec<u8>) -> Datum {
Datum {
id: id.to_string(),
keys,
value,
watermark: chrono::Utc::now(),
event_time: chrono::Utc::now(),
headers: HashMap::new(),
}
}

#[tokio::test]
async fn test_flatmap_splits_comma_separated_values() {
let flatmap = Flatmap;
let (tx, rx) = mpsc::channel(10);

let datum = create_datum("msg-1", vec!["key1".to_string()], b"a,b,c".to_vec());
tx.send(datum).await.unwrap();
drop(tx);

let responses = flatmap.batchmap(rx).await;

assert_eq!(responses.len(), 1);
assert_eq!(responses[0].id, "msg-1");
assert_eq!(responses[0].message.len(), 3);
assert_eq!(responses[0].message[0].value, b"a");
assert_eq!(responses[0].message[1].value, b"b");
assert_eq!(responses[0].message[2].value, b"c");
}

#[tokio::test]
async fn test_flatmap_single_value_no_comma() {
let flatmap = Flatmap;
let (tx, rx) = mpsc::channel(10);

let datum = create_datum("msg-1", vec!["key1".to_string()], b"single_value".to_vec());
tx.send(datum).await.unwrap();
drop(tx);

let responses = flatmap.batchmap(rx).await;

assert_eq!(responses.len(), 1);
assert_eq!(responses[0].message.len(), 1);
assert_eq!(responses[0].message[0].value, b"single_value");
}

#[tokio::test]
async fn test_flatmap_preserves_keys_for_all_parts() {
let flatmap = Flatmap;
let (tx, rx) = mpsc::channel(10);

let datum = create_datum(
"msg-1",
vec!["key1".to_string(), "key2".to_string()],
b"x,y".to_vec(),
);
tx.send(datum).await.unwrap();
drop(tx);

let responses = flatmap.batchmap(rx).await;

assert_eq!(responses.len(), 1);
assert_eq!(responses[0].message.len(), 2);
for msg in &responses[0].message {
assert_eq!(msg.keys, Some(vec!["key1".to_string(), "key2".to_string()]));
}
}

#[tokio::test]
async fn test_flatmap_multiple_messages() {
let flatmap = Flatmap;
let (tx, rx) = mpsc::channel(10);

tx.send(create_datum(
"msg-1",
vec!["k1".to_string()],
b"a,b".to_vec(),
))
.await
.unwrap();
tx.send(create_datum(
"msg-2",
vec!["k2".to_string()],
b"x,y,z".to_vec(),
))
.await
.unwrap();
drop(tx);

let responses = flatmap.batchmap(rx).await;

assert_eq!(responses.len(), 2);
assert_eq!(responses[0].id, "msg-1");
assert_eq!(responses[0].message.len(), 2);
assert_eq!(responses[1].id, "msg-2");
assert_eq!(responses[1].message.len(), 3);
}

#[tokio::test]
async fn test_flatmap_empty_input() {
let flatmap = Flatmap;
let (tx, rx) = mpsc::channel(10);
drop(tx);

let responses = flatmap.batchmap(rx).await;

assert_eq!(responses.len(), 0);
}

#[tokio::test]
async fn test_flatmap_empty_value() {
let flatmap = Flatmap;
let (tx, rx) = mpsc::channel(10);

let datum = create_datum("msg-1", vec!["key1".to_string()], vec![]);
tx.send(datum).await.unwrap();
drop(tx);

let responses = flatmap.batchmap(rx).await;

assert_eq!(responses.len(), 1);
// Empty string split by comma gives one empty part
assert_eq!(responses[0].message.len(), 1);
assert_eq!(responses[0].message[0].value, b"");
}

#[tokio::test]
async fn test_flatmap_handles_invalid_utf8() {
let flatmap = Flatmap;
let (tx, rx) = mpsc::channel(10);

// Invalid UTF-8 bytes - from_utf8_lossy should handle gracefully
let invalid_utf8 = vec![0xff, 0xfe, b',', b'a'];
let datum = create_datum("msg-1", vec!["key1".to_string()], invalid_utf8);
tx.send(datum).await.unwrap();
drop(tx);

let responses = flatmap.batchmap(rx).await;

// Should still process and split by comma
assert_eq!(responses.len(), 1);
assert_eq!(responses[0].message.len(), 2);
}
}
3 changes: 3 additions & 0 deletions examples/flatmap-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ rust-version.workspace = true
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }

[dev-dependencies]
chrono.workspace = true
Loading
Loading