Skip to content

Commit dde2221

Browse files
craig[bot]cthumuluru-crdb
andcommitted
Merge #152948
152948: *: add generic dialers to create RPC clients r=cthumuluru-crdb a=cthumuluru-crdb This PR has changes to move `ExperimentalDRPCEnabled` flag to `rpcbase` package and add generic dial methods to create RPC clients. This refactor reduces the number of places where the DRPC experimental flag must be enabled, and simplifies adding version guards by limiting them to only a few locations. Epic: CRDB-48935 Informs: None Release note: None Co-authored-by: Chandra Thumuluru <[email protected]>
2 parents 79a2404 + b61418c commit dde2221

File tree

10 files changed

+130
-44
lines changed

10 files changed

+130
-44
lines changed

pkg/rpc/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ go_library(
2020
"keepalive.go",
2121
"metrics.go",
2222
"peer.go",
23-
"peer_drpc.go",
2423
"peer_map.go",
2524
"restricted_internal_client.go",
2625
"settings.go",

pkg/rpc/nodedialer/nodedialer.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (n *Dialer) DialInternalClient(
171171

172172
var client rpc.RestrictedInternalClient
173173
useStreamPoolClient := shouldUseBatchStreamPoolClient(ctx, n.rpcContext.Settings)
174-
if !rpc.ExperimentalDRPCEnabled.Get(&n.rpcContext.Settings.SV) {
174+
if !rpcbase.ExperimentalDRPCEnabled.Get(&n.rpcContext.Settings.SV) {
175175
gc, conn, err := dial(ctx, n.resolver, n.rpcContext.GRPCDialNode, nodeID, class, true /* checkBreaker */)
176176
if err != nil {
177177
return nil, errors.Wrapf(err, "gRPC")
@@ -430,3 +430,32 @@ func (c *tracingInternalClient) Batch(
430430
}
431431
return c.RestrictedInternalClient.Batch(ctx, ba)
432432
}
433+
434+
// DialRPCClient establishes a connection to a node identified by its ID and
435+
// returns a client for the requested service type. When DRPC is enabled, it
436+
// creates a DRPC client; otherwise, it falls back to a gRPC client.
437+
func DialRPCClient[C any](
438+
nd *Dialer,
439+
ctx context.Context,
440+
nodeID roachpb.NodeID,
441+
class rpcbase.ConnectionClass,
442+
grpcClientFn func(*grpc.ClientConn) C,
443+
drpcClientFn func(drpc.Conn) C,
444+
) (C, error) {
445+
return rpcbase.DialRPCClient(nd, ctx, nodeID, class, grpcClientFn,
446+
drpcClientFn, nd.rpcContext.Settings)
447+
}
448+
449+
// DialRPCClientNoBreaker is like DialRPCClient, but will not check the
450+
// circuit breaker before trying to connect.
451+
func DialRPCClientNoBreaker[C any](
452+
nd *Dialer,
453+
ctx context.Context,
454+
nodeID roachpb.NodeID,
455+
class rpcbase.ConnectionClass,
456+
grpcClientFn func(*grpc.ClientConn) C,
457+
drpcClientFn func(drpc.Conn) C,
458+
) (C, error) {
459+
return rpcbase.DialRPCClientNoBreaker(nd, ctx, nodeID, class, grpcClientFn,
460+
drpcClientFn, nd.rpcContext.Settings)
461+
}

pkg/rpc/peer_drpc.go

Lines changed: 0 additions & 34 deletions
This file was deleted.

pkg/rpc/rpcbase/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ go_library(
2525
"//pkg/keys",
2626
"//pkg/roachpb",
2727
"//pkg/rpc/rpcpb",
28+
"//pkg/settings",
29+
"//pkg/settings/cluster",
30+
"//pkg/util/buildutil",
2831
"//pkg/util/envutil",
32+
"@com_github_cockroachdb_errors//:errors",
2933
"@io_storj_drpc//:drpc",
3034
"@org_golang_google_grpc//:grpc",
3135
],

pkg/rpc/rpcbase/nodedialer.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,36 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/roachpb"
12+
"github.com/cockroachdb/cockroach/pkg/settings"
13+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
14+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
15+
"github.com/cockroachdb/cockroach/pkg/util/envutil"
16+
"github.com/cockroachdb/errors"
1217
"google.golang.org/grpc"
1318
"storj.io/drpc"
1419
)
1520

21+
var envExperimentalDRPCEnabled = envutil.EnvOrDefaultBool("COCKROACH_EXPERIMENTAL_DRPC_ENABLED", false)
22+
23+
// ExperimentalDRPCEnabled determines whether a drpc server accepting BatchRequest
24+
// is enabled. This server is experimental and completely unsuitable to production
25+
// usage (for example, does not implement authorization checks).
26+
var ExperimentalDRPCEnabled = settings.RegisterBoolSetting(
27+
settings.ApplicationLevel,
28+
"rpc.experimental_drpc.enabled",
29+
"if true, use drpc to execute Batch RPCs (instead of gRPC)",
30+
envExperimentalDRPCEnabled,
31+
settings.WithValidateBool(func(values *settings.Values, b bool) error {
32+
// drpc support is highly experimental and should not be enabled in production.
33+
// Since authorization is not implemented, we only even host the server if the
34+
// env var is set or it's a CRDB test build. Consequently, these are prereqs
35+
// for setting the cluster setting.
36+
if b && !(envExperimentalDRPCEnabled || buildutil.CrdbTestBuild) {
37+
return errors.New("experimental drpc is not allowed in this environment")
38+
}
39+
return nil
40+
}))
41+
1642
// TODODRPC is a marker to identify each RPC client creation site that needs to
1743
// be updated to support DRPC.
1844
const TODODRPC = false
@@ -31,3 +57,62 @@ type NodeDialerNoBreaker interface {
3157
DialNoBreaker(context.Context, roachpb.NodeID, ConnectionClass) (_ *grpc.ClientConn, err error)
3258
DRPCDialNoBreaker(context.Context, roachpb.NodeID, ConnectionClass) (_ drpc.Conn, err error)
3359
}
60+
61+
// DialRPCClient establishes a connection to a node identified by its ID and
62+
// returns a client for the requested service type. When DRPC is enabled, it
63+
// creates a DRPC client; otherwise, it falls back to a gRPC client.
64+
func DialRPCClient[C any](
65+
nd NodeDialer,
66+
ctx context.Context,
67+
nodeID roachpb.NodeID,
68+
class ConnectionClass,
69+
grpcClientFn func(*grpc.ClientConn) C,
70+
drpcClientFn func(drpc.Conn) C,
71+
st *cluster.Settings,
72+
) (C, error) {
73+
useDRPC := ExperimentalDRPCEnabled.Get(&st.SV)
74+
75+
var nilC C
76+
if !TODODRPC && !useDRPC {
77+
conn, err := nd.Dial(ctx, nodeID, class)
78+
if err != nil {
79+
return nilC, err
80+
}
81+
return grpcClientFn(conn), nil
82+
}
83+
84+
conn, err := nd.DRPCDial(ctx, nodeID, class)
85+
if err != nil {
86+
return nilC, err
87+
}
88+
return drpcClientFn(conn), nil
89+
}
90+
91+
// DialRPCClientNoBreaker is like DialRPCClient, but will not check the
92+
// circuit breaker before trying to connect.
93+
func DialRPCClientNoBreaker[C any](
94+
nd NodeDialerNoBreaker,
95+
ctx context.Context,
96+
nodeID roachpb.NodeID,
97+
class ConnectionClass,
98+
grpcClientFn func(*grpc.ClientConn) C,
99+
drpcClientFn func(drpc.Conn) C,
100+
st *cluster.Settings,
101+
) (C, error) {
102+
useDRPC := ExperimentalDRPCEnabled.Get(&st.SV)
103+
104+
var nilC C
105+
if !TODODRPC && !useDRPC {
106+
conn, err := nd.DialNoBreaker(ctx, nodeID, class)
107+
if err != nil {
108+
return nilC, err
109+
}
110+
return grpcClientFn(conn), nil
111+
}
112+
113+
conn, err := nd.DRPCDialNoBreaker(ctx, nodeID, class)
114+
if err != nil {
115+
return nilC, err
116+
}
117+
return drpcClientFn(conn), nil
118+
}

pkg/server/drpc_server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"crypto/tls"
1111

1212
"github.com/cockroachdb/cockroach/pkg/rpc"
13+
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1314
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
1415
"github.com/cockroachdb/errors"
1516
"google.golang.org/grpc/codes"
@@ -34,7 +35,7 @@ type drpcServer struct {
3435
// DRPC if the experimental setting is on, otherwise returns a dummy server.
3536
func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error) {
3637
drpcServer := &drpcServer{}
37-
if rpc.ExperimentalDRPCEnabled.Get(&rpcCtx.Settings.SV) {
38+
if rpcbase.ExperimentalDRPCEnabled.Get(&rpcCtx.Settings.SV) {
3839
d, err := rpc.NewDRPCServer(ctx, rpcCtx)
3940
if err != nil {
4041
return nil, err

pkg/server/drpc_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestDRPCBatchServer(t *testing.T) {
4343
args.ServerArgs.Insecure = insecure
4444
args.ReplicationMode = base.ReplicationManual
4545
args.ServerArgs.Settings = cluster.MakeClusterSettings()
46-
rpc.ExperimentalDRPCEnabled.Override(ctx, &args.ServerArgs.Settings.SV, true)
46+
rpcbase.ExperimentalDRPCEnabled.Override(ctx, &args.ServerArgs.Settings.SV, true)
4747
c := testcluster.StartTestCluster(t, numNodes, args)
4848
defer c.Stopper().Stop(ctx)
4949

@@ -99,7 +99,7 @@ func TestStreamContextCancel(t *testing.T) {
9999
}
100100

101101
ctx := context.Background()
102-
rpc.ExperimentalDRPCEnabled.Override(ctx, &args.ServerArgs.Settings.SV, true)
102+
rpcbase.ExperimentalDRPCEnabled.Override(ctx, &args.ServerArgs.Settings.SV, true)
103103
c := testcluster.StartTestCluster(t, numNodes, args)
104104
defer c.Stopper().Stop(ctx)
105105

@@ -225,6 +225,7 @@ func TestDialDRPC_InterceptorsAreSet(t *testing.T) {
225225
},
226226
}
227227

228+
rpcbase.ExperimentalDRPCEnabled.Override(ctx, &args.ServerArgs.Settings.SV, true)
228229
c := testcluster.StartTestCluster(t, numNodes, args)
229230
defer c.Stopper().Stop(ctx)
230231
rpcAddr := c.Server(0).RPCAddr()

pkg/server/server_drpc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"testing"
1212

1313
"github.com/cockroachdb/cockroach/pkg/base"
14-
"github.com/cockroachdb/cockroach/pkg/rpc"
14+
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1515
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1616
"github.com/cockroachdb/cockroach/pkg/testutils"
1717
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
@@ -29,7 +29,7 @@ func TestDRPCSelectQuery(t *testing.T) {
2929
defer cancel()
3030

3131
st := cluster.MakeTestingClusterSettings()
32-
rpc.ExperimentalDRPCEnabled.Override(ctx, &st.SV, true)
32+
rpcbase.ExperimentalDRPCEnabled.Override(ctx, &st.SV, true)
3333

3434
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{
3535
ServerArgs: base.TestServerArgs{

pkg/server/start_listen.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/cockroachdb/cmux"
1616
"github.com/cockroachdb/cockroach/pkg/rpc"
17+
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1718
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
1819
"github.com/cockroachdb/cockroach/pkg/util/log"
1920
"github.com/cockroachdb/cockroach/pkg/util/netutil"
@@ -137,8 +138,8 @@ func startListenRPCAndSQL(
137138
// Host drpc only if it's _possible_ to turn it on (this requires a test build
138139
// or env var). If the setting _is_ on, then it was overridden in testing and
139140
// we want to host the server too.
140-
hostDRPC := rpc.ExperimentalDRPCEnabled.Validate(nil /* not used */, true) == nil ||
141-
rpc.ExperimentalDRPCEnabled.Get(&cfg.Settings.SV)
141+
hostDRPC := rpcbase.ExperimentalDRPCEnabled.Validate(nil /* not used */, true) == nil ||
142+
rpcbase.ExperimentalDRPCEnabled.Get(&cfg.Settings.SV)
142143

143144
// If we're not hosting drpc, make a listener that never accepts anything.
144145
// We will start the dRPC server all the same; it barely consumes any

pkg/server/testserver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config {
334334
}
335335

336336
if params.DefaultDRPCOption == base.TestDRPCEnabled {
337-
rpc.ExperimentalDRPCEnabled.Override(context.Background(), &st.SV, true)
337+
rpcbase.ExperimentalDRPCEnabled.Override(context.Background(), &st.SV, true)
338338
}
339339

340340
return cfg

0 commit comments

Comments
 (0)