Skip to content

Commit 44892b2

Browse files
authored
dm: add retry for dm-worker to join dm-master (#11701)
close #4287
1 parent 792da42 commit 44892b2

File tree

1 file changed

+44
-37
lines changed

1 file changed

+44
-37
lines changed

dm/worker/join.go

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"google.golang.org/grpc/credentials"
3232
)
3333

34+
const retryTimes = 5
35+
3436
// GetJoinURLs gets the endpoints from the join address.
3537
func GetJoinURLs(addrs string) []string {
3638
// TODO: handle pm1=xxxx:1234,pm2=xxxx:1234,pm3=xxxx:1234
@@ -63,46 +65,51 @@ func (s *Server) JoinMaster(endpoints []string) error {
6365
}
6466

6567
var errorStr string
66-
for _, endpoint := range endpoints {
67-
ctx1, cancel1 := context.WithTimeout(ctx, 3*time.Second)
68-
//nolint:staticcheck
69-
conn, err := grpc.DialContext(
70-
ctx1,
71-
utils.UnwrapScheme(endpoint),
72-
grpc.WithBlock(),
73-
grpcTLS,
74-
grpc.WithBackoffMaxDelay(3*time.Second),
75-
)
76-
cancel1()
77-
if err != nil {
78-
if conn != nil {
79-
conn.Close()
68+
// retry to connect master
69+
for i := 0; i < retryTimes; i++ {
70+
for _, endpoint := range endpoints {
71+
ctx1, cancel1 := context.WithTimeout(ctx, 3*time.Second)
72+
//nolint:staticcheck
73+
conn, err := grpc.DialContext(
74+
ctx1,
75+
utils.UnwrapScheme(endpoint),
76+
grpc.WithBlock(),
77+
grpcTLS,
78+
grpc.WithBackoffMaxDelay(3*time.Second),
79+
)
80+
cancel1()
81+
if err != nil {
82+
if conn != nil {
83+
conn.Close()
84+
}
85+
log.L().Error("fail to dial dm-master", zap.String("endpoint", endpoint), zap.Error(err))
86+
errorStr = err.Error()
87+
continue
88+
}
89+
client := pb.NewMasterClient(conn)
90+
ctx1, cancel1 = context.WithTimeout(ctx, 3*time.Second)
91+
resp, err := client.RegisterWorker(ctx1, req)
92+
cancel1()
93+
conn.Close()
94+
if err != nil {
95+
log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.Error(err))
96+
errorStr = err.Error()
97+
continue
98+
}
99+
if !resp.GetResult() {
100+
log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.String("error", resp.Msg))
101+
errorStr = resp.Msg
102+
continue
80103
}
81-
log.L().Error("fail to dial dm-master", zap.String("endpoint", endpoint), zap.Error(err))
82-
errorStr = err.Error()
83-
continue
84-
}
85-
client := pb.NewMasterClient(conn)
86-
ctx1, cancel1 = context.WithTimeout(ctx, 3*time.Second)
87-
resp, err := client.RegisterWorker(ctx1, req)
88-
cancel1()
89-
conn.Close()
90-
if err != nil {
91-
log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.Error(err))
92-
errorStr = err.Error()
93-
continue
94-
}
95-
if !resp.GetResult() {
96-
log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.String("error", resp.Msg))
97-
errorStr = resp.Msg
98-
continue
99-
}
100104

101-
// worker do calls decrypt, but the password is decrypted already,
102-
// but in case we need it later, init it.
103-
encrypt.InitCipher(resp.GetSecretKey())
105+
// worker do calls decrypt, but the password is decrypted already,
106+
// but in case we need it later, init it.
107+
encrypt.InitCipher(resp.GetSecretKey())
104108

105-
return nil
109+
return nil
110+
}
111+
log.L().Warn("retry to connect master", zap.Int("retry", i+1), zap.Int("total", retryTimes))
112+
time.Sleep(retryConnectSleepTime)
106113
}
107114
return terror.ErrWorkerFailConnectMaster.Generate(endpoints, errorStr)
108115
}

0 commit comments

Comments
 (0)