Skip to content

Commit e1c1794

Browse files
craig[bot]shubhamdhama
andcommitted
Merge #147822
147822: drpc: move drpc server back to pkg/rpc r=cthumuluru-crdb a=shubhamdhama `NewRaftTransport` needs a `drpc.Mux` to register its server implementations, so the straightforward approach is to pass `drpcServer.Mux`. However, some tests within the `kvserver` package (not `kvserver_test` itself) also require this `drpcServer.Mux`. This would necessitate exporting `newDRPCServer` and `drpcServer.mux` as `NewDRPCServer` and `drpcServer.Mux` respectively. But even then, this won't resolve the situation; it will introduce a cyclic dependency. Moving these tests to `kvserver_test` is not a viable option, as they rely on private methods within `kvserver`. Even if we managed to untangle this dependency mess, the whole setup just feels wrong and screams for a proper refactor. We should simply align with how gRPC handles this. Additionally, as part of this change, it's more appropriate for the DRPC server implementation residing in `pkg/rpc` to be exposed as an interface to other packages, rather than as a concrete object. Epic: CRDB-48925 Release note: None Co-authored-by: Shubham Dhama <[email protected]>
2 parents 314510c + f203631 commit e1c1794

File tree

6 files changed

+113
-93
lines changed

6 files changed

+113
-93
lines changed

