Skip to content

Commit e42de35

Browse files
craig[bot]shubhamdhama
andcommitted
Merge #147287
147287: server: refactor drpc server out of grpcServer r=cthumuluru-crdb a=shubhamdhama This change extracts the DRPC server from grpcServer. It also introduces a `serve` mode, mirroring the gRPC implementation. Fixes: #147285 Epic: CRDB-48925 Release note: None Co-authored-by: Shubham Dhama <[email protected]>
2 parents 5aa19ef + a183f84 commit e42de35

File tree

14 files changed

+265
-154
lines changed

14 files changed

+265
-154
lines changed

pkg/rpc/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,7 @@ go_library(
8181
"@io_storj_drpc//drpcconn",
8282
"@io_storj_drpc//drpcmanager",
8383
"@io_storj_drpc//drpcmigrate",
84-
"@io_storj_drpc//drpcmux",
8584
"@io_storj_drpc//drpcpool",
86-
"@io_storj_drpc//drpcserver",
8785
"@io_storj_drpc//drpcstream",
8886
"@io_storj_drpc//drpcwire",
8987
"@org_golang_google_grpc//:grpc",

pkg/rpc/context.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import (
5656
// either expects incoming connections from KV nodes, or from tenant SQL
5757
// servers.
5858
func NewServer(ctx context.Context, rpcCtx *Context, opts ...ServerOption) (*grpc.Server, error) {
59-
srv, _, _, err := NewServerEx(ctx, rpcCtx, opts...)
59+
srv, _, err := NewServerEx(ctx, rpcCtx, opts...)
6060
return srv, err
6161
}
6262

@@ -84,7 +84,7 @@ type ClientInterceptorInfo struct {
8484
// internalClientAdapter does).
8585
func NewServerEx(
8686
ctx context.Context, rpcCtx *Context, opts ...ServerOption,
87-
) (s *grpc.Server, d *DRPCServer, sii ServerInterceptorInfo, err error) {
87+
) (s *grpc.Server, sii ServerInterceptorInfo, err error) {
8888
var o serverOpts
8989
for _, f := range opts {
9090
f(&o)
@@ -113,7 +113,7 @@ func NewServerEx(
113113
if !rpcCtx.ContextOptions.Insecure {
114114
tlsConfig, err := rpcCtx.GetServerTLSConfig()
115115
if err != nil {
116-
return nil, nil, sii, err
116+
return nil, sii, err
117117
}
118118
grpcOpts = append(grpcOpts, grpc.Creds(newTLSCipherRestrictCred(tlsConfig)))
119119
}
@@ -194,13 +194,10 @@ func NewServerEx(
194194
grpcOpts = append(grpcOpts, grpc.ChainStreamInterceptor(streamInterceptor...))
195195

196196
s = grpc.NewServer(grpcOpts...)
197-
d, err = newDRPCServer(ctx, rpcCtx)
198-
if err != nil {
199-
return nil, nil, ServerInterceptorInfo{}, err
200-
}
197+
201198
RegisterHeartbeatServer(s, rpcCtx.NewHeartbeatService())
202199

203-
return s, d, ServerInterceptorInfo{
200+
return s, ServerInterceptorInfo{
204201
UnaryInterceptors: unaryInterceptor,
205202
StreamInterceptors: streamInterceptor,
206203
}, nil

pkg/rpc/context_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ func TestInternalClientAdapterRunsInterceptors(t *testing.T) {
435435
serverCtx.AdvertiseAddr = "127.0.0.1:8888"
436436
serverCtx.NodeID.Set(context.Background(), 1)
437437

438-
_ /* gRPC server */, _ /* drpc server */, serverInterceptors, err := NewServerEx(ctx, serverCtx)
438+
_ /* gRPC server */, serverInterceptors, err := NewServerEx(ctx, serverCtx)
439439
require.NoError(t, err)
440440

441441
// Pile on one more interceptor to make sure it's called.
@@ -536,7 +536,7 @@ func TestInternalClientAdapterWithClientStreamInterceptors(t *testing.T) {
536536
serverCtx.AdvertiseAddr = "127.0.0.1:8888"
537537
serverCtx.NodeID.Set(context.Background(), 1)
538538

539-
_ /* gRPC server */, _ /* drpc server */, serverInterceptors, err := NewServerEx(ctx, serverCtx)
539+
_ /* gRPC server */, serverInterceptors, err := NewServerEx(ctx, serverCtx)
540540
require.NoError(t, err)
541541
var clientInterceptors ClientInterceptorInfo
542542
var s *testClientStream
@@ -599,7 +599,7 @@ func TestInternalClientAdapterWithServerStreamInterceptors(t *testing.T) {
599599
serverCtx.AdvertiseAddr = "127.0.0.1:8888"
600600
serverCtx.NodeID.Set(context.Background(), 1)
601601

602-
_ /* gRPC server */, _ /* drpc server */, serverInterceptors, err := NewServerEx(ctx, serverCtx)
602+
_ /* gRPC server */, serverInterceptors, err := NewServerEx(ctx, serverCtx)
603603
require.NoError(t, err)
604604

605605
const int1Name = "interceptor 1"
@@ -737,7 +737,7 @@ func BenchmarkInternalClientAdapter(b *testing.B) {
737737
serverCtx.AdvertiseAddr = "127.0.0.1:8888"
738738
serverCtx.NodeID.Set(context.Background(), 1)
739739

740-
_ /* gRPC server */, _ /* drpc server */, interceptors, err := NewServerEx(ctx, serverCtx)
740+
_ /* gRPC server */, interceptors, err := NewServerEx(ctx, serverCtx)
741741
require.NoError(b, err)
742742

743743
internal := &internalServer{}
@@ -2369,7 +2369,7 @@ func TestMetricsInterceptor(t *testing.T) {
23692369
return nil, nil
23702370
}
23712371

2372-
_, _, serverInterceptors, err := NewServerEx(ctx, serverCtx, WithMetricsServerInterceptor(interceptor))
2372+
_, serverInterceptors, err := NewServerEx(ctx, serverCtx, WithMetricsServerInterceptor(interceptor))
23732373
require.NoError(t, err)
23742374
require.GreaterOrEqual(t, len(serverInterceptors.UnaryInterceptors), 2)
23752375
// make sure that the RequestMetricsInterceptor is the second registered

pkg/rpc/drpc.go

Lines changed: 0 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -9,91 +9,20 @@ import (
99
"context"
1010
"crypto/tls"
1111
"math"
12-
"net"
1312
"time"
1413

15-
"github.com/cockroachdb/cockroach/pkg/util/log"
16-
"github.com/cockroachdb/errors"
1714
"storj.io/drpc"
1815
"storj.io/drpc/drpcconn"
1916
"storj.io/drpc/drpcmanager"
2017
"storj.io/drpc/drpcmigrate"
21-
"storj.io/drpc/drpcmux"
2218
"storj.io/drpc/drpcpool"
23-
"storj.io/drpc/drpcserver"
2419
"storj.io/drpc/drpcstream"
2520
"storj.io/drpc/drpcwire"
2621
)
2722

28-
// ErrDRPCDisabled is returned from hosts that in principle could but do not
29-
// have the DRPC server enabled.
30-
var ErrDRPCDisabled = errors.New("DRPC is not enabled")
31-
3223
// Default idle connection timeout for DRPC connections in the pool.
3324
var defaultDRPCConnIdleTimeout = 5 * time.Minute
3425

35-
type drpcServerI interface {
36-
Serve(ctx context.Context, lis net.Listener) error
37-
}
38-
39-
type drpcMuxI interface {
40-
Register(srv interface{}, desc drpc.Description) error
41-
}
42-
43-
type DRPCServer struct {
44-
Srv drpcServerI
45-
Mux drpcMuxI
46-
TLSCfg *tls.Config
47-
}
48-
49-
var _ drpcServerI = (*drpcserver.Server)(nil)
50-
var _ drpcServerI = (*drpcOffServer)(nil)
51-
52-
func newDRPCServer(_ context.Context, rpcCtx *Context) (*DRPCServer, error) {
53-
var dmux drpcMuxI = &drpcOffServer{}
54-
var dsrv drpcServerI = &drpcOffServer{}
55-
var tlsCfg *tls.Config
56-
57-
if ExperimentalDRPCEnabled.Get(&rpcCtx.Settings.SV) {
58-
mux := drpcmux.New()
59-
dsrv = drpcserver.NewWithOptions(mux, drpcserver.Options{
60-
Log: func(err error) {
61-
log.Warningf(context.Background(), "drpc server error %v", err)
62-
},
63-
// The reader's max buffer size defaults to 4mb, and if it is exceeded (such
64-
// as happens with AddSSTable) the RPCs fail.
65-
Manager: drpcmanager.Options{Reader: drpcwire.ReaderOptions{MaximumBufferSize: math.MaxInt}},
66-
})
67-
dmux = mux
68-
69-
var err error
70-
tlsCfg, err = rpcCtx.GetServerTLSConfig()
71-
if err != nil {
72-
return nil, err
73-
}
74-
75-
// NB: any server middleware (server interceptors in gRPC parlance) would go
76-
// here:
77-
// dmux = whateverMiddleware1(dmux)
78-
// dmux = whateverMiddleware2(dmux)
79-
// ...
80-
//
81-
// Each middleware must implement the Handler interface:
82-
//
83-
// HandleRPC(stream Stream, rpc string) error
84-
//
85-
// where Stream
86-
// See here for an example:
87-
// https://github.com/bryk-io/pkg/blob/4da5fbfef47770be376e4022eab5c6c324984bf7/net/drpc/server.go#L91-L101
88-
}
89-
90-
return &DRPCServer{
91-
Srv: dsrv,
92-
Mux: dmux,
93-
TLSCfg: tlsCfg,
94-
}, nil
95-
}
96-
9726
func dialDRPC(
9827
rpcCtx *Context,
9928
) func(ctx context.Context, target string, _ ConnectionClass) (drpc.Conn, error) {
@@ -162,22 +91,4 @@ func (c *closeEntirePoolConn) Close() error {
16291
return c.pool.Close()
16392
}
16493

165-
// drpcOffServer is used for drpcServerI and drpcMuxI if the DRPC server is
166-
// disabled. It immediately closes accepted connections and returns
167-
// ErrDRPCDisabled.
168-
type drpcOffServer struct{}
169-
170-
func (srv *drpcOffServer) Serve(_ context.Context, lis net.Listener) error {
171-
conn, err := lis.Accept()
172-
if err != nil {
173-
return err
174-
}
175-
_ = conn.Close()
176-
return ErrDRPCDisabled
177-
}
178-
179-
func (srv *drpcOffServer) Register(interface{}, drpc.Description) error {
180-
return nil
181-
}
182-
18394
type DRPCConnection = Connection[drpc.Conn]

pkg/server/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ go_library(
2424
"distsql_flows.go",
2525
"doc.go",
2626
"drain.go",
27+
"drpc_server.go",
2728
"env_sampler.go",
2829
"external_storage_builder.go",
2930
"fanout_clients.go",
@@ -51,6 +52,7 @@ go_library(
5152
"rlimit_bsd.go",
5253
"rlimit_darwin.go",
5354
"rlimit_unix.go",
55+
"serve_mode.go",
5456
"server.go",
5557
"server_controller.go",
5658
"server_controller_channel_orchestrator.go",
@@ -377,7 +379,13 @@ go_library(
377379
"@com_github_prometheus_client_model//go",
378380
"@com_github_prometheus_common//expfmt",
379381
"@in_gopkg_yaml_v2//:yaml_v2",
382+
"@io_storj_drpc//:drpc",
383+
"@io_storj_drpc//drpcerr",
384+
"@io_storj_drpc//drpcmanager",
380385
"@io_storj_drpc//drpcmigrate",
386+
"@io_storj_drpc//drpcmux",
387+
"@io_storj_drpc//drpcserver",
388+
"@io_storj_drpc//drpcwire",
381389
"@org_golang_google_grpc//:grpc",
382390
"@org_golang_google_grpc//codes",
383391
"@org_golang_google_grpc//metadata",

pkg/server/admin.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ type adminServer struct {
117117
rpcContext *rpc.Context
118118
clock *hlc.Clock
119119
grpc *grpcServer
120+
drpc *drpcServer
120121
db *kv.DB
121122
drainServer *drainServer
122123
}
@@ -156,6 +157,7 @@ func newSystemAdminServer(
156157
clock *hlc.Clock,
157158
distSender *kvcoord.DistSender,
158159
grpc *grpcServer,
160+
drpc *drpcServer,
159161
drainServer *drainServer,
160162
s *topLevelServer,
161163
) *systemAdminServer {
@@ -172,6 +174,7 @@ func newSystemAdminServer(
172174
clock,
173175
distSender,
174176
grpc,
177+
drpc,
175178
drainServer,
176179
)
177180
return &systemAdminServer{
@@ -199,6 +202,7 @@ func newAdminServer(
199202
clock *hlc.Clock,
200203
distSender *kvcoord.DistSender,
201204
grpc *grpcServer,
205+
drpc *drpcServer,
202206
drainServer *drainServer,
203207
) *adminServer {
204208
server := &adminServer{
@@ -217,6 +221,7 @@ func newAdminServer(
217221
rpcContext: rpcCtx,
218222
clock: clock,
219223
grpc: grpc,
224+
drpc: drpc,
220225
db: db,
221226
drainServer: drainServer,
222227
}
@@ -2100,10 +2105,18 @@ func (s *adminServer) Health(
21002105

21012106
// checkReadinessForHealthCheck returns a gRPC error.
21022107
func (s *adminServer) checkReadinessForHealthCheck(ctx context.Context) error {
2108+
// A gRPC server will always be running, so ensure that we check its health
2109+
// until it is completely removed after the DRPC to gRPC migration.
21032110
if err := s.grpc.health(ctx); err != nil {
21042111
return err
21052112
}
21062113

2114+
if s.drpc.enabled {
2115+
if err := s.drpc.health(ctx); err != nil {
2116+
return err
2117+
}
2118+
}
2119+
21072120
if !s.sqlServer.isReady.Load() {
21082121
return grpcstatus.Errorf(codes.Unavailable, "node is not accepting SQL clients")
21092122
}
@@ -2138,10 +2151,18 @@ func (s *systemAdminServer) Health(
21382151

21392152
// checkReadinessForHealthCheck returns a gRPC error.
21402153
func (s *systemAdminServer) checkReadinessForHealthCheck(ctx context.Context) error {
2154+
// A gRPC server will always be running, so ensure that we check its health
2155+
// until it is completely removed after the DRPC to gRPC migration.
21412156
if err := s.grpc.health(ctx); err != nil {
21422157
return err
21432158
}
21442159

2160+
if s.drpc.enabled {
2161+
if err := s.drpc.health(ctx); err != nil {
2162+
return err
2163+
}
2164+
}
2165+
21452166
status := s.nodeLiveness.GetNodeVitalityFromCache(roachpb.NodeID(s.serverIterator.getID()))
21462167
if !status.IsLive(livenesspb.AdminHealthCheck) {
21472168
return grpcstatus.Errorf(codes.Unavailable, "node is not healthy")

pkg/server/drain.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ type drainServer struct {
119119
// stopTrigger is used to request that the server is shut down.
120120
stopTrigger *stopTrigger
121121
grpc *grpcServer
122+
drpc *drpcServer
122123
sqlServer *SQLServer
123124
drainSleepFn func(time.Duration)
124125
serverCtl *serverController
@@ -135,6 +136,7 @@ func newDrainServer(
135136
stopper *stop.Stopper,
136137
stopTrigger *stopTrigger,
137138
grpc *grpcServer,
139+
drpc *drpcServer,
138140
sqlServer *SQLServer,
139141
) *drainServer {
140142
var drainSleepFn = time.Sleep
@@ -147,6 +149,7 @@ func newDrainServer(
147149
stopper: stopper,
148150
stopTrigger: stopTrigger,
149151
grpc: grpc,
152+
drpc: drpc,
150153
sqlServer: sqlServer,
151154
drainSleepFn: drainSleepFn,
152155
}
@@ -384,6 +387,7 @@ func (s *drainServer) drainClients(
384387
// Set the gRPC mode of the node to "draining" and mark the node as "not ready".
385388
// Probes to /health?ready=1 will now notice the change in the node's readiness.
386389
s.grpc.setMode(modeDraining)
390+
s.drpc.setMode(modeDraining)
387391
s.sqlServer.isReady.Store(false)
388392

389393
// Log the number of connections periodically.

0 commit comments

Comments
 (0)