Skip to content

Commit 7e3c23d

Browse files
committed
Pod syncers no found error should be skiped, add some tests, fixes #160
1 parent 0f1f41c commit 7e3c23d

File tree

8 files changed

+243
-13
lines changed

8 files changed

+243
-13
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright 2018 Pressinfra SRL
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package mysqlcluster
18+
19+
import "fmt"
20+
21+
// SyncErrCode is the error code
22+
type SyncErrCode int
23+
24+
const (
25+
// PodNotFound represents the error when pod is not found
26+
PodNotFound SyncErrCode = iota
27+
)
28+
29+
// SyncError is the error type for pod syncer
30+
type SyncError struct {
31+
syncer string
32+
code SyncErrCode
33+
details string
34+
}
35+
36+
func (e *SyncError) Error() string {
37+
return fmt.Sprintf("%s(%d: %s)", e.syncer, e.code, e.details)
38+
}
39+
40+
// NewError returns a syncer error
41+
func NewError(code SyncErrCode, syncer, details string) error {
42+
return &SyncError{
43+
syncer: syncer,
44+
code: code,
45+
details: details,
46+
}
47+
}
48+
49+
// IsError check if a error is of given code
50+
func IsError(err error, code SyncErrCode) bool {
51+
switch t := err.(type) {
52+
default:
53+
return false
54+
case *SyncError:
55+
return t.code == code
56+
}
57+
}