pkg/rpc/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ go_library(
8080
"@io_storj_drpc//drpcconn",
8181
"@io_storj_drpc//drpcmanager",
8282
"@io_storj_drpc//drpcmigrate",
83+
"@io_storj_drpc//drpcmux",
8384
"@io_storj_drpc//drpcpool",
85+
"@io_storj_drpc//drpcserver",
8486
"@io_storj_drpc//drpcstream",
8587
"@io_storj_drpc//drpcwire",
8688
"@org_golang_google_grpc//:grpc",

pkg/rpc/drpc.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,19 @@ import (
99
"context"
1010
"crypto/tls"
1111
"math"
12+
"net"
1213
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/cockroachdb/errors"
1518
"storj.io/drpc"
1619
"storj.io/drpc/drpcconn"
1720
"storj.io/drpc/drpcmanager"
1821
"storj.io/drpc/drpcmigrate"
22+
"storj.io/drpc/drpcmux"
1923
"storj.io/drpc/drpcpool"
24+
"storj.io/drpc/drpcserver"
2025
"storj.io/drpc/drpcstream"
2126
"storj.io/drpc/drpcwire"
2227
)
@@ -93,3 +98,79 @@ func (c *closeEntirePoolConn) Close() error {
9398
}
9499

95100
type DRPCConnection = Connection[drpc.Conn]
101+
102+
// ErrDRPCDisabled is returned from hosts that in principle could but do not
103+
// have the DRPC server enabled.
104+
var ErrDRPCDisabled = errors.New("DRPC is not enabled")
105+
106+
// DRPCServer defines the interface for a DRPC server, which can serve
107+
// connections and provides a drpc.Mux for registering handlers.
108+
type DRPCServer interface {
109+
// Serve starts serving DRPC requests on the provided listener.
110+
Serve(ctx context.Context, lis net.Listener) error
111+
drpc.Mux
112+
}
113+
114+
// drpcServer implements the DRPCServer interface.
115+
type drpcServer struct {
116+
*drpcserver.Server
117+
drpc.Mux
118+
}
119+
120+
// NewDRPCServer creates a new DRPCServer with the provided rpc context.
121+
func NewDRPCServer(_ context.Context, rpcCtx *Context) (DRPCServer, error) {
122+
d := &drpcServer{}
123+
mux := drpcmux.New()
124+
d.Server = drpcserver.NewWithOptions(mux, drpcserver.Options{
125+
Log: func(err error) {
126+
log.Warningf(context.Background(), "drpc server error %v", err)
127+
},
128+
// The reader's max buffer size defaults to 4mb, and if it is exceeded (such
129+
// as happens with AddSSTable) the RPCs fail.
130+
Manager: drpcmanager.Options{Reader: drpcwire.ReaderOptions{MaximumBufferSize: math.MaxInt}},
131+
})
132+
d.Mux = mux
133+
134+
// NB: any server middleware (server interceptors in gRPC parlance) would go
135+
// here:
136+
// dmux = whateverMiddleware1(dmux)
137+
// dmux = whateverMiddleware2(dmux)
138+
// ...
139+
//
140+
// Each middleware must implement the Handler interface:
141+
//
142+
// HandleRPC(stream Stream, rpc string) error
143+
//
144+
// where Stream
145+
// See here for an example:
146+
// https://github.com/bryk-io/pkg/blob/4da5fbfef47770be376e4022eab5c6c324984bf7/net/drpc/server.go#L91-L101
147+
return d, nil
148+
}
149+
150+
// NewDummyDRPCServer returns a DRPCServer that is disabled and always returns
151+
// ErrDRPCDisabled.
152+
func NewDummyDRPCServer() DRPCServer {
153+
return &drpcOffServer{}
154+
}
155+
156+
// drpcOffServer is a disabled DRPC server implementation. It immediately closes
157+
// accepted connections and returns ErrDRPCDisabled for all Serve calls.
158+
// Register is a no-op.
159+
type drpcOffServer struct{}
160+
161+
// Serve implements the DRPCServer interface for drpcOffServer. It closes any
162+
// accepted connection and returns ErrDRPCDisabled.
163+
func (srv *drpcOffServer) Serve(_ context.Context, lis net.Listener) error {
164+
conn, err := lis.Accept()
165+
if err != nil {
166+
return err
167+
}
168+
_ = conn.Close()
169+
return ErrDRPCDisabled
170+
}
171+
172+
// Register implements the drpc.Mux interface for drpcOffServer. It is a no-op
173+
// when DRPC is disabled.
174+
func (srv *drpcOffServer) Register(interface{}, drpc.Description) error {
175+
return nil
176+
}

pkg/server/BUILD.bazel

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -375,13 +375,8 @@ go_library(
375375
"@com_github_prometheus_client_model//go",
376376
"@com_github_prometheus_common//expfmt",
377377
"@in_gopkg_yaml_v2//:yaml_v2",
378-
"@io_storj_drpc//:drpc",
379378
"@io_storj_drpc//drpcerr",
380-
"@io_storj_drpc//drpcmanager",
381379
"@io_storj_drpc//drpcmigrate",
382-
"@io_storj_drpc//drpcmux",
383-
"@io_storj_drpc//drpcserver",
384-
"@io_storj_drpc//drpcwire",
385380
"@org_golang_google_grpc//:grpc",
386381
"@org_golang_google_grpc//codes",
387382
"@org_golang_google_grpc//metadata",

pkg/server/drpc_server.go

Lines changed: 27 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -8,116 +8,58 @@ package server
88
import (
99
"context"
1010
"crypto/tls"
11-
"math"
12-
"net"
1311

1412
"github.com/cockroachdb/cockroach/pkg/rpc"
1513
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
16-
"github.com/cockroachdb/cockroach/pkg/util/log"
1714
"github.com/cockroachdb/errors"
1815
"google.golang.org/grpc/codes"
19-
"storj.io/drpc"
2016
"storj.io/drpc/drpcerr"
21-
"storj.io/drpc/drpcmanager"
22-
"storj.io/drpc/drpcmux"
23-
"storj.io/drpc/drpcserver"
24-
"storj.io/drpc/drpcwire"
2517
)
2618

27-
// ErrDRPCDisabled is returned from hosts that in principle could but do not
28-
// have the DRPC server enabled.
29-
var ErrDRPCDisabled = errors.New("DRPC is not enabled")
30-
31-
type drpcServerI interface {
32-
Serve(ctx context.Context, lis net.Listener) error
33-
}
34-
35-
type drpcMuxI interface {
36-
Register(srv interface{}, desc drpc.Description) error
37-
}
38-
19+
// drpcServer wraps a DRPCServer and manages its enabled state and TLS
20+
// configuration. It also tracks the current serving mode for health checks.
3921
type drpcServer struct {
22+
// Embeds logic for managing server mode (initializing, draining, operational).
4023
serveModeHandler
41-
srv drpcServerI
42-
mux drpcMuxI
43-
tlsCfg *tls.Config
24+
// Underlying DRPC server implementation.
25+
rpc.DRPCServer
26+
// Indicates if DRPC is enabled for this server.
4427
enabled bool
28+
// TLS configuration for secure connections.
29+
tlsCfg *tls.Config
4530
}
4631

47-
var _ drpcServerI = (*drpcserver.Server)(nil)
48-
var _ drpcServerI = (*drpcOffServer)(nil)
49-
32+
// newDRPCServer creates and configures a new drpcServer instance. It enables
33+
// DRPC if the experimental setting is on, otherwise returns a dummy server.
34+
//
5035
// TODO: Register DRPC Heartbeat service
51-
func newDRPCServer(_ context.Context, rpcCtx *rpc.Context) (*drpcServer, error) {
52-
var dmux drpcMuxI = &drpcOffServer{}
53-
var dsrv drpcServerI = &drpcOffServer{}
54-
var tlsCfg *tls.Config
55-
enabled := false
56-
36+
func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error) {
37+
drpcServer := &drpcServer{}
5738
if rpc.ExperimentalDRPCEnabled.Get(&rpcCtx.Settings.SV) {
58-
enabled = true
59-
mux := drpcmux.New()
60-
dsrv = drpcserver.NewWithOptions(mux, drpcserver.Options{
61-
Log: func(err error) {
62-
log.Warningf(context.Background(), "drpc server error %v", err)
63-
},
64-
// The reader's max buffer size defaults to 4mb, and if it is exceeded (such
65-
// as happens with AddSSTable) the RPCs fail.
66-
Manager: drpcmanager.Options{Reader: drpcwire.ReaderOptions{MaximumBufferSize: math.MaxInt}},
67-
})
68-
dmux = mux
69-
70-
var err error
71-
tlsCfg, err = rpcCtx.GetServerTLSConfig()
39+
d, err := rpc.NewDRPCServer(ctx, rpcCtx)
7240
if err != nil {
7341
return nil, err
7442
}
75-
76-
// NB: any server middleware (server interceptors in gRPC parlance) would go
77-
// here:
78-
// dmux = whateverMiddleware1(dmux)
79-
// dmux = whateverMiddleware2(dmux)
80-
// ...
81-
//
82-
// Each middleware must implement the Handler interface:
83-
//
84-
// HandleRPC(stream Stream, rpc string) error
85-
//
86-
// where Stream
87-
// See here for an example:
88-
// https://github.com/bryk-io/pkg/blob/4da5fbfef47770be376e4022eab5c6c324984bf7/net/drpc/server.go#L91-L101
89-
}
90-
91-
d := &drpcServer{
92-
srv: dsrv,
93-
mux: dmux,
94-
tlsCfg: tlsCfg,
95-
enabled: enabled,
43+
drpcServer.DRPCServer = d
44+
drpcServer.enabled = true
45+
} else {
46+
drpcServer.DRPCServer = rpc.NewDummyDRPCServer()
47+
drpcServer.enabled = false
9648
}
9749

98-
d.setMode(modeInitializing)
99-
100-
return d, nil
101-
}
102-
103-
// drpcOffServer is used for drpcServerI and drpcMuxI if the DRPC server is
104-
// disabled. It immediately closes accepted connections and returns
105-
// ErrDRPCDisabled.
106-
type drpcOffServer struct{}
107-
108-
func (srv *drpcOffServer) Serve(_ context.Context, lis net.Listener) error {
109-
conn, err := lis.Accept()
50+
tlsCfg, err := rpcCtx.GetServerTLSConfig()
11051
if err != nil {
111-
return err
52+
return nil, err
11253
}
113-
_ = conn.Close()
114-
return ErrDRPCDisabled
115-
}
11654

117-
func (srv *drpcOffServer) Register(interface{}, drpc.Description) error {
118-
return nil
55+
drpcServer.tlsCfg = tlsCfg
56+
drpcServer.setMode(modeInitializing)
57+
58+
return drpcServer, nil
11959
}
12060

61+
// health returns an error if the server is not operational, encoding the error
62+
// with a DRPC code. Returns nil if the server is healthy and operational.
12163
func (s *drpcServer) health(ctx context.Context) error {
12264
sm := s.mode.get()
12365
switch sm {

pkg/server/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
975975
cfg.LicenseEnforcer,
976976
)
977977
kvpb.RegisterInternalServer(grpcServer.Server, node)
978-
if err := kvpb.DRPCRegisterKVBatch(drpcServer.mux, node.AsDRPCKVBatchServer()); err != nil {
978+
if err := kvpb.DRPCRegisterKVBatch(drpcServer, node.AsDRPCKVBatchServer()); err != nil {
979979
return nil, err
980980
}
981981
kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer)

pkg/server/start_listen.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,9 @@ func startListenRPCAndSQL(
207207
_ = stopper.RunAsyncTask(drpcCtx, "serve-drpc", func(ctx context.Context) {
208208
if cfg := drpc.tlsCfg; cfg != nil {
209209
drpcTLSL := tls.NewListener(drpcL, cfg)
210-
netutil.FatalIfUnexpected(drpc.srv.Serve(ctx, drpcTLSL))
210+
netutil.FatalIfUnexpected(drpc.Serve(ctx, drpcTLSL))
211211
} else {
212-
netutil.FatalIfUnexpected(drpc.srv.Serve(ctx, drpcL))
212+
netutil.FatalIfUnexpected(drpc.Serve(ctx, drpcL))
213213
}
214214
})
215215
_ = stopper.RunAsyncTask(workersCtx, "serve-loopback-grpc", func(context.Context) {

0 commit comments

Comments
 (0)