@@ -21,6 +21,7 @@ package e2e
2121
2222import (
2323 "context"
24+ "encoding/json"
2425 "fmt"
2526 "strings"
2627 "testing"
@@ -31,16 +32,20 @@ import (
3132 batchv1 "k8s.io/api/batch/v1"
3233 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3334 "k8s.io/apimachinery/pkg/runtime"
35+ "k8s.io/apimachinery/pkg/runtime/schema"
3436 "k8s.io/apimachinery/pkg/types"
3537 "k8s.io/apimachinery/pkg/util/wait"
3638 "k8s.io/apiserver/pkg/storage/names"
3739 "k8s.io/client-go/dynamic"
3840 "k8s.io/utils/pointer"
39- sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
40- "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
41+ eventing "knative.dev/eventing/pkg/apis/eventing/v1"
4142 messaging "knative.dev/eventing/pkg/apis/messaging/v1"
4243 eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"
4344 testlib "knative.dev/eventing/test/lib"
45+ "knative.dev/pkg/ptr"
46+
47+ sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
48+ "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
4449
4550 pkgtest "knative.dev/eventing-kafka-broker/test/pkg"
4651 kafkatest "knative.dev/eventing-kafka-broker/test/pkg/kafka"
@@ -64,6 +69,8 @@ type SacuraTestConfig struct {
6469 // Namespace is the test namespace.
6570 Namespace string
6671
72+ ConsumerResourceGVR schema.GroupVersionResource
73+
6774 // BrokerTopic is the expected Broker topic.
6875 // It's used to verify the committed offset.
6976 BrokerTopic * string
@@ -79,14 +86,16 @@ type SacuraTestConfig struct {
7986
8087func TestSacuraSinkSourceJob (t * testing.T ) {
8188 runSacuraTest (t , SacuraTestConfig {
82- Namespace : "sacura-sink-source" ,
83- SourceTopic : pointer .String ("sacura-sink-source-topic" ),
89+ Namespace : "sacura-sink-source" ,
90+ ConsumerResourceGVR : sources .SchemeGroupVersion .WithResource ("kafkasources" ),
91+ SourceTopic : pointer .String ("sacura-sink-source-topic" ),
8492 })
8593}
8694
8795func TestSacuraBrokerJob (t * testing.T ) {
8896 runSacuraTest (t , SacuraTestConfig {
8997 Namespace : "sacura" ,
98+ ConsumerResourceGVR : eventing .SchemeGroupVersion .WithResource ("triggers" ),
9099 BrokerTopic : pointer .String ("knative-broker-sacura-sink-source-broker" ),
91100 })
92101}
@@ -98,6 +107,21 @@ func runSacuraTest(t *testing.T, config SacuraTestConfig) {
98107
99108 ctx := context .Background ()
100109
110+ w , err := c .Dynamic .Resource (config .ConsumerResourceGVR ).
111+ Namespace (config .Namespace ).
112+ Watch (ctx , metav1.ListOptions {Watch : true , SendInitialEvents : ptr .Bool (true )})
113+ if err != nil {
114+ t .Fatal (err )
115+ }
116+ defer w .Stop ()
117+
118+ go func () {
119+ for e := range w .ResultChan () {
120+ bytes , _ := json .MarshalIndent (e , "" , " " )
121+ t .Logf ("Consumer resource %q changed:\n %s\n \n " , config .ConsumerResourceGVR .String (), string (bytes ))
122+ }
123+ }()
124+
101125 jobPollError := wait .Poll (pollInterval , pollTimeout , func () (done bool , err error ) {
102126 job , err := c .Kube .BatchV1 ().Jobs (config .Namespace ).Get (ctx , app , metav1.GetOptions {})
103127 assert .Nil (t , err )
0 commit comments