Skip to content

Commit 27dc928

Browse files
committed
move thread context from ThreadManager constructor to Run method
1 parent f17e27b commit 27dc928

File tree

3 files changed

+128
-68
lines changed

3 files changed

+128
-68
lines changed

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,13 +280,13 @@ The most relevant use-case for this library in the context of k8s controllers is
280280

281281
- `NewThreadManager` creates a new thread manager.
282282
- The first argument is a `context.Context` used by the manager itself. Cancelling this context will stop the manager, and if the context contains a `logging.Logger`, the manager will use it for logging.
283-
- The second argument is a `context.Context` that is used as a base context for the executed go routines.
284-
- The third argument is an optional function that is executed after any go routine executed with this manager has finished. It is also possible to provide such a function for a specific go routine, instead for all of them, see below.
283+
- The second argument is an optional function that is executed after any go routine executed with this manager has finished. It is also possible to provide such a function for a specific go routine, instead for all of them, see below.
285284
- Use the `Run` method to start a new go routine.
285+
- Starting a go routine cancels the context of any running go routine with the same id.
286286
- This method also takes an optional function to be executed after the actual workload is done.
287287
- A on-finish function specified here is executed before the on-finish function of the manager is executed.
288288
- Note that go routines will wait for the thread manager to be started, if that has not yet happened. If the manager has been started, they will be executed immediately.
289-
- The thread manager will cancel the context that is passed into the workload function when the manager is being stopped. If any long-running commands are being run as part of the workload, it is recommended to listen to the context's `Done` channel.
289+
- The thread manager will cancel the context that is passed into the workload function when the manager is being stopped. If any long-running commands are being run as part of the workload, it is strongly recommended to listen to the context's `Done` channel.
290290
- Use `Start()` to start the thread manager.
291291
- If any go routines have been added before this is called, they will be started now. New go routines added afterwards will be started immediately.
292292
- Calling this multiple times doesn't have any effect, unless the manager has already been stopped, in which case `Start()` will panic.
@@ -300,11 +300,11 @@ The most relevant use-case for this library in the context of k8s controllers is
300300
#### Examples
301301

