Skip to content

Commit b2daf95

Browse files
authored
test: Unit Tests For Examples (#162)
Signed-off-by: godfather2327 <harshithsai94@gmail.com>
1 parent 0a496b1 commit b2daf95

File tree

21 files changed

+1746
-134
lines changed

21 files changed

+1746
-134
lines changed

examples/batchmap-cat/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@ rust-version.workspace = true
88
tonic.workspace = true
99
tokio.workspace = true
1010
numaflow = { path = "../../numaflow" }
11+
12+
[dev-dependencies]
13+
chrono.workspace = true

examples/batchmap-cat/src/main.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,121 @@ impl batchmap::BatchMapper for Cat {
2020
responses
2121
}
2222
}
23+
24+
#[cfg(test)]
25+
mod tests {
26+
use super::*;
27+
use numaflow::batchmap::BatchMapper;
28+
use std::collections::HashMap;
29+
use tokio::sync::mpsc;
30+
31+
fn create_datum(id: &str, keys: Vec<String>, value: Vec<u8>) -> Datum {
32+
Datum {
33+
id: id.to_string(),
34+
keys,
35+
value,
36+
watermark: chrono::Utc::now(),
37+
event_time: chrono::Utc::now(),
38+
headers: HashMap::new(),
39+
}
40+
}
41+
42+
#[tokio::test]
43+
async fn test_batchmap_single_message() {
44+
let cat = Cat;
45+
let (tx, rx) = mpsc::channel(10);
46+
47+
let datum = create_datum(
48+
"msg-1",
49+
vec!["key1".to_string()],
50+
b"Hello, Numaflow!".to_vec(),
51+
);
52+
tx.send(datum).await.unwrap();
53+
drop(tx);
54+
55+
let responses = cat.batchmap(rx).await;
56+
57+
assert_eq!(responses.len(), 1);
58+
assert_eq!(responses[0].id, "msg-1");
59+
assert_eq!(responses[0].message.len(), 1);
60+
assert_eq!(responses[0].message[0].value, b"Hello, Numaflow!");
61+
assert_eq!(responses[0].message[0].keys, Some(vec!["key1".to_string()]));
62+
}
63+
64+
#[tokio::test]
65+
async fn test_batchmap_multiple_messages() {
66+
let cat = Cat;
67+
let (tx, rx) = mpsc::channel(10);
68+
69+
for i in 0..5 {
70+
let datum = create_datum(
71+
&format!("msg-{}", i),
72+
vec![format!("key-{}", i)],
73+
format!("Message {}", i).into_bytes(),
74+
);
75+
tx.send(datum).await.unwrap();
76+
}
77+
drop(tx);
78+
79+
let responses = cat.batchmap(rx).await;
80+
81+
assert_eq!(responses.len(), 5);
82+
for (i, response) in responses.iter().enumerate() {
83+
assert_eq!(response.id, format!("msg-{}", i));
84+
assert_eq!(response.message.len(), 1);
85+
assert_eq!(
86+
response.message[0].value,
87+
format!("Message {}", i).into_bytes()
88+
);
89+
}
90+
}
91+
92+
#[tokio::test]
93+
async fn test_batchmap_empty_input() {
94+
let cat = Cat;
95+
let (tx, rx) = mpsc::channel(10);
96+
drop(tx);
97+
98+
let responses = cat.batchmap(rx).await;
99+
100+
assert_eq!(responses.len(), 0, "No responses for empty input");
101+
}
102+
103+
#[tokio::test]
104+
async fn test_batchmap_preserves_keys() {
105+
let cat = Cat;
106+
let (tx, rx) = mpsc::channel(10);
107+
108+
let datum = create_datum(
109+
"msg-1",
110+
vec!["key1".to_string(), "key2".to_string()],
111+
b"test value".to_vec(),
112+
);
113+
tx.send(datum).await.unwrap();
114+
drop(tx);
115+
116+
let responses = cat.batchmap(rx).await;
117+
118+
assert_eq!(responses.len(), 1);
119+
assert_eq!(
120+
responses[0].message[0].keys,
121+
Some(vec!["key1".to_string(), "key2".to_string()])
122+
);
123+
}
124+
125+
#[tokio::test]
126+
async fn test_batchmap_empty_value() {
127+
let cat = Cat;
128+
let (tx, rx) = mpsc::channel(10);
129+
130+
let datum = create_datum("msg-empty", vec!["key1".to_string()], vec![]);
131+
tx.send(datum).await.unwrap();
132+
drop(tx);
133+
134+
let responses = cat.batchmap(rx).await;
135+
136+
assert_eq!(responses.len(), 1);
137+
assert_eq!(responses[0].id, "msg-empty");
138+
assert_eq!(responses[0].message[0].value, Vec::<u8>::new());
139+
}
140+
}

examples/batchmap-flatmap/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@ rust-version.workspace = true
88
tonic.workspace = true
99
tokio.workspace = true
1010
numaflow = { path = "../../numaflow" }
11+
12+
[dev-dependencies]
13+
chrono.workspace = true

