Skip to content

Commit dc0568c

Browse files
OTA-1418: USC: Make messages more robust and decoupled (#1146)
* OTA-1418: Identify the informer sending insight message Make USC aware of the identities of the informers that send insights to it, by including the identifier in the message sent to USC. In the future API, we will have separated containers per informer: ``` informers: - name: name-1 insights: [...] - name: name-2 insights: [...] ``` We do not have this in our ConfigMap API mock so for now we encode the informer into the key of the item stored in the ConfigMap. USC also takes ownership of the `usc.` prefix (formerly `usc-`) which identifies insight keys, and the informers no longer need to maintain this contract. * USC: Drop & log invalid messages Co-authored-by: David Hurta <[email protected]> * USC: Validate insight message creation in producers Co-authored-by: David Hurta <[email protected]> * USC: Producers no longer need to prefix insight UIDs Co-authored-by: David Hurta <[email protected]> * USC: Address review --------- Co-authored-by: David Hurta <[email protected]>
1 parent da45e1a commit dc0568c

File tree

6 files changed

+261
-103
lines changed

6 files changed

+261
-103
lines changed

pkg/updatestatus/controlplaneinformer.go

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"strings"
1010
"time"
1111

12-
"gopkg.in/yaml.v3"
1312
kerrors "k8s.io/apimachinery/pkg/api/errors"
1413
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1514
"k8s.io/apimachinery/pkg/runtime"
@@ -87,8 +86,9 @@ func clusterOperatorEventFilterFunc(obj interface{}) bool {
8786
}
8887

8988
const (
90-
clusterVersionKindName = "ClusterVersion"
91-
clusterOperatorKindName = "ClusterOperator"
89+
clusterVersionKindName = "ClusterVersion"
90+
clusterOperatorKindName = "ClusterOperator"
91+
controlPlaneInformerName = "cpi"
9292
)
9393

9494
// sync is called for any controller event. It will assess the state and health of the control plane, indicated by
@@ -118,9 +118,19 @@ func (c *controlPlaneInformerController) sync(ctx context.Context, syncCtx facto
118118

119119
now := c.now()
120120
cvInsight, healthInsights := assessClusterVersion(clusterVersion, now)
121-
msgs = append(msgs, makeInsightMsgForClusterVersion(cvInsight, now))
121+
msg, err := makeInsightMsgForClusterVersion(cvInsight, now)
122+
if err != nil {
123+
klog.Errorf("BUG: Could not create insight message: %v", err)
124+
return nil
125+
}
126+
msgs = append(msgs, msg)
122127
for item := range healthInsights {
123-
msgs = append(msgs, makeInsightMsgForHealthInsight(healthInsights[item], now))
128+
msg, err := makeInsightMsgForHealthInsight(healthInsights[item], now)
129+
if err != nil {
130+
klog.Errorf("BUG: Could not create insight message: %v", err)
131+
return nil
132+
}
133+
msgs = append(msgs, msg)
124134
}
125135

126136
case clusterOperatorKindName:
@@ -144,7 +154,12 @@ func (c *controlPlaneInformerController) sync(ctx context.Context, syncCtx facto
144154
if err != nil {
145155
return fmt.Errorf("failed to assess cluster operator %s: %w", name, err)
146156
}
147-
msgs = append(msgs, makeInsightMsgForClusterOperator(insight, now))
157+
msg, err := makeInsightMsgForClusterOperator(insight, now)
158+
if err != nil {
159+
klog.Errorf("BUG: Could not create insight message: %v", err)
160+
return nil
161+
}
162+
msgs = append(msgs, msg)
148163
default:
149164
return fmt.Errorf("invalid queue key %s with unexpected type %s", queueKey, t)
150165
}
@@ -161,22 +176,16 @@ func (c *controlPlaneInformerController) sync(ctx context.Context, syncCtx facto
161176
return nil
162177
}
163178

164-
func makeInsightMsgForClusterOperator(coInsight *ClusterOperatorStatusInsight, acquiredAt metav1.Time) informerMsg {
165-
uid := fmt.Sprintf("usc-co-%s", coInsight.Name)
179+
func makeInsightMsgForClusterOperator(coInsight *ClusterOperatorStatusInsight, acquiredAt metav1.Time) (informerMsg, error) {
166180
insight := ControlPlaneInsight{
167-
UID: uid,
181+
UID: fmt.Sprintf("co-%s", coInsight.Name),
168182
AcquiredAt: acquiredAt,
169183
ControlPlaneInsightUnion: ControlPlaneInsightUnion{
170184
Type: ClusterOperatorStatusInsightType,
171185
ClusterOperatorStatusInsight: coInsight,
172186
},
173187
}
174-
// Should handle errors, but ultimately we will have a proper API and won’t need to serialize ourselves
175-
rawInsight, _ := yaml.Marshal(insight)
176-
return informerMsg{
177-
uid: uid,
178-
insight: rawInsight,
179-
}
188+
return makeControlPlaneInsightMsg(insight, controlPlaneInformerName)
180189
}
181190

182191
func assessClusterOperator(ctx context.Context, operator *configv1.ClusterOperator, targetVersion string, appsClient appsv1client.AppsV1Interface, now metav1.Time) (*ClusterOperatorStatusInsight, error) {
@@ -300,22 +309,16 @@ func getImagePullSpec(ctx context.Context, name string, appsClient appsv1client.
300309
// makeInsightMsgForClusterVersion creates an informerMsg for the given ClusterVersionStatusInsight. It defines an uid
301310
// name and serializes the insight as YAML. Serialization is convenient because it prevents any data sharing issues
302311
// between controllers.
303-
func makeInsightMsgForClusterVersion(cvInsight *ClusterVersionStatusInsight, acquiredAt metav1.Time) informerMsg {
304-
uid := fmt.Sprintf("usc-cv-%s", cvInsight.Resource.Name)
312+
func makeInsightMsgForClusterVersion(cvInsight *ClusterVersionStatusInsight, acquiredAt metav1.Time) (informerMsg, error) {
305313
insight := ControlPlaneInsight{
306-
UID: uid,
314+
UID: fmt.Sprintf("cv-%s", cvInsight.Resource.Name),
307315
AcquiredAt: acquiredAt,
308316
ControlPlaneInsightUnion: ControlPlaneInsightUnion{
309317
Type: ClusterVersionStatusInsightType,
310318
ClusterVersionStatusInsight: cvInsight,
311319
},
312320
}
313-
// Should handle errors, but ultimately we will have a proper API and won’t need to serialize ourselves
314-
rawInsight, _ := yaml.Marshal(insight)
315-
return informerMsg{
316-
uid: uid,
317-
insight: rawInsight,
318-
}
321+
return makeControlPlaneInsightMsg(insight, controlPlaneInformerName)
319322
}
320323

321324
func uidForHealthInsight(healthInsight *HealthInsight) string {
@@ -332,26 +335,19 @@ func uidForHealthInsight(healthInsight *HealthInsight) string {
332335
encoded := base64.StdEncoding.EncodeToString(sum)
333336
encoded = strings.TrimRight(encoded, "=")
334337

335-
return fmt.Sprintf("usc-%s", encoded)
338+
return encoded
336339
}
337340

338-
func makeInsightMsgForHealthInsight(healthInsight *HealthInsight, acquiredAt metav1.Time) informerMsg {
339-
uid := uidForHealthInsight(healthInsight)
341+
func makeInsightMsgForHealthInsight(healthInsight *HealthInsight, acquiredAt metav1.Time) (informerMsg, error) {
340342
insight := ControlPlaneInsight{
341-
UID: uid,
343+
UID: uidForHealthInsight(healthInsight),
342344
AcquiredAt: acquiredAt,
343345
ControlPlaneInsightUnion: ControlPlaneInsightUnion{
344346
Type: HealthInsightType,
345347
HealthInsight: healthInsight,
346348
},
347349
}
348-
349-
// Should handle errors, but ultimately we will have a proper API and won’t need to serialize ourselves
350-
rawInsight, _ := yaml.Marshal(insight)
351-
return informerMsg{
352-
uid: uid,
353-
insight: rawInsight,
354-
}
350+
return makeControlPlaneInsightMsg(insight, controlPlaneInformerName)
355351
}
356352

357353
// assessClusterVersion produces a ClusterVersion status insight from the current state of the ClusterVersion resource.

pkg/updatestatus/controlplaneinformer_test.go

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ func Test_sync_with_cv(t *testing.T) {
9696
cvProgressing: &progressingTrue,
9797
cvHistory: []configv1.UpdateHistory{inProgress418},
9898
expectedMsgs: map[string]ControlPlaneInsight{
99-
"usc-cv-version": {
100-
UID: "usc-cv-version",
99+
"cv-version": {
100+
UID: "cv-version",
101101
AcquiredAt: now,
102102
ControlPlaneInsightUnion: ControlPlaneInsightUnion{
103103
Type: ClusterVersionStatusInsightType,
@@ -131,8 +131,8 @@ func Test_sync_with_cv(t *testing.T) {
131131
cvProgressing: &progressingFalse,
132132
cvHistory: []configv1.UpdateHistory{completed418},
133133
expectedMsgs: map[string]ControlPlaneInsight{
134-
"usc-cv-version": {
135-
UID: "usc-cv-version",
134+
"cv-version": {
135+
UID: "cv-version",
136136
AcquiredAt: now,
137137
ControlPlaneInsightUnion: ControlPlaneInsightUnion{
138138
Type: ClusterVersionStatusInsightType,
@@ -167,8 +167,8 @@ func Test_sync_with_cv(t *testing.T) {
167167
cvProgressing: &progressingTrue,
168168
cvHistory: []configv1.UpdateHistory{inProgress419, completed418},
169169
expectedMsgs: map[string]ControlPlaneInsight{
170-
"usc-cv-version": {
171-
UID: "usc-cv-version",
170+
"cv-version": {
171+
UID: "cv-version",
172172
AcquiredAt: now,
173173
ControlPlaneInsightUnion: ControlPlaneInsightUnion{
174174
Type: ClusterVersionStatusInsightType,
@@ -203,8 +203,8 @@ func Test_sync_with_cv(t *testing.T) {
203203
uscForceHealthInsightAnnotation: "value-does-not-matter",
204204
},
205205
expectedMsgs: map[string]ControlPlaneInsight{
206-
"usc-0kmuaUQRUJDOAIAF1KWTmg": {
207-
UID: "usc-0kmuaUQRUJDOAIAF1KWTmg",
206+
"0kmuaUQRUJDOAIAF1KWTmg": {
207+
UID: "0kmuaUQRUJDOAIAF1KWTmg",
208208
AcquiredAt: now,
209209
ControlPlaneInsightUnion: ControlPlaneInsightUnion{
210210
Type: HealthInsightType,
@@ -228,8 +228,8 @@ func Test_sync_with_cv(t *testing.T) {
228228
},
229229
},
230230
},
231-
"usc-cv-version": {
232-
UID: "usc-cv-version",
231+
"cv-version": {
232+
UID: "cv-version",
233233
AcquiredAt: now,
234234
ControlPlaneInsightUnion: ControlPlaneInsightUnion{
235235
Type: ClusterVersionStatusInsightType,
@@ -293,8 +293,9 @@ func Test_sync_with_cv(t *testing.T) {
293293
t.Fatalf("Failed to marshal expected insight: %v", err)
294294
}
295295
expectedMsgs = append(expectedMsgs, informerMsg{
296-
uid: uid,
297-
insight: raw,
296+
informer: controlPlaneInformerName,
297+
uid: uid,
298+
insight: raw,
298299
})
299300
}
300301

@@ -304,6 +305,11 @@ func Test_sync_with_cv(t *testing.T) {
304305
if diff := cmp.Diff(expectedMsgs, actualMsgs, ignoreOrder, cmp.AllowUnexported(informerMsg{})); diff != "" {
305306
t.Errorf("Sync messages differ from expected:\n%s", diff)
306307
}
308+
for _, msg := range actualMsgs {
309+
if err := msg.validate(); err != nil {
310+
t.Errorf("Received message is invalid: %v\nMessage content: %v", err, msg)
311+
}
312+
}
307313
})
308314
}
309315
}
@@ -525,8 +531,8 @@ func Test_sync_with_co(t *testing.T) {
525531
{
526532
name: "Cluster during installation",
527533
expectedMsgs: map[string]ControlPlaneInsight{
528-
"usc-co-some-co": {
529-
UID: "usc-co-some-co",
534+
"co-some-co": {
535+
UID: "co-some-co",
530536
AcquiredAt: now,
531537
ControlPlaneInsightUnion: ControlPlaneInsightUnion{
532538
Type: ClusterOperatorStatusInsightType,
@@ -585,14 +591,24 @@ func Test_sync_with_co(t *testing.T) {
585591
t.Fatalf("Failed to marshal expected insight: %v", err)
586592
}
587593
expectedMsgs = append(expectedMsgs, informerMsg{
588-
uid: uid,
589-
insight: raw,
594+
informer: controlPlaneInformerName,
595+
uid: uid,
596+
insight: raw,
590597
})
591598
}
592599

593-
if diff := cmp.Diff(expectedMsgs, actualMsgs, cmp.AllowUnexported(informerMsg{})); diff != "" {
600+
ignoreOrder := cmpopts.SortSlices(func(a, b informerMsg) bool {
601+
return a.uid < b.uid
602+
})
603+
if diff := cmp.Diff(expectedMsgs, actualMsgs, ignoreOrder, cmp.AllowUnexported(informerMsg{})); diff != "" {
594604
t.Errorf("Sync messages differ from expected:\n%s", diff)
595605
}
606+
607+
for _, msg := range actualMsgs {
608+
if err := msg.validate(); err != nil {
609+
t.Errorf("Received message is invalid: %v\nMessage content: %v", err, msg)
610+
}
611+
}
596612
})
597613
}
598614
}

pkg/updatestatus/nodeinformer.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"strings"
77
"time"
88

9-
"gopkg.in/yaml.v3"
10-
119
corev1 "k8s.io/api/core/v1"
1210
kerrors "k8s.io/apimachinery/pkg/api/errors"
1311
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -126,7 +124,11 @@ func (c *nodeInformerController) sync(ctx context.Context, syncCtx factory.SyncC
126124

127125
now := c.now()
128126
if insight := assessNode(node, mcp, machineConfigVersions, mostRecentVersionInCVHistory, now); insight != nil {
129-
msg = makeInsightMsgForNode(insight, now)
127+
msg, err = makeInsightMsgForNode(insight, now)
128+
if err != nil {
129+
klog.Errorf("BUG: Could not create insight message: %v", err)
130+
return nil
131+
}
130132
}
131133
default:
132134
return fmt.Errorf("invalid queue key %s with unexpected type %s", queueKey, t)
@@ -140,22 +142,17 @@ func (c *nodeInformerController) sync(ctx context.Context, syncCtx factory.SyncC
140142
return nil
141143
}
142144

143-
func makeInsightMsgForNode(nodeInsight *NodeStatusInsight, acquiredAt metav1.Time) informerMsg {
144-
uid := fmt.Sprintf("usc-node-%s", nodeInsight.Resource.Name)
145+
func makeInsightMsgForNode(nodeInsight *NodeStatusInsight, acquiredAt metav1.Time) (informerMsg, error) {
145146
insight := WorkerPoolInsight{
146-
UID: uid,
147+
UID: fmt.Sprintf("node-%s", nodeInsight.Resource.Name),
147148
AcquiredAt: acquiredAt,
148149
WorkerPoolInsightUnion: WorkerPoolInsightUnion{
149150
Type: NodeStatusInsightType,
150151
NodeStatusInsight: nodeInsight,
151152
},
152153
}
153-
// Should handle errors, but ultimately we will have a proper API and won’t need to serialize ourselves
154-
rawInsight, _ := yaml.Marshal(insight)
155-
return informerMsg{
156-
uid: uid,
157-
insight: rawInsight,
158-
}
154+
155+
return makeWorkerPoolsInsightMsg(insight, nodesInformerName)
159156
}
160157

161158
func whichMCP(node *corev1.Node, pools []*machineconfigv1.MachineConfigPool) (*machineconfigv1.MachineConfigPool, error) {
@@ -376,7 +373,8 @@ func assessNode(node *corev1.Node, mcp *machineconfigv1.MachineConfigPool, machi
376373
}
377374

378375
const (
379-
nodeKindName = "Node"
376+
nodeKindName = "Node"
377+
nodesInformerName = "ni"
380378
)
381379

382380
func parseNodeInformerControllerQueueKey(queueKey string) (string, string, error) {

pkg/updatestatus/nodeinformer_test.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/google/go-cmp/cmp"
10+
"github.com/google/go-cmp/cmp/cmpopts"
1011
"gopkg.in/yaml.v3"
1112

1213
corev1 "k8s.io/api/core/v1"
@@ -915,8 +916,8 @@ func Test_sync_with_node(t *testing.T) {
915916
},
916917
},
917918
expectedMsgs: map[string]WorkerPoolInsight{
918-
"usc-node-worker-1": {
919-
UID: "usc-node-worker-1",
919+
"node-worker-1": {
920+
UID: "node-worker-1",
920921
AcquiredAt: now,
921922
WorkerPoolInsightUnion: WorkerPoolInsightUnion{
922923
Type: NodeStatusInsightType,
@@ -957,8 +958,8 @@ func Test_sync_with_node(t *testing.T) {
957958
},
958959
},
959960
expectedMsgs: map[string]WorkerPoolInsight{
960-
"usc-node-worker-1": {
961-
UID: "usc-node-worker-1",
961+
"node-worker-1": {
962+
UID: "node-worker-1",
962963
AcquiredAt: now,
963964
WorkerPoolInsightUnion: WorkerPoolInsightUnion{
964965
Type: NodeStatusInsightType,
@@ -1040,14 +1041,25 @@ func Test_sync_with_node(t *testing.T) {
10401041
t.Fatalf("Failed to marshal expected insight: %v", err)
10411042
}
10421043
expectedMsgs = append(expectedMsgs, informerMsg{
1043-
uid: uid,
1044-
insight: raw,
1044+
informer: nodesInformerName,
1045+
uid: uid,
1046+
insight: raw,
10451047
})
10461048
}
10471049

1048-
if diff := cmp.Diff(expectedMsgs, actualMsgs, cmp.AllowUnexported(informerMsg{})); diff != "" {
1050+
ignoreOrder := cmpopts.SortSlices(func(a, b informerMsg) bool {
1051+
return a.uid < b.uid
1052+
})
1053+
1054+
if diff := cmp.Diff(expectedMsgs, actualMsgs, ignoreOrder, cmp.AllowUnexported(informerMsg{})); diff != "" {
10491055
t.Errorf("Sync messages differ from expected:\n%s", diff)
10501056
}
1057+
1058+
for _, msg := range actualMsgs {
1059+
if err := msg.validate(); err != nil {
1060+
t.Errorf("Received message is invalid: %v\nMessage content: %v", err, msg)
1061+
}
1062+
}
10511063
})
10521064
}
10531065
}

0 commit comments

Comments
 (0)