Skip to content

Commit ad1a2e4

Browse files
authored
Merge branch 'main' into feat/resources
2 parents acd1b3f + acdeca4 commit ad1a2e4

File tree

5 files changed

+654
-9
lines changed

5 files changed

+654
-9
lines changed

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,47 @@ The library provides a wrapper around `logr.Logger`, exposing additional helper
270270
- There are several `FromContext...` functions for retrieving a logger from a `context.Context` object.
271271
- `InitFlags(...)` can be used to add the configuration flags for this logger to a cobra `FlagSet`.
272272

273+
### threads
274+
275+
The `threads` package provides a simple thread managing library. It can be used to run go routines in a non-blocking manner and provides the possibility to react if the routine has exited.
276+
277+
The most relevant use-case for this library in the context of k8s controllers is to handle dynamic watches on multiple clusters. To start a watch, that cluster's cache's `Start` method has to be used. Because this method is blocking, it has to be executed in a different go routine, and because it can return an error, a simple `go cache.Start(...)` is not enough, because it would hide the error.
278+
279+
#### Noteworthy Functions
280+
281+
- `NewThreadManager` creates a new thread manager.
282+
- The first argument is a `context.Context` used by the manager itself. Cancelling this context will stop the manager, and if the context contains a `logging.Logger`, the manager will use it for logging.
283+
- The second argument is an optional function that is executed after any go routine executed with this manager has finished. It is also possible to provide such a function for a specific go routine, instead for all of them, see below.
284+
- Use the `Run` method to start a new go routine.
285+
- Starting a go routine cancels the context of any running go routine with the same id.
286+
- This method also takes an optional function to be executed after the actual workload is done.
287+
- A on-finish function specified here is executed before the on-finish function of the manager is executed.
288+
- Note that go routines will wait for the thread manager to be started, if that has not yet happened. If the manager has been started, they will be executed immediately.
289+
- The thread manager will cancel the context that is passed into the workload function when the manager is being stopped. If any long-running commands are being run as part of the workload, it is strongly recommended to listen to the context's `Done` channel.
290+
- Use `Start()` to start the thread manager.
291+
- If any go routines have been added before this is called, they will be started now. New go routines added afterwards will be started immediately.
292+
- Calling this multiple times doesn't have any effect, unless the manager has already been stopped, in which case `Start()` will panic.
293+
- There are three ways to stop the thread manager again:
294+
- Use its `Stop()` method.
295+
- This is a blocking method that waits for all remaining go routines to finish. Their context is cancelled to notify them of the manager being stopped.
296+
- Cancel the context that was passed into `NewThreadManager` as the first argument.
297+
- Send a `SIGTERM` or `SIGINT` signal to the process.
298+
- The `TaskManager`'s `Restart`, `RestartOnError`, and `RestartOnSuccess` methods are pre-defined on-finish functions. They are not meant to be used directly, but instead be used as an argument to `Run`. See the example below.
299+
300+
#### Examples
301+
302+
```golang
303+
mgr := threads.NewThreadManager(ctx, nil)
304+
mgr.Start()
305+
// do other stuff
306+
// start a go routine that is restarted automatically if it finishes with an error
307+
mgr.Run(myCtx, "myTask", func(ctx context.Context) error {
308+
// my task coding
309+
}, mgr.RestartOnError)
310+
// do more other stuff
311+
mgr.Stop()
312+
```
313+
273314
### testing
274315

275316
This package contains useful functionality to aid with writing tests.

pkg/clusters/cluster.go

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ type Cluster struct {
2323
client client.Client
2424
// cluster
2525
cluster cluster.Cluster
26+
27+
clientOpts *client.Options
28+
clusterOpts []cluster.Option
2629
}
2730

2831
// Initializes a new cluster.
@@ -56,6 +59,36 @@ func (c *Cluster) RegisterConfigPathFlag(flags *flag.FlagSet) {
5659
flags.StringVar(&c.cfgPath, fmt.Sprintf("%s-cluster", c.id), "", fmt.Sprintf("Path to the %s cluster kubeconfig file or directory containing either a kubeconfig or host, token, and ca file. Leave empty to use in-cluster config.", c.id))
5760
}
5861

