@@ -51,6 +51,9 @@ const (
51
51
// storageWatchListPageSize is the cacher's request chunk size of
52
52
// initial and resync watch lists to storage.
53
53
storageWatchListPageSize = int64 (10000 )
54
+ // defaultBookmarkFrequency defines how frequently watch bookmarks should be send
55
+ // in addition to sending a bookmark right before watch deadline
56
+ defaultBookmarkFrequency = time .Minute
54
57
)
55
58
56
59
// Config contains the configuration for a given Cache.
@@ -154,24 +157,26 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache
154
157
// second in a bucket, and pop up them once at the timeout. To be more specific,
155
158
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
156
159
type watcherBookmarkTimeBuckets struct {
157
- lock sync.Mutex
158
- watchersBuckets map [int64 ][]* cacheWatcher
159
- startBucketID int64
160
- clock clock.Clock
160
+ lock sync.Mutex
161
+ watchersBuckets map [int64 ][]* cacheWatcher
162
+ startBucketID int64
163
+ clock clock.Clock
164
+ bookmarkFrequency time.Duration
161
165
}
162
166
163
- func newTimeBucketWatchers (clock clock.Clock ) * watcherBookmarkTimeBuckets {
167
+ func newTimeBucketWatchers (clock clock.Clock , bookmarkFrequency time. Duration ) * watcherBookmarkTimeBuckets {
164
168
return & watcherBookmarkTimeBuckets {
165
- watchersBuckets : make (map [int64 ][]* cacheWatcher ),
166
- startBucketID : clock .Now ().Unix (),
167
- clock : clock ,
169
+ watchersBuckets : make (map [int64 ][]* cacheWatcher ),
170
+ startBucketID : clock .Now ().Unix (),
171
+ clock : clock ,
172
+ bookmarkFrequency : bookmarkFrequency ,
168
173
}
169
174
}
170
175
171
176
// adds a watcher to the bucket, if the deadline is before the start, it will be
172
177
// added to the first one.
173
178
func (t * watcherBookmarkTimeBuckets ) addWatcher (w * cacheWatcher ) bool {
174
- nextTime , ok := w .nextBookmarkTime (t .clock .Now ())
179
+ nextTime , ok := w .nextBookmarkTime (t .clock .Now (), t . bookmarkFrequency )
175
180
if ! ok {
176
181
return false
177
182
}
@@ -339,7 +344,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
339
344
stopCh : stopCh ,
340
345
clock : clock ,
341
346
timer : time .NewTimer (time .Duration (0 )),
342
- bookmarkWatchers : newTimeBucketWatchers (clock ),
347
+ bookmarkWatchers : newTimeBucketWatchers (clock , defaultBookmarkFrequency ),
343
348
}
344
349
345
350
// Ensure that timer is stopped.
@@ -914,9 +919,8 @@ func (c *Cacher) startDispatchingBookmarkEvents() {
914
919
continue
915
920
}
916
921
c .watchersBuffer = append (c .watchersBuffer , watcher )
917
- // Given that we send bookmark event once at deadline-2s, never push again
918
- // after the watcher pops up from the buckets. Once we decide to change the
919
- // strategy to more sophisticated, we may need it here.
922
+ // Requeue the watcher for the next bookmark if needed.
923
+ c .bookmarkWatchers .addWatcher (watcher )
920
924
}
921
925
}
922
926
}
@@ -1219,13 +1223,28 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
1219
1223
}
1220
1224
}
1221
1225
1222
- func (c * cacheWatcher ) nextBookmarkTime (now time.Time ) (time.Time , bool ) {
1223
- // For now we return 2s before deadline (and maybe +infinity is now already passed this time)
1224
- // but it gives us extensibility for the future(false when deadline is not set).
1226
+ func (c * cacheWatcher ) nextBookmarkTime (now time.Time , bookmarkFrequency time.Duration ) (time.Time , bool ) {
1227
+ // We try to send bookmarks:
1228
+ // (a) roughly every minute
1229
+ // (b) right before the watcher timeout - for now we simply set it 2s before
1230
+ // the deadline
1231
+ // The former gives us periodicity if the watch breaks due to unexpected
1232
+ // conditions, the later ensures that on timeout the watcher is as close to
1233
+ // now as possible - this covers 99% of cases.
1234
+ heartbeatTime := now .Add (bookmarkFrequency )
1225
1235
if c .deadline .IsZero () {
1226
- return c .deadline , false
1236
+ // Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
1237
+ // apiserver if properly configured. So this shoudln't happen in practice.
1238
+ return heartbeatTime , true
1239
+ }
1240
+ if pretimeoutTime := c .deadline .Add (- 2 * time .Second ); pretimeoutTime .Before (heartbeatTime ) {
1241
+ heartbeatTime = pretimeoutTime
1242
+ }
1243
+
1244
+ if heartbeatTime .Before (now ) {
1245
+ return time.Time {}, false
1227
1246
}
1228
- return c . deadline . Add ( - 2 * time . Second ) , true
1247
+ return heartbeatTime , true
1229
1248
}
1230
1249
1231
1250
func getEventObject (object runtime.Object ) runtime.Object {
0 commit comments