Skip to content

Commit e131f0e

Browse files
pooknullinelpandzichors
authored
K8SPSMDB-876: delete-psmdb-pods-in-order for cfg (#1267)
* K8SPSMDB-876: `delete-psmdb-pods-in-order` for cfg https://jira.percona.com/browse/K8SPSMDB-876 * shut down `cfg` replset at the end * delete replsets fully in order * stop balancer and delete mongos before rs deletion * fix pause * don't disable backup before pausing cluster --------- Co-authored-by: Inel Pandzic <[email protected]> Co-authored-by: Viacheslav Sarzhan <[email protected]>
1 parent ec9ce7e commit e131f0e

File tree

7 files changed

+181
-76
lines changed

7 files changed

+181
-76
lines changed

pkg/apis/psmdb/v1/psmdb_defaults.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,7 @@ func (cr *PerconaServerMongoDB) CheckNSetDefaults(platform version.Platform, log
134134
return errors.New("mongos should be specified")
135135
}
136136

137-
if cr.Spec.Pause {
138-
cr.Spec.Sharding.Mongos.Size = 0
139-
} else {
137+
if !cr.Spec.Pause && cr.DeletionTimestamp == nil {
140138
if !cr.Spec.UnsafeConf && cr.Spec.Sharding.Mongos.Size < minSafeMongosSize {
141139
log.Info("Safe config set, updating mongos size",
142140
"oldSize", cr.Spec.Sharding.Mongos.Size, "newSize", minSafeMongosSize)
@@ -443,20 +441,6 @@ func (cr *PerconaServerMongoDB) CheckNSetDefaults(platform version.Platform, log
443441
if err := replset.NonVoting.SetDefaults(cr, replset); err != nil {
444442
return errors.Wrap(err, "set nonvoting defaults")
445443
}
446-
447-
if cr.Spec.Pause {
448-
if cr.Status.State == AppStateStopping {
449-
log.Info("Pausing cluster", "replset", replset.Name, "oldSize", replset.Size, "newSize", 0)
450-
}
451-
replset.Size = 0
452-
replset.Arbiter.Enabled = false
453-
replset.NonVoting.Enabled = false
454-
}
455-
}
456-
457-
// there is shouldn't be any backups while pause
458-
if cr.Spec.Pause {
459-
cr.Spec.Backup.Enabled = false
460444
}
461445

462446
if cr.Spec.Backup.Enabled {
@@ -564,7 +548,7 @@ func (rs *ReplsetSpec) SetDefaults(platform version.Platform, cr *PerconaServerM
564548
rs.Arbiter.MultiAZ.reconcileOpts(cr)
565549
}
566550

567-
if !cr.Spec.UnsafeConf && cr.DeletionTimestamp == nil {
551+
if !cr.Spec.UnsafeConf && (cr.DeletionTimestamp == nil && !cr.Spec.Pause) {
568552
rs.setSafeDefaults(log)
569553
}
570554

pkg/controller/perconaservermongodb/balancer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
func (r *ReconcilePerconaServerMongoDB) enableBalancerIfNeeded(ctx context.Context, cr *api.PerconaServerMongoDB) error {
1818
log := logf.FromContext(ctx)
1919

20-
if !cr.Spec.Sharding.Enabled || cr.Spec.Sharding.Mongos.Size == 0 || cr.Spec.Unmanaged {
20+
if !cr.Spec.Sharding.Enabled || cr.Spec.Sharding.Mongos.Size == 0 || cr.Spec.Unmanaged || cr.DeletionTimestamp != nil || cr.Spec.Pause {
2121
return nil
2222
}
2323

pkg/controller/perconaservermongodb/connections.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,7 @@ func (r *ReconcilePerconaServerMongoDB) mongoClientWithRole(ctx context.Context,
6363
func (r *ReconcilePerconaServerMongoDB) mongosClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, role UserRole) (mongo.Client, error) {
6464
return r.MongoClientProvider().Mongos(ctx, cr, role)
6565
}
66+
67+
func (r *ReconcilePerconaServerMongoDB) standaloneClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, role UserRole, host string) (mongo.Client, error) {
68+
return r.MongoClientProvider().Standalone(ctx, cr, role, host)
69+
}

pkg/controller/perconaservermongodb/finalizers.go

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/pkg/errors"
7+
appsv1 "k8s.io/api/apps/v1"
78
corev1 "k8s.io/api/core/v1"
89
k8serrors "k8s.io/apimachinery/pkg/api/errors"
910
"k8s.io/apimachinery/pkg/labels"
@@ -55,44 +56,106 @@ func (r *ReconcilePerconaServerMongoDB) checkFinalizers(ctx context.Context, cr
5556
}
5657

5758
func (r *ReconcilePerconaServerMongoDB) deletePSMDBPods(ctx context.Context, cr *api.PerconaServerMongoDB) (err error) {
58-
done := true
59-
for _, rs := range cr.Spec.Replsets {
60-
sts, err := r.getRsStatefulset(ctx, cr, rs.Name)
61-
if err != nil {
62-
if k8serrors.IsNotFound(err) {
63-
continue
59+
if cr.Spec.Sharding.Enabled {
60+
cr.Spec.Sharding.Mongos.Size = 0
61+
62+
sts := new(appsv1.StatefulSet)
63+
err := r.client.Get(ctx, cr.MongosNamespacedName(), sts)
64+
if client.IgnoreNotFound(err) != nil {
65+
return errors.Wrap(err, "failed to get mongos statefulset")
66+
}
67+
if sts.Spec.Replicas != nil && *sts.Spec.Replicas > 0 {
68+
err = r.disableBalancer(ctx, cr)
69+
if err != nil {
70+
return errors.Wrap(err, "failed to disable balancer")
6471
}
65-
return errors.Wrap(err, "get rs statefulset")
6672
}
67-
68-
pods := &corev1.PodList{}
69-
err = r.client.List(ctx,
70-
pods,
71-
&client.ListOptions{
72-
Namespace: cr.Namespace,
73-
LabelSelector: labels.SelectorFromSet(sts.Spec.Selector.MatchLabels),
74-
},
75-
)
73+
list, err := r.getMongosPods(ctx, cr)
7674
if err != nil {
77-
if k8serrors.IsNotFound(err) {
78-
continue
79-
}
80-
return errors.Wrap(err, "get rs statefulset")
75+
return errors.Wrap(err, "get mongos pods")
8176
}
82-
if len(pods.Items) > int(*sts.Spec.Replicas) {
77+
if len(list.Items) != 0 {
8378
return errWaitingTermination
8479
}
85-
if *sts.Spec.Replicas != 1 {
86-
rs.Size = 1
87-
done = false
80+
}
81+
82+
replsetsDeleted := true
83+
for _, rs := range cr.Spec.Replsets {
84+
if err := r.deleteRSPods(ctx, cr, rs); err != nil {
85+
if err == errWaitingTermination {
86+
replsetsDeleted = false
87+
continue
88+
}
89+
return err
8890
}
8991
}
90-
if !done {
92+
if !replsetsDeleted {
9193
return errWaitingTermination
9294
}
95+
96+
if cr.Spec.Sharding.Enabled && cr.Spec.Sharding.ConfigsvrReplSet != nil {
97+
if err := r.deleteRSPods(ctx, cr, cr.Spec.Sharding.ConfigsvrReplSet); err != nil {
98+
return err
99+
}
100+
}
93101
return nil
94102
}
95103

104+
func (r *ReconcilePerconaServerMongoDB) deleteRSPods(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec) error {
105+
sts, err := r.getRsStatefulset(ctx, cr, rs.Name)
106+
if err != nil {
107+
if k8serrors.IsNotFound(err) {
108+
return nil
109+
}
110+
return errors.Wrap(err, "get rs statefulset")
111+
}
112+
113+
pods := &corev1.PodList{}
114+
err = r.client.List(ctx,
115+
pods,
116+
&client.ListOptions{
117+
Namespace: cr.Namespace,
118+
LabelSelector: labels.SelectorFromSet(sts.Spec.Selector.MatchLabels),
119+
},
120+
)
121+
if err != nil {
122+
if k8serrors.IsNotFound(err) {
123+
return nil
124+
}
125+
return errors.Wrap(err, "get rs statefulset")
126+
}
127+
128+
rs.Size = 1
129+
130+
switch *sts.Spec.Replicas {
131+
case 0:
132+
rs.Size = 0
133+
if len(pods.Items) == 0 {
134+
return nil
135+
}
136+
return errWaitingTermination
137+
case 1:
138+
// If there is one pod left, we should be sure that it's the primary
139+
if len(pods.Items) != 1 {
140+
return errWaitingTermination
141+
}
142+
143+
isPrimary, err := r.isPodPrimary(ctx, cr, pods.Items[0], rs)
144+
if err != nil {
145+
return errors.Wrap(err, "is pod primary")
146+
}
147+
if !isPrimary {
148+
return errWaitingTermination
149+
}
150+
151+
// If true, we should resize the replset to 0
152+
rs.Size = 0
153+
return errWaitingTermination
154+
default:
155+
return errWaitingTermination
156+
}
157+
}
158+
96159
func (r *ReconcilePerconaServerMongoDB) deletePvcFinalizer(ctx context.Context, cr *api.PerconaServerMongoDB) error {
97160
err := r.deleteAllStatefulsets(ctx, cr)
98161
if err != nil {

pkg/controller/perconaservermongodb/mgo.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (r *ReconcilePerconaServerMongoDB) reconcileCluster(ctx context.Context, cr
165165
cr.Status.Mongos != nil &&
166166
cr.Status.Mongos.Status == api.AppStateReady &&
167167
replset.ClusterRole == api.ClusterRoleShardSvr &&
168-
len(mongosPods) > 0 {
168+
len(mongosPods) > 0 && cr.Spec.Sharding.Mongos.Size > 0 {
169169

170170
mongosSession, err := r.mongosClientWithRole(ctx, cr, roleClusterAdmin)
171171
if err != nil {

pkg/controller/perconaservermongodb/psmdb_controller.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,11 @@ func (r *ReconcilePerconaServerMongoDB) Reconcile(ctx context.Context, request r
274274
}
275275
}
276276

277+
err = r.reconcilePause(ctx, cr)
278+
if err != nil {
279+
return reconcile.Result{}, err
280+
}
281+
277282
err = r.checkConfiguration(ctx, cr)
278283
if err != nil {
279284
return reconcile.Result{}, err
@@ -553,6 +558,44 @@ func (r *ReconcilePerconaServerMongoDB) Reconcile(ctx context.Context, request r
553558
return rr, nil
554559
}
555560

561+
func (r *ReconcilePerconaServerMongoDB) reconcilePause(ctx context.Context, cr *api.PerconaServerMongoDB) error {
562+
if !cr.Spec.Pause || cr.DeletionTimestamp != nil {
563+
return nil
564+
}
565+
566+
log := logf.FromContext(ctx)
567+
568+
backupRunning, err := r.isBackupRunning(ctx, cr)
569+
if err != nil {
570+
return errors.Wrap(err, "check if backup is running")
571+
}
572+
if backupRunning {
573+
cr.Spec.Pause = false
574+
if err := cr.CheckNSetDefaults(r.serverVersion.Platform, log); err != nil {
575+
return errors.Wrap(err, "failed to set defaults")
576+
}
577+
log.Info("cluster will pause after all backups finished")
578+
return nil
579+
}
580+
581+
for _, rs := range cr.Spec.Replsets {
582+
if cr.Status.State == api.AppStateStopping {
583+
log.Info("Pausing cluster", "replset", rs.Name)
584+
}
585+
rs.Arbiter.Enabled = false
586+
rs.NonVoting.Enabled = false
587+
}
588+
589+
if err := r.deletePSMDBPods(ctx, cr); err != nil {
590+
if err == errWaitingTermination {
591+
log.Info("pausing cluster", "error", err.Error())
592+
return nil
593+
}
594+
return errors.Wrap(err, "delete psmdb pods")
595+
}
596+
return nil
597+
}
598+
556599
func (r *ReconcilePerconaServerMongoDB) setCRVersion(ctx context.Context, cr *api.PerconaServerMongoDB) error {
557600
if len(cr.Spec.CRVersion) > 0 {
558601
return nil

pkg/controller/perconaservermongodb/smart.go

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"sort"
7-
"strings"
87
"time"
98

109
"github.com/pkg/errors"
@@ -115,24 +114,6 @@ func (r *ReconcilePerconaServerMongoDB) smartUpdate(ctx context.Context, cr *api
115114
}
116115
}
117116

118-
client, err := r.mongoClientWithRole(ctx, cr, *replset, roleClusterAdmin)
119-
if err != nil {
120-
return fmt.Errorf("failed to get mongo client: %v", err)
121-
}
122-
123-
defer func() {
124-
err := client.Disconnect(ctx)
125-
if err != nil {
126-
log.Error(err, "failed to close connection")
127-
}
128-
}()
129-
130-
primary, err := psmdb.GetPrimaryPod(ctx, client)
131-
if err != nil {
132-
return fmt.Errorf("get primary pod: %v", err)
133-
}
134-
log.Info("Got primary pod", "name", primary)
135-
136117
waitLimit := int(replset.LivenessProbe.InitialDelaySeconds)
137118

138119
sort.Slice(list.Items, func(i, j int) bool {
@@ -141,19 +122,11 @@ func (r *ReconcilePerconaServerMongoDB) smartUpdate(ctx context.Context, cr *api
141122

142123
var primaryPod corev1.Pod
143124
for _, pod := range list.Items {
144-
if replset.Expose.Enabled {
145-
host, err := psmdb.MongoHost(ctx, r.client, cr, replset.Name, replset.Expose.Enabled, pod)
146-
if err != nil {
147-
return errors.Wrapf(err, "get mongo host for pod %s", pod.Name)
148-
}
149-
150-
if host == primary {
151-
primaryPod = pod
152-
continue
153-
}
125+
isPrimary, err := r.isPodPrimary(ctx, cr, pod, replset)
126+
if err != nil {
127+
return errors.Wrap(err, "is pod primary")
154128
}
155-
156-
if strings.HasPrefix(primary, fmt.Sprintf("%s.%s.%s", pod.Name, sfs.Name, sfs.Namespace)) {
129+
if isPrimary {
157130
primaryPod = pod
158131
continue
159132
}
@@ -180,6 +153,18 @@ func (r *ReconcilePerconaServerMongoDB) smartUpdate(ctx context.Context, cr *api
180153
if sfs.Labels["app.kubernetes.io/component"] != "nonVoting" && len(primaryPod.Name) > 0 {
181154
forceStepDown := replset.Size == 1
182155
log.Info("doing step down...", "force", forceStepDown)
156+
client, err := r.mongoClientWithRole(ctx, cr, *replset, roleClusterAdmin)
157+
if err != nil {
158+
return fmt.Errorf("failed to get mongo client: %v", err)
159+
}
160+
161+
defer func() {
162+
err := client.Disconnect(ctx)
163+
if err != nil {
164+
log.Error(err, "failed to close connection")
165+
}
166+
}()
167+
183168
err = client.StepDown(ctx, forceStepDown)
184169
if err != nil {
185170
return errors.Wrap(err, "failed to do step down")
@@ -196,6 +181,32 @@ func (r *ReconcilePerconaServerMongoDB) smartUpdate(ctx context.Context, cr *api
196181
return nil
197182
}
198183

184+
func (r *ReconcilePerconaServerMongoDB) isPodPrimary(ctx context.Context, cr *api.PerconaServerMongoDB, pod corev1.Pod, rs *api.ReplsetSpec) (bool, error) {
185+
log := logf.FromContext(ctx)
186+
187+
host, err := psmdb.MongoHost(ctx, r.client, cr, rs.Name, rs.Expose.Enabled, pod)
188+
if err != nil {
189+
return false, errors.Wrap(err, "failed to get mongo host")
190+
}
191+
mgoClient, err := r.standaloneClientWithRole(ctx, cr, roleClusterAdmin, host)
192+
if err != nil {
193+
return false, errors.Wrap(err, "failed to create standalone client")
194+
}
195+
defer func() {
196+
err := mgoClient.Disconnect(ctx)
197+
if err != nil {
198+
log.Error(err, "failed to close connection")
199+
}
200+
}()
201+
202+
isMaster, err := mgoClient.IsMaster(ctx)
203+
if err != nil {
204+
return false, errors.Wrap(err, "is master")
205+
}
206+
207+
return isMaster.IsMaster, nil
208+
}
209+
199210
func (r *ReconcilePerconaServerMongoDB) smartMongosUpdate(ctx context.Context, cr *api.PerconaServerMongoDB, sts *appsv1.StatefulSet) error {
200211
log := logf.FromContext(ctx)
201212

0 commit comments

Comments
 (0)