62+
// WithClientOptions allows to overwrite the default client options.
63+
// It must be called before InitializeClient().
64+
// Note that using this method disables the the scheme injection during client initialization.
65+
// This means that the required scheme should already be set in the options that are passed into this method.
66+
// Returns the cluster for chaining.
67+
func (c *Cluster) WithClientOptions(opts client.Options) *Cluster {
68+
c.clientOpts = &opts
69+
return c
70+
}
71+
72+
// WithClusterOptions allows to overwrite the default cluster options.
73+
// It must be called before InitializeClient().
74+
// Note that using this method disables the the scheme injection during client initialization.
75+
// This means that the required scheme should be set by the cluster options that are passed into this method.
76+
// The DefaultClusterOptions function can be passed in as a cluster option to set the scheme.
77+
// Returns the cluster for chaining.
78+
func (c *Cluster) WithClusterOptions(opts ...cluster.Option) *Cluster {
79+
c.clusterOpts = opts
80+
return c
81+
}
82+
83+
// DefaultClusterOptions returns the default cluster options.
84+
// This is useful when one wants to add custom cluster options without overwriting the default ones via WithClusterOptions().
85+
func DefaultClusterOptions(scheme *runtime.Scheme) cluster.Option {
86+
return func(o *cluster.Options) {
87+
o.Scheme = scheme
88+
o.Cache.Scheme = scheme
89+
}
90+
}
91+
5992
///////////////////
6093
// STATUS CHECKS //
6194
///////////////////
@@ -92,15 +125,11 @@ func (c *Cluster) InitializeID(id string) {
92125
}
93126

94127
// InitializeRESTConfig loads the cluster's REST config.
95-
// If the config has already been loaded, this is a no-op.
96128
// Panics if the cluster's id is not set (InitializeID must be called first).
97129
func (c *Cluster) InitializeRESTConfig() error {
98130
if !c.HasID() {
99131
panic("cluster id must be set before loading the config")
100132
}
101-
if c.HasRESTConfig() {
102-
return nil
103-
}
104133
cfg, err := controller.LoadKubeconfig(c.cfgPath)
105134
if err != nil {
106135
return fmt.Errorf("failed to load '%s' cluster kubeconfig: %w", c.ID(), err)
@@ -111,20 +140,29 @@ func (c *Cluster) InitializeRESTConfig() error {
111140

112141
// InitializeClient creates a new client for the cluster.
113142
// This also initializes the cluster's controller-runtime 'Cluster' representation.
114-
// If the client has already been initialized, this is a no-op.
115143
// Panics if the cluster's REST config has not been loaded (InitializeRESTConfig must be called first).
116144
func (c *Cluster) InitializeClient(scheme *runtime.Scheme) error {
117145
if !c.HasRESTConfig() {
118146
panic("cluster REST config must be set before creating the client")
119147
}
120-
if c.HasClient() {
121-
return nil
148+
if c.clientOpts == nil {
149+
c.clientOpts = &client.Options{
150+
Scheme: scheme,
151+
}
122152
}
123-
cli, err := client.New(c.restCfg, client.Options{Scheme: scheme})
153+
cli, err := client.New(c.restCfg, *c.clientOpts)
124154
if err != nil {
125155
return fmt.Errorf("failed to create '%s' cluster client: %w", c.ID(), err)
126156
}
127-
clu, err := cluster.New(c.restCfg, func(o *cluster.Options) { o.Scheme = scheme; o.Cache.Scheme = scheme })
157+
if c.clusterOpts == nil {
158+
c.clusterOpts = []cluster.Option{
159+
func(o *cluster.Options) {
160+
o.Scheme = scheme
161+
o.Cache.Scheme = scheme
162+
},
163+
}
164+
}
165+
clu, err := cluster.New(c.restCfg, c.clusterOpts...)
128166
if err != nil {
129167
return fmt.Errorf("failed to create '%s' cluster Cluster representation: %w", c.ID(), err)
130168
}

0 commit comments

Comments
 (0)