Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/map-bypass-cat/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
13 changes: 13 additions & 0 deletions examples/map-bypass-cat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "map-bypass-cat"
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true

[dependencies]
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }

[dev-dependencies]
chrono.workspace = true
20 changes: 20 additions & 0 deletions examples/map-bypass-cat/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM rust:1.85-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y

WORKDIR /numaflow-rs
COPY ./ ./
WORKDIR /numaflow-rs/examples/map-bypass-cat

# build for release
RUN cargo build --release

# our final base
FROM debian:bullseye AS map-bypass-cat

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/target/release/map-bypass-cat .

# set the startup command to run your binary
CMD ["./map-bypass-cat"]
20 changes: 20 additions & 0 deletions examples/map-bypass-cat/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/map-bypass-cat:${TAG}
DOCKER_FILE_PATH = examples/map-bypass-cat/Dockerfile

.PHONY: update
update:
cargo check
cargo update

.PHONY: image
image: update
cd ../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

.PHONY: clean
clean:
-rm -rf target
36 changes: 36 additions & 0 deletions examples/map-bypass-cat/manifests/map-bypass-cat.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
name: map-bypass-cat
spec:
bypass:
sink:
tags:
operator: or
values:
- sink
fallback:
tags:
operator: or
values:
- fallback
onSuccess:
tags:
operator: or
values:
- onSuccess
source:
http: {}
udf:
container:
image: quay.io/numaio/numaflow-rs/map-bypass-cat:stable
imagePullPolicy: IfNotPresent
sink:
udsink:
container:
image: quay.io/numaio/numaflow-rs/sink-log:stable
imagePullPolicy: IfNotPresent
fallback:
log: { }
onSuccess:
log: { }
294 changes: 294 additions & 0 deletions examples/map-bypass-cat/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
//! This example is added to allow creation of a reproducible image to be used for e2e testing
//! of MonoVertex's bypass feature.
//!
//! Based on the message content, tags will be added to messages which will allow the bypass router
//! to route them to the specific sink:
//! * Add "fallback" tag to all the messages which have the word "fallback" in their value.
//! * Add "onSuccess" tag to all the messages which have the word "onSuccess" in their value.
//! * Add "sink" tag to all the messages which have the word "primary" in their value.
//!
//! This example will be used along with the following bypass spec:
//! ```yaml
//! bypass:
//! sink:
//! tags:
//! operator: or
//! values:
//! - sink
//! fallback:
//! tags:
//! operator: or
//! values:
//! - fallback
//! onSuccess:
//! tags:
//! operator: or
//! values:
//! - onSuccess
//! ```

use numaflow::map;
use numaflow::shared::grpc_server::ServerExtras;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
map::Server::new(BypassCat)
.with_max_message_size(10240)
.start()
.await
}

struct BypassCat;

#[tonic::async_trait]
impl map::Mapper for BypassCat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
let input_str = String::from_utf8(input.value.clone()).expect("Invalid UTF-8");
let mut tags = vec![];
if input_str.contains("fallback") || input_str.contains("Fallback") {
tags.push("fallback".to_string());
}
if input_str.contains("onSuccess")
|| input_str.contains("OnSuccess")
|| input_str.contains("on_success")
|| input_str.contains("on-success")
{
tags.push("onSuccess".to_string());
}
if input_str.contains("primary") {
tags.push("sink".to_string());
}
Comment on lines +58 to +60
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the docs in main.rs

Based on the content of the message, we're adding tags to the message so that the bypass router can forward these messages to the respective sinks.

vec![
map::Message::new(input.value)
.with_keys(input.keys.clone())
.with_tags(tags),
]
}
}

