Skip to content

Commit 9581c0f

Browse files
[shell-operator] fix: stop informer correctly (#805)
Signed-off-by: Timur Tuktamyshev <[email protected]> Signed-off-by: Pavel Okhlopkov <[email protected]> Co-authored-by: Pavel Okhlopkov <[email protected]>
1 parent cb26d01 commit 9581c0f

File tree

7 files changed

+154
-41
lines changed

7 files changed

+154
-41
lines changed

pkg/kube_events_manager/factory.go

Lines changed: 94 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ import (
1515
"k8s.io/client-go/tools/cache"
1616
)
1717

18+
const (
19+
FactoryShutdownTimeout = 30 * time.Second
20+
)
21+
1822
var (
1923
DefaultFactoryStore *FactoryStore
2024
DefaultSyncTime = 100 * time.Millisecond
@@ -36,38 +40,46 @@ type Factory struct {
3640
handlerRegistrations map[string]cache.ResourceEventHandlerRegistration
3741
ctx context.Context
3842
cancel context.CancelFunc
43+
// done is closed when the underlying informer.Run returns
44+
done chan struct{}
3945
}
4046

4147
type FactoryStore struct {
42-
mu sync.Mutex
43-
data map[FactoryIndex]Factory
48+
mu sync.Mutex
49+
data map[FactoryIndex]*Factory
50+
stoppedCh map[FactoryIndex]chan struct{}
4451
}
4552

4653
func NewFactoryStore() *FactoryStore {
47-
return &FactoryStore{
48-
data: make(map[FactoryIndex]Factory),
54+
fs := &FactoryStore{
55+
data: make(map[FactoryIndex]*Factory),
56+
stoppedCh: make(map[FactoryIndex]chan struct{}),
4957
}
58+
return fs
5059
}
5160

5261
func (c *FactoryStore) Reset() {
5362
c.mu.Lock()
5463
defer c.mu.Unlock()
55-
c.data = make(map[FactoryIndex]Factory)
64+
c.data = make(map[FactoryIndex]*Factory)
65+
c.stoppedCh = make(map[FactoryIndex]chan struct{})
5666
}
5767

58-
func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
59-
ctx, cancel := context.WithCancel(context.Background())
60-
c.data[index] = Factory{
68+
func (c *FactoryStore) add(ctx context.Context, index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
69+
ctx, cancel := context.WithCancel(ctx)
70+
c.data[index] = &Factory{
6171
shared: f,
6272
handlerRegistrations: make(map[string]cache.ResourceEventHandlerRegistration),
6373
ctx: ctx,
6474
cancel: cancel,
75+
done: nil,
6576
}
77+
6678
log.Debug("Factory store: added a new factory for index",
6779
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
6880
}
6981

70-
func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory {
82+
func (c *FactoryStore) get(ctx context.Context, client dynamic.Interface, index FactoryIndex) *Factory {
7183
f, ok := c.data[index]
7284
if ok {
7385
log.Debug("Factory store: the factory with index found",
@@ -91,51 +103,68 @@ func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory
91103
client, resyncPeriod, index.Namespace, tweakListOptions)
92104
factory.ForResource(index.GVR)
93105

94-
c.add(index, factory)
106+
c.add(ctx, index, factory)
107+
95108
return c.data[index]
96109
}
97110

98111
func (c *FactoryStore) Start(ctx context.Context, informerId string, client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error {
99112
c.mu.Lock()
100113
defer c.mu.Unlock()
101114

102-
factory := c.get(client, index)
115+
factory := c.get(ctx, client, index)
103116

104117
informer := factory.shared.ForResource(index.GVR).Informer()
105118
// Add error handler, ignore "already started" error.
106119
_ = informer.SetWatchErrorHandler(errorHandler.handler)
120+
107121
registration, err := informer.AddEventHandler(handler)
108122
if err != nil {
109123
log.Warn("Factory store: couldn't add event handler to the factory's informer",
110124
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()),
111125
log.Err(err))
112126
}
127+
113128
factory.handlerRegistrations[informerId] = registration
129+
114130
log.Debug("Factory store: increased usage counter of the factory",
115131
slog.Int("value", len(factory.handlerRegistrations)),
116132
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
117133

118-
if !informer.HasSynced() {
119-
go informer.Run(factory.ctx.Done())
134+
// Ensure informer.Run is started once and tracked
135+
if factory.done == nil {
136+
factory.done = make(chan struct{})
137+
138+
go func() {
139+
informer.Run(factory.ctx.Done())
120140

141+
close(factory.done)
142+
143+
log.Debug("Factory store: informer goroutine exited",
144+
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
145+
}()
146+
}
147+
148+
if !informer.HasSynced() {
121149
if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) {
122150
return informer.HasSynced(), nil
123151
}); err != nil {
124152
return err
125153
}
126154
}
155+
127156
log.Debug("Factory store: started informer",
128157
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
158+
129159
return nil
130160
}
131161

132162
func (c *FactoryStore) Stop(informerId string, index FactoryIndex) {
133163
c.mu.Lock()
134-
defer c.mu.Unlock()
135-
136164
f, ok := c.data[index]
137165
if !ok {
138166
// already deleted
167+
c.mu.Unlock()
139168
return
140169
}
141170

@@ -146,15 +175,65 @@ func (c *FactoryStore) Stop(informerId string, index FactoryIndex) {
146175
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()),
147176
log.Err(err))
148177
}
178+
149179
delete(f.handlerRegistrations, informerId)
180+
150181
log.Debug("Factory store: decreased usage counter of the factory",
151182
slog.Int("value", len(f.handlerRegistrations)),
152183
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
184+
153185
if len(f.handlerRegistrations) == 0 {
186+
log.Debug("Factory store: last handler removed, canceling shared informer",
187+
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
188+
189+
done := f.done
190+
154191
f.cancel()
192+
c.mu.Unlock()
193+
if done != nil {
194+
<-done
195+
}
196+
197+
c.mu.Lock()
155198
delete(c.data, index)
199+
156200
log.Debug("Factory store: deleted factory",
157201
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
202+
203+
if ch, ok := c.stoppedCh[index]; ok {
204+
close(ch)
205+
delete(c.stoppedCh, index)
206+
}
207+
}
208+
}
209+
210+
c.mu.Unlock()
211+
}
212+
213+
// WaitStopped blocks until there is no factory for the index or timeout
214+
func (c *FactoryStore) WaitStopped(index FactoryIndex) {
215+
c.mu.Lock()
216+
217+
if _, ok := c.data[index]; !ok {
218+
c.mu.Unlock()
219+
return
220+
}
221+
222+
ch, ok := c.stoppedCh[index]
223+
if !ok {
224+
ch = make(chan struct{})
225+
c.stoppedCh[index] = ch
226+
}
227+
228+
c.mu.Unlock()
229+
230+
for {
231+
select {
232+
case <-ch:
233+
return
234+
case <-time.After(FactoryShutdownTimeout):
235+
log.Warn("timeout waiting for factory to stop",
236+
slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()))
158237
}
159238
}
160239
}

pkg/kube_events_manager/kube_events_manager.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ type KubeEventsManager interface {
2424
StopMonitor(monitorID string) error
2525

2626
Ch() chan kemtypes.KubeEvent
27-
PauseHandleEvents()
27+
Stop()
28+
Wait()
2829
}
2930

3031
// kubeEventsManager is a main implementation of KubeEventsManager.
@@ -138,14 +139,20 @@ func (mgr *kubeEventsManager) Ch() chan kemtypes.KubeEvent {
138139
return mgr.KubeEventCh
139140
}
140141

141-
// PauseHandleEvents set flags for all informers to ignore incoming events.
142-
// Useful for shutdown without panicking.
143-
// Calling cancel() leads to a race and panicking, see https://github.com/kubernetes/kubernetes/issues/59822
144-
func (mgr *kubeEventsManager) PauseHandleEvents() {
142+
// Stop the kube events manager and all the informers inside monitors.
143+
func (mgr *kubeEventsManager) Stop() {
144+
mgr.cancel()
145+
}
146+
147+
func (mgr *kubeEventsManager) Wait() {
145148
mgr.m.RLock()
146-
defer mgr.m.RUnlock()
147-
for _, monitor := range mgr.Monitors {
148-
monitor.PauseHandleEvents()
149+
monitors := make([]Monitor, 0, len(mgr.Monitors))
150+
for _, mon := range mgr.Monitors {
151+
monitors = append(monitors, mon)
152+
}
153+
mgr.m.RUnlock()
154+
for _, mon := range monitors {
155+
mon.Wait()
149156
}
150157
}
151158

pkg/kube_events_manager/monitor.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type Monitor interface {
1919
CreateInformers() error
2020
Start(context.Context)
2121
Stop()
22-
PauseHandleEvents()
22+
Wait()
2323
Snapshot() []kemtypes.ObjectAndFilterResult
2424
EnableKubeEventCb()
2525
GetConfig() *MonitorConfig
@@ -373,22 +373,18 @@ func (m *monitor) Stop() {
373373
}
374374
}
375375

376-
// PauseHandleEvents set flags for all informers to ignore incoming events.
377-
// Useful for shutdown without panicking.
378-
// Calling cancel() leads to a race and panicking, see https://github.com/kubernetes/kubernetes/issues/59822
379-
func (m *monitor) PauseHandleEvents() {
376+
// Wait waits for all started informers to stop
377+
func (m *monitor) Wait() {
380378
for _, informer := range m.ResourceInformers {
381-
informer.pauseHandleEvents()
379+
informer.wait()
382380
}
383-
384381
m.VaryingInformers.RangeValue(func(value []*resourceInformer) {
385382
for _, informer := range value {
386-
informer.pauseHandleEvents()
383+
informer.wait()
387384
}
388385
})
389-
390386
if m.NamespaceInformer != nil {
391-
m.NamespaceInformer.pauseHandleEvents()
387+
m.NamespaceInformer.wait()
392388
}
393389
}
394390

pkg/kube_events_manager/namespace_informer.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"fmt"
88
"log/slog"
9+
"time"
910

1011
"github.com/deckhouse/deckhouse/pkg/log"
1112
v1 "k8s.io/api/core/v1"
@@ -17,10 +18,15 @@ import (
1718
klient "github.com/flant/kube-client/client"
1819
)
1920

21+
const (
22+
NamespaceInformerShutdownTimeout = 30 * time.Second
23+
)
24+
2025
type namespaceInformer struct {
2126
ctx context.Context
2227
cancel context.CancelFunc
2328
stopped bool
29+
done chan struct{}
2430

2531
KubeClient *klient.Client
2632
Monitor *MonitorConfig
@@ -128,13 +134,18 @@ func (ni *namespaceInformer) start() {
128134
return
129135
}
130136
cctx, cancel := context.WithCancel(ni.ctx)
137+
ni.done = make(chan struct{})
131138
go func() {
132139
<-ni.ctx.Done()
133140
ni.stopped = true
134141
cancel()
135142
}()
136143

137-
go ni.SharedInformer.Run(cctx.Done())
144+
go func() {
145+
ni.SharedInformer.Run(cctx.Done())
146+
close(ni.done)
147+
log.Debug("Namespace informer goroutine exited", slog.String("name", ni.Monitor.Metadata.DebugName))
148+
}()
138149

139150
if err := wait.PollUntilContextCancel(cctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) {
140151
return ni.SharedInformer.HasSynced(), nil
@@ -146,6 +157,16 @@ func (ni *namespaceInformer) start() {
146157
log.Debug("Informer is ready", slog.String("debugName", ni.Monitor.Metadata.DebugName))
147158
}
148159

149-
func (ni *namespaceInformer) pauseHandleEvents() {
150-
ni.stopped = true
160+
func (ni *namespaceInformer) wait() {
161+
if ni.done != nil {
162+
for {
163+
select {
164+
case <-ni.done:
165+
log.Debug("Namespace informer stopped", slog.String("name", ni.Monitor.Metadata.DebugName))
166+
return
167+
case <-time.After(NamespaceInformerShutdownTimeout):
168+
log.Warn("timeout waiting for namespace informer to stop", slog.String("name", ni.Monitor.Metadata.DebugName))
169+
}
170+
}
171+
}
151172
}

pkg/kube_events_manager/resource_informer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,9 +470,9 @@ func (ei *resourceInformer) start() {
470470
log.Debug("informer is ready", slog.String("debugName", ei.Monitor.Metadata.DebugName))
471471
}
472472

473-
func (ei *resourceInformer) pauseHandleEvents() {
474-
log.Debug("PAUSE resource informer", slog.String("debugName", ei.Monitor.Metadata.DebugName))
475-
ei.stopped = true
473+
// wait blocks until the underlying shared informer for this FactoryIndex is stopped
474+
func (ei *resourceInformer) wait() {
475+
DefaultFactoryStore.WaitStopped(ei.FactoryIndex)
476476
}
477477

478478
// CachedObjectsInfo returns info accumulated from start.

pkg/shell-operator/operator.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -990,9 +990,18 @@ func (op *ShellOperator) initAndStartHookQueues() {
990990

991991
// Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop.
992992
func (op *ShellOperator) Shutdown() {
993+
op.logger.Info("shutdown begin", slog.String("phase", "shutdown"))
993994
op.ScheduleManager.Stop()
994-
op.KubeEventsManager.PauseHandleEvents()
995+
op.logger.Info("schedule manager stopped", slog.String("phase", "shutdown"))
996+
997+
op.KubeEventsManager.Stop()
998+
op.logger.Info("waiting informers", slog.String("phase", "shutdown"))
999+
op.KubeEventsManager.Wait()
1000+
op.logger.Info("informers stopped", slog.String("phase", "shutdown"))
1001+
9951002
op.TaskQueues.Stop()
1003+
op.logger.Info("waiting task queues", slog.String("phase", "shutdown"))
9961004
// Wait for queues to stop, but no more than 10 seconds
9971005
op.TaskQueues.WaitStopWithTimeout(WaitQueuesTimeout)
1006+
op.logger.Info("task queues stopped", slog.String("phase", "shutdown"))
9981007
}

test/integration/kube_event_manager/kube_event_manager_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ var _ = Describe("Binding 'kubernetes' with kind 'Pod' should emit KubeEvent obj
5757

5858
AfterEach(func() {
5959
fmt.Fprintf(GinkgoWriter, "Starting AfterEach\n")
60-
KubeEventsManager.PauseHandleEvents()
60+
KubeEventsManager.Stop()
61+
KubeEventsManager.Wait()
6162
fmt.Fprintf(GinkgoWriter, "Finished AfterEach\n")
6263
})
6364

0 commit comments

Comments
 (0)