Skip to content

Commit dad69f3

Browse files
committed
USC: Drop unknown insights after grace period
When we first see an insight in the API that is no longer known by the informer that originally reported it, mark it for expiration instead of dropping it right away. If it becomes known again, unmark the expiration. If it does not become known until expired, drop it from the API.
1 parent 11ca71a commit dad69f3

File tree

2 files changed

+190
-26
lines changed

2 files changed

+190
-26
lines changed

pkg/updatestatus/updatestatuscontroller.go

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ import (
2525
"github.com/openshift/library-go/pkg/operator/events"
2626
)
2727

28+
const (
29+
unknownInsightGracePeriod = 60 * time.Second
30+
)
31+
2832
// informerMsg is the communication structure between informers and the update status controller. It contains the UID of
2933
// the insight and the insight itself, serialized as YAML. Passing serialized avoids shared data access problems. Until
3034
// we have the Status API we need to serialize ourselves anyway.
@@ -72,6 +76,9 @@ func isStatusInsightKey(k string) bool {
7276
return strings.HasPrefix(k, "usc.")
7377
}
7478

79+
// insightExpirations is UID -> expiration time map
80+
type insightExpirations map[string]time.Time
81+
7582
// updateStatusController is a controller that collects insights from informers and maintains a ConfigMap with the insights
7683
// until we have a proper UpdateStatus API. The controller maintains an internal desired content of the ConfigMap (even
7784
// if it does not exist in the cluster) and updates it in the cluster when new insights are received, or when the ConfigMap
@@ -98,11 +105,20 @@ type updateStatusController struct {
98105
sync.Mutex
99106
cm *corev1.ConfigMap
100107

108+
// unknownInsightExpirations is a map of informer -> map of UID -> expiration time. It is used to track insights
109+
// that were reported by informers but are no longer known to them. The API keeps unknown insights until they
110+
// expire. If an insight is reported as known again before it expires, it is removed from the map.
111+
// TODO (muller): Needs to periodically rebuilt to avoid leaking memory
112+
unknownInsightExpirations map[string]insightExpirations
113+
101114
// processed is the number of insights processed, used for testing
102115
processed int
103116
}
104117

105118
recorder events.Recorder
119+
// TODO: Get rid of this and use `clock.Clock` in all controllers, passed from start.go main function's
120+
// controllercmd.ControllerContext
121+
now func() time.Time
106122
}
107123

