Skip to content

Commit e005120

Browse files
committed
drpc, server: implement filtering rules for server initialization state
Previously, in DRPC, the `batch` RPC was able to access `node.Descriptor` which was yet to be initialized. This led to a data race, described in #153948. This wasn't the case for gRPC, because of an interceptor that could filter RPCs during different server states. Particularly, it allows only bootstrap, heartbeat, health and gossip methods during initialization to prevent calls to potentially uninitialized services. This patch adds this interceptor support into drpc as well. In-depth discussion on this: #153948 (comment) Epic: None Fixes: #153948 Release note: None
1 parent 3476899 commit e005120

File tree

3 files changed

+31
-25
lines changed

3 files changed

+31
-25
lines changed

pkg/server/drpc_server.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,17 @@ type drpcServer struct {
3131
// newDRPCServer creates and configures a new drpcServer instance. It enables
3232
// DRPC if the experimental setting is on, otherwise returns a dummy server.
3333
func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error) {
34-
d, err := rpc.NewDRPCServer(ctx, rpcCtx)
34+
drpcServer := &drpcServer{}
35+
drpcServer.setMode(modeInitializing)
36+
37+
d, err := rpc.NewDRPCServer(
38+
ctx,
39+
rpcCtx,
40+
rpc.WithInterceptor(
41+
func(path string) error {
42+
return drpcServer.intercept(path)
43+
}),
44+
)
3545
if err != nil {
3646
return nil, err
3747
}
@@ -41,12 +51,8 @@ func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error
4151
return nil, err
4252
}
4353

44-
drpcServer := &drpcServer{
45-
DRPCServer: d,
46-
tlsCfg: tlsCfg,
47-
}
48-
49-
drpcServer.setMode(modeInitializing)
54+
drpcServer.DRPCServer = d
55+
drpcServer.tlsCfg = tlsCfg
5056

5157
if err := rpc.DRPCRegisterHeartbeat(drpcServer, rpcCtx.NewHeartbeatService()); err != nil {
5258
return nil, err

pkg/server/grpc_server.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,24 +68,6 @@ func (s *grpcServer) health(ctx context.Context) error {
6868
}
6969
}
7070

71-
var rpcsAllowedWhileBootstrapping = map[string]struct{}{
72-
"/cockroach.rpc.Heartbeat/Ping": {},
73-
"/cockroach.gossip.Gossip/Gossip": {},
74-
"/cockroach.server.serverpb.Init/Bootstrap": {},
75-
"/cockroach.server.serverpb.Admin/Health": {},
76-
}
77-
78-
// intercept implements filtering rules for each server state.
79-
func (s *grpcServer) intercept(fullName string) error {
80-
if s.operational() {
81-
return nil
82-
}
83-
if _, allowed := rpcsAllowedWhileBootstrapping[fullName]; !allowed {
84-
return NewWaitingForInitError(fullName)
85-
}
86-
return nil
87-
}
88-
8971
// NewWaitingForInitError creates an error indicating that the server cannot run
9072
// the specified method until the node has been initialized.
9173
func NewWaitingForInitError(methodName string) error {

pkg/server/serve_mode.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,24 @@ const (
3636
modeDraining
3737
)
3838

39+
var rpcsAllowedWhileBootstrapping = map[string]struct{}{
40+
"/cockroach.rpc.Heartbeat/Ping": {},
41+
"/cockroach.gossip.Gossip/Gossip": {},
42+
"/cockroach.server.serverpb.Init/Bootstrap": {},
43+
"/cockroach.server.serverpb.Admin/Health": {},
44+
}
45+
46+
// intercept implements filtering rules for each server state.
47+
func (s *serveModeHandler) intercept(fullName string) error {
48+
if s.operational() {
49+
return nil
50+
}
51+
if _, allowed := rpcsAllowedWhileBootstrapping[fullName]; !allowed {
52+
return NewWaitingForInitError(fullName)
53+
}
54+
return nil
55+
}
56+
3957
func (s *serveMode) set(mode serveMode) {
4058
atomic.StoreInt32((*int32)(s), int32(mode))
4159
}

0 commit comments

Comments
 (0)