Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/numaflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async-trait = "0.1.88"
[dev-dependencies]
tempfile = "3.11.0"
# TODO: Remove this when OnSuccess sink support is added in numaflow-rs SDK
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "61d0a40057fd39fcc25440afbf31fafe626919c2" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "45aff2913bdc7fb75bdf2b49e535fceb1544a81a" }
pulsar = { version = "6.3.0", default-features = false, features = ["tokio-rustls-runtime"] }
hyper = "1.6.0"
hyper-rustls = "0.27.5"
Expand Down
1 change: 1 addition & 0 deletions rust/numaflow-core/src/monovertex/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ mod tests {
},
keys: vec![],
headers: Default::default(),
user_metadata: None,
})
.await
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ mod tests {
},
keys: vec![],
headers: Default::default(),
user_metadata: None,
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions rust/numaflow-core/src/sinker/sink/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub mod tests {
},
keys: vec![],
headers: Default::default(),
user_metadata: None,
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions rust/numaflow-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ mod tests {
},
keys: vec![],
headers: Default::default(),
user_metadata: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/numaflow-core/src/source/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ mod tests {
},
keys: vec![],
headers: Default::default(),
user_metadata: None,
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,27 @@ func (s *FunctionalSuite) TestExponentialBackoffRetryStrategyForPipeline() {
w.Expect().VertexPodLogNotContains(vertexName, thirdRetryLog, PodLogCheckOptionWithContainer("numa"))
}

func (s *FunctionalSuite) TestPipelineUserMetadataPropagation() {
w := s.Given().Pipeline("@testdata/metadata-pipeline.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()

w.Expect().VertexPodsRunning()

w.Expect().
VertexPodLogContains("map", "Groups at mapper:", PodLogCheckOptionWithContainer("udf")).
VertexPodLogContains("map", "event-time-group", PodLogCheckOptionWithContainer("udf")).
VertexPodLogContains("map", "simple-source", PodLogCheckOptionWithContainer("udf"))

w.Expect().
VertexPodLogContains("sink", "User Metadata:", PodLogCheckOptionWithContainer("udsink")).
VertexPodLogContains("sink", "event-time-group", PodLogCheckOptionWithContainer("udsink")).
VertexPodLogContains("sink", "simple-source", PodLogCheckOptionWithContainer("udsink")).
VertexPodLogContains("sink", "map-group", PodLogCheckOptionWithContainer("udsink")).
VertexPodLogContains("sink", "txn-id", PodLogCheckOptionWithContainer("udsink"))
}

func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}
33 changes: 33 additions & 0 deletions test/e2e/testdata/metadata-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: metadata-pipeline
spec:
vertices:
- name: in
source:
udsource:
container:
image: quay.io/numaio/numaflow-go/source-metadata:stable #Source creates user metadata with group "simple-source" and adds txn-id
imagePullPolicy: IfNotPresent
transformer:
container:
image: quay.io/numaio/numaflow-go/transformer-metadata:stable # Transformer adds event-time to group "event-time-group"
imagePullPolicy: IfNotPresent
- name: map
udf:
container:
image: quay.io/numaio/numaflow-go/map-metadata:stable # Map logs both groups: "Groups at mapper: [event-time-group simple-source]"/ and creates additional group "map-group" with map-key
imagePullPolicy: IfNotPresent
- name: sink
sink:
udsink:
container:
image: quay.io/numaio/numaflow-go/sink-metadata:stable # Sink logs all user metadata received
imagePullPolicy: IfNotPresent
edges:
- from: in
to: map
- from: map
to: sink

21 changes: 21 additions & 0 deletions test/monovertex-e2e/monovertex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,27 @@ func (s *MonoVertexSuite) TestMonoVertexRateLimitWithRedisStore() {
w.Expect().MonoVertexPodLogContains("\"processed\":\"50", PodLogCheckOptionWithContainer("numa"), PodLogCheckOptionWithCount(20))
}

func (s *MonoVertexSuite) TestMonoVertexUserMetadataPropagation() {
w := s.Given().MonoVertex("@testdata/metadata-monovertex.yaml").
When().
CreateMonoVertexAndWait()
defer w.DeleteMonoVertexAndWait()

w.Expect().MonoVertexPodsRunning()

w.Expect().
MonoVertexPodLogContains("Groups at mapper:", PodLogCheckOptionWithContainer("udf")).
MonoVertexPodLogContains("event-time-group", PodLogCheckOptionWithContainer("udf")).
MonoVertexPodLogContains("simple-source", PodLogCheckOptionWithContainer("udf"))

w.Expect().
MonoVertexPodLogContains("User Metadata:", PodLogCheckOptionWithContainer("udsink")).
MonoVertexPodLogContains("event-time-group", PodLogCheckOptionWithContainer("udsink")).
MonoVertexPodLogContains("simple-source", PodLogCheckOptionWithContainer("udsink")).
MonoVertexPodLogContains("map-group", PodLogCheckOptionWithContainer("udsink")).
MonoVertexPodLogContains("txn-id", PodLogCheckOptionWithContainer("udsink"))
}

func TestMonoVertexSuite(t *testing.T) {
suite.Run(t, new(MonoVertexSuite))
}
26 changes: 26 additions & 0 deletions test/monovertex-e2e/testdata/metadata-monovertex.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
name: metadata-monovertex
spec:
scale:
min: 1
source:
udsource:
container:
image: quay.io/numaio/numaflow-go/source-metadata:stable #Source creates user metadata with group "simple-source" and adds txn-id
imagePullPolicy: IfNotPresent
transformer:
container:
image: quay.io/numaio/numaflow-go/transformer-metadata:stable # Transformer adds event-time to group "event-time-group"
imagePullPolicy: IfNotPresent
udf:
container:
image: quay.io/numaio/numaflow-go/map-metadata:stable # Map logs both groups: "Groups at mapper: [event-time-group simple-source]"/ and creates additional group "map-group" with map-key
imagePullPolicy: IfNotPresent
sink:
udsink:
container:
image: quay.io/numaio/numaflow-go/sink-metadata:stable # Sink logs all user metadata received
imagePullPolicy: IfNotPresent

Loading