Skip to content

Commit 5f848b3

Browse files
committed
feat: Add shutdown interception and readiness check
1 parent 569c00a commit 5f848b3

File tree

5 files changed

+92
-4
lines changed

5 files changed

+92
-4
lines changed

internal/health/health.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ func AddProbe(probe Probe) {
4343
defaultHealthManager.addProbe(probe)
4444
}
4545

46+
// IsReady return global comboHealthManager status.
47+
func IsReady() bool {
48+
return defaultHealthManager.IsReady()
49+
}
50+
4651
// CreateHttpHandler create health http handler base on given probe.
4752
func CreateHttpHandler(healthResponse string) http.HandlerFunc {
4853
return func(w http.ResponseWriter, _ *http.Request) {

zrpc/internal/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type (
2323
StatConf StatConf `json:",optional"`
2424
Prometheus bool `json:",default=true"`
2525
Breaker bool `json:",default=true"`
26+
Shutdown bool `json:",default=true"`
2627
}
2728

2829
// MethodTimeoutConf defines specified timeout for gRPC methods.

zrpc/internal/rpcpubserver.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,25 @@
11
package internal
22

33
import (
4+
"context"
5+
"errors"
46
"os"
57
"strings"
8+
"time"
69

710
"github.com/zeromicro/go-zero/core/discov"
11+
"github.com/zeromicro/go-zero/core/logx"
812
"github.com/zeromicro/go-zero/core/netx"
13+
"github.com/zeromicro/go-zero/internal/health"
914
)
1015

1116
const (
1217
allEths = "0.0.0.0"
1318
envPodIp = "POD_IP"
1419
)
1520

21+
var notReadyError = errors.New("service is not ready for a limited time")
22+
1623
// NewRpcPubServer returns a Server.
1724
func NewRpcPubServer(etcd discov.EtcdConf, listenOn string,
1825
opts ...ServerOption) (Server, error) {
@@ -46,9 +53,27 @@ type keepAliveServer struct {
4653
}
4754

4855
func (s keepAliveServer) Start(fn RegisterFn) error {
49-
if err := s.registerEtcd(); err != nil {
50-
return err
51-
}
56+
go func() {
57+
// Wait for the service to start successfully, otherwise the registration service will fail.
58+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
59+
defer cancel()
60+
61+
ticker := time.NewTicker(time.Second)
62+
defer ticker.Stop()
63+
64+
for {
65+
select {
66+
case <-ticker.C:
67+
if health.IsReady() {
68+
logx.Must(s.registerEtcd())
69+
return
70+
}
71+
case <-ctx.Done():
72+
logx.Must(notReadyError)
73+
return
74+
}
75+
}
76+
}()
5277

5378
return s.Server.Start(fn)
5479
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package serverinterceptors
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
7+
"google.golang.org/grpc"
8+
"google.golang.org/grpc/codes"
9+
"google.golang.org/grpc/status"
10+
11+
"github.com/zeromicro/go-zero/core/proc"
12+
)
13+
14+
var (
15+
shutdown int32
16+
shutdownError = status.Error(codes.Unavailable, "server is shutting down")
17+
)
18+
19+
func init() {
20+
proc.AddWrapUpListener(func() {
21+
atomic.StoreInt32(&shutdown, 1)
22+
})
23+
}
24+
25+
// StreamShutdownInterceptor returns a grpc.StreamServerInterceptor that handles server shutdown.
26+
func StreamShutdownInterceptor(svr any, stream grpc.ServerStream, _ *grpc.StreamServerInfo,
27+
handler grpc.StreamHandler) error {
28+
29+
return handleShutdown(func() error {
30+
return handler(svr, stream)
31+
})
32+
}
33+
34+
// UnaryShutdownInterceptor returns a grpc.UnaryServerInterceptor that handles server shutdown.
35+
func UnaryShutdownInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo,
36+
handler grpc.UnaryHandler) (resp any, err error) {
37+
38+
err = handleShutdown(func() error {
39+
resp, err = handler(ctx, req)
40+
return err
41+
})
42+
return
43+
}
44+
45+
func handleShutdown(f func() error) error {
46+
if atomic.LoadInt32(&shutdown) == 1 {
47+
return shutdownError
48+
}
49+
return f()
50+
}

zrpc/server.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package zrpc
33
import (
44
"time"
55

6+
"google.golang.org/grpc"
7+
68
"github.com/zeromicro/go-zero/core/load"
79
"github.com/zeromicro/go-zero/core/logx"
810
"github.com/zeromicro/go-zero/core/stat"
911
"github.com/zeromicro/go-zero/core/stores/redis"
1012
"github.com/zeromicro/go-zero/zrpc/internal"
1113
"github.com/zeromicro/go-zero/zrpc/internal/auth"
1214
"github.com/zeromicro/go-zero/zrpc/internal/serverinterceptors"
13-
"google.golang.org/grpc"
1415
)
1516

1617
// A RpcServer is a rpc server.
@@ -139,6 +140,9 @@ func setupStreamInterceptors(svr internal.Server, c RpcServerConf) {
139140
if c.Middlewares.Breaker {
140141
svr.AddStreamInterceptors(serverinterceptors.StreamBreakerInterceptor)
141142
}
143+
if c.Middlewares.Shutdown {
144+
svr.AddStreamInterceptors(serverinterceptors.StreamShutdownInterceptor)
145+
}
142146
}
143147

144148
func setupUnaryInterceptors(svr internal.Server, c RpcServerConf, metrics *stat.Metrics) {
@@ -157,6 +161,9 @@ func setupUnaryInterceptors(svr internal.Server, c RpcServerConf, metrics *stat.
157161
if c.Middlewares.Breaker {
158162
svr.AddUnaryInterceptors(serverinterceptors.UnaryBreakerInterceptor)
159163
}
164+
if c.Middlewares.Shutdown {
165+
svr.AddUnaryInterceptors(serverinterceptors.UnaryShutdownInterceptor)
166+
}
160167
if c.CpuThreshold > 0 {
161168
shedder := load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold))
162169
svr.AddUnaryInterceptors(serverinterceptors.UnarySheddingInterceptor(shedder, metrics))

0 commit comments

Comments
 (0)