77 "sync/atomic"
88 "time"
99
10- "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/cryptorand"
1110 "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"
1211
1312 ipamclaimslister "github.com/k8snetworkplumbingwg/ipamclaims/pkg/crd/ipamclaims/v1alpha1/apis/listers/ipamclaims/v1alpha1"
@@ -26,6 +25,7 @@ import (
2625
2726 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2827 ktypes "k8s.io/apimachinery/pkg/types"
28+ "k8s.io/apimachinery/pkg/util/rand"
2929 listers "k8s.io/client-go/listers/core/v1"
3030 discoverylisters "k8s.io/client-go/listers/discovery/v1"
3131 netlisters "k8s.io/client-go/listers/networking/v1"
@@ -107,20 +107,22 @@ type queueMapEntry struct {
107107
108108type internalInformer struct {
109109 sync.RWMutex
110- oType reflect.Type
110+ oType reflect.Type
111+ // keyed by priority - used to track the handler's priority of being invoked.
112+ // example: a handler with priority 0 will process the received event first
113+ // before a handler with priority 1, 0 being the highest priority.
114+ // NOTE: we can have multiple handlers with the same priority hence the value
115+ // is a map of handlers keyed by its unique id.
111116 handlers map [int ]map [uint64 ]* Handler
112117 // queueMap handles distributing events across a queued handler's queues
113118 queueMap * queueMap
119+ // hasHandlers is an atomic used to determine if this internal informer actually has handlers attached to it or not
120+ hasHandlers uint32
114121}
115122
116123type informer struct {
117- oType reflect.Type
118- inf cache.SharedIndexInformer
119- // keyed by priority - used to track the handler's priority of being invoked.
120- // example: a handler with priority 0 will process the received event first
121- // before a handler with priority 1, 0 being the higest priority.
122- // NOTE: we can have multiple handlers with the same priority hence the value
123- // is a map of handlers keyed by its unique id.
124+ oType reflect.Type
125+ inf cache.SharedIndexInformer
124126 lister listerInterface
125127 // initialAddFunc will be called to deliver the initial list of objects
126128 // when a handler is added
@@ -133,7 +135,7 @@ type informer struct {
133135func (inf * internalInformer ) forEachQueuedHandler (f func (h * Handler )) {
134136 inf .RLock ()
135137 defer inf .RUnlock ()
136- for priority := 0 ; priority <= minHandlerPriority ; priority ++ { // loop over priority higest to lowest
138+ for priority := 0 ; priority <= minHandlerPriority ; priority ++ { // loop over priority highest to lowest
137139 for _ , handler := range inf .handlers [priority ] {
138140 f (handler )
139141 }
@@ -192,19 +194,28 @@ func (i *informer) removeHandler(handler *Handler) {
192194
193195 intInf .Lock ()
194196 defer intInf .Unlock ()
195- removed := 0
197+ removed := false
198+ // track overall how many handlers this internal informer has
199+ numHandlers := 0
196200 for priority := range intInf .handlers { // loop over priority
197201 if _ , ok := intInf .handlers [priority ]; ! ok {
198202 continue // protection against nil map as value
199203 }
200204 if _ , ok := intInf.handlers [priority ][handler.id ]; ok {
201205 // Remove the handler
202206 delete (intInf .handlers [priority ], handler .id )
203- removed = 1
207+ removed = true
204208 klog .V (5 ).Infof ("Removed %v event handler %d" , i .oType , handler .id )
205209 }
210+ numHandlers += len (intInf .handlers [priority ])
206211 }
207- if removed == 0 {
212+
213+ // if this internal informer has no handlers, update the atomic
214+ if numHandlers == 0 {
215+ atomic .StoreUint32 (& intInf .hasHandlers , hasNoHandler )
216+ }
217+
218+ if ! removed {
208219 klog .Warningf ("Tried to remove unknown object type %v event handler %d" , i .oType , handler .id )
209220 }
210221 }()
@@ -260,7 +271,7 @@ func (qm *queueMap) getNewQueueNum() uint32 {
260271 if numEventQueues == 1 {
261272 return 0
262273 }
263- startIdx = uint32 (cryptorand .Intn (int64 (numEventQueues - 1 )))
274+ startIdx = uint32 (rand .Intn (int (numEventQueues - 1 )))
264275 queueIdx = startIdx
265276 lowestNum := len (qm .queues [startIdx ])
266277 for j = 0 ; j < numEventQueues ; j ++ {
@@ -381,6 +392,10 @@ func (i *informer) newFederatedQueuedHandler(internalInformerIndex int) cache.Re
381392 intInf := i .internalInformers [internalInformerIndex ]
382393 return cache.ResourceEventHandlerFuncs {
383394 AddFunc : func (obj interface {}) {
395+ // do not enqueue events to internal informer that has no handlers for better performance
396+ if atomic .LoadUint32 (& intInf .hasHandlers ) == hasNoHandler {
397+ return
398+ }
384399 intInf .queueMap .enqueueEvent (nil , obj , i .oType , false , func (e * event ) {
385400 metrics .MetricResourceUpdateCount .WithLabelValues (name , "add" ).Inc ()
386401 start := time .Now ()
@@ -391,6 +406,10 @@ func (i *informer) newFederatedQueuedHandler(internalInformerIndex int) cache.Re
391406 })
392407 },
393408 UpdateFunc : func (oldObj , newObj interface {}) {
409+ // do not enqueue events to internal informer that has no handlers for better performance
410+ if atomic .LoadUint32 (& intInf .hasHandlers ) == hasNoHandler {
411+ return
412+ }
394413 intInf .queueMap .enqueueEvent (oldObj , newObj , i .oType , false , func (e * event ) {
395414 metrics .MetricResourceUpdateCount .WithLabelValues (name , "update" ).Inc ()
396415 start := time .Now ()
@@ -415,6 +434,10 @@ func (i *informer) newFederatedQueuedHandler(internalInformerIndex int) cache.Re
415434 klog .Errorf (err .Error ())
416435 return
417436 }
437+ // do not enqueue events to internal informer that has no handlers for better performance
438+ if atomic .LoadUint32 (& intInf .hasHandlers ) == hasNoHandler {
439+ return
440+ }
418441 intInf .queueMap .enqueueEvent (nil , realObj , i .oType , true , func (e * event ) {
419442 metrics .MetricResourceUpdateCount .WithLabelValues (name , "delete" ).Inc ()
420443 start := time .Now ()
0 commit comments