Skip to content

Commit 3ec2cd1

Browse files
committed
Apply rustfmt formatting
1 parent 58bac0d commit 3ec2cd1

File tree

11 files changed

+637
-573
lines changed

11 files changed

+637
-573
lines changed

examples/batchmap-cat/src/main.rs

Lines changed: 140 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -1,137 +1,140 @@
1-
use numaflow::batchmap;
2-
use numaflow::batchmap::{BatchResponse, Datum, Message};
3-
4-
#[tokio::main]
5-
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
6-
batchmap::Server::new(Cat).start().await
7-
}
8-
9-
struct Cat;
10-
11-
#[tonic::async_trait]
12-
impl batchmap::BatchMapper for Cat {
13-
async fn batchmap(&self, mut input: tokio::sync::mpsc::Receiver<Datum>) -> Vec<BatchResponse> {
14-
let mut responses: Vec<BatchResponse> = Vec::new();
15-
while let Some(datum) = input.recv().await {
16-
let mut response = BatchResponse::from_id(datum.id);
17-
response.append(Message::new(datum.value).with_keys(datum.keys.clone()));
18-
responses.push(response);
19-
}
20-
responses
21-
}
22-
}
23-
24-
#[cfg(test)]
25-
mod tests {
26-
use super::*;
27-
use numaflow::batchmap::BatchMapper;
28-
use std::collections::HashMap;
29-
use tokio::sync::mpsc;
30-
31-
fn create_datum(id: &str, keys: Vec<String>, value: Vec<u8>) -> Datum {
32-
Datum {
33-
id: id.to_string(),
34-
keys,
35-
value,
36-
watermark: chrono::Utc::now(),
37-
event_time: chrono::Utc::now(),
38-
headers: HashMap::new(),
39-
}
40-
}
41-
42-
#[tokio::test]
43-
async fn test_batchmap_single_message() {
44-
let cat = Cat;
45-
let (tx, rx) = mpsc::channel(10);
46-
47-
let datum = create_datum(
48-
"msg-1",
49-
vec!["key1".to_string()],
50-
b"Hello, Numaflow!".to_vec(),
51-
);
52-
tx.send(datum).await.unwrap();
53-
drop(tx);
54-
55-
let responses = cat.batchmap(rx).await;
56-
57-
assert_eq!(responses.len(), 1);
58-
assert_eq!(responses[0].id, "msg-1");
59-
assert_eq!(responses[0].message.len(), 1);
60-
assert_eq!(responses[0].message[0].value, b"Hello, Numaflow!");
61-
assert_eq!(responses[0].message[0].keys, Some(vec!["key1".to_string()]));
62-
}
63-
64-
#[tokio::test]
65-
async fn test_batchmap_multiple_messages() {
66-
let cat = Cat;
67-
let (tx, rx) = mpsc::channel(10);
68-
69-
for i in 0..5 {
70-
let datum = create_datum(
71-
&format!("msg-{}", i),
72-
vec![format!("key-{}", i)],
73-
format!("Message {}", i).into_bytes(),
74-
);
75-
tx.send(datum).await.unwrap();
76-
}
77-
drop(tx);
78-
79-
let responses = cat.batchmap(rx).await;
80-
81-
assert_eq!(responses.len(), 5);
82-
for (i, response) in responses.iter().enumerate() {
83-
assert_eq!(response.id, format!("msg-{}", i));
84-
assert_eq!(response.message.len(), 1);
85-
assert_eq!(response.message[0].value, format!("Message {}", i).into_bytes());
86-
}
87-
}
88-
89-
#[tokio::test]
90-
async fn test_batchmap_empty_input() {
91-
let cat = Cat;
92-
let (tx, rx) = mpsc::channel(10);
93-
drop(tx);
94-
95-
let responses = cat.batchmap(rx).await;
96-
97-
assert_eq!(responses.len(), 0, "No responses for empty input");
98-
}
99-
100-
#[tokio::test]
101-
async fn test_batchmap_preserves_keys() {
102-
let cat = Cat;
103-
let (tx, rx) = mpsc::channel(10);
104-
105-
let datum = create_datum(
106-
"msg-1",
107-
vec!["key1".to_string(), "key2".to_string()],
108-
b"test value".to_vec(),
109-
);
110-
tx.send(datum).await.unwrap();
111-
drop(tx);
112-
113-
let responses = cat.batchmap(rx).await;
114-
115-
assert_eq!(responses.len(), 1);
116-
assert_eq!(
117-
responses[0].message[0].keys,
118-
Some(vec!["key1".to_string(), "key2".to_string()])
119-
);
120-
}
121-
122-
#[tokio::test]
123-
async fn test_batchmap_empty_value() {
124-
let cat = Cat;
125-
let (tx, rx) = mpsc::channel(10);
126-
127-
let datum = create_datum("msg-empty", vec!["key1".to_string()], vec![]);
128-
tx.send(datum).await.unwrap();
129-
drop(tx);
130-
131-
let responses = cat.batchmap(rx).await;
132-
133-
assert_eq!(responses.len(), 1);
134-
assert_eq!(responses[0].id, "msg-empty");
135-
assert_eq!(responses[0].message[0].value, Vec::<u8>::new());
136-
}
137-
}
1+
use numaflow::batchmap;
2+
use numaflow::batchmap::{BatchResponse, Datum, Message};
3+
4+
#[tokio::main]
5+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
6+
batchmap::Server::new(Cat).start().await
7+
}
8+
9+
struct Cat;
10+
11+
#[tonic::async_trait]
12+
impl batchmap::BatchMapper for Cat {
13+
async fn batchmap(&self, mut input: tokio::sync::mpsc::Receiver<Datum>) -> Vec<BatchResponse> {
14+
let mut responses: Vec<BatchResponse> = Vec::new();
15+
while let Some(datum) = input.recv().await {
16+
let mut response = BatchResponse::from_id(datum.id);
17+
response.append(Message::new(datum.value).with_keys(datum.keys.clone()));
18+
responses.push(response);
19+
}
20+
responses
21+
}
22+
}
23+
24+
#[cfg(test)]
25+
mod tests {
26+
use super::*;
27+
use numaflow::batchmap::BatchMapper;
28+
use std::collections::HashMap;
29+
use tokio::sync::mpsc;
30+
31+
fn create_datum(id: &str, keys: Vec<String>, value: Vec<u8>) -> Datum {
32+
Datum {
33+
id: id.to_string(),
34+
keys,
35+
value,
36+
watermark: chrono::Utc::now(),
37+
event_time: chrono::Utc::now(),
38+
headers: HashMap::new(),
39+
}
40+
}
41+
42+
#[tokio::test]
43+
async fn test_batchmap_single_message() {
44+
let cat = Cat;
45+
let (tx, rx) = mpsc::channel(10);
46+
47+
let datum = create_datum(
48+
"msg-1",
49+
vec!["key1".to_string()],
50+
b"Hello, Numaflow!".to_vec(),
51+
);
52+
tx.send(datum).await.unwrap();
53+
drop(tx);
54+
55+
let responses = cat.batchmap(rx).await;
56+
57+
assert_eq!(responses.len(), 1);
58+
assert_eq!(responses[0].id, "msg-1");
59+
assert_eq!(responses[0].message.len(), 1);
60+
assert_eq!(responses[0].message[0].value, b"Hello, Numaflow!");
61+
assert_eq!(responses[0].message[0].keys, Some(vec!["key1".to_string()]));
62+
}
63+
64+
#[tokio::test]
65+
async fn test_batchmap_multiple_messages() {
66+
let cat = Cat;
67+
let (tx, rx) = mpsc::channel(10);
68+
69+
for i in 0..5 {
70+
let datum = create_datum(
71+
&format!("msg-{}", i),
72+
vec![format!("key-{}", i)],
73+
format!("Message {}", i).into_bytes(),
74+
);
75+
tx.send(datum).await.unwrap();
76+
}
77+
drop(tx);
78+
79+
let responses = cat.batchmap(rx).await;
80+
81+
assert_eq!(responses.len(), 5);
82+
for (i, response) in responses.iter().enumerate() {
83+
assert_eq!(response.id, format!("msg-{}", i));
84+
assert_eq!(response.message.len(), 1);
85+
assert_eq!(
86+
response.message[0].value,
87+
format!("Message {}", i).into_bytes()
88+
);
89+
}
90+
}
91+
92+
#[tokio::test]
93+
async fn test_batchmap_empty_input() {
94+
let cat = Cat;
95+
let (tx, rx) = mpsc::channel(10);
96+
drop(tx);
97+
98+
let responses = cat.batchmap(rx).await;
99+
100+
assert_eq!(responses.len(), 0, "No responses for empty input");
101+
}
102+
103+
#[tokio::test]
104+
async fn test_batchmap_preserves_keys() {
105+
let cat = Cat;
106+
let (tx, rx) = mpsc::channel(10);
107+
108+
let datum = create_datum(
109+
"msg-1",
110+
vec!["key1".to_string(), "key2".to_string()],
111+
b"test value".to_vec(),
112+
);
113+
tx.send(datum).await.unwrap();
114+
drop(tx);
115+
116+
let responses = cat.batchmap(rx).await;
117+
118+
assert_eq!(responses.len(), 1);
119+
assert_eq!(
120+
responses[0].message[0].keys,
121+
Some(vec!["key1".to_string(), "key2".to_string()])
122+
);
123+
}
124+
125+
#[tokio::test]
126+
async fn test_batchmap_empty_value() {
127+
let cat = Cat;
128+
let (tx, rx) = mpsc::channel(10);
129+
130+
let datum = create_datum("msg-empty", vec!["key1".to_string()], vec![]);
131+
tx.send(datum).await.unwrap();
132+
drop(tx);
133+
134+
let responses = cat.batchmap(rx).await;
135+
136+
assert_eq!(responses.len(), 1);
137+
assert_eq!(responses[0].id, "msg-empty");
138+
assert_eq!(responses[0].message[0].value, Vec::<u8>::new());
139+
}
140+
}

0 commit comments

Comments
 (0)