302302
```golang
303-
mgr := threads.NewThreadManager(ctx1, ctx2, nil)
303+
mgr := threads.NewThreadManager(ctx, nil)
304304
mgr.Start()
305305
// do other stuff
306306
// start a go routine that is restarted automatically if it finishes with an error
307-
mgr.Run("myTask", func(ctx context.Context) error {
307+
mgr.Run(myCtx, "myTask", func(ctx context.Context) error {
308308
// my task coding
309309
}, mgr.RestartOnError)
310310
// do more other stuff

pkg/threads/manager.go

Lines changed: 84 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -31,34 +31,29 @@ type OnFinishFunc func(context.Context, ThreadReturn)
3131
// 1. If the context is cancelled, the ThreadManager is stopped. Alternatively, its Stop() method can be called.
3232
// 2. If the context contains a logger, it is used for logging.
3333
//
34-
// The threadCtx will be passed to the threads and is cancelled when the ThreadManager is stopped.
3534
// If onFinish is not nil, it will be called whenever a thread finishes. It is called after the thread's own onFinish function, if any.
36-
func NewThreadManager(mgrCtx, threadCtx context.Context, onFinish OnFinishFunc) *ThreadManager {
37-
threadCtx, stopFunc := context.WithCancel(threadCtx)
35+
func NewThreadManager(mgrCtx context.Context, onFinish OnFinishFunc) *ThreadManager {
3836
return &ThreadManager{
39-
threadCtx: threadCtx,
40-
returns: make(chan ThreadReturn, 100),
41-
onFinish: onFinish,
42-
log: logging.FromContextOrDiscard(mgrCtx),
43-
runOnStart: []*Thread{},
44-
ctxStop: mgrCtx.Done(),
45-
stopThreads: stopFunc,
37+
returns: make(chan ThreadReturn, 100),
38+
onFinish: onFinish,
39+
log: logging.FromContextOrDiscard(mgrCtx),
40+
runOnStart: map[string]*Thread{},
41+
mgrStop: mgrCtx.Done(),
42+
threadCancelFuncs: map[string]context.CancelFunc{},
4643
}
4744
}
4845

4946
type ThreadManager struct {
50-
lock sync.Mutex
51-
threadCtx context.Context // context that is passed to threads, is cancelled when the ThreadManager is stopped
52-
returns chan ThreadReturn // channel to receive thread returns
53-
onFinish OnFinishFunc // function to call when a thread finishes
54-
log logging.Logger // logger for the ThreadManager
55-
runOnStart []*Thread // is filled if threads are added before the ThreadManager is started
56-
ctxStop <-chan struct{} // channel to stop the ThreadManager
57-
stopThreads context.CancelFunc // function to cancel the context that is passed to threads
58-
stopSelf func() // convenience function to use the internalStop channel
59-
internalStop chan struct{} // if the Stop() method is called, we need to stop the internal main loop by using this channel
60-
stopped atomic.Bool // indicates if the ThreadManager is stopped
61-
waitForThreads sync.WaitGroup // used to wait for threads to finish when stopping the ThreadManager
47+
lock sync.Mutex // generic lock for the ThreadManager
48+
lockThreadMap sync.Mutex // lock specifically for the threadCancelFuncs map
49+
returns chan ThreadReturn // channel to receive thread returns
50+
onFinish OnFinishFunc // function to call when a thread finishes
51+
log logging.Logger // logger for the ThreadManager
52+
runOnStart map[string]*Thread // is filled if threads are added before the ThreadManager is started
53+
mgrStop <-chan struct{} // channel to stop the ThreadManager
54+
stopped atomic.Bool // indicates if the ThreadManager is stopped
55+
waitForThreads sync.WaitGroup // used to wait for threads to finish when stopping the ThreadManager
56+
threadCancelFuncs map[string]context.CancelFunc // map of thread ids to cancel functions
6257
}
6358

6459
// Start starts the ThreadManager.
@@ -94,10 +89,10 @@ func (tm *ThreadManager) Start() {
9489
}
9590
case sig := <-sigs:
9691
tm.log.Info("Received os signal, stopping ThreadManager", "signal", sig)
97-
tm.stop()
92+
tm.Stop()
9893
return
99-
case <-tm.ctxStop:
100-
tm.stop()
94+
case <-tm.mgrStop:
95+
tm.Stop()
10196
return
10297
}
10398
}
@@ -113,6 +108,7 @@ func (tm *ThreadManager) Start() {
113108
}
114109

115110
// Stop stops the ThreadManager.
111+
// Panics if the ThreadManager has not been started yet.
116112
// Calling Stop() multiple times is a no-op.
117113
// It is not possible to start the ThreadManager again after it has been stopped, a new instance must be created.
118114
// Adding threads after the ThreadManager has been stopped is a no-op.
@@ -133,7 +129,12 @@ func (tm *ThreadManager) stop() {
133129
}
134130
tm.log.Info("Stopping ThreadManager, waiting for remaining threads to finish")
135131
tm.stopped.Store(true)
136-
tm.stopThreads()
132+
tm.lockThreadMap.Lock()
133+
for id, cancel := range tm.threadCancelFuncs {
134+
tm.log.Debug("Cancelling thread", "thread", id)
135+
cancel()
136+
}
137+
tm.lockThreadMap.Unlock()
137138

138139
tm.waitForThreads.Wait()
139140
close(tm.returns)
@@ -142,12 +143,16 @@ func (tm *ThreadManager) stop() {
142143
}
143144

144145
// Run gives a new thread to run to the ThreadManager.
145-
// id is only used for logging and debugging purposes.
146+
// The context is used to create a new context with a cancel function for the thread.
147+
// id is used for logging and debugging purposes.
148+
// Note that when a thread with the same id as an already running thread is added, the running thread will be cancelled.
149+
// If the ThreadManager has not been started yet, the previously added thread with the conflicting id will be discarded and the newly added one will be run when the ThreadManager is started instead.
150+
// A thread MUST NOT start another thread with the same id as itself during its work function. If a thread wants to restart itself, this must happen in the onFinish function.
146151
// work is the actual workload of the thread.
147152
// onFinish can be used to react to the thread having finished.
148-
// Note that there are some pre-defined functions that can be used as onFinish functions, e.g. the ThreadManager's Restart method.
149-
func (tm *ThreadManager) Run(id string, work func(context.Context) error, onFinish OnFinishFunc) {
150-
tm.RunThread(NewThread(id, work, onFinish))
153+
// There are some pre-defined functions that can be used as onFinish functions, e.g. the ThreadManager's Restart method.
154+
func (tm *ThreadManager) Run(ctx context.Context, id string, work func(context.Context) error, onFinish OnFinishFunc) {
155+
tm.RunThread(NewThread(ctx, id, work, onFinish))
151156
}
152157

153158
// RunThread is the same as Run, but takes a Thread struct instead of the individual parameters.
@@ -167,28 +172,48 @@ func (tm *ThreadManager) run(t *Thread) {
167172
return
168173
}
169174
if !tm.isStarted() {
170-
tm.runOnStart = append(tm.runOnStart, t)
171175
tm.log.Debug("ThreadManager has not been started yet, enqueuing thread to run on start", "thread", t.ID())
176+
_, ok := tm.runOnStart[t.id]
177+
if ok {
178+
tm.log.Debug("Discarding thread with the same id that was already enqueued", "thread", t.id)
179+
}
180+
tm.runOnStart[t.id] = t
172181
return
173182
}
174183
tm.log.Debug("Running thread", "thread", t.id)
184+
tm.lockThreadMap.Lock()
185+
if cancel := tm.threadCancelFuncs[t.id]; cancel != nil {
186+
tm.log.Debug("A thread with the same id is already running, cancelling it", "thread", t.id)
187+
cancel()
188+
}
189+
tm.threadCancelFuncs[t.id] = t.cancel
190+
tm.lockThreadMap.Unlock()
175191
tm.waitForThreads.Add(1)
176192
go func() {
177193
defer tm.waitForThreads.Done()
178194
var err error
179195
if t.work != nil {
180-
err = t.work(tm.threadCtx)
196+
err = t.work(t.ctx)
181197
} else {
182198
tm.log.Debug("Thread has no work function", "thread", t.id)
183199
}
200+
tm.lockThreadMap.Lock()
201+
// thread must be removed from the internal map here, because otherwise the thread might be restarted before the cancel function is removed
202+
// which would then wrongfully remove the cancel function of the new thread
203+
if cancelOld := tm.threadCancelFuncs[t.id]; cancelOld != nil { // this should always be true
204+
// cancel the thread's context, just to be sure that no running thread can 'leak' by losing its cancel function
205+
cancelOld()
206+
delete(tm.threadCancelFuncs, t.id)
207+
}
208+
tm.lockThreadMap.Unlock()
184209
tr := NewThreadReturn(t, err)
185210
if t.onFinish != nil {
186211
tm.log.Debug("Calling the thread's onFinish function", "thread", t.id)
187-
t.onFinish(tm.threadCtx, tr)
212+
t.onFinish(t.ctx, tr)
188213
}
189214
if tm.onFinish != nil {
190215
tm.log.Debug("Calling the thread manager's onFinish function", "thread", tr.Thread.id)
191-
tm.onFinish(tm.threadCtx, tr)
216+
tm.onFinish(t.ctx, tr)
192217
}
193218
tm.returns <- tr
194219
tm.log.Debug("Thread finished", "thread", t.id)
@@ -225,7 +250,7 @@ var _ OnFinishFunc = (*ThreadManager)(nil).Restart
225250
// Restart is a pre-defined onFinish function that can be used to restart a thread after it has finished.
226251
// This method is not meant to be called directly, instead pass it to the ThreadManager's Run method as the onFinish parameter:
227252
//
228-
// tm.Run("myThread", myWorkFunc, tm.Restart)
253+
// tm.Run(ctx, "myThread", myWorkFunc, tm.Restart)
229254
func (tm *ThreadManager) Restart(_ context.Context, tr ThreadReturn) {
230255
if tm.stopped.Load() {
231256
return
@@ -239,10 +264,10 @@ var _ OnFinishFunc = (*ThreadManager)(nil).RestartOnError
239264
// It is the opposite of RestartOnSuccess.
240265
// This method is not meant to be called directly, instead pass it to the ThreadManager's Run method as the onFinish parameter:
241266
//
242-
// tm.Run("myThread", myWorkFunc, tm.RestartOnError)
243-
func (tm *ThreadManager) RestartOnError(_ context.Context, tr ThreadReturn) {
267+
// tm.Run(ctx, "myThread", myWorkFunc, tm.RestartOnError)
268+
func (tm *ThreadManager) RestartOnError(ctx context.Context, tr ThreadReturn) {
244269
if tr.Err != nil {
245-
tm.Restart(tm.threadCtx, tr)
270+
tm.Restart(ctx, tr)
246271
}
247272
}
248273

@@ -252,18 +277,22 @@ var _ OnFinishFunc = (*ThreadManager)(nil).RestartOnSuccess
252277
// It is the opposite of RestartOnError.
253278
// This method is not meant to be called directly, instead pass it to the ThreadManager's Run method as the onFinish parameter:
254279
//
255-
// tm.Run("myThread", myWorkFunc, tm.RestartOnSuccess)
256-
func (tm *ThreadManager) RestartOnSuccess(_ context.Context, tr ThreadReturn) {
280+
// tm.Run(ctx, "myThread", myWorkFunc, tm.RestartOnSuccess)
281+
func (tm *ThreadManager) RestartOnSuccess(ctx context.Context, tr ThreadReturn) {
257282
if tr.Err == nil {
258-
tm.Restart(tm.threadCtx, tr)
283+
tm.Restart(ctx, tr)
259284
}
260285
}
261286

262287
// NewThread creates a new thread with the given id, work function and onFinish function.
263288
// It is usually not required to call this function directly, instead use the ThreadManager's Run method.
289+
// A new context with a cancel function is derived from the context passed to the constructor.
264290
// The Thread's fields are considered immutable after creation.
265-
func NewThread(id string, work WorkFunc, onFinish OnFinishFunc) Thread {
291+
func NewThread(ctx context.Context, id string, work WorkFunc, onFinish OnFinishFunc) Thread {
292+
ctx, cancel := context.WithCancel(ctx)
266293
return Thread{
294+
ctx: ctx,
295+
cancel: cancel,
267296
id: id,
268297
work: work,
269298
onFinish: onFinish,
@@ -272,11 +301,24 @@ func NewThread(id string, work WorkFunc, onFinish OnFinishFunc) Thread {
272301

273302
// Thread represents a thread that can be run by the ThreadManager.
274303
type Thread struct {
304+
ctx context.Context
305+
cancel context.CancelFunc
275306
id string
276307
work WorkFunc
277308
onFinish OnFinishFunc
278309
}
279310

311+
// Context returns the context of the thread.
312+
func (t *Thread) Context() context.Context {
313+
return t.ctx
314+
}
315+
316+
// Cancel cancels the thread's context.
317+
// The thread manager cancels all threads' contexts when it is stopped, so calling this manually is usually not necessary.
318+
func (t *Thread) Cancel() {
319+
t.cancel()
320+
}
321+
280322
// ID returns the id of the thread.
281323
func (t *Thread) ID() string {
282324
return t.id

0 commit comments

Comments
 (0)