@@ -31,11 +31,6 @@ import (
31
31
"k8s.io/utils/clock"
32
32
)
33
33
34
- // The frequency with which global timestamp of the cache is to
35
- // is to be updated periodically. If pod workers get stuck at cache.GetNewerThan
36
- // call, after this period it will be unblocked.
37
- const globalCacheUpdatePeriod = 1 * time .Second
38
-
39
34
var (
40
35
eventedPLEGUsage = false
41
36
eventedPLEGUsageMu = sync.RWMutex {}
@@ -76,6 +71,10 @@ type EventedPLEG struct {
76
71
eventedPlegMaxStreamRetries int
77
72
// Indicates relisting related parameters
78
73
relistDuration * RelistDuration
74
+ // The frequency with which global timestamp of the cache is to
75
+ // is to be updated periodically. If pod workers get stuck at cache.GetNewerThan
76
+ // call, after this period it will be unblocked.
77
+ globalCacheUpdatePeriod time.Duration
79
78
// Stop the Evented PLEG by closing the channel.
80
79
stopCh chan struct {}
81
80
// Stops the periodic update of the cache global timestamp.
@@ -87,7 +86,7 @@ type EventedPLEG struct {
87
86
// NewEventedPLEG instantiates a new EventedPLEG object and return it.
88
87
func NewEventedPLEG (runtime kubecontainer.Runtime , runtimeService internalapi.RuntimeService , eventChannel chan * PodLifecycleEvent ,
89
88
cache kubecontainer.Cache , genericPleg PodLifecycleEventGenerator , eventedPlegMaxStreamRetries int ,
90
- relistDuration * RelistDuration , clock clock.Clock ) (PodLifecycleEventGenerator , error ) {
89
+ relistDuration * RelistDuration , clock clock.Clock , cacheUpdatePeriod time. Duration ) (PodLifecycleEventGenerator , error ) {
91
90
handler , ok := genericPleg .(podLifecycleEventGeneratorHandler )
92
91
if ! ok {
93
92
return nil , fmt .Errorf ("%v doesn't implement podLifecycleEventGeneratorHandler interface" , genericPleg )
@@ -101,6 +100,7 @@ func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.Ru
101
100
eventedPlegMaxStreamRetries : eventedPlegMaxStreamRetries ,
102
101
relistDuration : relistDuration ,
103
102
clock : clock ,
103
+ globalCacheUpdatePeriod : cacheUpdatePeriod ,
104
104
}, nil
105
105
}
106
106
@@ -125,7 +125,7 @@ func (e *EventedPLEG) Start() {
125
125
e .stopCh = make (chan struct {})
126
126
e .stopCacheUpdateCh = make (chan struct {})
127
127
go wait .Until (e .watchEventsChannel , 0 , e .stopCh )
128
- go wait .Until (e .updateGlobalCache , globalCacheUpdatePeriod , e .stopCacheUpdateCh )
128
+ go wait .Until (e .updateGlobalCache , e . globalCacheUpdatePeriod , e .stopCacheUpdateCh )
129
129
}
130
130
131
131
// Stop stops the Evented PLEG
0 commit comments