Skip to content

Commit 495f882

Browse files
committed
feat: readiness check
1 parent 569c00a commit 495f882

File tree

4 files changed

+54
-5
lines changed

4 files changed

+54
-5
lines changed

internal/health/health.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ func AddProbe(probe Probe) {
4343
defaultHealthManager.addProbe(probe)
4444
}
4545

46+
// IsReady return global comboHealthManager status.
47+
func IsReady() bool {
48+
return defaultHealthManager.IsReady()
49+
}
50+
4651
// CreateHttpHandler create health http handler base on given probe.
4752
func CreateHttpHandler(healthResponse string) http.HandlerFunc {
4853
return func(w http.ResponseWriter, _ *http.Request) {

zrpc/internal/rpcpubserver.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
package internal
22

33
import (
4+
"context"
5+
"errors"
46
"os"
57
"strings"
8+
"time"
69

710
"github.com/zeromicro/go-zero/core/discov"
811
"github.com/zeromicro/go-zero/core/netx"
12+
"github.com/zeromicro/go-zero/internal/health"
913
)
1014

1115
const (
1216
allEths = "0.0.0.0"
1317
envPodIp = "POD_IP"
1418
)
1519

20+
var errNotReady = errors.New("service is not ready for a limited time")
21+
1622
// NewRpcPubServer returns a Server.
1723
func NewRpcPubServer(etcd discov.EtcdConf, listenOn string,
1824
opts ...ServerOption) (Server, error) {
@@ -46,11 +52,45 @@ type keepAliveServer struct {
4652
}
4753

4854
func (s keepAliveServer) Start(fn RegisterFn) error {
49-
if err := s.registerEtcd(); err != nil {
50-
return err
55+
errCh := make(chan error)
56+
stopCh := make(chan struct{})
57+
defer close(stopCh)
58+
59+
go func() {
60+
defer close(errCh)
61+
select {
62+
case errCh <- s.Server.Start(fn):
63+
case <-stopCh:
64+
// prevent goroutine leak
65+
}
66+
}()
67+
68+
// Wait for the service to start successfully, otherwise the registration service will fail.
69+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
70+
defer cancel()
71+
ticker := time.NewTicker(time.Second)
72+
73+
l:
74+
for {
75+
select {
76+
case <-ticker.C:
77+
if health.IsReady() {
78+
err := s.registerEtcd()
79+
if err != nil {
80+
return err
81+
}
82+
// break for loop
83+
break l
84+
}
85+
case <-ctx.Done():
86+
return errNotReady
87+
case err := <-errCh:
88+
return err
89+
}
5190
}
91+
ticker.Stop()
5292

53-
return s.Server.Start(fn)
93+
return <-errCh
5494
}
5595

5696
func figureOutListenOn(listenOn string) string {

zrpc/internal/rpcpubserver_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"testing"
55

66
"github.com/stretchr/testify/assert"
7+
"google.golang.org/grpc"
8+
79
"github.com/zeromicro/go-zero/core/discov"
810
"github.com/zeromicro/go-zero/core/netx"
911
)
@@ -16,7 +18,8 @@ func TestNewRpcPubServer(t *testing.T) {
1618
}, "")
1719
assert.NoError(t, err)
1820
assert.NotPanics(t, func() {
19-
s.Start(nil)
21+
s.Start(func(server *grpc.Server) {
22+
})
2023
})
2124
}
2225

zrpc/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package zrpc
33
import (
44
"time"
55

6+
"google.golang.org/grpc"
7+
68
"github.com/zeromicro/go-zero/core/load"
79
"github.com/zeromicro/go-zero/core/logx"
810
"github.com/zeromicro/go-zero/core/stat"
911
"github.com/zeromicro/go-zero/core/stores/redis"
1012
"github.com/zeromicro/go-zero/zrpc/internal"
1113
"github.com/zeromicro/go-zero/zrpc/internal/auth"
1214
"github.com/zeromicro/go-zero/zrpc/internal/serverinterceptors"
13-
"google.golang.org/grpc"
1415
)
1516

1617
// A RpcServer is a rpc server.

0 commit comments

Comments
 (0)