Skip to content

Commit 1e79df1

Browse files
authored
Merge branch 'main' into release-0-7-0
2 parents 8898b43 + 524567b commit 1e79df1

32 files changed

+2520
-15
lines changed

README.md

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,47 @@ The library provides a wrapper around `logr.Logger`, exposing additional helper
270270
- There are several `FromContext...` functions for retrieving a logger from a `context.Context` object.
271271
- `InitFlags(...)` can be used to add the configuration flags for this logger to a cobra `FlagSet`.
272272

273+
### threads
274+
275+
The `threads` package provides a simple thread managing library. It can be used to run go routines in a non-blocking manner and provides the possibility to react if the routine has exited.
276+
277+
The most relevant use-case for this library in the context of k8s controllers is to handle dynamic watches on multiple clusters. To start a watch, that cluster's cache's `Start` method has to be used. Because this method is blocking, it has to be executed in a different go routine, and because it can return an error, a simple `go cache.Start(...)` is not enough, because it would hide the error.
278+
279+
#### Noteworthy Functions
280+
281+
- `NewThreadManager` creates a new thread manager.
282+
- The first argument is a `context.Context` used by the manager itself. Cancelling this context will stop the manager, and if the context contains a `logging.Logger`, the manager will use it for logging.
283+
- The second argument is an optional function that is executed after any go routine executed with this manager has finished. It is also possible to provide such a function for a specific go routine, instead for all of them, see below.
284+
- Use the `Run` method to start a new go routine.
285+
- Starting a go routine cancels the context of any running go routine with the same id.
286+
- This method also takes an optional function to be executed after the actual workload is done.
287+
- A on-finish function specified here is executed before the on-finish function of the manager is executed.
288+
- Note that go routines will wait for the thread manager to be started, if that has not yet happened. If the manager has been started, they will be executed immediately.
289+
- The thread manager will cancel the context that is passed into the workload function when the manager is being stopped. If any long-running commands are being run as part of the workload, it is strongly recommended to listen to the context's `Done` channel.
290+
- Use `Start()` to start the thread manager.
291+
- If any go routines have been added before this is called, they will be started now. New go routines added afterwards will be started immediately.
292+
- Calling this multiple times doesn't have any effect, unless the manager has already been stopped, in which case `Start()` will panic.
293+
- There are three ways to stop the thread manager again:
294+
- Use its `Stop()` method.
295+
- This is a blocking method that waits for all remaining go routines to finish. Their context is cancelled to notify them of the manager being stopped.
296+
- Cancel the context that was passed into `NewThreadManager` as the first argument.
297+
- Send a `SIGTERM` or `SIGINT` signal to the process.
298+
- The `TaskManager`'s `Restart`, `RestartOnError`, and `RestartOnSuccess` methods are pre-defined on-finish functions. They are not meant to be used directly, but instead be used as an argument to `Run`. See the example below.
299+
300+
#### Examples
301+
302+
```golang
303+
mgr := threads.NewThreadManager(ctx, nil)
304+
mgr.Start()
305+
// do other stuff
306+
// start a go routine that is restarted automatically if it finishes with an error
307+
mgr.Run(myCtx, "myTask", func(ctx context.Context) error {
308+
// my task coding
309+
}, mgr.RestartOnError)
310+
// do more other stuff
311+
mgr.Stop()
312+
```
313+
273314
### testing
274315

