Skip to content

Commit 9237e14

Browse files
test: Examples for bypass map and bypass source transformer for e2e tests (#164)
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
1 parent e160036 commit 9237e14

File tree

12 files changed

+904
-0
lines changed

12 files changed

+904
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
target/

examples/map-bypass-cat/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "map-bypass-cat"
3+
version = "0.1.0"
4+
edition.workspace = true
5+
rust-version.workspace = true
6+
7+
[dependencies]
8+
tonic.workspace = true
9+
tokio.workspace = true
10+
numaflow = { path = "../../numaflow" }
11+
12+
[dev-dependencies]
13+
chrono.workspace = true

examples/map-bypass-cat/Dockerfile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
FROM rust:1.85-bullseye AS build
2+
3+
RUN apt-get update
4+
RUN apt-get install protobuf-compiler -y
5+
6+
WORKDIR /numaflow-rs
7+
COPY ./ ./
8+
WORKDIR /numaflow-rs/examples/map-bypass-cat
9+
10+
# build for release
11+
RUN cargo build --release
12+
13+
# our final base
14+
FROM debian:bullseye AS map-bypass-cat
15+
16+
# copy the build artifact from the build stage
17+
COPY --from=build /numaflow-rs/target/release/map-bypass-cat .
18+
19+
# set the startup command to run your binary
20+
CMD ["./map-bypass-cat"]

examples/map-bypass-cat/Makefile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/map-bypass-cat:${TAG}
4+
DOCKER_FILE_PATH = examples/map-bypass-cat/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
cargo check
9+
cargo update
10+
11+
.PHONY: image
12+
image: update
13+
cd ../../ && docker build \
14+
-f ${DOCKER_FILE_PATH} \
15+
-t ${IMAGE_REGISTRY} .
16+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
17+
18+
.PHONY: clean
19+
clean:
20+
-rm -rf target
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
apiVersion: numaflow.numaproj.io/v1alpha1
2+
kind: MonoVertex
3+
metadata:
4+
name: map-bypass-cat
5+
spec:
6+
bypass:
7+
sink:
8+
tags:
9+
operator: or
10+
values:
11+
- sink
12+
fallback:
13+
tags:
14+
operator: or
15+
values:
16+
- fallback
17+
onSuccess:
18+
tags:
19+
operator: or
20+
values:
21+
- onSuccess
22+
source:
23+
http: {}
24+
udf:
25+
container:
26+
image: quay.io/numaio/numaflow-rs/map-bypass-cat:stable
27+
imagePullPolicy: IfNotPresent
28+
sink:
29+
udsink:
30+
container:
31+
image: quay.io/numaio/numaflow-rs/sink-log:stable
32+
imagePullPolicy: IfNotPresent
33+
fallback:
34+
log: { }
35+
onSuccess:
36+
log: { }
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
//! This example is added to allow creation of a reproducible image to be used for e2e testing
2+
//! of MonoVertex's bypass feature.
3+
//!
4+
//! Based on the message content, tags will be added to messages which will allow the bypass router
5+
//! to route them to the specific sink:
6+
//! * Add "fallback" tag to all the messages which have the word "fallback" in their value.
7+
//! * Add "onSuccess" tag to all the messages which have the word "onSuccess" in their value.
8+
//! * Add "sink" tag to all the messages which have the word "primary" in their value.
9+
//!
10+
//! This example will be used along with the following bypass spec:
11+
//! ```yaml
12+
//! bypass:
13+
//! sink:
14+
//! tags:
15+
//! operator: or
16+
//! values:
17+
//! - sink
18+
//! fallback:
19+
//! tags:
20+
//! operator: or
21+
//! values:
22+
//! - fallback
23+
//! onSuccess:
24+
//! tags:
25+
//! operator: or
26+
//! values:
27+
//! - onSuccess
28+
//! ```
29+
30+
use numaflow::map;
31+
use numaflow::shared::grpc_server::ServerExtras;
32+
33+
#[tokio::main]
34+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
35+
map::Server::new(BypassCat)
36+
.with_max_message_size(10240)
37+
.start()
38+
.await
39+
}
40+
41+
struct BypassCat;
42+
43+
#[tonic::async_trait]
44+
impl map::Mapper for BypassCat {
45+
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
46+
let input_str = String::from_utf8(input.value.clone()).expect("Invalid UTF-8");
47+
let mut tags = vec![];
48+
if input_str.contains("fallback") || input_str.contains("Fallback") {
49+
tags.push("fallback".to_string());
50+
}
51+
if input_str.contains("onSuccess")
52+
|| input_str.contains("OnSuccess")
53+
|| input_str.contains("on_success")
54+
|| input_str.contains("on-success")
55+
{
56+
tags.push("onSuccess".to_string());
57+
}
58+
if input_str.contains("primary") {
59+
tags.push("sink".to_string());
60+
}
61+
vec![
62+
map::Message::new(input.value)
63+
.with_keys(input.keys.clone())
64+
.with_tags(tags),
65+
]
66+
}
67+
}
68+
69+
#[cfg(test)]
70+
mod tests {
71+
use super::*;
72+
use numaflow::map::{MapRequest, Mapper, SystemMetadata, UserMetadata};
73+
use std::collections::HashMap;
74+
75+
fn create_request(keys: Vec<String>, value: Vec<u8>) -> MapRequest {
76+
MapRequest {
77+
keys,
78+
value,
79+
watermark: chrono::Utc::now(),
80+
eventtime: chrono::Utc::now(),
81+
headers: HashMap::new(),
82+
user_metadata: UserMetadata::new(),
83+
system_metadata: SystemMetadata::new(),
84+
}
85+
}
86+
87+
#[tokio::test]
88+
async fn test_map_cat_no_tags() {
89+
let cat = BypassCat;
90+
let request = create_request(vec!["key1".to_string()], b"regular message".to_vec());
91+
92+
let messages = cat.map(request).await;
93+
94+
assert_eq!(messages.len(), 1);
95+
assert!(messages[0].tags.as_ref().unwrap().is_empty());
96+
}
97+
98+
#[tokio::test]
99+
async fn test_map_cat_fallback_lowercase() {
100+
let cat = BypassCat;
101+
let request = create_request(vec!["key1".to_string()], b"this is fallback data".to_vec());
102+
103+
let messages = cat.map(request).await;
104+
105+
assert_eq!(messages.len(), 1);
106+
assert!(
107+
messages[0]
108+
.tags
109+
.as_ref()
110+
.unwrap()
111+
.contains(&"fallback".to_string())
112+
);
113+
}
114+
115+
#[tokio::test]
116+
async fn test_map_cat_fallback_capitalized() {
117+
let cat = BypassCat;
118+
let request = create_request(vec!["key1".to_string()], b"this is Fallback data".to_vec());
119+
120+
let messages = cat.map(request).await;
121+
122+
assert_eq!(messages.len(), 1);
123+
assert!(
124+
messages[0]
125+
.tags
126+
.as_ref()
127+
.unwrap()
128+
.contains(&"fallback".to_string())
129+
);
130+
}
131+
132+
#[tokio::test]
133+
async fn test_map_cat_on_success_camel_case() {
134+
let cat = BypassCat;
135+
let request = create_request(vec!["key1".to_string()], b"onSuccess event".to_vec());
136+
137+
let messages = cat.map(request).await;
138+
139+
assert_eq!(messages.len(), 1);
140+
assert!(
141+
messages[0]
142+
.tags
143+
.as_ref()
144+
.unwrap()
145+
.contains(&"onSuccess".to_string())
146+
);
147+
}
148+
149+
#[tokio::test]
150+
async fn test_map_cat_on_success_pascal_case() {
151+
let cat = BypassCat;
152+
let request = create_request(vec!["key1".to_string()], b"OnSuccess event".to_vec());
153+
154+
let messages = cat.map(request).await;
155+
156+
assert_eq!(messages.len(), 1);
157+
assert!(
158+
messages[0]
159+
.tags
160+
.as_ref()
161+
.unwrap()
162+
.contains(&"onSuccess".to_string())
163+
);
164+
}
165+
166+
#[tokio::test]
167+
async fn test_map_cat_on_success_snake_case() {
168+
let cat = BypassCat;
169+
let request = create_request(vec!["key1".to_string()], b"on_success event".to_vec());
170+
171+
let messages = cat.map(request).await;
172+
173+
assert_eq!(messages.len(), 1);
174+
assert!(
175+
messages[0]
176+
.tags
177+
.as_ref()
178+
.unwrap()
179+
.contains(&"onSuccess".to_string())
180+
);
181+
}
182+
183+
#[tokio::test]
184+
async fn test_map_cat_on_success_kebab_case() {
185+
let cat = BypassCat;
186+
let request = create_request(vec!["key1".to_string()], b"on-success event".to_vec());
187+
188+
let messages = cat.map(request).await;
189+
190+
assert_eq!(messages.len(), 1);
191+
assert!(
192+
messages[0]
193+
.tags
194+
.as_ref()
195+
.unwrap()
196+
.contains(&"onSuccess".to_string())
197+
);
198+
}
199+
200+
#[tokio::test]
201+
async fn test_map_cat_primary_adds_sink_tag() {
202+
let cat = BypassCat;
203+
let request = create_request(vec!["key1".to_string()], b"primary destination".to_vec());
204+
205+
let messages = cat.map(request).await;
206+
207+
assert_eq!(messages.len(), 1);
208+
assert!(
209+
messages[0]
210+
.tags
211+
.as_ref()
212+
.unwrap()
213+
.contains(&"sink".to_string())
214+
);
215+
}
216+
217+
#[tokio::test]
218+
async fn test_map_cat_multiple_tags() {
219+
let cat = BypassCat;
220+
let request = create_request(
221+
vec!["key1".to_string()],
222+
b"fallback and onSuccess and primary".to_vec(),
223+
);
224+
225+
let messages = cat.map(request).await;
226+
227+
assert_eq!(messages.len(), 1);
228+
let tags = messages[0].tags.as_ref().unwrap();
229+
assert_eq!(tags.len(), 3);
230+
assert!(tags.contains(&"fallback".to_string()));
231+
assert!(tags.contains(&"onSuccess".to_string()));
232+
assert!(tags.contains(&"sink".to_string()));
233+
}
234+
235+
#[tokio::test]
236+
async fn test_map_cat_fallback_and_on_success() {
237+
let cat = BypassCat;
238+
let request = create_request(
239+
vec!["key1".to_string()],
240+
b"Fallback with on_success".to_vec(),
241+
);
242+
243+
let messages = cat.map(request).await;
244+
245+
assert_eq!(messages.len(), 1);
246+
let tags = messages[0].tags.as_ref().unwrap();
247+
assert_eq!(tags.len(), 2);
248+
assert!(tags.contains(&"fallback".to_string()));
249+
assert!(tags.contains(&"onSuccess".to_string()));
250+
}
251+
252+
#[tokio::test]
253+
async fn test_map_cat_preserves_value_with_tags() {
254+
let cat = BypassCat;
255+
let value = b"fallback message content".to_vec();
256+
let request = create_request(vec!["key1".to_string()], value.clone());
257+
258+
let messages = cat.map(request).await;
259+
260+
assert_eq!(messages.len(), 1);
261+
assert_eq!(messages[0].value, value);
262+
assert!(
263+
messages[0]
264+
.tags
265+
.as_ref()
266+
.unwrap()
267+
.contains(&"fallback".to_string())
268+
);
269+
}
270+
271+
#[tokio::test]
272+
async fn test_map_cat_preserves_keys_with_tags() {
273+
let cat = BypassCat;
274+
let request = create_request(
275+
vec!["key1".to_string(), "key2".to_string()],
276+
b"primary data".to_vec(),
277+
);
278+
279+
let messages = cat.map(request).await;
280+
281+
assert_eq!(messages.len(), 1);
282+
assert_eq!(
283+
messages[0].keys,
284+
Some(vec!["key1".to_string(), "key2".to_string()])
285+
);
286+
assert!(
287+
messages[0]
288+
.tags
289+
.as_ref()
290+
.unwrap()
291+
.contains(&"sink".to_string())
292+
);
293+
}
294+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
target/

0 commit comments

Comments
 (0)