108124
// newUpdateStatusController creates a new update status controller and returns it. The second return value is a function
@@ -117,6 +133,7 @@ func newUpdateStatusController(
117133
c := &updateStatusController{
118134
configMaps: coreClient.CoreV1().ConfigMaps(uscNamespace),
119135
recorder: uscRecorder,
136+
now: time.Now,
120137
}
121138

122139
startInsightReceiver, sendInsight := c.setupInsightReceiver()
@@ -223,18 +240,66 @@ func (c *updateStatusController) updateInsightInStatusApi(msg informerMsg) {
223240
}
224241

225242
// removeUnknownInsights removes insights from the status API that are no longer reported as known to the informer
226-
// that originally reported them.
243+
// that originally reported them. The insights are kept for a grace period after they are no longer reported as known
244+
// and eventually dropped if they are not reported as known again within that period.
227245
// Assumes the statusApi field is locked.
228246
func (c *updateStatusController) removeUnknownInsights(message informerMsg) {
229247
known := sets.New(message.knownInsights...)
230248
known.Insert(message.uid)
231249
informerPrefix := fmt.Sprintf("usc.%s.", message.informer)
232250
for key := range c.statusApi.cm.Data {
233-
if strings.HasPrefix(key, informerPrefix) && !known.Has(strings.TrimPrefix(key, informerPrefix)) {
234-
delete(c.statusApi.cm.Data, key)
235-
klog.V(2).Infof("USC :: Collector :: Dropped insight %q because it is no longer reported as known by informer %q", key, message.informer)
251+
if strings.HasPrefix(key, informerPrefix) {
252+
uid := strings.TrimPrefix(key, informerPrefix)
253+
c.handleInsightExpiration(message.informer, known.Has(uid), uid)
236254
}
237255
}
256+
257+
if len(c.statusApi.unknownInsightExpirations) > 0 && len(c.statusApi.unknownInsightExpirations[message.informer]) == 0 {
258+
delete(c.statusApi.unknownInsightExpirations, message.informer)
259+
}
260+
if len(c.statusApi.unknownInsightExpirations) == 0 {
261+
c.statusApi.unknownInsightExpirations = nil
262+
}
263+
}
264+
265+
// handleInsightExpiration considers potential expiration of an insight present in the API based on whether the informer
266+
// knows about it.
267+
// If the informer knows about the insight, it is not dropped from the API and any previous expiration is cancelled.
268+
// If the informer does not know about the insight then it is either set to expire in the future if no expiration is
269+
// set yet, or the expiration is checked to see whether the insight should be dropped.
270+
func (c *updateStatusController) handleInsightExpiration(informer string, knows bool, uid string) {
271+
now := c.now()
272+
273+
if knows {
274+
if c.statusApi.unknownInsightExpirations != nil && c.statusApi.unknownInsightExpirations[informer] != nil {
275+
delete(c.statusApi.unknownInsightExpirations[informer], uid)
276+
}
277+
return
278+
}
279+
280+
expireIn := now.Add(unknownInsightGracePeriod)
281+
keepLog := func(expire time.Time) {
282+
klog.V(2).Infof("USC :: Collector :: Keeping insight %q until %s after it is no longer reported as known by informer %q", uid, expire, informer)
283+
}
284+
switch {
285+
// Two cases when we first consider an insight as unknown -> set expiration
286+
case c.statusApi.unknownInsightExpirations == nil:
287+
c.statusApi.unknownInsightExpirations = map[string]insightExpirations{informer: {uid: expireIn}}
288+
keepLog(expireIn)
289+
case c.statusApi.unknownInsightExpirations[informer][uid].IsZero():
290+
c.statusApi.unknownInsightExpirations[informer] = insightExpirations{uid: expireIn}
291+
keepLog(expireIn)
292+
293+
// Already set for expiration but still in grace period -> keep insight
294+
case c.statusApi.unknownInsightExpirations[informer][uid].After(now):
295+
keepLog(c.statusApi.unknownInsightExpirations[informer][uid])
296+
297+
// Already set for expiration and grace period expired -> drop insight
298+
default:
299+
delete(c.statusApi.unknownInsightExpirations[informer], uid)
300+
delete(c.statusApi.cm.Data, fmt.Sprintf("usc.%s.%s", informer, uid))
301+
klog.V(2).Infof("USC :: Collector :: Dropped insight %q because it is no longer reported as known by informer %q", uid, informer)
302+
}
238303
}
239304

240305
func (c *updateStatusController) commitStatusApiAsConfigMap(ctx context.Context) error {

pkg/updatestatus/updatestatuscontroller_test.go

Lines changed: 121 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,35 @@ import (
1616
)
1717

1818
func Test_updateStatusController(t *testing.T) {
19+
var now = time.Now()
20+
var minus90sec = now.Add(-90 * time.Second)
21+
var minus30sec = now.Add(-30 * time.Second)
22+
var plus30sec = now.Add(30 * time.Second)
23+
var plus60sec = now.Add(1 * time.Minute)
24+
1925
testCases := []struct {
20-
name string
26+
name string
27+
2128
controllerConfigMap *corev1.ConfigMap
29+
unknownExpirations map[string]insightExpirations
2230

2331
informerMsg []informerMsg
24-
expected *corev1.ConfigMap
32+
33+
expectedControllerConfigMap *corev1.ConfigMap
34+
expectedUnknownExpirations map[string]insightExpirations
2535
}{
2636
{
27-
name: "no messages, no state -> no state",
28-
controllerConfigMap: nil,
29-
informerMsg: []informerMsg{},
30-
expected: nil,
37+
name: "no messages, no state -> no state",
38+
controllerConfigMap: nil,
39+
informerMsg: []informerMsg{},
40+
expectedControllerConfigMap: nil,
3141
},
3242
{
3343
name: "no messages, empty state -> empty state",
3444
controllerConfigMap: &corev1.ConfigMap{
3545
Data: map[string]string{},
3646
},
37-
expected: &corev1.ConfigMap{
47+
expectedControllerConfigMap: &corev1.ConfigMap{
3848
Data: map[string]string{},
3949
},
4050
},
@@ -45,7 +55,7 @@ func Test_updateStatusController(t *testing.T) {
4555
"usc.cpi.cv-version": "value",
4656
},
4757
},
48-
expected: &corev1.ConfigMap{
58+
expectedControllerConfigMap: &corev1.ConfigMap{
4959
Data: map[string]string{
5060
"usc.cpi.cv-version": "value",
5161
},
@@ -61,7 +71,7 @@ func Test_updateStatusController(t *testing.T) {
6171
insight: []byte("value"),
6272
},
6373
},
64-
expected: &corev1.ConfigMap{
74+
expectedControllerConfigMap: &corev1.ConfigMap{
6575
Data: map[string]string{
6676
"usc.cpi.cv-version": "value",
6777
},
@@ -101,7 +111,7 @@ func Test_updateStatusController(t *testing.T) {
101111
knownInsights: []string{"kept", "new-item", "another"},
102112
},
103113
},
104-
expected: &corev1.ConfigMap{
114+
expectedControllerConfigMap: &corev1.ConfigMap{
105115
Data: map[string]string{
106116
"usc.cpi.kept": "kept",
107117
"usc.cpi.new-item": "msg1",
@@ -124,7 +134,7 @@ func Test_updateStatusController(t *testing.T) {
124134
insight: []byte("msg from informer two"),
125135
},
126136
},
127-
expected: &corev1.ConfigMap{
137+
expectedControllerConfigMap: &corev1.ConfigMap{
128138
Data: map[string]string{
129139
"usc.one.item": "msg from informer one",
130140
"usc.two.item": "msg from informer two",
@@ -141,7 +151,7 @@ func Test_updateStatusController(t *testing.T) {
141151
insight: []byte("msg from informer one"),
142152
},
143153
},
144-
expected: nil,
154+
expectedControllerConfigMap: nil,
145155
},
146156
{
147157
name: "empty uid -> message gets dropped",
@@ -153,7 +163,7 @@ func Test_updateStatusController(t *testing.T) {
153163
insight: []byte("msg from informer one"),
154164
},
155165
},
156-
expected: nil,
166+
expectedControllerConfigMap: nil,
157167
},
158168
{
159169
name: "empty insight payload -> message gets dropped",
@@ -165,7 +175,7 @@ func Test_updateStatusController(t *testing.T) {
165175
insight: []byte{},
166176
},
167177
},
168-
expected: nil,
178+
expectedControllerConfigMap: nil,
169179
},
170180
{
171181
name: "nil insight payload -> message gets dropped",
@@ -177,10 +187,10 @@ func Test_updateStatusController(t *testing.T) {
177187
insight: nil,
178188
},
179189
},
180-
expected: nil,
190+
expectedControllerConfigMap: nil,
181191
},
182192
{
183-
name: "unknown message gets removed from state",
193+
name: "unknown insight -> not removed from state immediately but set for expiration",
184194
controllerConfigMap: &corev1.ConfigMap{
185195
Data: map[string]string{
186196
"usc.one.old": "payload",
@@ -192,11 +202,88 @@ func Test_updateStatusController(t *testing.T) {
192202
insight: []byte("new payload"),
193203
knownInsights: nil,
194204
}},
195-
expected: &corev1.ConfigMap{
205+
expectedControllerConfigMap: &corev1.ConfigMap{
196206
Data: map[string]string{
207+
"usc.one.old": "payload",
197208
"usc.one.new": "new payload",
198209
},
199210
},
211+
expectedUnknownExpirations: map[string]insightExpirations{
212+
"one": {"old": plus60sec},
213+
},
214+
},
215+
{
216+
name: "unknown insight already set for expiration -> not removed from state while not expired yet",
217+
controllerConfigMap: &corev1.ConfigMap{
218+
Data: map[string]string{
219+
"usc.one.old": "payload",
220+
},
221+
},
222+
unknownExpirations: map[string]insightExpirations{
223+
"one": {"old": plus30sec},
224+
},
225+
informerMsg: []informerMsg{{
226+
informer: "one",
227+
uid: "new",
228+
insight: []byte("new payload"),
229+
knownInsights: nil,
230+
}},
231+
expectedControllerConfigMap: &corev1.ConfigMap{
232+
Data: map[string]string{
233+
"usc.one.old": "payload",
234+
"usc.one.new": "new payload",
235+
},
236+
},
237+
expectedUnknownExpirations: map[string]insightExpirations{
238+
"one": {"old": plus30sec},
239+
},
240+
},
241+
{
242+
name: "previously unknown insight set for expiration is known again -> kept in state and expire dropped",
243+
controllerConfigMap: &corev1.ConfigMap{
244+
Data: map[string]string{
245+
"usc.one.old": "payload",
246+
},
247+
},
248+
unknownExpirations: map[string]insightExpirations{
249+
"one": {"old": minus30sec},
250+
},
251+
informerMsg: []informerMsg{{
252+
informer: "one",
253+
uid: "new",
254+
insight: []byte("new payload"),
255+
knownInsights: []string{"old"},
256+
}},
257+
expectedControllerConfigMap: &corev1.ConfigMap{
258+
Data: map[string]string{
259+
"usc.one.old": "payload",
260+
"usc.one.new": "new payload",
261+
},
262+
},
263+
expectedUnknownExpirations: nil,
264+
},
265+
{
266+
name: "previously unknown insight expired and never became known again -> dropped from state and expire dropped",
267+
controllerConfigMap: &corev1.ConfigMap{
268+
Data: map[string]string{
269+
"usc.one.old": "payload",
270+
},
271+
},
272+
unknownExpirations: map[string]insightExpirations{
273+
"one": {"old": minus90sec},
274+
},
275+
informerMsg: []informerMsg{{
276+
informer: "one",
277+
uid: "new",
278+
insight: []byte("new payload"),
279+
knownInsights: nil,
280+
}},
281+
expectedControllerConfigMap: &corev1.ConfigMap{
282+
Data: map[string]string{
283+
"usc.one.new": "new payload",
284+
},
285+
},
286+
expectedUnknownExpirations: nil,
200287
},
201288
}
202289

@@ -208,9 +295,16 @@ func Test_updateStatusController(t *testing.T) {
208295

209296
controller := updateStatusController{
210297
configMaps: kubeClient.CoreV1().ConfigMaps(uscNamespace),
298+
now: func() time.Time { return now },
211299
}
212300
controller.statusApi.Lock()
213301
controller.statusApi.cm = tc.controllerConfigMap
302+
for informer, expirations := range tc.unknownExpirations {
303+
if controller.statusApi.unknownInsightExpirations == nil {
304+
controller.statusApi.unknownInsightExpirations = make(map[string]insightExpirations)
305+
}
306+
controller.statusApi.unknownInsightExpirations[informer] = expirations
307+
}
214308
controller.statusApi.Unlock()
215309

216310
startInsightReceiver, sendInsight := controller.setupInsightReceiver()
@@ -227,19 +321,24 @@ func Test_updateStatusController(t *testing.T) {
227321

228322
expectedProcessed := len(tc.informerMsg)
229323
var sawProcessed int
230-
var diff string
324+
var diffConfigMap string
325+
var diffExpirations string
231326
backoff := wait.Backoff{Duration: 5 * time.Millisecond, Factor: 2, Steps: 10}
232327
if err := wait.ExponentialBackoff(backoff, func() (bool, error) {
233328
controller.statusApi.Lock()
234329
defer controller.statusApi.Unlock()
235330

236331
sawProcessed = controller.statusApi.processed
237-
diff = cmp.Diff(tc.expected, controller.statusApi.cm)
332+
diffConfigMap = cmp.Diff(tc.expectedControllerConfigMap, controller.statusApi.cm)
333+
diffExpirations = cmp.Diff(tc.expectedUnknownExpirations, controller.statusApi.unknownInsightExpirations)
238334

239-
return diff == "" && sawProcessed == expectedProcessed, nil
335+
return diffConfigMap == "" && diffExpirations == "" && sawProcessed == expectedProcessed, nil
240336
}); err != nil {
241-
if diff != "" {
242-
t.Errorf("controller config map differs from expected:\n%s", diff)
337+
if diffConfigMap != "" {
338+
t.Errorf("controller config map differs from expected:\n%s", diffConfigMap)
339+
}
340+
if diffExpirations != "" {
341+
t.Errorf("expirations differ from expected:\n%s", diffExpirations)
243342
}
244343
if controller.statusApi.processed != len(tc.informerMsg) {
245344
t.Errorf("controller processed %d messages, expected %d", controller.statusApi.processed, len(tc.informerMsg))

0 commit comments

Comments
 (0)