diff --git a/rust/Cargo.lock b/rust/Cargo.lock index d83ed8d84e..f63e1b1a02 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2491,7 +2491,7 @@ dependencies = [ [[package]] name = "numaflow" version = "0.4.0" -source = "git+https://github.com/numaproj/numaflow-rs.git?rev=61d0a40057fd39fcc25440afbf31fafe626919c2#61d0a40057fd39fcc25440afbf31fafe626919c2" +source = "git+https://github.com/numaproj/numaflow-rs.git?rev=45aff2913bdc7fb75bdf2b49e535fceb1544a81a#45aff2913bdc7fb75bdf2b49e535fceb1544a81a" dependencies = [ "chrono", "hyper-util", diff --git a/rust/numaflow-core/Cargo.toml b/rust/numaflow-core/Cargo.toml index 25f6aa0461..8ec8d28d67 100644 --- a/rust/numaflow-core/Cargo.toml +++ b/rust/numaflow-core/Cargo.toml @@ -64,7 +64,7 @@ async-trait = "0.1.88" [dev-dependencies] tempfile = "3.11.0" # TODO: Remove this when OnSuccess sink support is added in numaflow-rs SDK -numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "61d0a40057fd39fcc25440afbf31fafe626919c2" } +numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "45aff2913bdc7fb75bdf2b49e535fceb1544a81a" } pulsar = { version = "6.3.0", default-features = false, features = ["tokio-rustls-runtime"] } hyper = "1.6.0" hyper-rustls = "0.27.5" diff --git a/rust/numaflow-core/src/monovertex/forwarder.rs b/rust/numaflow-core/src/monovertex/forwarder.rs index 64a19444f2..e2d0924fb0 100644 --- a/rust/numaflow-core/src/monovertex/forwarder.rs +++ b/rust/numaflow-core/src/monovertex/forwarder.rs @@ -175,6 +175,7 @@ mod tests { }, keys: vec![], headers: Default::default(), + user_metadata: None, }) .await .unwrap(); diff --git a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs index e894f216c6..54f2efc0c8 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs @@ -292,6 +292,7 @@ mod tests { }, keys: vec![], headers: Default::default(), + user_metadata: None, }) .await .unwrap(); diff --git a/rust/numaflow-core/src/sinker/sink/sqs.rs b/rust/numaflow-core/src/sinker/sink/sqs.rs index 4a33141f06..9468abf874 100644 --- a/rust/numaflow-core/src/sinker/sink/sqs.rs +++ b/rust/numaflow-core/src/sinker/sink/sqs.rs @@ -142,6 +142,7 @@ pub mod tests { }, keys: vec![], headers: Default::default(), + user_metadata: None, }) .await .unwrap(); diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index 61c648e25a..8a318cda63 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -842,6 +842,7 @@ mod tests { }, keys: vec![], headers: Default::default(), + user_metadata: None, } } } diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index b9766c2e57..ef5bca6909 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -444,6 +444,7 @@ mod tests { }, keys: vec![], headers: Default::default(), + user_metadata: None, } } } diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index f0049b3e0d..859263ade7 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -342,6 +342,27 @@ func (s *FunctionalSuite) TestExponentialBackoffRetryStrategyForPipeline() { w.Expect().VertexPodLogNotContains(vertexName, thirdRetryLog, PodLogCheckOptionWithContainer("numa")) } +func (s *FunctionalSuite) TestPipelineUserMetadataPropagation() { + w := s.Given().Pipeline("@testdata/metadata-pipeline.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + + w.Expect().VertexPodsRunning() + + w.Expect(). + VertexPodLogContains("map", "Groups at mapper:", PodLogCheckOptionWithContainer("udf")). + VertexPodLogContains("map", "event-time-group", PodLogCheckOptionWithContainer("udf")). + VertexPodLogContains("map", "simple-source", PodLogCheckOptionWithContainer("udf")) + + w.Expect(). + VertexPodLogContains("sink", "User Metadata:", PodLogCheckOptionWithContainer("udsink")). + VertexPodLogContains("sink", "event-time-group", PodLogCheckOptionWithContainer("udsink")). + VertexPodLogContains("sink", "simple-source", PodLogCheckOptionWithContainer("udsink")). + VertexPodLogContains("sink", "map-group", PodLogCheckOptionWithContainer("udsink")). + VertexPodLogContains("sink", "txn-id", PodLogCheckOptionWithContainer("udsink")) +} + func TestFunctionalSuite(t *testing.T) { suite.Run(t, new(FunctionalSuite)) } diff --git a/test/e2e/testdata/metadata-pipeline.yaml b/test/e2e/testdata/metadata-pipeline.yaml new file mode 100644 index 0000000000..7fc394dc8b --- /dev/null +++ b/test/e2e/testdata/metadata-pipeline.yaml @@ -0,0 +1,33 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: metadata-pipeline +spec: + vertices: + - name: in + source: + udsource: + container: + image: quay.io/numaio/numaflow-go/source-metadata:stable #Source creates user metadata with group "simple-source" and adds txn-id + imagePullPolicy: IfNotPresent + transformer: + container: + image: quay.io/numaio/numaflow-go/transformer-metadata:stable # Transformer adds event-time to group "event-time-group" + imagePullPolicy: IfNotPresent + - name: map + udf: + container: + image: quay.io/numaio/numaflow-go/map-metadata:stable # Map logs both groups: "Groups at mapper: [event-time-group simple-source]"/ and creates additional group "map-group" with map-key + imagePullPolicy: IfNotPresent + - name: sink + sink: + udsink: + container: + image: quay.io/numaio/numaflow-go/sink-metadata:stable # Sink logs all user metadata received + imagePullPolicy: IfNotPresent + edges: + - from: in + to: map + - from: map + to: sink + diff --git a/test/monovertex-e2e/monovertex_test.go b/test/monovertex-e2e/monovertex_test.go index c4d9b6628f..d0ac6ab294 100644 --- a/test/monovertex-e2e/monovertex_test.go +++ b/test/monovertex-e2e/monovertex_test.go @@ -105,6 +105,27 @@ func (s *MonoVertexSuite) TestMonoVertexRateLimitWithRedisStore() { w.Expect().MonoVertexPodLogContains("\"processed\":\"50", PodLogCheckOptionWithContainer("numa"), PodLogCheckOptionWithCount(20)) } +func (s *MonoVertexSuite) TestMonoVertexUserMetadataPropagation() { + w := s.Given().MonoVertex("@testdata/metadata-monovertex.yaml"). + When(). + CreateMonoVertexAndWait() + defer w.DeleteMonoVertexAndWait() + + w.Expect().MonoVertexPodsRunning() + + w.Expect(). + MonoVertexPodLogContains("Groups at mapper:", PodLogCheckOptionWithContainer("udf")). + MonoVertexPodLogContains("event-time-group", PodLogCheckOptionWithContainer("udf")). + MonoVertexPodLogContains("simple-source", PodLogCheckOptionWithContainer("udf")) + + w.Expect(). + MonoVertexPodLogContains("User Metadata:", PodLogCheckOptionWithContainer("udsink")). + MonoVertexPodLogContains("event-time-group", PodLogCheckOptionWithContainer("udsink")). + MonoVertexPodLogContains("simple-source", PodLogCheckOptionWithContainer("udsink")). + MonoVertexPodLogContains("map-group", PodLogCheckOptionWithContainer("udsink")). + MonoVertexPodLogContains("txn-id", PodLogCheckOptionWithContainer("udsink")) +} + func TestMonoVertexSuite(t *testing.T) { suite.Run(t, new(MonoVertexSuite)) } diff --git a/test/monovertex-e2e/testdata/metadata-monovertex.yaml b/test/monovertex-e2e/testdata/metadata-monovertex.yaml new file mode 100644 index 0000000000..2bec21bfbc --- /dev/null +++ b/test/monovertex-e2e/testdata/metadata-monovertex.yaml @@ -0,0 +1,26 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: MonoVertex +metadata: + name: metadata-monovertex +spec: + scale: + min: 1 + source: + udsource: + container: + image: quay.io/numaio/numaflow-go/source-metadata:stable #Source creates user metadata with group "simple-source" and adds txn-id + imagePullPolicy: IfNotPresent + transformer: + container: + image: quay.io/numaio/numaflow-go/transformer-metadata:stable # Transformer adds event-time to group "event-time-group" + imagePullPolicy: IfNotPresent + udf: + container: + image: quay.io/numaio/numaflow-go/map-metadata:stable # Map logs both groups: "Groups at mapper: [event-time-group simple-source]"/ and creates additional group "map-group" with map-key + imagePullPolicy: IfNotPresent + sink: + udsink: + container: + image: quay.io/numaio/numaflow-go/sink-metadata:stable # Sink logs all user metadata received + imagePullPolicy: IfNotPresent +