Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/e2e-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func main() {
// initialize Redis handlers
redisController := NewRedisController()
http.HandleFunc("/redis/get-msg-count-contains", redisController.GetMsgCountContains)
http.HandleFunc("/redis/get-list", redisController.GetList)

// initialize http handlers
httpController := NewHttpController()
Expand Down
25 changes: 25 additions & 0 deletions test/e2e-api/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"encoding/json"
"log"
"net/http"
"net/url"
Expand Down Expand Up @@ -83,6 +84,30 @@ func (h *RedisController) GetMsgCountContains(w http.ResponseWriter, r *http.Req
_, _ = w.Write([]byte(count))
}

func (h *RedisController) GetList(w http.ResponseWriter, r *http.Request) {
redisClient := h.getRedisClient()

keyName := r.URL.Query().Get("keyName")

vals, err := redisClient.LRange(context.Background(), keyName, 0, -1).Result()
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

respBytes, err := json.Marshal(vals)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
_, _ = w.Write(respBytes)
}

// Close closes the Redis client.
func (h *RedisController) Close() {
h.mLock.Lock()
Expand Down
72 changes: 70 additions & 2 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -380,6 +378,76 @@ func (s *FunctionalSuite) TestPipelineUserMetadataPropagation() {
VertexPodLogContains("sink", "txn-id", PodLogCheckOptionWithContainer("udsink"))
}

func (s *FunctionalSuite) TestOrderedProcessing() {
w := s.Given().Pipeline("@testdata/ordered-processing.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "ordered-processing"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

// Send messages with different keys, each key getting values in a specific order.
// The x-numaflow-keys header sets the message key for partition routing.
keys := []string{"A", "M", "Z"}
values := []string{"create", "update", "delete"}

Comment on lines +390 to +395
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you send atleast 1000 messages and use two sources?

for _, val := range values {
for _, key := range keys {
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().
WithBody([]byte(val)).
WithHeader("x-numaflow-keys", key).
WithHeader("x-numaflow-id", fmt.Sprintf("%s-%s", key, val)))
}
}

// Verify that for each key, the Redis list contains exactly ["create", "update", "delete"] in order.
// The Rust redis-sink in ordered mode stores at key "{SINK_HASH_KEY}_{message_keys}" using RPUSH.
for _, key := range keys {
redisKey := fmt.Sprintf("ordered-processing-out_%s", key)
w.Expect().RedisSinkListEquals(redisKey, values)
}
}

func (s *FunctionalSuite) TestOrderedProcessingMultiSource() {
w := s.Given().Pipeline("@testdata/ordered-processing-multi-source.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "ordered-multi-source"

w.Expect().VertexPodsRunning()

// 10 numeric keys, 100 messages per key = 1000 total messages.
// Messages are sent round-robin across two HTTP sources (in-1, in-2).
numKeys := 10
msgsPerKey := 100
sources := []string{"in-1", "in-2"}

for seq := 0; seq < msgsPerKey; seq++ {
for key := 0; key < numKeys; key++ {
src := sources[(key+seq)%len(sources)]
keyStr := fmt.Sprintf("%d", key)
val := fmt.Sprintf("%d", seq)
w.SendMessageTo(pipelineName, src, NewHttpPostRequest().
WithBody([]byte(val)).
WithHeader("x-numaflow-keys", keyStr).
WithHeader("x-numaflow-id", fmt.Sprintf("%s-%s", keyStr, val)))
}
}

// Verify that for each key, the Redis list contains values 0..99 in order.
expectedValues := make([]string, msgsPerKey)
for i := 0; i < msgsPerKey; i++ {
expectedValues[i] = fmt.Sprintf("%d", i)
}
for key := 0; key < numKeys; key++ {
redisKey := fmt.Sprintf("ordered-multi-source-out_%d", key)
w.Expect().RedisSinkListEquals(redisKey, expectedValues)
}
}

func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}
41 changes: 41 additions & 0 deletions test/e2e/testdata/ordered-processing-multi-source.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: ordered-multi-source
spec:
limits:
readBatchSize: 1
ordered:
enabled: true
vertices:
- name: in-1
source:
http: {}
- name: in-2
source:
http: {}
- name: cat
partitions: 3
udf:
container:
image: quay.io/numaio/numaflow-rs/map-cat:stable
imagePullPolicy: IfNotPresent
- name: out
partitions: 3
sink:
udsink:
container:
image: quay.io/numaio/numaflow-rs/redis-sink:stable
imagePullPolicy: IfNotPresent
env:
- name: SINK_HASH_KEY
value: "ordered-multi-source-out"
- name: MODE
value: "ordered"
edges:
- from: in-1
to: cat
- from: in-2
to: cat
- from: cat
to: out
36 changes: 36 additions & 0 deletions test/e2e/testdata/ordered-processing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: ordered-processing
spec:
limits:
readBatchSize: 1
ordered:
enabled: true
vertices:
- name: in
source:
http: {}
- name: cat
partitions: 3
udf:
container:
image: quay.io/numaio/numaflow-rs/map-cat:stable
imagePullPolicy: IfNotPresent
- name: out
partitions: 3
sink:
udsink:
container:
image: quay.io/numaio/numaflow-rs/redis-sink:stable
imagePullPolicy: IfNotPresent
env:
- name: SINK_HASH_KEY
value: "ordered-processing-out"
- name: MODE
value: "ordered"
edges:
- from: in
to: cat
- from: cat
to: out
12 changes: 12 additions & 0 deletions test/fixtures/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ func (t *Expect) RedisSinkNotContains(hashKey string, targetStr string, opts ...
return t
}

// RedisSinkListEquals checks that the Redis list at keyName contains exactly the expectedValues in order.
// This is used for ordered processing verification where the Rust redis-sink stores values as lists using RPUSH.
func (t *Expect) RedisSinkListEquals(keyName string, expectedValues []string, opts ...SinkCheckOption) *Expect {
t.t.Helper()
ctx := context.Background()
if equals := redisListEquals(ctx, keyName, expectedValues, opts...); !equals {
actual := getRedisListValues(keyName)
t.t.Fatalf("Expected redis list at key %s to equal %v, but got %v", keyName, expectedValues, actual)
}
return t
}

func (t *Expect) ISBSvcDeleted(timeout time.Duration) *Expect {
t.t.Helper()
ctx := context.Background()
Expand Down
11 changes: 11 additions & 0 deletions test/fixtures/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package fixtures

import (
"encoding/json"
"fmt"
"net/url"
"strconv"
Expand All @@ -32,3 +33,13 @@ func getMsgCountContains(keyName, targetStr string) int {
}
return count
}

// getRedisListValues returns the full ordered list of values stored at keyName in Redis.
func getRedisListValues(keyName string) []string {
str := InvokeE2EAPI("/redis/get-list?keyName=%s", keyName)
var vals []string
if err := json.Unmarshal([]byte(str), &vals); err != nil {
panic(fmt.Sprintf("Can't parse string %s as JSON string array: %v", str, err))
}
return vals
}
24 changes: 24 additions & 0 deletions test/fixtures/redis_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,30 @@ func SinkCheckWithTimeout(t time.Duration) SinkCheckOption {
}
}

// redisListEquals verifies that the Redis list at the given key contains exactly the expected values in order.
func redisListEquals(ctx context.Context, keyName string, expectedValues []string, opts ...SinkCheckOption) bool {
o := defaultRedisCheckOptions()
for _, opt := range opts {
if opt != nil {
opt(o)
}
}
ctx, cancel := context.WithTimeout(ctx, o.timeout)
defer cancel()
return runChecks(ctx, func() bool {
actual := getRedisListValues(keyName)
if len(actual) != len(expectedValues) {
return false
}
for i, v := range expectedValues {
if actual[i] != v {
return false
}
}
return true
})
}

type CheckFunc func() bool

// runChecks executes a performChecks function with retry strategy (retryInterval with timeout).
Expand Down
Loading