275316
This package contains useful functionality to aid with writing tests.
@@ -298,6 +339,108 @@ env := testing.NewEnvironmentBuilder().
298339
env.ShouldReconcile(testing.RequestFromStrings("testresource"))
299340
```
300341

342+
### Readiness Checks
343+
344+
The `pkg/readiness` package provides a simple way to check if a kubernetes resource is ready.
345+
The meaning of readiness depends on the resource type.
346+
347+
#### Example
348+
349+
```go
350+
deployment := &appsv1.Deployment{}
351+
err := r.Client.Get(ctx, types.NamespacedName{
352+
Name: "my-deployment",
353+
Namespace: "my-namespace",
354+
}, deployment)
355+
356+
if err != nil {
357+
return err
358+
}
359+
360+
readiness := readiness.CheckDeployment(deployment)
361+
362+
if readiness.IsReady() {
363+
fmt.Println("Deployment is ready")
364+
} else {
365+
fmt.Printf("Deployment is not ready: %s\n", readiness.Message())
366+
}
367+
```
368+
369+
### Kubernetes resource management
370+
371+
The `pkg/resource` package contains some useful functions for working with Kubernetes resources. The `Mutator` interface can be used to modify resources in a generic way. It is used by the `Mutate` function, which takes a resource and a mutator and applies the mutator to the resource.
372+
The package also contains convenience types for the most common resource types, e.g. `ConfigMap`, `Secret`, `ClusterRole`, `ClusterRoleBinding`, etc. These types implement the `Mutator` interface and can be used to modify the corresponding resources.
373+
374+
#### Examples
375+
376+
Create or update a `ConfigMap`, a `ServiceAccount` and a `Deployment` using the `Mutator` interface:
377+
378+
```go
379+
type myDeploymentMutator struct {
380+
}
381+
382+
var _ resource.Mutator[*appsv1.Deployment] = &myDeploymentMutator{}
383+
384+
func newDeploymentMutator() resources.Mutator[*appsv1.Deployment] {
385+
return &MyDeploymentMutator{}
386+
}
387+
388+
func (m *MyDeploymentMutator) String() string {
389+
return "deployment default/test"
390+
}
391+
392+
func (m *MyDeploymentMutator) Empty() *appsv1.Deployment {
393+
return &appsv1.Deployment{
394+
ObjectMeta: metav1.ObjectMeta{
395+
Name: "test",
396+
Namespace: "default",
397+
},
398+
}
399+
}
400+
401+
func (m *MyDeploymentMutator) Mutate(deployment *appsv1.Deployment) error {
402+
// create one container with an image
403+
deployment.Spec.Template.Spec.Containers = []corev1.Container{
404+
{
405+
Name: "test",
406+
Image: "test-image:latest",
407+
},
408+
}
409+
return nil
410+
}
411+
412+
413+
func ReconcileResources(ctx context.Context, client client.Client) error {
414+
configMapResource := resource.NewConfigMap("my-configmap", "my-namespace", map[string]string{)
415+
"label1": "value1",
416+
"label2": "value2",
417+
}, nil)
418+
419+
serviceAccountResource := resource.NewServiceAccount("my-serviceaccount", "my-namespace", nil, nil)
420+
421+
myDeploymentMutator := newDeploymentMutator()
422+
423+
var err error
424+
425+
err = resources.CreateOrUpdateResource(ctx, client, configMapResource)
426+
if err != nil {
427+
return err
428+
}
429+
430+
resources.CreateOrUpdateResource(ctx, client, serviceAccountResource)
431+
if err != nil {
432+
return err
433+
}
434+
435+
err = resources.CreateOrUpdateResource(ctx, client, myDeploymentMutator)
436+
if err != nil {
437+
return err
438+
}
439+
440+
return nil
441+
}
442+
```
443+
301444
## Support, Feedback, Contributing
302445

303446
This project is open to feature requests/suggestions, bug reports etc. via [GitHub issues](https://github.com/openmcp-project/controller-utils/issues). Contribution and feedback are encouraged and always welcome. For more information about how to contribute, the project structure, as well as additional contribution information, see our [Contribution Guidelines](CONTRIBUTING.md).

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
k8s.io/client-go v0.33.0
2020
k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979
2121
sigs.k8s.io/controller-runtime v0.20.4
22+
sigs.k8s.io/gateway-api v1.3.0
2223
sigs.k8s.io/yaml v1.4.0
2324
)
2425

@@ -66,13 +67,13 @@ require (
6667
golang.org/x/time v0.10.0 // indirect
6768
golang.org/x/tools v0.33.0 // indirect
6869
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
69-
google.golang.org/protobuf v1.36.5 // indirect
70+
google.golang.org/protobuf v1.36.6 // indirect
7071
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
7172
gopkg.in/inf.v0 v0.9.1 // indirect
7273
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
7374
k8s.io/klog/v2 v2.130.1 // indirect
7475
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect
7576
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect
7677
sigs.k8s.io/randfill v1.0.0 // indirect
77-
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect
78+
sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect
7879
)

go.sum

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,8 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ
193193
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
194194
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
195195
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
196-
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
197-
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
196+
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
197+
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
198198
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
199199
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
200200
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
@@ -226,12 +226,14 @@ k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 h1:jgJW5IePPXLGB8e/1wvd0Ich9QE97
226226
k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
227227
sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU=
228228
sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY=
229+
sigs.k8s.io/gateway-api v1.3.0 h1:q6okN+/UKDATola4JY7zXzx40WO4VISk7i9DIfOvr9M=
230+
sigs.k8s.io/gateway-api v1.3.0/go.mod h1:d8NV8nJbaRbEKem+5IuxkL8gJGOZ+FJ+NvOIltV8gDk=
229231
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE=
230232
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
231233
sigs.k8s.io/randfill v0.0.0-20250304075658-069ef1bbf016/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY=
232234
sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU=
233235
sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY=
234-
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 h1:IUA9nvMmnKWcj5jl84xn+T5MnlZKThmUW1TdblaLVAc=
235-
sigs.k8s.io/structured-merge-diff/v4 v4.6.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps=
236+
sigs.k8s.io/structured-merge-diff/v4 v4.7.0 h1:qPeWmscJcXP0snki5IYF79Z8xrl8ETFxgMd7wez1XkI=
237+
sigs.k8s.io/structured-merge-diff/v4 v4.7.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps=
236238
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
237239
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=

pkg/clusters/cluster.go

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ type Cluster struct {
2323
client client.Client
2424
// cluster
2525
cluster cluster.Cluster
26+
27+
clientOpts *client.Options
28+
clusterOpts []cluster.Option
2629
}
2730

2831
// Initializes a new cluster.
@@ -56,6 +59,36 @@ func (c *Cluster) RegisterConfigPathFlag(flags *flag.FlagSet) {
5659
flags.StringVar(&c.cfgPath, fmt.Sprintf("%s-cluster", c.id), "", fmt.Sprintf("Path to the %s cluster kubeconfig file or directory containing either a kubeconfig or host, token, and ca file. Leave empty to use in-cluster config.", c.id))
5760
}
5861

62+
// WithClientOptions allows to overwrite the default client options.
63+
// It must be called before InitializeClient().
64+
// Note that using this method disables the the scheme injection during client initialization.
65+
// This means that the required scheme should already be set in the options that are passed into this method.
66+
// Returns the cluster for chaining.
67+
func (c *Cluster) WithClientOptions(opts client.Options) *Cluster {
68+
c.clientOpts = &opts
69+
return c
70+
}
71+
72+
// WithClusterOptions allows to overwrite the default cluster options.
73+
// It must be called before InitializeClient().
74+
// Note that using this method disables the the scheme injection during client initialization.
75+
// This means that the required scheme should be set by the cluster options that are passed into this method.
76+
// The DefaultClusterOptions function can be passed in as a cluster option to set the scheme.
77+
// Returns the cluster for chaining.
78+
func (c *Cluster) WithClusterOptions(opts ...cluster.Option) *Cluster {
79+
c.clusterOpts = opts
80+
return c
81+
}
82+
83+
// DefaultClusterOptions returns the default cluster options.
84+
// This is useful when one wants to add custom cluster options without overwriting the default ones via WithClusterOptions().
85+
func DefaultClusterOptions(scheme *runtime.Scheme) cluster.Option {
86+
return func(o *cluster.Options) {
87+
o.Scheme = scheme
88+
o.Cache.Scheme = scheme
89+
}
90+
}
91+
5992
///////////////////
6093
// STATUS CHECKS //
6194
///////////////////
@@ -92,15 +125,11 @@ func (c *Cluster) InitializeID(id string) {
92125
}
93126

94127
// InitializeRESTConfig loads the cluster's REST config.
95-
// If the config has already been loaded, this is a no-op.
96128
// Panics if the cluster's id is not set (InitializeID must be called first).
97129
func (c *Cluster) InitializeRESTConfig() error {
98130
if !c.HasID() {
99131
panic("cluster id must be set before loading the config")
100132
}
101-
if c.HasRESTConfig() {
102-
return nil
103-
}
104133
cfg, err := controller.LoadKubeconfig(c.cfgPath)
105134
if err != nil {
106135
return fmt.Errorf("failed to load '%s' cluster kubeconfig: %w", c.ID(), err)
@@ -111,20 +140,29 @@ func (c *Cluster) InitializeRESTConfig() error {
111140

112141
// InitializeClient creates a new client for the cluster.
113142
// This also initializes the cluster's controller-runtime 'Cluster' representation.
114-
// If the client has already been initialized, this is a no-op.
115143
// Panics if the cluster's REST config has not been loaded (InitializeRESTConfig must be called first).
116144
func (c *Cluster) InitializeClient(scheme *runtime.Scheme) error {
117145
if !c.HasRESTConfig() {
118146
panic("cluster REST config must be set before creating the client")
119147
}
120-
if c.HasClient() {
121-
return nil
148+
if c.clientOpts == nil {
149+
c.clientOpts = &client.Options{
150+
Scheme: scheme,
151+
}
122152
}
123-
cli, err := client.New(c.restCfg, client.Options{Scheme: scheme})
153+
cli, err := client.New(c.restCfg, *c.clientOpts)
124154
if err != nil {
125155
return fmt.Errorf("failed to create '%s' cluster client: %w", c.ID(), err)
126156
}
127-
clu, err := cluster.New(c.restCfg, func(o *cluster.Options) { o.Scheme = scheme; o.Cache.Scheme = scheme })
157+
if c.clusterOpts == nil {
158+
c.clusterOpts = []cluster.Option{
159+
func(o *cluster.Options) {
160+
o.Scheme = scheme
161+
o.Cache.Scheme = scheme
162+
},
163+
}
164+
}
165+
clu, err := cluster.New(c.restCfg, c.clusterOpts...)
128166
if err != nil {
129167
return fmt.Errorf("failed to create '%s' cluster Cluster representation: %w", c.ID(), err)
130168
}

0 commit comments

Comments
 (0)