Skip to content

Commit 34fc8bf

Browse files
inelpandzichorsegegunes
authored
K8SPS-43: Include async replication status in the status reconciliation (#667)
* Check is async replication ready. * Refactor - rename orchestrator '*exec' functions. * Update e2e test. * Update unit tests. * Check for specific replication problems and record a event if any issue happens. * Increase timeout. * Update e2e-tests/tests/init-deploy/06-check-async-repl-not-ready-cr-status.yaml Co-authored-by: Ege Güneş <[email protected]> * Update e2e-tests/tests/init-deploy/06-check-async-repl-not-ready-cr-status.yaml Co-authored-by: Ege Güneş <[email protected]> * Scaling down step timeout. * Fix wrong getOrcPod function. * Forget removed instances. * Cleanup --------- Co-authored-by: Viacheslav Sarzhan <[email protected]> Co-authored-by: Ege Güneş <[email protected]>
1 parent 1123dbe commit 34fc8bf

File tree

8 files changed

+259
-36
lines changed

8 files changed

+259
-36
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestStep
4+
timeout: 30
5+
commands:
6+
- script: |-
7+
set -o errexit
8+
set -o xtrace
9+
10+
source ../../functions
11+
12+
state=$(kubectl -n ${NAMESPACE} get ps $(get_cluster_name) -o jsonpath='{.status.state}')
13+
if [[ "$state" != "ready" ]]; then
14+
echo "Status state should be ready, but is $state."
15+
exit 1
16+
fi
17+
18+
19+
run_mysqlsh "STOP REPLICA;" "-h localhost -P 33060 -uroot -proot_password" "init-deploy-mysql-2"
20+
21+
sleep 20
22+
23+
state=$(kubectl -n ${NAMESPACE} get ps $(get_cluster_name) -o jsonpath='{.status.state}')
24+
if [[ "$state" != "initializing" ]]; then
25+
echo "Status state should be initializing, but is $state."
26+
exit 1
27+
fi
28+
29+
run_mysqlsh "START REPLICA;" "-h localhost -P 33060 -uroot -proot_password" "init-deploy-mysql-2"
30+
31+
sleep 20
32+
33+
state=$(kubectl -n ${NAMESPACE} get ps $(get_cluster_name) -o jsonpath='{.status.state}')
34+
if [[ "$state" != "ready" ]]; then
35+
echo "Status state should be ready, but is $state."
36+
exit 1
37+
fi

e2e-tests/tests/scaling/08-scale-down.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
apiVersion: kuttl.dev/v1beta1
22
kind: TestStep
3+
timeout: 120
34
commands:
45
- script: |-
56
set -o errexit

pkg/controller/ps/controller.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr *
194194
}
195195

