Skip to content

Commit 49532df

Browse files
authored
Move dapr/concurrency to kit (dapr#72)
* Move dapr/concurrency to kit Does not include any code change Signed-off-by: ItalyPaleAle <[email protected]> * Fixed copyright year Signed-off-by: ItalyPaleAle <[email protected]> * Improved memory usage in error collection Signed-off-by: ItalyPaleAle <[email protected]> --------- Signed-off-by: ItalyPaleAle <[email protected]>
1 parent c0ebd07 commit 49532df

File tree

5 files changed

+1613
-0
lines changed

5 files changed

+1613
-0
lines changed

concurrency/closer.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package concurrency
15+
16+
import (
17+
"context"
18+
"errors"
19+
"fmt"
20+
"io"
21+
"sync/atomic"
22+
"time"
23+
24+
"k8s.io/utils/clock"
25+
26+
"github.com/dapr/kit/logger"
27+
)
28+
29+
var (
30+
ErrManagerAlreadyClosed = errors.New("runner manager already closed")
31+
32+
log = logger.NewLogger("dapr.kit.concurrency")
33+
)
34+
35+
// RunnerCloserManager is a RunnerManager that also implements Closing of the
36+
// added closers once the main runners are done.
37+
type RunnerCloserManager struct {
38+
// mngr implements the main RunnerManager.
39+
mngr *RunnerManager
40+
41+
// closers are the closers to be closed once the main runners are done.
42+
closers []func() error
43+
44+
// retErr is the error returned by the main runners and closers. Used to
45+
// return the resulting error from Close().
46+
retErr error
47+
48+
// fatalShutdownFn is called if the grace period is exceeded.
49+
// Defined if the grace period is not nil.
50+
fatalShutdownFn func()
51+
52+
// closeFatalShutdown closes the fatal shutdown goroutine. Closing is a no-op
53+
// if fatalShutdownFn is nil.
54+
closeFatalShutdown chan struct{}
55+
56+
clock clock.Clock
57+
running atomic.Bool
58+
closing atomic.Bool
59+
closed atomic.Bool
60+
closeCh chan struct{}
61+
stopped chan struct{}
62+
}
63+
64+
// NewRunnerCloserManager creates a new RunnerCloserManager with the given
65+
// grace period and runners.
66+
// If gracePeriod is nil, the grace period is infinite.
67+
func NewRunnerCloserManager(gracePeriod *time.Duration, runners ...Runner) *RunnerCloserManager {
68+
c := &RunnerCloserManager{
69+
mngr: NewRunnerManager(runners...),
70+
clock: clock.RealClock{},
71+
stopped: make(chan struct{}),
72+
closeCh: make(chan struct{}),
73+
closeFatalShutdown: make(chan struct{}),
74+
}
75+
76+
if gracePeriod == nil {
77+
log.Warn("Graceful shutdown timeout is infinite, will wait indefinitely to shutdown")
78+
return c
79+
}
80+
81+
c.fatalShutdownFn = func() {
82+
log.Fatal("Graceful shutdown timeout exceeded, forcing shutdown")
83+
}
84+
85+
c.AddCloser(func() {
86+
log.Debugf("Graceful shutdown timeout: %s", *gracePeriod)
87+
88+
t := c.clock.NewTimer(*gracePeriod)
89+
defer t.Stop()
90+
91+
select {
92+
case <-t.C():
93+
c.fatalShutdownFn()
94+
case <-c.closeFatalShutdown:
95+
}
96+
})
97+
98+
return c
99+
}
100+
101+
// Add implements RunnerManager.Add.
102+
func (c *RunnerCloserManager) Add(runner ...Runner) error {
103+
if c.running.Load() {
104+
return ErrManagerAlreadyStarted
105+
}
106+
107+
return c.mngr.Add(runner...)
108+
}
109+
110+
// AddCloser adds a closer to the list of closers to be closed once the main
111+
// runners are done.
112+
func (c *RunnerCloserManager) AddCloser(closers ...any) error {
113+
if c.closing.Load() {
114+
return ErrManagerAlreadyClosed
115+
}
116+
117+
c.mngr.lock.Lock()
118+
defer c.mngr.lock.Unlock()
119+
120+
var errs []error
121+
for _, cl := range closers {
122+
switch v := cl.(type) {
123+
case io.Closer:
124+
c.closers = append(c.closers, v.Close)
125+
case func(context.Context) error:
126+
c.closers = append(c.closers, func() error {
127+
// We use a background context here since the fatalShutdownFn will kill
128+
// the program if the grace period is exceeded.
129+
return v(context.Background())
130+
})
131+
case func() error:
132+
c.closers = append(c.closers, v)
133+
case func():
134+
c.closers = append(c.closers, func() error {
135+
v()
136+
return nil
137+
})
138+
default:
139+
errs = append(errs, fmt.Errorf("unsupported closer type: %T", v))
140+
}
141+
}
142+
143+
return errors.Join(errs...)
144+
}
145+
146+
// Add implements RunnerManager.Run.
147+
func (c *RunnerCloserManager) Run(ctx context.Context) error {
148+
if !c.running.CompareAndSwap(false, true) {
149+
return ErrManagerAlreadyStarted
150+
}
151+
152+
// Signal the manager is stopped.
153+
defer close(c.stopped)
154+
155+
// If the main runner has at least one runner, add a closer that will
156+
// close the context once Close() is called.
157+
if len(c.mngr.runners) > 0 {
158+
c.mngr.Add(func(ctx context.Context) error {
159+
select {
160+
case <-ctx.Done():
161+
case <-c.closeCh:
162+
}
163+
return nil
164+
})
165+
}
166+
167+
errCh := make(chan error, len(c.closers))
168+
go func() {
169+
errCh <- c.mngr.Run(ctx)
170+
}()
171+
172+
rErr := <-errCh
173+
174+
c.mngr.lock.Lock()
175+
defer c.mngr.lock.Unlock()
176+
c.closing.Store(true)
177+
178+
errs := make([]error, len(c.closers)+1)
179+
errs[0] = rErr
180+
181+
for _, closer := range c.closers {
182+
go func(closer func() error) {
183+
errCh <- closer()
184+
}(closer)
185+
}
186+
187+
// Wait for all closers to be done.
188+
for i := 1; i < len(c.closers)+1; i++ {
189+
// Close the fatal shutdown goroutine if all closers are done. This is a
190+
// no-op if the fatal go routine is not defined.
191+
if i == len(c.closers) {
192+
close(c.closeFatalShutdown)
193+
}
194+
errs[i] = <-errCh
195+
}
196+
197+
c.retErr = errors.Join(errs...)
198+
199+
return c.retErr
200+
}
201+
202+
// Close will close the main runners and then the closers.
203+
func (c *RunnerCloserManager) Close() error {
204+
if c.closed.CompareAndSwap(false, true) {
205+
close(c.closeCh)
206+
}
207+
// If the manager is not running yet, we stop immediately.
208+
if c.running.CompareAndSwap(false, true) {
209+
close(c.stopped)
210+
}
211+
c.WaitUntilShutdown()
212+
return c.retErr
213+
}
214+
215+
// WaitUntilShutdown will block until the main runners and closers are done.
216+
func (c *RunnerCloserManager) WaitUntilShutdown() {
217+
<-c.stopped
218+
}

0 commit comments

Comments
 (0)