Skip to content

Commit 0a496b1

Browse files
test: Implement redis sink in Rust for on-success sink e2e test (#157)
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
1 parent 0b85673 commit 0a496b1

File tree

6 files changed

+203
-0
lines changed

6 files changed

+203
-0
lines changed

examples/redis-sink/.dockerignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
target/

examples/redis-sink/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "redis-sink"
3+
version = "0.1.0"
4+
edition.workspace = true
5+
rust-version.workspace = true
6+
7+
[dependencies]
8+
tonic.workspace = true
9+
tokio.workspace = true
10+
numaflow = { path = "../../numaflow" }
11+
redis = { version = "1.0.0", features = ["tokio-comp", "aio"] }

examples/redis-sink/Dockerfile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
FROM rust:1.85-bullseye AS build
2+
3+
RUN apt-get update
4+
RUN apt-get install protobuf-compiler -y
5+
6+
WORKDIR /numaflow-rs
7+
COPY ./ ./
8+
WORKDIR /numaflow-rs/examples/redis-sink
9+
10+
# build for release
11+
RUN cargo build --release
12+
13+
# our final base
14+
FROM debian:bullseye AS redis-sink
15+
16+
# copy the build artifact from the build stage
17+
COPY --from=build /numaflow-rs/target/release/redis-sink .
18+
19+
# set the startup command to run your binary
20+
CMD ["./redis-sink"]

examples/redis-sink/Makefile

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/redis-sink:${TAG}
4+
DOCKER_FILE_PATH = examples/redis-sink/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
cargo check
9+
cargo update
10+
11+
.PHONY: image
12+
image: update
13+
cd ../../ && docker build \
14+
-f ${DOCKER_FILE_PATH} \
15+
-t ${IMAGE_REGISTRY} . --load
16+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

examples/redis-sink/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Redis E2E Test Sink
2+
A User Defined Sink using redis hashes to store messages.
3+
The hash key is set by an environment variable `SINK_HASH_KEY` under the sink container spec.
4+
5+
For each message received, the sink will store the message in the hash with the key being the payload of the message
6+
and the value being the no. of occurrences of that payload so far.
7+
8+
The environment variable `CHECK_ORDER` is used to determine whether to check the order of the messages based one event time.
9+
The environment variable `MESSAGE_COUNT` is used to determine how many subsequent number of messages at a time to check the order of.
10+
11+
This sink is used by Numaflow E2E testing.

examples/redis-sink/src/main.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use numaflow::sink::{self, Response, SinkRequest, Sinker};
2+
use redis::AsyncCommands;
3+
use std::env;
4+
use tokio::sync::Mutex;
5+
6+
/// RedisTestSink is a sink that writes messages to Redis hashes.
7+
/// Created for numaflow e2e tests.
8+
struct RedisTestSink {
9+
/// Redis hash key to store messages. This is set by an environment variable `SINK_HASH_KEY`
10+
/// under the sink container spec.
11+
hash_key: String,
12+
/// Used to determine how many subsequent number of messages at a time to check the order of.
13+
/// This is set by an environment variable `MESSAGE_COUNT`
14+
message_count: usize,
15+
/// Used to collect `message_count` number of messages, check whether they all arrived in order
16+
/// and increment the count for the order result in Redis
17+
inflight_messages: Mutex<Vec<SinkRequest>>,
18+
client: redis::Client,
19+
/// If true, checks the order of messages based on event time.
20+
/// This is set by an environment variable `CHECK_ORDER`
21+
check_order: bool,
22+
}
23+
24+
impl RedisTestSink {
25+
/// Creates a new instance of RedisTestSink with a Redis client.
26+
fn new() -> Self {
27+
let client =
28+
redis::Client::open("redis://redis:6379").expect("Failed to create Redis client");
29+
30+
let hash_key =
31+
env::var("SINK_HASH_KEY").expect("SINK_HASH_KEY environment variable is not set");
32+
33+
let message_count: usize = env::var("MESSAGE_COUNT")
34+
.ok()
35+
.and_then(|s| s.parse().ok())
36+
.unwrap_or(0);
37+
38+
let check_order: bool = env::var("CHECK_ORDER")
39+
.ok()
40+
.and_then(|s| s.parse().ok())
41+
.unwrap_or(false);
42+
43+
RedisTestSink {
44+
client,
45+
hash_key,
46+
message_count,
47+
inflight_messages: Mutex::new(Vec::with_capacity(message_count)),
48+
check_order,
49+
}
50+
}
51+
}
52+
53+
#[tonic::async_trait]
54+
impl Sinker for RedisTestSink {
55+
/// This redis UDSink is created for numaflow e2e tests. This handle function assumes that
56+
/// a redis instance listening on address redis:6379 has already been up and running.
57+
async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {
58+
let mut results: Vec<Response> = Vec::new();
59+
60+
// Get async connection to Redis
61+
let mut con = self
62+
.client
63+
.get_multiplexed_async_connection()
64+
.await
65+
.expect("Failed to get Redis connection");
66+
67+
while let Some(datum) = input.recv().await {
68+
let id = datum.id.clone();
69+
let value = datum.value.clone();
70+
71+
if self.check_order {
72+
let mut inflight = self.inflight_messages.lock().await;
73+
inflight.push(datum);
74+
75+
if inflight.len() == self.message_count {
76+
// Check if messages are ordered by event time
77+
let ordered = inflight
78+
.windows(2)
79+
.all(|w| w[0].event_time <= w[1].event_time);
80+
81+
let result_message = if ordered { "ordered" } else { "not ordered" };
82+
83+
// Increment the count for the order result in Redis
84+
let result: Result<(), redis::RedisError> =
85+
con.hincr(&self.hash_key, result_message, 1).await;
86+
87+
match result {
88+
Ok(_) => {
89+
println!(
90+
"Incremented by 1 the no. of occurrences of {} under hash key {}",
91+
result_message, self.hash_key
92+
);
93+
}
94+
Err(e) => {
95+
eprintln!("Set Error - {:?}", e);
96+
}
97+
}
98+
99+
// Reset the inflight messages
100+
inflight.clear();
101+
}
102+
}
103+
104+
// Watermark and event time of the message can be accessed
105+
let _ = datum.event_time;
106+
let _ = datum.watermark;
107+
108+
// We use redis hashes to store messages.
109+
// Each field of a hash is the content of a message and
110+
// value of the field is the no. of occurrences of the message.
111+
let value_str = String::from_utf8(value).unwrap_or_else(|_| "".to_string());
112+
113+
let result: Result<(), redis::RedisError> =
114+
con.hincr(&self.hash_key, &value_str, 1).await;
115+
116+
match result {
117+
Ok(_) => {
118+
println!(
119+
"Incremented by 1 the no. of occurrences of {} under hash key {}",
120+
value_str, self.hash_key
121+
);
122+
}
123+
Err(e) => {
124+
eprintln!("Set Error - {:?}", e);
125+
}
126+
}
127+
128+
results.push(Response::ok(id));
129+
}
130+
131+
results
132+
}
133+
}
134+
135+
#[tokio::main]
136+
async fn main() {
137+
let sink = RedisTestSink::new();
138+
139+
let server = sink::Server::new(sink);
140+
141+
if let Err(e) = server.start().await {
142+
panic!("Failed to start sink function server: {:?}", e);
143+
}
144+
}

0 commit comments

Comments
 (0)