196196
log.Info("Ensuring oldest mysql node is the primary")
197-
err = orchestrator.EnsureNodeIsPrimaryExec(ctx, r.ClientCmd, orcPod, cr.ClusterHint(), firstPod.GetName(), mysql.DefaultPort)
197+
err = orchestrator.EnsureNodeIsPrimary(ctx, r.ClientCmd, orcPod, cr.ClusterHint(), firstPod.GetName(), mysql.DefaultPort)
198198
if err != nil {
199199
return errors.Wrap(err, "ensure node is primary")
200200
}
@@ -645,7 +645,7 @@ func (r *PerconaServerMySQLReconciler) reconcileOrchestrator(ctx context.Context
645645
for _, peer := range newPeers {
646646
p := peer
647647
g.Go(func() error {
648-
return orchestrator.AddPeerExec(gCtx, r.ClientCmd, orcPod, p)
648+
return orchestrator.AddPeer(gCtx, r.ClientCmd, orcPod, p)
649649
})
650650
}
651651

@@ -656,7 +656,7 @@ func (r *PerconaServerMySQLReconciler) reconcileOrchestrator(ctx context.Context
656656
for _, peer := range oldPeers {
657657
p := peer
658658
g.Go(func() error {
659-
return orchestrator.RemovePeerExec(gCtx, r.ClientCmd, orcPod, p)
659+
return orchestrator.RemovePeer(gCtx, r.ClientCmd, orcPod, p)
660660
})
661661
}
662662

@@ -789,7 +789,7 @@ func (r *PerconaServerMySQLReconciler) reconcileReplication(ctx context.Context,
789789
return nil
790790
}
791791

792-
if err := orchestrator.DiscoverExec(ctx, r.ClientCmd, pod, mysql.ServiceName(cr), mysql.DefaultPort); err != nil {
792+
if err := orchestrator.Discover(ctx, r.ClientCmd, pod, mysql.ServiceName(cr), mysql.DefaultPort); err != nil {
793793
switch err.Error() {
794794
case "Unauthorized":
795795
log.Info("mysql is not ready, unauthorized orchestrator discover response. skip")
@@ -801,7 +801,7 @@ func (r *PerconaServerMySQLReconciler) reconcileReplication(ctx context.Context,
801801
return errors.Wrap(err, "failed to discover cluster")
802802
}
803803

804-
primary, err := orchestrator.ClusterPrimaryExec(ctx, r.ClientCmd, pod, cr.ClusterHint())
804+
primary, err := orchestrator.ClusterPrimary(ctx, r.ClientCmd, pod, cr.ClusterHint())
805805
if err != nil {
806806
return errors.Wrap(err, "get cluster primary")
807807
}
@@ -812,11 +812,40 @@ func (r *PerconaServerMySQLReconciler) reconcileReplication(ctx context.Context,
812812

813813
// orchestrator doesn't attempt to recover from NonWriteableMaster if there's only 1 MySQL pod
814814
if cr.MySQLSpec().Size == 1 && primary.ReadOnly {
815-
if err := orchestrator.SetWriteableExec(ctx, r.ClientCmd, pod, primary.Key.Hostname, int(primary.Key.Port)); err != nil {
815+
if err := orchestrator.SetWriteable(ctx, r.ClientCmd, pod, primary.Key.Hostname, int(primary.Key.Port)); err != nil {
816816
return errors.Wrapf(err, "set %s writeable", primary.Key.Hostname)
817817
}
818818
}
819819

820+
clusterInstances, err := orchestrator.Cluster(ctx, r.ClientCmd, pod, cr.ClusterHint())
821+
if err != nil {
822+
return errors.Wrap(err, "get cluster instances")
823+
}
824+
825+
// In the case of a cluster downscale, we need to forget replicas that are not part of the cluster
826+
if len(clusterInstances) > int(cr.MySQLSpec().Size) {
827+
mysqlPods, err := k8s.PodsByLabels(ctx, r.Client, mysql.MatchLabels(cr))
828+
if err != nil {
829+
return errors.Wrap(err, "get mysql pods")
830+
}
831+
832+
podSet := make(map[string]struct{}, len(mysqlPods))
833+
for _, p := range mysqlPods {
834+
podSet[p.Name] = struct{}{}
835+
}
836+
for _, instance := range clusterInstances {
837+
if _, ok := podSet[instance.Alias]; ok {
838+
continue
839+
}
840+
841+
log.Info("Forgeting replica", "replica", instance.Alias)
842+
err := orchestrator.ForgetInstance(ctx, r.ClientCmd, pod, instance.Alias, int(instance.Key.Port))
843+
if err != nil {
844+
return errors.Wrapf(err, "forget replica %s", instance.Alias)
845+
}
846+
}
847+
}
848+
820849
return nil
821850
}
822851

@@ -1027,7 +1056,7 @@ func (r *PerconaServerMySQLReconciler) getPrimaryFromOrchestrator(ctx context.Co
10271056
if err != nil {
10281057
return nil, err
10291058
}
1030-
primary, err := orchestrator.ClusterPrimaryExec(ctx, r.ClientCmd, pod, cr.ClusterHint())
1059+
primary, err := orchestrator.ClusterPrimary(ctx, r.ClientCmd, pod, cr.ClusterHint())
10311060
if err != nil {
10321061
return nil, errors.Wrap(err, "get cluster primary")
10331062
}
@@ -1100,7 +1129,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
11001129
}
11011130
repDb := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)
11021131

1103-
if err := orchestrator.StopReplicationExec(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil {
1132+
if err := orchestrator.StopReplication(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil {
11041133
return errors.Wrapf(err, "stop replica %s", hostname)
11051134
}
11061135

@@ -1159,7 +1188,7 @@ func (r *PerconaServerMySQLReconciler) startAsyncReplication(ctx context.Context
11591188
return errors.Wrapf(err, "change replication source on %s", hostname)
11601189
}
11611190

1162-
if err := orchestrator.StartReplicationExec(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil {
1191+
if err := orchestrator.StartReplication(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil {
11631192
return errors.Wrapf(err, "start replication on %s", hostname)
11641193
}
11651194

pkg/controller/ps/status.go

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package ps
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"strings"
78

89
"github.com/pkg/errors"
@@ -65,13 +66,28 @@ func (r *PerconaServerMySQLReconciler) reconcileCRStatus(ctx context.Context, cr
6566
}
6667
cr.Status.MySQL = mysqlStatus
6768

68-
if mysqlStatus.State == apiv1alpha1.StateReady && cr.Spec.MySQL.IsGR() {
69-
ready, err := r.isGRReady(ctx, cr)
70-
if err != nil {
71-
return errors.Wrap(err, "check if GR ready")
69+
if mysqlStatus.State == apiv1alpha1.StateReady {
70+
if cr.Spec.MySQL.IsGR() {
71+
ready, err := r.isGRReady(ctx, cr)
72+
if err != nil {
73+
return errors.Wrap(err, "check if GR is ready")
74+
}
75+
if !ready {
76+
mysqlStatus.State = apiv1alpha1.StateInitializing
77+
}
7278
}
73-
if !ready {
74-
mysqlStatus.State = apiv1alpha1.StateInitializing
79+
80+
if cr.Spec.MySQL.IsAsync() && cr.OrchestratorEnabled() {
81+
ready, msg, err := r.isAsyncReady(ctx, cr)
82+
if err != nil {
83+
return errors.Wrap(err, "check if async is ready")
84+
}
85+
if !ready {
86+
mysqlStatus.State = apiv1alpha1.StateInitializing
87+
88+
r.Recorder.Event(cr, "Warning", "AsyncReplicationNotReady", msg)
89+
90+
}
7591
}
7692
}
7793
cr.Status.MySQL = mysqlStatus
@@ -257,6 +273,41 @@ func (r *PerconaServerMySQLReconciler) isGRReady(ctx context.Context, cr *apiv1a
257273
return true, nil
258274
}
259275

276+
func (r *PerconaServerMySQLReconciler) isAsyncReady(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (bool, string, error) {
277+
pod, err := getReadyOrcPod(ctx, r.Client, cr)
278+
if err != nil {
279+
return false, "", err
280+
}
281+
282+
instances, err := orchestrator.Cluster(ctx, r.ClientCmd, pod, cr.ClusterHint())
283+
if err != nil {
284+
return false, "", err
285+
}
286+
287+
problems := make(map[string][]string)
288+
289+
for _, i := range instances {
290+
if len(i.Problems) > 0 {
291+
problems[i.Alias] = i.Problems
292+
}
293+
}
294+
295+
// formatMessage formats a map of problems to a message like
296+
// 'cluster1-mysql-1:[not_replicating, replication_lag], cluster1-mysql-2:[not_replicating]'
297+
formatMessage := func(problems map[string][]string) string {
298+
var sb strings.Builder
299+
for k, v := range problems {
300+
joinedValues := strings.Join(v, ", ")
301+
sb.WriteString(fmt.Sprintf("%s: [%s], ", k, joinedValues))
302+
}
303+
304+
return strings.TrimRight(sb.String(), ", ")
305+
}
306+
307+
msg := formatMessage(problems)
308+
return msg == "", msg, nil
309+
}
310+
260311
func (r *PerconaServerMySQLReconciler) allLoadBalancersReady(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (bool, error) {
261312
opts := &client.ListOptions{Namespace: cr.Namespace, LabelSelector: labels.SelectorFromSet(cr.Labels())}
262313
svcList := &corev1.ServiceList{}

pkg/controller/ps/status_test.go

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"database/sql"
77
"encoding/csv"
8+
"encoding/json"
89
"fmt"
910
"io"
1011
"reflect"
@@ -97,9 +98,12 @@ func TestReconcileStatusAsync(t *testing.T) {
9798
},
9899
},
99100
{
100-
name: "with 3 ready mysql pods",
101-
cr: cr,
102-
objects: makeFakeReadyPods(cr, 3, "mysql"),
101+
name: "with 3 ready mysql pods",
102+
cr: cr,
103+
objects: appendSlices(
104+
makeFakeReadyPods(cr, 3, "mysql"),
105+
makeFakeReadyPods(cr, 3, "orchestrator"),
106+
),
103107
expected: apiv1alpha1.PerconaServerMySQLStatus{
104108
MySQL: apiv1alpha1.StatefulAppStatus{
105109
Size: 3,
@@ -108,7 +112,8 @@ func TestReconcileStatusAsync(t *testing.T) {
108112
},
109113
Orchestrator: apiv1alpha1.StatefulAppStatus{
110114
Size: 3,
111-
State: apiv1alpha1.StateInitializing,
115+
Ready: 3,
116+
State: apiv1alpha1.StateReady,
112117
},
113118
HAProxy: apiv1alpha1.StatefulAppStatus{
114119
Size: 3,
@@ -366,14 +371,21 @@ func TestReconcileStatusAsync(t *testing.T) {
366371
cr := tt.cr.DeepCopy()
367372
cb := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cr).WithStatusSubresource(cr).WithObjects(tt.objects...).WithStatusSubresource(tt.objects...)
368373

374+
cliCmd, err := getFakeOrchestratorClient(cr)
375+
if err != nil {
376+
t.Fatal(err)
377+
}
378+
369379
r := &PerconaServerMySQLReconciler{
370-
Client: cb.Build(),
371-
Scheme: scheme,
380+
Client: cb.Build(),
381+
Scheme: scheme,
382+
ClientCmd: cliCmd,
372383
ServerVersion: &platform.ServerVersion{
373384
Platform: platform.PlatformKubernetes,
374385
},
375386
}
376-
err := r.reconcileCRStatus(ctx, cr, nil)
387+
388+
err = r.reconcileCRStatus(ctx, cr, nil)
377389
if err != nil {
378390
t.Fatal(err)
379391
}
@@ -1121,6 +1133,48 @@ func getFakeClient(cr *apiv1alpha1.PerconaServerMySQL, mysqlMemberStates []innod
11211133
stderr: []byte("No such file or directory"),
11221134
err: errors.New("fake error"),
11231135
})
1136+
1137+
return &fakeClient{
1138+
scripts: scripts,
1139+
}, nil
1140+
}
1141+
1142+
func getFakeOrchestratorClient(cr *apiv1alpha1.PerconaServerMySQL) (clientcmd.Client, error) {
1143+
var scripts []fakeClientScript
1144+
1145+
// Get async cluster from Orchestrator
1146+
orcClusterScript := func() fakeClientScript {
1147+
instances := []*orchestrator.Instance{
1148+
{
1149+
Alias: "mysql-host-0",
1150+
Problems: []string{},
1151+
},
1152+
{
1153+
Alias: "mysql-host-1",
1154+
Problems: []string{},
1155+
},
1156+
{
1157+
Alias: "mysql-host-2",
1158+
Problems: []string{},
1159+
},
1160+
}
1161+
1162+
res, err := json.Marshal(&instances)
1163+
if err != nil {
1164+
panic(err)
1165+
}
1166+
1167+
return fakeClientScript{
1168+
cmd: []string{
1169+
"curl",
1170+
fmt.Sprintf("localhost:%d/%s", 3000, fmt.Sprintf("api/cluster/%s", cr.Name+"."+cr.Namespace)),
1171+
},
1172+
stdout: res,
1173+
}
1174+
}
1175+
1176+
scripts = append(scripts, orcClusterScript())
1177+
11241178
return &fakeClient{
11251179
scripts: scripts,
11261180
}, nil

pkg/controller/psbackup/topology.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func getDBTopology(ctx context.Context, cli client.Client, cliCmd clientcmd.Clie
5555
return topology{}, err
5656
}
5757

58-
primary, err := orchestrator.ClusterPrimaryExec(ctx, cliCmd, pod, cluster.ClusterHint())
58+
primary, err := orchestrator.ClusterPrimary(ctx, cliCmd, pod, cluster.ClusterHint())
5959

6060
if err != nil {
6161
return topology{}, errors.Wrap(err, "get primary")

0 commit comments

Comments
 (0)