Skip to content

Commit f5f608c

Browse files
authored
Merge pull request #856 from n3wscott/gate-until
Adding Gate and a CustomResourceGate reconciler
2 parents 3029c4a + 990e45c commit f5f608c

File tree

7 files changed

+1111
-0
lines changed

7 files changed

+1111
-0
lines changed

pkg/controller/gate.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package controller
2+
3+
import (
4+
"k8s.io/apimachinery/pkg/runtime/schema"
5+
)
6+
7+
// A Gate is an interface to allow reconcilers to delay a callback until a set of GVKs are set to true inside the gate.
8+
type Gate interface {
9+
// Register to call a callback function when all given GVKs are marked true. If the callback is unblocked, the
10+
// registration is removed.
11+
Register(callback func(), gvks ...schema.GroupVersionKind)
12+
// Set marks the associated condition to the given value. If the condition is already set as
13+
// that value, then this is a no-op. Returns true if there was an update detected.
14+
Set(gvk schema.GroupVersionKind, ready bool) bool
15+
}

pkg/controller/options.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ type Options struct {
6969

7070
// ChangeLogOptions for recording change logs.
7171
ChangeLogOptions *ChangeLogOptions
72+
73+
// Gate implements a gated function callback pattern.
74+
Gate Gate
7275
}
7376

7477
// ForControllerRuntime extracts options for controller-runtime.

pkg/gate/gate.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
Copyright 2025 The Crossplane Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package gate contains a gated function callback registration implementation.
18+
package gate
19+
20+
import (
21+
"slices"
22+
"sync"
23+
)
24+
25+
// Gate implements a gated function callback registration with comparable conditions.
26+
type Gate[T comparable] struct {
27+
mux sync.RWMutex
28+
satisfied map[T]bool
29+
fns []gated[T]
30+
}
31+
32+
// gated is an internal tracking resource.
33+
type gated[T comparable] struct {
34+
// fn is the function callback we will invoke when all the dependent conditions are true.
35+
fn func()
36+
// depends is the list of conditions this gated function is waiting on. This is an AND.
37+
depends []T
38+
// released means the gated function has been invoked and we can garbage collect this gated function.
39+
released bool
40+
}
41+
42+
// Register a callback function that will be called when all the provided dependent conditions are true.
43+
// After all conditions are true, the callback function is removed from the registration and will not be called again.
44+
// Thread Safe.
45+
func (g *Gate[T]) Register(fn func(), depends ...T) {
46+
g.mux.Lock()
47+
g.fns = append(g.fns, gated[T]{fn: fn, depends: depends})
48+
g.mux.Unlock()
49+
50+
g.process()
51+
}
52+
53+
// Set marks the associated condition to the given value. If the condition is already set as that value, then this is a
54+
// no-op. Returns true if there was an update detected. Thread safe.
55+
func (g *Gate[T]) Set(condition T, value bool) bool {
56+
g.mux.Lock()
57+
58+
if g.satisfied == nil {
59+
g.satisfied = make(map[T]bool)
60+
}
61+
62+
old, found := g.satisfied[condition]
63+
64+
updated := false
65+
if !found || old != value {
66+
updated = true
67+
g.satisfied[condition] = value
68+
}
69+
// process() would also like to lock the mux, so we must unlock here directly and not use defer.
70+
g.mux.Unlock()
71+
72+
if updated {
73+
g.process()
74+
}
75+
76+
return updated
77+
}
78+
79+
func (g *Gate[T]) process() {
80+
g.mux.Lock()
81+
defer g.mux.Unlock()
82+
83+
for i := range g.fns {
84+
// release controls if we should release the function.
85+
release := true
86+
87+
for _, dep := range g.fns[i].depends {
88+
if !g.satisfied[dep] {
89+
release = false
90+
}
91+
}
92+
93+
if release {
94+
fn := g.fns[i].fn
95+
// mark the function released so we can garbage collect after we are done with the loop.
96+
g.fns[i].released = true
97+
// Need to capture a copy of fn or else we would be accessing a deleted member when the go routine runs.
98+
go fn()
99+
}
100+
}
101+
102+
// garbage collect released functions.
103+
g.fns = slices.DeleteFunc(g.fns, func(a gated[T]) bool {
104+
return a.released
105+
})
106+
}

0 commit comments

Comments
 (0)