diff --git a/test/eventinge2erekt/eventtransform_ksvc_test.go b/test/eventinge2erekt/eventtransform_ksvc_test.go new file mode 100644 index 0000000000..870e89c858 --- /dev/null +++ b/test/eventinge2erekt/eventtransform_ksvc_test.go @@ -0,0 +1,27 @@ +package eventinge2erekt + +import ( + "testing" + + "knative.dev/eventing/test/rekt/features/eventtransform" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestEventTransform(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, eventtransform.JsonataDirect()) + env.Test(ctx, t, eventtransform.JsonataSink()) + env.Test(ctx, t, eventtransform.JsonataSinkReplyTransform()) +} diff --git a/vendor/knative.dev/eventing/test/rekt/features/eventtransform/eventtransform.go b/vendor/knative.dev/eventing/test/rekt/features/eventtransform/eventtransform.go new file mode 100644 index 0000000000..2b93d3d64c --- /dev/null +++ b/vendor/knative.dev/eventing/test/rekt/features/eventtransform/eventtransform.go @@ -0,0 +1,444 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtransform + +import ( + "context" + "fmt" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" + "knative.dev/eventing/test/rekt/features/featureflags" + "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/eventing/test/rekt/resources/eventtransform" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/network" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/resources/service" +) + +// JsonataDirect tests that the EventTransform replies with the transformed event back as HTTP response. +func JsonataDirect() *feature.Feature { + f := feature.NewFeature() + + transformName := feature.MakeRandomK8sName("event-transform") + source := feature.MakeRandomK8sName("source") + + const ( + reason = "test-reason" + message = "test-message" + ) + + event := cloudevents.NewEvent() + event.SetID(uuid.NewString()) + event.SetSource("my-source") + event.SetType("my-type") + event.SetTime(time.Now()) + _ = event.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "reason": reason, + "message": message, + }) + + f.Setup("Install event transform", eventtransform.Install(transformName, eventtransform.WithSpec( + eventtransform.WithJsonata(eventing.JsonataEventTransformationSpec{Expression: ` +{ + "specversion": "1.0", + "id": id, + "type": "transformed-event", + "source": source, + "reason": data.reason, + "message": data.message, + "kind": "input", + "data": $ +} +`}), + ))) + f.Setup("event transform is addressable", eventtransform.IsAddressable(transformName)) + f.Setup("event transform is ready", eventtransform.IsReady(transformName)) + + f.Requirement("install source", eventshub.Install(source, + eventshub.InputEvent(event), + eventshub.StartSenderToResource(eventtransform.GVR(), transformName)), + ) + + f.Assert("expected sent event", assert.OnStore(source). + MatchSentEvent( + cetest.HasId(event.ID()), + cetest.HasSource(event.Source()), + cetest.HasType(event.Type()), + cetest.DataContains(reason), + cetest.DataContains(message), + ).Exact(1), + ) + + f.Assert("expected transformed event as response", assert.OnStore(source). + Match(PositiveStatusCode). + MatchResponseEvent( + cetest.HasId(event.ID()), + cetest.HasSource(event.Source()), + cetest.HasType("transformed-event"), + cetest.HasExtension("reason", reason), + cetest.HasExtension("message", message), + cetest.DataContains(event.Type()), + cetest.DataContains(event.Source()), + cetest.DataContains(event.ID()), + cetest.DataContains(reason), + cetest.DataContains(message), + ).Exact(1), + ) + + return f +} + +// JsonataSink tests that the EventTransform forwards the transformed event to the sink. +func JsonataSink() *feature.Feature { + f := feature.NewFeature() + + transformName := feature.MakeRandomK8sName("event-transform") + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + + const ( + reason = "test-reason" + message = "test-message" + ) + + event := cloudevents.NewEvent() + event.SetID(uuid.NewString()) + event.SetSource("my-source") + event.SetType("my-type") + event.SetTime(time.Now()) + _ = event.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "reason": reason, + "message": message, + }) + + f.Setup("Install event transform", eventtransform.Install(transformName, eventtransform.WithSpec( + eventtransform.WithSink(service.AsDestinationRef(sink)), + eventtransform.WithJsonata(eventing.JsonataEventTransformationSpec{Expression: ` +{ + "specversion": "1.0", + "id": id, + "type": "transformed-event", + "source": source, + "reason": data.reason, + "message": data.message, + "data": $ +} +`}), + ))) + f.Setup("event transform is addressable", eventtransform.IsAddressable(transformName)) + f.Setup("event transform is ready", eventtransform.IsReady(transformName)) + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + + f.Requirement("install source", eventshub.Install(source, + eventshub.InputEvent(event), + eventshub.StartSenderToResource(eventtransform.GVR(), transformName)), + ) + + f.Assert("expected transformed event", assert.OnStore(sink). + MatchReceivedEvent( + cetest.HasId(event.ID()), + cetest.HasSource(event.Source()), + cetest.HasType("transformed-event"), + cetest.HasExtension("reason", reason), + cetest.HasExtension("message", message), + cetest.DataContains(event.Type()), + cetest.DataContains(event.Source()), + cetest.DataContains(event.ID()), + cetest.DataContains(reason), + cetest.DataContains(message), + ).Exact(1), + ) + + f.Assert("source received a 2xx status code", assert.OnStore(source). + Match( + assert.MatchKind(eventshub.EventResponse), + PositiveStatusCode, + ). + Exact(1), + ) + + return f +} + +func JsonataSinkReplyTransform() *feature.Feature { + f := feature.NewFeature() + + transformName := feature.MakeRandomK8sName("event-transform") + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + + const ( + reason = "test-reason" + message = "test-message" + ) + + event := cloudevents.NewEvent() + event.SetID(uuid.NewString()) + event.SetSource("my-source") + event.SetType("my-type") + event.SetTime(time.Now()) + _ = event.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "reason": reason, + "message": message, + }) + + f.Setup("Install event transform", eventtransform.Install(transformName, eventtransform.WithSpec( + eventtransform.WithSink(service.AsDestinationRef(sink)), + eventtransform.WithJsonata(eventing.JsonataEventTransformationSpec{Expression: ` +{ + "specversion": "1.0", + "id": id, + "type": "transformed-event", + "source": source, + "reason": data.reason, + "message": data.message, + "kind": "input", + "data": $ +} +`}), + eventtransform.WithReplyJsonata(eventing.JsonataEventTransformationSpec{Expression: ` +{ + "specversion": "1.0", + "id": id, + "type": type, + "source": source, + "reason": reason, + "kind": "transformed", + "data": $ +} +`}), + ))) + f.Setup("event transform is addressable", eventtransform.IsAddressable(transformName)) + f.Setup("event transform is ready", eventtransform.IsReady(transformName)) + + const ( + replyEventType = "reply-event-type" + replyEventSource = "reply-event-source" + ) + + f.Setup("install sink", eventshub.Install(sink, + eventshub.ReplyWithTransformedEvent(replyEventType, replyEventSource, `{"reason": "reply-reason"}`), + eventshub.StartReceiver, + )) + + f.Requirement("install source", eventshub.Install(source, + eventshub.InputEvent(event), + eventshub.StartSenderToResource(eventtransform.GVR(), transformName)), + ) + + f.Assert("expected transformed event to the sink", assert.OnStore(sink). + MatchReceivedEvent( + cetest.HasId(event.ID()), + cetest.HasSource(event.Source()), + cetest.HasType("transformed-event"), + cetest.HasExtension("reason", reason), + cetest.HasExtension("message", message), + cetest.DataContains(event.Type()), + cetest.DataContains(event.Source()), + cetest.DataContains(event.ID()), + cetest.DataContains(reason), + cetest.DataContains(message), + ).Exact(1), + ) + + f.Assert("expected response transformed event to the source", assert.OnStore(source). + Match(PositiveStatusCode). + MatchResponseEvent( + cetest.HasSource(replyEventSource), + cetest.HasType(replyEventType), + cetest.HasExtension("reason", reason), + cetest.DataContains(replyEventType), + cetest.DataContains(replyEventSource), + ).Exact(1), + ) + + return f +} + +func JsonataDirectTLS() *feature.Feature { + f := feature.NewFeature() + + transformName := feature.MakeRandomK8sName("event-transform") + source := feature.MakeRandomK8sName("source") + + const ( + reason = "test-reason" + message = "test-message" + ) + + event := cloudevents.NewEvent() + event.SetID(uuid.NewString()) + event.SetSource("my-source") + event.SetType("my-type") + event.SetTime(time.Now()) + _ = event.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "reason": reason, + "message": message, + }) + + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("Install event transform", eventtransform.Install(transformName, eventtransform.WithSpec( + eventtransform.WithJsonata(eventing.JsonataEventTransformationSpec{Expression: ` +{ + "specversion": "1.0", + "id": id, + "type": "transformed-event", + "source": source, + "reason": data.reason, + "message": data.message, + "kind": "input", + "data": $ +} +`}), + ))) + f.Setup("event transform is addressable", eventtransform.IsAddressable(transformName)) + f.Setup("event transform is ready", eventtransform.IsReady(transformName)) + f.Setup("event transform has HTTPS address", eventtransform.ValidateAddress(transformName, addressable.AssertHTTPSAddress)) + + f.Requirement("install source", eventshub.Install(source, + eventshub.InputEvent(event), + eventshub.StartSenderToResourceTLS(eventtransform.GVR(), transformName, nil)), + ) + + f.Assert("expected sent event", assert.OnStore(source). + MatchSentEvent( + cetest.HasId(event.ID()), + cetest.HasSource(event.Source()), + cetest.HasType(event.Type()), + cetest.DataContains(reason), + cetest.DataContains(message), + ).Exact(1), + ) + + f.Assert("expected transformed event as response", assert.OnStore(source). + Match(PositiveStatusCode). + MatchResponseEvent( + cetest.HasId(event.ID()), + cetest.HasSource(event.Source()), + cetest.HasType("transformed-event"), + cetest.HasExtension("reason", reason), + cetest.HasExtension("message", message), + cetest.DataContains(event.Type()), + cetest.DataContains(event.Source()), + cetest.DataContains(event.ID()), + cetest.DataContains(reason), + cetest.DataContains(message), + ).Exact(1), + ) + + return f +} + +// JsonataSinkTLS tests that the EventTransform forwards the transformed event to the sink. +func JsonataSinkTLS() *feature.Feature { + f := feature.NewFeature() + + transformName := feature.MakeRandomK8sName("event-transform") + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + + const ( + reason = "test-reason" + message = "test-message" + ) + + event := cloudevents.NewEvent() + event.SetID(uuid.NewString()) + event.SetSource("my-source") + event.SetType("my-type") + event.SetTime(time.Now()) + _ = event.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "reason": reason, + "message": message, + }) + + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("Install event transform", func(ctx context.Context, t feature.T) { + + eventtransform.Install(transformName, eventtransform.WithSpec( + eventtransform.WithSink(&duckv1.Destination{URI: apis.HTTPS(fmt.Sprintf("%s.%s.svc.%s", sink, environment.FromContext(ctx).Namespace(), network.GetClusterDomainName()))}), + eventtransform.WithJsonata(eventing.JsonataEventTransformationSpec{Expression: ` +{ + "specversion": "1.0", + "id": id, + "type": "transformed-event", + "source": source, + "reason": data.reason, + "message": data.message, + "data": $ +} +`}), + ))(ctx, t) + }) + f.Setup("event transform is addressable", eventtransform.IsAddressable(transformName)) + f.Setup("event transform is ready", eventtransform.IsReady(transformName)) + f.Setup("install sink", eventshub.Install(sink, + eventshub.IssuerRef(eventingtlstesting.IssuerKind, eventingtlstesting.IssuerName), + eventshub.StartReceiverTLS)) + f.Setup("event transform has HTTPS address", eventtransform.ValidateAddress(transformName, addressable.AssertHTTPSAddress)) + + f.Requirement("install source", eventshub.Install(source, + eventshub.InputEvent(event), + eventshub.StartSenderToResourceTLS(eventtransform.GVR(), transformName, nil)), + ) + + f.Assert("expected transformed event", assert.OnStore(sink). + MatchReceivedEvent( + cetest.HasId(event.ID()), + cetest.HasSource(event.Source()), + cetest.HasType("transformed-event"), + cetest.HasExtension("reason", reason), + cetest.HasExtension("message", message), + cetest.DataContains(event.Type()), + cetest.DataContains(event.Source()), + cetest.DataContains(event.ID()), + cetest.DataContains(reason), + cetest.DataContains(message), + ).Exact(1), + ) + + f.Assert("source received a 2xx status code", assert.OnStore(source). + Match( + assert.MatchKind(eventshub.EventResponse), + PositiveStatusCode, + ). + Exact(1), + ) + + return f +} + +func PositiveStatusCode(info eventshub.EventInfo) error { + if info.StatusCode < 200 || info.StatusCode >= 300 { + return fmt.Errorf("expected 2xx status code, got %d", info.StatusCode) + } + return nil +} diff --git a/vendor/knative.dev/eventing/test/rekt/resources/eventtransform/eventtransform.go b/vendor/knative.dev/eventing/test/rekt/resources/eventtransform/eventtransform.go new file mode 100644 index 0000000000..a3da24311e --- /dev/null +++ b/vendor/knative.dev/eventing/test/rekt/resources/eventtransform/eventtransform.go @@ -0,0 +1,178 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtransform + +import ( + "context" + "embed" + "encoding/json" + "strings" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" + "knative.dev/reconciler-test/pkg/manifest" + "sigs.k8s.io/yaml" + + eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/test/rekt/resources/addressable" +) + +//go:embed eventtransform.yaml +var yamlEmbed embed.FS + +func GVR() schema.GroupVersionResource { + return schema.GroupVersionResource{Group: "eventing.knative.dev", Version: "v1alpha1", Resource: "eventtransforms"} +} + +// WithAnnotations adds annotations to the JobSink. +func WithAnnotations(annotations map[string]interface{}) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if annotations != nil { + cfg["annotations"] = annotations + } + } +} + +// Install will create a resource, augmented with the config fn options. +func Install(name string, opts ...manifest.CfgFn) feature.StepFn { + + return func(ctx context.Context, t feature.T) { + cfg := map[string]interface{}{ + "name": name, + "namespace": environment.FromContext(ctx).Namespace(), + "image": eventshub.ImageFromContext(ctx), + eventshub.ConfigLoggingEnv: knative.LoggingConfigFromContext(ctx), + eventshub.ConfigTracingEnv: knative.TracingConfigFromContext(ctx), + } + for _, fn := range opts { + fn(cfg) + } + + if err := registerImage(ctx); err != nil { + t.Fatal(err) + } + if _, err := manifest.InstallYamlFS(ctx, yamlEmbed, cfg); err != nil { + t.Fatal(err) + } + } +} + +type SpecOption func(spec *eventing.EventTransformSpec) + +func WithSpec(options ...SpecOption) manifest.CfgFn { + spec := eventing.EventTransformSpec{} + for _, opt := range options { + opt(&spec) + } + + specBytes, err := json.Marshal(spec) + if err != nil { + panic(err) + } + + yamlBytes, err := yaml.JSONToYAML(specBytes) + if err != nil { + panic(err) + } + + specYaml := string(yamlBytes) + + lines := strings.Split(specYaml, "\n") + out := make([]string, 0, len(lines)) + for i := range lines { + out = append(out, " "+lines[i]) + } + + return func(m map[string]interface{}) { + m["spec"] = strings.Join(out, "\n") + } +} + +func WithSink(sink *duckv1.Destination) SpecOption { + return func(spec *eventing.EventTransformSpec) { + spec.Sink = sink + } +} + +func WithReplyJsonata(jsonata eventing.JsonataEventTransformationSpec) SpecOption { + return func(spec *eventing.EventTransformSpec) { + if spec.Reply == nil { + spec.Reply = &eventing.ReplySpec{} + } + spec.Reply.Jsonata = &jsonata + } +} + +func WithJsonata(jsonata eventing.JsonataEventTransformationSpec) SpecOption { + return func(spec *eventing.EventTransformSpec) { + spec.Jsonata = &jsonata + } +} + +// IsReady tests to see if a JobSink becomes ready within the time given. +func IsReady(name string, timing ...time.Duration) feature.StepFn { + return k8s.IsReady(GVR(), name, timing...) +} + +// IsNotReady tests to see if a JobSink becomes NotReady within the time given. +func IsNotReady(name string, timing ...time.Duration) feature.StepFn { + return k8s.IsNotReady(GVR(), name, timing...) +} + +// IsAddressable tests to see if a JobSink becomes addressable within the time +// given. +func IsAddressable(name string, timings ...time.Duration) feature.StepFn { + return k8s.IsAddressable(GVR(), name, timings...) +} + +// ValidateAddress validates the address retured by Address +func ValidateAddress(name string, validate addressable.ValidateAddressFn, timings ...time.Duration) feature.StepFn { + return addressable.ValidateAddress(GVR(), name, validate, timings...) +} + +// Address returns a JobSink's address. +func Address(ctx context.Context, name string, timings ...time.Duration) (*duckv1.Addressable, error) { + return addressable.Address(ctx, GVR(), name, timings...) +} + +func AsDestinationRef(name string) *duckv1.Destination { + return &duckv1.Destination{ + Ref: AsKReference(name), + } +} + +// AsKReference returns a KReference for a JobSink without namespace. +func AsKReference(name string) *duckv1.KReference { + return &duckv1.KReference{ + Kind: "EventTransform", + Name: name, + APIVersion: GVR().GroupVersion().String(), + } +} + +func registerImage(ctx context.Context) error { + im := eventshub.ImageFromContext(ctx) + reg := environment.RegisterPackage(im) + _, err := reg(ctx, environment.FromContext(ctx)) + return err +} diff --git a/vendor/knative.dev/eventing/test/rekt/resources/eventtransform/eventtransform.yaml b/vendor/knative.dev/eventing/test/rekt/resources/eventtransform/eventtransform.yaml new file mode 100644 index 0000000000..4e78aecc09 --- /dev/null +++ b/vendor/knative.dev/eventing/test/rekt/resources/eventtransform/eventtransform.yaml @@ -0,0 +1,26 @@ +# Copyright 2024 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: eventing.knative.dev/v1alpha1 +kind: EventTransform +metadata: + name: {{ .name }} + namespace: {{ .namespace }} + {{ if .annotations }} + {{ range $key, $value := .annotations }} + {{ $key }}: "{{ $value }}" + {{ end }} + {{ end }} +spec: +{{ .spec }} diff --git a/vendor/modules.txt b/vendor/modules.txt index 6822eeb84f..557205dda2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1472,6 +1472,7 @@ knative.dev/eventing/test/lib/sender knative.dev/eventing/test/rekt/features knative.dev/eventing/test/rekt/features/broker knative.dev/eventing/test/rekt/features/channel +knative.dev/eventing/test/rekt/features/eventtransform knative.dev/eventing/test/rekt/features/featureflags knative.dev/eventing/test/rekt/features/jobsink knative.dev/eventing/test/rekt/features/knconf @@ -1487,6 +1488,7 @@ knative.dev/eventing/test/rekt/resources/channel_impl knative.dev/eventing/test/rekt/resources/channel_template knative.dev/eventing/test/rekt/resources/containersource knative.dev/eventing/test/rekt/resources/delivery +knative.dev/eventing/test/rekt/resources/eventtransform knative.dev/eventing/test/rekt/resources/eventtype knative.dev/eventing/test/rekt/resources/jobsink knative.dev/eventing/test/rekt/resources/parallel