pkg/controller/mysqlcluster/internal/syncer/healty_service.go renamed to pkg/controller/mysqlcluster/internal/syncer/healthy_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func NewHealthySVCSyncer(c client.Client, scheme *runtime.Scheme, cluster *mysql
4040

4141
out.Spec.Type = "ClusterIP"
4242
out.Spec.Selector = cluster.GetLabels()
43-
out.Spec.Selector["healty"] = "yes"
43+
out.Spec.Selector["healthy"] = "yes"
4444

4545
if len(out.Spec.Ports) != 1 {
4646
out.Spec.Ports = make([]core.ServicePort, 1)

pkg/controller/mysqlcluster/internal/syncer/pod.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (s *podSyncer) SyncFn(in runtime.Object) error {
6565

6666
// raise error if pod is not created
6767
if out.CreationTimestamp.IsZero() {
68-
return fmt.Errorf("pod is not created")
68+
return NewError(PodNotFound, "PodSyncer", "pod is not found")
6969
}
7070

7171
master := s.cluster.GetNodeCondition(s.hostname, api.NodeConditionMaster)
@@ -86,18 +86,18 @@ func (s *podSyncer) SyncFn(in runtime.Object) error {
8686
role = labelMaster
8787
}
8888

89-
// set healty label
90-
healty := labelNotHealty
89+
// set healthy label
90+
healthy := labelNotHealty
9191
if isMaster || !isMaster && isReplicating && !isLagged {
92-
healty = labelHealty
92+
healthy = labelHealty
9393
}
9494

9595
if len(out.ObjectMeta.Labels) == 0 {
9696
out.ObjectMeta.Labels = map[string]string{}
9797
}
9898

9999
out.ObjectMeta.Labels["role"] = role
100-
out.ObjectMeta.Labels["healty"] = healty
100+
out.ObjectMeta.Labels["healthy"] = healthy
101101

102102
return nil
103103
}

pkg/controller/mysqlcluster/internal/syncer/pod_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ var _ = Describe("Pod syncer", func() {
107107
Expect(err).ToNot(Succeed())
108108
})
109109

110-
It("should mark pods as healty", func() {
110+
It("should mark pods as healthy", func() {
111111
cluster.UpdateNodeConditionStatus(cluster.GetPodHostname(1), api.NodeConditionLagged, core.ConditionFalse)
112112
cluster.UpdateNodeConditionStatus(cluster.GetPodHostname(1), api.NodeConditionReplicating, core.ConditionTrue)
113113
// update cluster

pkg/controller/mysqlcluster/mysqlcluster_controller.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,22 +188,31 @@ func (r *ReconcileMysqlCluster) Reconcile(request reconcile.Request) (reconcile.
188188
syncers = append(syncers, clustersyncer.NewPDBSyncer(r.Client, r.scheme, cluster))
189189
}
190190

191-
syncers = append(syncers, r.getPodSyncers(cluster)...)
192-
193-
// add pods syncers for every node status
191+
// run the syncers
194192
for _, sync := range syncers {
195193
if err = syncer.Sync(context.TODO(), sync, r.recorder); err != nil {
196194
return reconcile.Result{}, err
197195
}
198196
}
199197

198+
// run the pod syncers
199+
log.Info("cluster status", "status", cluster.Status)
200+
for _, sync := range r.getPodSyncers(cluster) {
201+
if err = syncer.Sync(context.TODO(), sync, r.recorder); err != nil {
202+
// if it's pod not found then skip the error
203+
if !clustersyncer.IsError(err, clustersyncer.PodNotFound) {
204+
return reconcile.Result{}, err
205+
}
206+
}
207+
}
208+
200209
// Perform any cleanup
201210
pvcCleaner := cleaner.NewPvcCleaner(cluster, r.opt)
202211
err = pvcCleaner.Run(context.TODO(), r.Client, r.scheme, r.recorder)
203-
204212
if err != nil {
205213
return reconcile.Result{}, err
206214
}
215+
207216
return reconcile.Result{}, nil
208217
}
209218

pkg/controller/mysqlcluster/mysqlcluster_controller_suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ import (
2828
"sigs.k8s.io/controller-runtime/pkg/envtest"
2929
"sigs.k8s.io/controller-runtime/pkg/manager"
3030
"sigs.k8s.io/controller-runtime/pkg/reconcile"
31+
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
3132

3233
"github.com/presslabs/mysql-operator/pkg/apis"
34+
"github.com/presslabs/mysql-operator/pkg/controller/internal/testutil"
3335
)
3436

3537
var cfg *rest.Config
@@ -43,6 +45,8 @@ func TestMysqlClusterController(t *testing.T) {
4345
var _ = BeforeSuite(func() {
4446
var err error
4547

48+
logf.SetLogger(testutil.NewTestLogger(GinkgoWriter))
49+
4650
t = &envtest.Environment{
4751
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds")},
4852
}

pkg/controller/mysqlcluster/mysqlcluster_controller_test.go

Lines changed: 161 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
1313
limitations under the License.
1414
*/
1515

16-
// nolint: errcheck
16+
// nolint: errcheck, unparam
1717
package mysqlcluster
1818

1919
import (
@@ -81,6 +81,7 @@ var _ = Describe("MysqlCluster controller", func() {
8181
var (
8282
expectedRequest reconcile.Request
8383
cluster *mysqlcluster.MysqlCluster
84+
clusterKey types.NamespacedName
8485
secret *corev1.Secret
8586
components clusterComponents
8687
)
@@ -107,6 +108,10 @@ var _ = Describe("MysqlCluster controller", func() {
107108
SecretName: secret.Name,
108109
},
109110
})
111+
clusterKey = types.NamespacedName{
112+
Name: cluster.Name,
113+
Namespace: cluster.Namespace,
114+
}
110115

111116
components = []runtime.Object{
112117
&appsv1.StatefulSet{
@@ -147,6 +152,7 @@ var _ = Describe("MysqlCluster controller", func() {
147152
},
148153
}
149154

155+
By("create the MySQL cluster")
150156
Expect(c.Create(context.TODO(), secret)).To(Succeed())
151157
Expect(c.Create(context.TODO(), cluster.Unwrap())).To(Succeed())
152158

@@ -169,6 +175,7 @@ var _ = Describe("MysqlCluster controller", func() {
169175

170176
// We need to make sure that the controller does not create infinite
171177
// loops
178+
By("wait for no more reconcile requests")
172179
Consistently(requests).ShouldNot(Receive(Equal(expectedRequest)))
173180
})
174181

@@ -247,6 +254,92 @@ var _ = Describe("MysqlCluster controller", func() {
247254
Eventually(requests, timeout).Should(Receive(Equal(expectedRequest)))
248255
Eventually(getClusterConditions(c, cluster), timeout).Should(haveCondWithStatus(api.ClusterConditionReady, corev1.ConditionTrue))
249256
})
257+
It("should label pods as healthy and as master accordingly", func() {
258+
pod0 := getPod(cluster, 0)
259+
Expect(c.Create(context.TODO(), pod0)).To(Succeed())
260+
pod1 := getPod(cluster, 1)
261+
Expect(c.Create(context.TODO(), pod1)).To(Succeed())
262+
pod2 := getPod(cluster, 2)
263+
Expect(c.Create(context.TODO(), pod2)).To(Succeed())
264+
265+
// update cluster conditions
266+
By("update cluster status")
267+
Expect(c.Get(context.TODO(), clusterKey, cluster.Unwrap())).To(Succeed())
268+
cluster.Status.Nodes = []api.NodeStatus{
269+
nodeStatusForPod(cluster, pod0, true, false, false),
270+
nodeStatusForPod(cluster, pod1, false, false, true),
271+
nodeStatusForPod(cluster, pod2, false, false, false),
272+
}
273+
Expect(c.Status().Update(context.TODO(), cluster.Unwrap())).To(Succeed())
274+
275+
Eventually(requests, timeout).Should(Receive(Equal(expectedRequest)))
276+
277+
// assert pods labels
278+
// master
279+
Expect(c.Get(context.TODO(), objToKey(pod0), pod0)).To(Succeed())
280+
Expect(pod0).To(haveLabelWithValue("role", "master"))
281+
Expect(pod0).To(haveLabelWithValue("healthy", "yes"))
282+
283+
// replica
284+
Expect(c.Get(context.TODO(), objToKey(pod1), pod1)).To(Succeed())
285+
Expect(pod1).To(haveLabelWithValue("role", "replica"))
286+
Expect(pod1).To(haveLabelWithValue("healthy", "yes"))
287+
})
288+
It("should label pods as master eaven if pods does not exists", func() {
289+
pod0 := getPod(cluster, 0)
290+
Expect(c.Create(context.TODO(), pod0)).To(Succeed())
291+
pod1 := getPod(cluster, 1)
292+
pod2 := getPod(cluster, 2)
293+
294+
// update cluster conditions
295+
By("update cluster status")
296+
Expect(c.Get(context.TODO(), clusterKey, cluster.Unwrap())).To(Succeed())
297+
cluster.Status.Nodes = []api.NodeStatus{
298+
nodeStatusForPod(cluster, pod0, true, false, false),
299+
nodeStatusForPod(cluster, pod1, false, false, false),
300+
nodeStatusForPod(cluster, pod2, false, false, false),
301+
}
302+
Expect(c.Status().Update(context.TODO(), cluster.Unwrap())).To(Succeed())
303+
304+
Eventually(requests, timeout).Should(Receive(Equal(expectedRequest)))
305+
306+
// assert pods labels
307+
// master
308+
Expect(c.Get(context.TODO(), objToKey(pod0), pod0)).To(Succeed())
309+
Expect(pod0).To(haveLabelWithValue("role", "master"))
310+
Expect(pod0).To(haveLabelWithValue("healthy", "yes"))
311+
312+
// check pod is not created
313+
Expect(c.Get(context.TODO(), objToKey(pod1), pod1)).ToNot(Succeed())
314+
})
315+
It("should label as unhealthy if lagged", func() {
316+
pod0 := getPod(cluster, 0)
317+
Expect(c.Create(context.TODO(), pod0)).To(Succeed())
318+
pod1 := getPod(cluster, 1)
319+
Expect(c.Create(context.TODO(), pod1)).To(Succeed())
320+
321+
// update cluster conditions
322+
By("update cluster status")
323+
Expect(c.Get(context.TODO(), clusterKey, cluster.Unwrap())).To(Succeed())
324+
cluster.Status.Nodes = []api.NodeStatus{
325+
nodeStatusForPod(cluster, pod0, false, true, false),
326+
nodeStatusForPod(cluster, pod1, true, false, true),
327+
}
328+
Expect(c.Status().Update(context.TODO(), cluster.Unwrap())).To(Succeed())
329+
330+
Eventually(requests, timeout).Should(Receive(Equal(expectedRequest)))
331+
332+
// assert pods labels
333+
// master
334+
Expect(c.Get(context.TODO(), objToKey(pod0), pod0)).To(Succeed())
335+
Expect(pod0).To(haveLabelWithValue("role", "replica"))
336+
Expect(pod0).To(haveLabelWithValue("healthy", "no"))
337+
338+
// replica
339+
Expect(c.Get(context.TODO(), objToKey(pod1), pod1)).To(Succeed())
340+
Expect(pod1).To(haveLabelWithValue("role", "master"))
341+
Expect(pod1).To(haveLabelWithValue("healthy", "yes"))
342+
})
250343
})
251344
})
252345
})
@@ -257,6 +350,65 @@ func removeAllCreatedResource(c client.Client, clusterComps []runtime.Object) {
257350
}
258351
}
259352

353+
func objToKey(o runtime.Object) types.NamespacedName {
354+
obj, _ := o.(*corev1.Pod)
355+
return types.NamespacedName{
356+
Name: obj.Name,
357+
Namespace: obj.Namespace,
358+
}
359+
}
360+
361+
func getPod(cluster *mysqlcluster.MysqlCluster, index int) *corev1.Pod {
362+
return &corev1.Pod{
363+
ObjectMeta: metav1.ObjectMeta{
364+
Name: fmt.Sprintf("%s-%d", cluster.GetNameForResource(mysqlcluster.StatefulSet), index),
365+
Namespace: cluster.Namespace,
366+
},
367+
Spec: corev1.PodSpec{
368+
Containers: []corev1.Container{
369+
corev1.Container{
370+
Name: "dummy",
371+
Image: "dummy",
372+
},
373+
},
374+
},
375+
}
376+
}
377+
378+
func nodeStatusForPod(cluster *mysqlcluster.MysqlCluster, pod *corev1.Pod, master, lagged, replicating bool) api.NodeStatus {
379+
name := fmt.Sprintf("%s.%s.%s", pod.Name, cluster.GetNameForResource(mysqlcluster.HeadlessSVC), pod.Namespace)
380+
381+
boolToStatus := func(c bool) corev1.ConditionStatus {
382+
if c {
383+
return corev1.ConditionTrue
384+
}
385+
return corev1.ConditionFalse
386+
}
387+
388+
t := time.Now()
389+
390+
return api.NodeStatus{
391+
Name: name,
392+
Conditions: []api.NodeCondition{
393+
api.NodeCondition{
394+
Type: api.NodeConditionMaster,
395+
Status: boolToStatus(master),
396+
LastTransitionTime: metav1.NewTime(t),
397+
},
398+
api.NodeCondition{
399+
Type: api.NodeConditionLagged,
400+
Status: boolToStatus(lagged),
401+
LastTransitionTime: metav1.NewTime(t),
402+
},
403+
api.NodeCondition{
404+
Type: api.NodeConditionReplicating,
405+
Status: boolToStatus(replicating),
406+
LastTransitionTime: metav1.NewTime(t),
407+
},
408+
},
409+
}
410+
}
411+
260412
// getClusterConditions is a helper func that returns a functions that returns cluster status conditions
261413
func getClusterConditions(c client.Client, cluster *mysqlcluster.MysqlCluster) func() []api.ClusterCondition {
262414
return func() []api.ClusterCondition {
@@ -273,3 +425,11 @@ func haveCondWithStatus(condType api.ClusterConditionType, status corev1.Conditi
273425
"Status": Equal(status),
274426
}))
275427
}
428+
429+
func haveLabelWithValue(label, value string) gomegatypes.GomegaMatcher {
430+
return PointTo(MatchFields(IgnoreExtras, Fields{
431+
"ObjectMeta": MatchFields(IgnoreExtras, Fields{
432+
"Labels": HaveKeyWithValue(label, value),
433+
}),
434+
}))
435+
}

pkg/controller/orchestrator/orchestrator_reconcile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ func (ou *orcUpdater) getRecoveriesToAck(recoveries []orc.TopologyRecovery) []or
295295
}
296296

297297
func (ou *orcUpdater) acknowledgeRecoveries(toAck []orc.TopologyRecovery) error {
298-
comment := fmt.Sprintf("Statefulset '%s' is healty for more then %d seconds",
298+
comment := fmt.Sprintf("Statefulset '%s' is healthy for more then %d seconds",
299299
ou.cluster.GetNameForResource(mysqlcluster.StatefulSet), recoveryGraceTime,
300300
)
301301

0 commit comments

Comments
 (0)