@@ -53,7 +53,8 @@ func BuildEventMesh(ctx context.Context, clientset versioned.Interface, dynamicC
5353 // map key: "<namespace>/<name>"
5454 brokerMap := make (map [string ]* Broker )
5555 for _ , cbr := range convertedBrokers {
56- brokerMap [cbr .GetNamespacedName ()] = cbr
56+ key := util .GKNamespacedName ("eventing.knative.dev" , "Broker" , cbr .Namespace , cbr .Name )
57+ brokerMap [key ] = cbr
5758 }
5859
5960 subscribableMap := make (map [string ]* Subscribable )
@@ -69,31 +70,35 @@ func BuildEventMesh(ctx context.Context, clientset versioned.Interface, dynamicC
6970 return EventMesh {}, err
7071 }
7172
72- // register the event types in the brokers
73+ // register the event types in the brokers and channels
7374 for _ , et := range convertedEventTypes {
7475 if et .Reference != nil {
7576 if br , ok := brokerMap [* et .Reference ]; ok {
7677 br .ProvidedEventTypes = append (br .ProvidedEventTypes , et .NamespacedName ())
78+ } else if subscribable , ok := subscribableMap [* et .Reference ]; ok {
79+ subscribable .ProvidedEventTypes = append (subscribable .ProvidedEventTypes , et .NamespacedName ())
80+ } else {
81+ logger .Infow ("Event type reference not found" , "eventType" , et .NamespacedName (), "reference" , * et .Reference )
7782 }
7883 }
7984 }
8085
81- // fetch the triggers we will process them later
82- triggers , err := clientset .EventingV1 ().Triggers (metav1 .NamespaceAll ).List (context .Background (), metav1.ListOptions {})
83- if err != nil {
84- logger .Errorw ("Error listing triggers" , "error" , err )
85- return EventMesh {}, err
86- }
87-
8886 // build a map for easier access to the ETs by their namespaced name.
8987 // we need this map when processing the triggers to find out ET definitions for the ET references
90- // brokers provide.
88+ // brokers and channels provide.
9189 // map key: "<namespace>/<eventType.name>"
9290 etByNamespacedName := make (map [string ]* EventType )
9391 for _ , et := range convertedEventTypes {
9492 etByNamespacedName [et .NamespacedName ()] = et
9593 }
9694
95+ // fetch the triggers we will process them later
96+ triggers , err := clientset .EventingV1 ().Triggers (metav1 .NamespaceAll ).List (context .Background (), metav1.ListOptions {})
97+ if err != nil {
98+ logger .Errorw ("Error listing triggers" , "error" , err )
99+ return EventMesh {}, err
100+ }
101+
97102 for _ , trigger := range triggers .Items {
98103 err := processTrigger (ctx , & trigger , brokerMap , etByNamespacedName , dynamicClient , logger )
99104 if err != nil {
@@ -110,7 +115,7 @@ func BuildEventMesh(ctx context.Context, clientset versioned.Interface, dynamicC
110115 }
111116
112117 for _ , subscription := range subscriptions .Items {
113- err := processSubscription (ctx , & subscription , subscribableMap , dynamicClient , logger )
118+ err := processSubscription (ctx , & subscription , subscribableMap , etByNamespacedName , dynamicClient , logger )
114119 if err != nil {
115120 logger .Errorw ("Error processing subscription" , "error" , err )
116121 // do not stop the Backstage plugin from rendering the rest of the data, e.g. because
@@ -166,7 +171,7 @@ func processTrigger(ctx context.Context, trigger *eventingv1.Trigger, brokerMap
166171 logger .Errorw ("Trigger has no broker" , "namespace" , trigger .Namespace , "trigger" , trigger .Name )
167172 return nil
168173 }
169- brokerRef := util .NamespacedName ( trigger .Namespace , trigger .Spec .Broker )
174+ brokerRef := util .GKNamespacedName ( "eventing.knative.dev" , "Broker" , trigger .Namespace , trigger .Spec .Broker )
170175 if _ , ok := brokerMap [brokerRef ]; ! ok {
171176 logger .Infow ("Broker not found" , "namespace" , trigger .Namespace , "trigger" , trigger .Name , "broker" , trigger .Spec .Broker )
172177 return nil
@@ -182,7 +187,7 @@ func processTrigger(ctx context.Context, trigger *eventingv1.Trigger, brokerMap
182187 return nil
183188}
184189
185- func processSubscription (ctx context.Context , subscription * v1.Subscription , subscribableMap map [string ]* Subscribable , dynamicClient dynamic.Interface , logger * zap.SugaredLogger ) error {
190+ func processSubscription (ctx context.Context , subscription * v1.Subscription , subscribableMap map [string ]* Subscribable , etByNamespacedName map [ string ] * EventType , dynamicClient dynamic.Interface , logger * zap.SugaredLogger ) error {
186191 // if the subscription has no subscriber, we can skip it, there's no relation to show on Backstage side
187192 if subscription .Spec .Subscriber .Ref == nil {
188193 logger .Debugw ("Subscription has no subscriber ref; cannot process this subscription" , "namespace" , subscription .Namespace , "subscription" , subscription .Name )
@@ -209,6 +214,16 @@ func processSubscription(ctx context.Context, subscription *v1.Subscription, sub
209214 return nil
210215 }
211216
217+ eventTypes := subscribableMap [channelRef ].ProvidedEventTypes
218+ logger .Infow ("Collected provided event types" , "namespace" , subscription .Namespace , "subscription" , subscription .Name , "channel" , channel .Name , "eventTypes" , eventTypes )
219+
220+ for _ , eventType := range eventTypes {
221+ key := util .NamespacedName (subscription .Namespace , eventType )
222+ if et , ok := etByNamespacedName [key ]; ok {
223+ et .ConsumedBy = append (et .ConsumedBy , subscriberBackstageId )
224+ }
225+ }
226+
212227 return nil
213228}
214229
0 commit comments