examples/batchmap-flatmap/src/main.rs

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,155 @@ impl batchmap::BatchMapper for Flatmap {
3030
responses
3131
}
3232
}
33+
34+
#[cfg(test)]
35+
mod tests {
36+
use super::*;
37+
use numaflow::batchmap::BatchMapper;
38+
use std::collections::HashMap;
39+
use tokio::sync::mpsc;
40+
41+
fn create_datum(id: &str, keys: Vec<String>, value: Vec<u8>) -> Datum {
42+
Datum {
43+
id: id.to_string(),
44+
keys,
45+
value,
46+
watermark: chrono::Utc::now(),
47+
event_time: chrono::Utc::now(),
48+
headers: HashMap::new(),
49+
}
50+
}
51+
52+
#[tokio::test]
53+
async fn test_flatmap_splits_comma_separated_values() {
54+
let flatmap = Flatmap;
55+
let (tx, rx) = mpsc::channel(10);
56+
57+
let datum = create_datum("msg-1", vec!["key1".to_string()], b"a,b,c".to_vec());
58+
tx.send(datum).await.unwrap();
59+
drop(tx);
60+
61+
let responses = flatmap.batchmap(rx).await;
62+
63+
assert_eq!(responses.len(), 1);
64+
assert_eq!(responses[0].id, "msg-1");
65+
assert_eq!(responses[0].message.len(), 3);
66+
assert_eq!(responses[0].message[0].value, b"a");
67+
assert_eq!(responses[0].message[1].value, b"b");
68+
assert_eq!(responses[0].message[2].value, b"c");
69+
}
70+
71+
#[tokio::test]
72+
async fn test_flatmap_single_value_no_comma() {
73+
let flatmap = Flatmap;
74+
let (tx, rx) = mpsc::channel(10);
75+
76+
let datum = create_datum("msg-1", vec!["key1".to_string()], b"single_value".to_vec());
77+
tx.send(datum).await.unwrap();
78+
drop(tx);
79+
80+
let responses = flatmap.batchmap(rx).await;
81+
82+
assert_eq!(responses.len(), 1);
83+
assert_eq!(responses[0].message.len(), 1);
84+
assert_eq!(responses[0].message[0].value, b"single_value");
85+
}
86+
87+
#[tokio::test]
88+
async fn test_flatmap_preserves_keys_for_all_parts() {
89+
let flatmap = Flatmap;
90+
let (tx, rx) = mpsc::channel(10);
91+
92+
let datum = create_datum(
93+
"msg-1",
94+
vec!["key1".to_string(), "key2".to_string()],
95+
b"x,y".to_vec(),
96+
);
97+
tx.send(datum).await.unwrap();
98+
drop(tx);
99+
100+
let responses = flatmap.batchmap(rx).await;
101+
102+
assert_eq!(responses.len(), 1);
103+
assert_eq!(responses[0].message.len(), 2);
104+
for msg in &responses[0].message {
105+
assert_eq!(msg.keys, Some(vec!["key1".to_string(), "key2".to_string()]));
106+
}
107+
}
108+
109+
#[tokio::test]
110+
async fn test_flatmap_multiple_messages() {
111+
let flatmap = Flatmap;
112+
let (tx, rx) = mpsc::channel(10);
113+
114+
tx.send(create_datum(
115+
"msg-1",
116+
vec!["k1".to_string()],
117+
b"a,b".to_vec(),
118+
))
119+
.await
120+
.unwrap();
121+
tx.send(create_datum(
122+
"msg-2",
123+
vec!["k2".to_string()],
124+
b"x,y,z".to_vec(),
125+
))
126+
.await
127+
.unwrap();
128+
drop(tx);
129+
130+
let responses = flatmap.batchmap(rx).await;
131+
132+
assert_eq!(responses.len(), 2);
133+
assert_eq!(responses[0].id, "msg-1");
134+
assert_eq!(responses[0].message.len(), 2);
135+
assert_eq!(responses[1].id, "msg-2");
136+
assert_eq!(responses[1].message.len(), 3);
137+
}
138+
139+
#[tokio::test]
140+
async fn test_flatmap_empty_input() {
141+
let flatmap = Flatmap;
142+
let (tx, rx) = mpsc::channel(10);
143+
drop(tx);
144+
145+
let responses = flatmap.batchmap(rx).await;
146+
147+
assert_eq!(responses.len(), 0);
148+
}
149+
150+
#[tokio::test]
151+
async fn test_flatmap_empty_value() {
152+
let flatmap = Flatmap;
153+
let (tx, rx) = mpsc::channel(10);
154+
155+
let datum = create_datum("msg-1", vec!["key1".to_string()], vec![]);
156+
tx.send(datum).await.unwrap();
157+
drop(tx);
158+
159+
let responses = flatmap.batchmap(rx).await;
160+
161+
assert_eq!(responses.len(), 1);
162+
// Empty string split by comma gives one empty part
163+
assert_eq!(responses[0].message.len(), 1);
164+
assert_eq!(responses[0].message[0].value, b"");
165+
}
166+
167+
#[tokio::test]
168+
async fn test_flatmap_handles_invalid_utf8() {
169+
let flatmap = Flatmap;
170+
let (tx, rx) = mpsc::channel(10);
171+
172+
// Invalid UTF-8 bytes - from_utf8_lossy should handle gracefully
173+
let invalid_utf8 = vec![0xff, 0xfe, b',', b'a'];
174+
let datum = create_datum("msg-1", vec!["key1".to_string()], invalid_utf8);
175+
tx.send(datum).await.unwrap();
176+
drop(tx);
177+
178+
let responses = flatmap.batchmap(rx).await;
179+
180+
// Should still process and split by comma
181+
assert_eq!(responses.len(), 1);
182+
assert_eq!(responses[0].message.len(), 2);
183+
}
184+
}

examples/flatmap-stream/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@ rust-version.workspace = true
88
tonic.workspace = true
99
tokio.workspace = true
1010
numaflow = { path = "../../numaflow" }
11+
12+
[dev-dependencies]
13+
chrono.workspace = true

0 commit comments

Comments
 (0)