@@ -48,6 +48,12 @@ func BuildEventMesh(ctx context.Context, clientset versioned.Interface, dynamicC
4848 return EventMesh {}, err
4949 }
5050
51+ convertedSourceEntries , err := fetchSources (ctx , dynamicClient , logger )
52+ if err != nil {
53+ logger .Errorw ("Error fetching and converting sources" , "error" , err )
54+ return EventMesh {}, err
55+ }
56+
5157 // build a broker map and a subscribable map for easier access.
5258 // we need this map to register the event types in the brokers when we are processing the event types.
5359 // map key: "<namespace>/<name>"
@@ -73,9 +79,9 @@ func BuildEventMesh(ctx context.Context, clientset versioned.Interface, dynamicC
7379 // register the event types in the brokers and channels
7480 for _ , et := range convertedEventTypes {
7581 if et .Reference != nil {
76- if br , ok := brokerMap [* et .Reference ]; ok {
82+ if br , ok := brokerMap [et .Reference . String () ]; ok {
7783 br .ProvidedEventTypes = append (br .ProvidedEventTypes , et .NamespacedName ())
78- } else if subscribable , ok := subscribableMap [* et .Reference ]; ok {
84+ } else if subscribable , ok := subscribableMap [et .Reference . String () ]; ok {
7985 subscribable .ProvidedEventTypes = append (subscribable .ProvidedEventTypes , et .NamespacedName ())
8086 } else {
8187 logger .Infow ("Event type reference not found" , "eventType" , et .NamespacedName (), "reference" , * et .Reference )
@@ -92,6 +98,29 @@ func BuildEventMesh(ctx context.Context, clientset versioned.Interface, dynamicC
9298 etByNamespacedName [et .NamespacedName ()] = et
9399 }
94100
101+ // build a map for easier access to the ETs by their type.
102+ // there can be multiple ETs with the same type but different names.
103+ // we need this map when processing the sources to find out ET definitions for the ET types.
104+ // map key does not have a namespace: "<eventType.type>"
105+ etsByType := make (map [string ][]* EventType )
106+ for _ , et := range convertedEventTypes {
107+ etsByType [et .Type ] = append (etsByType [et .Type ], et )
108+ }
109+
110+ // register the event types in the sources
111+ for _ , source := range convertedSourceEntries {
112+ if source .ProvidedEventTypeTypes == nil {
113+ continue
114+ }
115+ for _ , providedType := range * source .ProvidedEventTypeTypes {
116+ if ets , ok := etsByType [providedType ]; ok {
117+ for _ , et := range ets {
118+ source .ProvidedEventTypes = append (source .ProvidedEventTypes , et .NamespacedName ())
119+ }
120+ }
121+ }
122+ }
123+
95124 // fetch the triggers we will process them later
96125 triggers , err := clientset .EventingV1 ().Triggers (metav1 .NamespaceAll ).List (context .Background (), metav1.ListOptions {})
97126 if err != nil {
@@ -135,11 +164,16 @@ func BuildEventMesh(ctx context.Context, clientset versioned.Interface, dynamicC
135164 for _ , s := range convertedSubscribables {
136165 outputSubscribables = append (outputSubscribables , * s )
137166 }
167+ outputSources := make ([]Source , 0 , len (convertedSourceEntries ))
168+ for _ , s := range convertedSourceEntries {
169+ outputSources = append (outputSources , * s )
170+ }
138171
139172 eventMesh := EventMesh {
140173 EventTypes : outputEventTypes ,
141174 Brokers : outputBrokers ,
142175 Subscribables : outputSubscribables ,
176+ Sources : outputSources ,
143177 }
144178
145179 return eventMesh , nil
@@ -341,6 +375,58 @@ func fetchSubscribables(ctx context.Context, dynamicClient dynamic.Interface, lo
341375 return subscribables , nil
342376}
343377
378+ func fetchSources (ctx context.Context , dynamicClient dynamic.Interface , logger * zap.SugaredLogger ) ([]* Source , error ) {
379+ // first, fetch the source CRDs
380+ sourceCRDs , err := dynamicClient .Resource (
381+ schema.GroupVersionResource {
382+ Group : "apiextensions.k8s.io" ,
383+ Version : "v1" ,
384+ Resource : "customresourcedefinitions" ,
385+ },
386+ ).List (ctx , metav1.ListOptions {LabelSelector : labels.Set {"duck.knative.dev/source" : "true" }.String ()})
387+
388+ if errors .IsNotFound (err ) {
389+ return nil , nil
390+ }
391+
392+ if err != nil {
393+ logger .Errorw ("Error listing source CRDs" , "error" , err )
394+ return nil , err
395+ }
396+
397+ // then, fetch the sources
398+ sources := make ([]* Source , 0 )
399+ for _ , crd := range sourceCRDs .Items {
400+ gvr , err := util .GVRFromUnstructured (& crd )
401+ if err != nil {
402+ logger .Errorw ("Error getting GVR from CRD" , "error" , err )
403+ return nil , err
404+ }
405+
406+ sourceResources , err := dynamicClient .Resource (gvr ).Namespace (metav1 .NamespaceAll ).List (ctx , metav1.ListOptions {})
407+
408+ if errors .IsNotFound (err ) {
409+ continue
410+ }
411+
412+ if err != nil {
413+ logger .Errorw ("Error listing source resources" , "error" , err )
414+ return nil , err
415+ }
416+
417+ for _ , resource := range sourceResources .Items {
418+ sourceEntry , err := convertSource (gvr , crd , & resource )
419+ if err != nil {
420+ logger .Errorw ("Error converting source" , "error" , err )
421+ return nil , err
422+ }
423+ sources = append (sources , & sourceEntry )
424+ }
425+ }
426+
427+ return sources , nil
428+ }
429+
344430// fetchEventTypes fetches the event types and converts them to the representation that's consumed by the Backstage plugin.
345431func fetchEventTypes (clientset versioned.Interface , logger * zap.SugaredLogger ) ([]* EventType , error ) {
346432 eventTypeResponse , err := clientset .EventingV1beta2 ().EventTypes (metav1 .NamespaceAll ).List (context .Background (), metav1.ListOptions {})
0 commit comments