Skip to content

Commit 14255d0

Browse files
authored
krt: Add JoinWithMergeCollection (#58332)
* Add JoinWithMergeCollection * Check if Equal with values Use values when comparing for equality so either form of Equals can be used. * code review - wait for collections to sync before registering - remove dupe metadata assignment - remove incorrect comments
1 parent d7e6caf commit 14255d0

File tree

3 files changed

+449
-28
lines changed

3 files changed

+449
-28
lines changed

pkg/kube/krt/mergejoin.go

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,24 @@ import (
1818
"fmt"
1919
"sync"
2020

21+
"k8s.io/client-go/tools/cache"
22+
23+
"istio.io/istio/pkg/kube"
2124
"istio.io/istio/pkg/kube/controllers"
2225
istiolog "istio.io/istio/pkg/log"
2326
"istio.io/istio/pkg/maps"
27+
"istio.io/istio/pkg/ptr"
2428
"istio.io/istio/pkg/queue"
29+
"istio.io/istio/pkg/slices"
2530
"istio.io/istio/pkg/util/sets"
2631
)
2732

2833
type collectionLister[T any] interface {
2934
getCollections() []Collection[T]
3035
}
3136

37+
var _ internalCollection[any] = &mergejoin[any]{}
38+
3239
type mergejoin[T any] struct {
3340
id collectionUID
3441
collectionName string
@@ -169,6 +176,42 @@ func (j *mergejoin[T]) GetKey(k string) *T {
169176
return nil
170177
}
171178

179+
func (j *mergejoin[T]) Register(f func(e Event[T])) HandlerRegistration {
180+
return registerHandlerAsBatched(j, f)
181+
}
182+
183+
func (j *mergejoin[T]) RegisterBatch(f func(e []Event[T]), runExistingState bool) HandlerRegistration {
184+
j.mu.Lock()
185+
defer j.mu.Unlock()
186+
if !runExistingState {
187+
// If we don't to run the initial state this is simple, we just register the handler.
188+
return j.eventHandlers.Insert(f, j, nil, j.stop)
189+
}
190+
191+
// We need to run the initial state, but we don't want to get duplicate events.
192+
// We should get "ADD initialObject1, ADD initialObjectN, UPDATE someLaterUpdate" without mixing the initial ADDs
193+
// Create ADDs for the current state of the merge cache
194+
events := make([]Event[T], 0, len(j.outputs))
195+
for _, o := range j.outputs {
196+
events = append(events, Event[T]{
197+
New: &o,
198+
Event: controllers.EventAdd,
199+
})
200+
}
201+
202+
// Send out all the initial objects to the handler. We will then unlock the new events so it gets the future updates.
203+
return j.eventHandlers.Insert(f, j, events, j.stop)
204+
}
205+
206+
// nolint: unused // (not true, its to implement an interface)
207+
func (j *mergejoin[T]) dump() CollectionDump {
208+
return CollectionDump{
209+
Outputs: eraseMap(slices.GroupUnique(j.List(), getTypedKey)),
210+
Synced: j.HasSynced(),
211+
Inputs: nil,
212+
}
213+
}
214+
172215
func (j *mergejoin[T]) onSubCollectionEventHandler(o []Event[T]) {
173216
var events []Event[T]
174217
j.mu.Lock()
@@ -205,7 +248,7 @@ func (j *mergejoin[T]) onSubCollectionEventHandler(o []Event[T]) {
205248
oldObj := ev.Old
206249
if oldObj != nil && newObj != nil {
207250
// Update event
208-
if Equal(ev.New, ev.Old) {
251+
if Equal(*ev.New, *ev.Old) {
209252
// NOP change, skip
210253
continue
211254
}
@@ -313,3 +356,80 @@ func (j *mergejoin[T]) updateIndexLocked(e Event[T], key Key[T]) {
313356
}
314357
}
315358
}
359+
360+
func (j *mergejoin[T]) runQueue() {
361+
// Wait until all underlying collections are synced before registering
362+
syncers := slices.Map(j.collections.getCollections(), func(c Collection[T]) cache.InformerSynced {
363+
return c.HasSynced
364+
})
365+
if !kube.WaitForCacheSync(j.collectionName, j.stop, syncers...) {
366+
return
367+
}
368+
369+
// Register with the list of collections
370+
regs := []HandlerRegistration{}
371+
for _, c := range j.collections.getCollections() {
372+
regs = append(regs, c.RegisterBatch(func(events []Event[T]) {
373+
j.queue.Push(func() error {
374+
j.onSubCollectionEventHandler(events)
375+
return nil
376+
})
377+
}, true))
378+
}
379+
380+
syncers = slices.Map(regs, func(r HandlerRegistration) cache.InformerSynced {
381+
return r.HasSynced
382+
})
383+
if !kube.WaitForCacheSync(j.collectionName, j.stop, syncers...) {
384+
return
385+
}
386+
387+
j.queue.Run(j.stop)
388+
}
389+
390+
type collections[T any] []Collection[T]
391+
392+
// nolint: unused // (not true, its to implement an interface)
393+
func (jc collections[T]) getCollections() []Collection[T] {
394+
return jc
395+
}
396+
397+
func JoinWithMergeCollection[T any](cs []Collection[T], merge func(ts []T) *T, opts ...CollectionOption) Collection[T] {
398+
o := buildCollectionOptions(opts...)
399+
if o.name == "" {
400+
o.name = fmt.Sprintf("JoinWithMerge[%v]", ptr.TypeName[T]())
401+
}
402+
403+
synced := make(chan struct{})
404+
405+
j := &mergejoin[T]{
406+
collectionName: o.name,
407+
collections: collections[T](cs),
408+
id: nextUID(),
409+
log: log.WithLabels("owner", o.name),
410+
outputs: make(map[Key[T]]T),
411+
indexes: make(map[string]joinCollectionIndex[T]),
412+
eventHandlers: newHandlerSet[T](),
413+
metadata: o.metadata,
414+
merge: merge,
415+
synced: synced,
416+
stop: o.stop,
417+
syncer: channelSyncer{
418+
name: o.name,
419+
synced: synced,
420+
},
421+
}
422+
423+
maybeRegisterCollectionForDebugging(j, o.debugger)
424+
425+
// Create our queue. When it syncs (that is, all items that were present when Run() was called), we mark ourselves as synced.
426+
j.queue = queue.NewWithSync(func() {
427+
close(j.synced)
428+
j.log.Infof("%v synced (uid %v)", j.name(), j.uid())
429+
}, j.collectionName)
430+
431+
// The queue will process the initial state and mark ourselves as synced (from the NewWithSync callback)
432+
go j.runQueue()
433+
434+
return j
435+
}

0 commit comments

Comments
 (0)