#[cfg(test)]
mod tests {
use super::*;
use numaflow::map::{MapRequest, Mapper, SystemMetadata, UserMetadata};
use std::collections::HashMap;

fn create_request(keys: Vec<String>, value: Vec<u8>) -> MapRequest {
MapRequest {
keys,
value,
watermark: chrono::Utc::now(),
eventtime: chrono::Utc::now(),
headers: HashMap::new(),
user_metadata: UserMetadata::new(),
system_metadata: SystemMetadata::new(),
}
}

#[tokio::test]
async fn test_map_cat_no_tags() {
let cat = BypassCat;
let request = create_request(vec!["key1".to_string()], b"regular message".to_vec());

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert!(messages[0].tags.as_ref().unwrap().is_empty());
}

#[tokio::test]
async fn test_map_cat_fallback_lowercase() {
let cat = BypassCat;
let request = create_request(vec!["key1".to_string()], b"this is fallback data".to_vec());

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert!(
messages[0]
.tags
.as_ref()
.unwrap()
.contains(&"fallback".to_string())
);
}

#[tokio::test]
async fn test_map_cat_fallback_capitalized() {
let cat = BypassCat;
let request = create_request(vec!["key1".to_string()], b"this is Fallback data".to_vec());

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert!(
messages[0]
.tags
.as_ref()
.unwrap()
.contains(&"fallback".to_string())
);
}

#[tokio::test]
async fn test_map_cat_on_success_camel_case() {
let cat = BypassCat;
let request = create_request(vec!["key1".to_string()], b"onSuccess event".to_vec());

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert!(
messages[0]
.tags
.as_ref()
.unwrap()
.contains(&"onSuccess".to_string())
);
}

#[tokio::test]
async fn test_map_cat_on_success_pascal_case() {
let cat = BypassCat;
let request = create_request(vec!["key1".to_string()], b"OnSuccess event".to_vec());

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert!(
messages[0]
.tags
.as_ref()
.unwrap()
.contains(&"onSuccess".to_string())
);
}

#[tokio::test]
async fn test_map_cat_on_success_snake_case() {
let cat = BypassCat;
let request = create_request(vec!["key1".to_string()], b"on_success event".to_vec());

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert!(
messages[0]
.tags
.as_ref()
.unwrap()
.contains(&"onSuccess".to_string())
);
}

#[tokio::test]
async fn test_map_cat_on_success_kebab_case() {
let cat = BypassCat;
let request = create_request(vec!["key1".to_string()], b"on-success event".to_vec());

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert!(
messages[0]
.tags
.as_ref()
.unwrap()
.contains(&"onSuccess".to_string())
);
}

#[tokio::test]
async fn test_map_cat_primary_adds_sink_tag() {
let cat = BypassCat;
let request = create_request(vec!["key1".to_string()], b"primary destination".to_vec());

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert!(
messages[0]
.tags
.as_ref()
.unwrap()
.contains(&"sink".to_string())
);
}

#[tokio::test]
async fn test_map_cat_multiple_tags() {
let cat = BypassCat;
let request = create_request(
vec!["key1".to_string()],
b"fallback and onSuccess and primary".to_vec(),
);

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
let tags = messages[0].tags.as_ref().unwrap();
assert_eq!(tags.len(), 3);
assert!(tags.contains(&"fallback".to_string()));
assert!(tags.contains(&"onSuccess".to_string()));
assert!(tags.contains(&"sink".to_string()));
}

#[tokio::test]
async fn test_map_cat_fallback_and_on_success() {
let cat = BypassCat;
let request = create_request(
vec!["key1".to_string()],
b"Fallback with on_success".to_vec(),
);

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
let tags = messages[0].tags.as_ref().unwrap();
assert_eq!(tags.len(), 2);
assert!(tags.contains(&"fallback".to_string()));
assert!(tags.contains(&"onSuccess".to_string()));
}

#[tokio::test]
async fn test_map_cat_preserves_value_with_tags() {
let cat = BypassCat;
let value = b"fallback message content".to_vec();
let request = create_request(vec!["key1".to_string()], value.clone());

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert_eq!(messages[0].value, value);
assert!(
messages[0]
.tags
.as_ref()
.unwrap()
.contains(&"fallback".to_string())
);
}

#[tokio::test]
async fn test_map_cat_preserves_keys_with_tags() {
let cat = BypassCat;
let request = create_request(
vec!["key1".to_string(), "key2".to_string()],
b"primary data".to_vec(),
);

let messages = cat.map(request).await;

assert_eq!(messages.len(), 1);
assert_eq!(
messages[0].keys,
Some(vec!["key1".to_string(), "key2".to_string()])
);
assert!(
messages[0]
.tags
.as_ref()
.unwrap()
.contains(&"sink".to_string())
);
}
}
1 change: 1 addition & 0 deletions examples/source-transformer-bypass/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
Loading
Loading