Skip to content

Commit fece670

Browse files
cli,kv,server: consolidate StatusClient RPC client creation
This commit consolidates `StatusClient` RPC client creation logic in cli, kv, and server packages. It is a continuation of the work done in [#147606]. Epic: CRDB-48923 Fixes: #148099 Release note: none
1 parent 5ccd93e commit fece670

File tree

20 files changed

+216
-128
lines changed

20 files changed

+216
-128
lines changed

pkg/acceptance/localcluster/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
"//pkg/config/zonepb",
1515
"//pkg/roachpb",
1616
"//pkg/rpc",
17+
"//pkg/rpc/rpcbase",
1718
"//pkg/server/serverpb",
1819
"//pkg/settings/cluster",
1920
"//pkg/testutils",

pkg/acceptance/localcluster/cluster.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
2929
"github.com/cockroachdb/cockroach/pkg/roachpb"
3030
"github.com/cockroachdb/cockroach/pkg/rpc"
31+
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
3132
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
3233
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
3334
"github.com/cockroachdb/cockroach/pkg/testutils"
@@ -477,11 +478,14 @@ func (n *Node) StatusClient(ctx context.Context) serverpb.StatusClient {
477478
return existingClient
478479
}
479480

480-
conn, err := n.rpcCtx.GRPCUnvalidatedDial(n.RPCAddr(), roachpb.Locality{}).Connect(ctx)
481-
if err != nil {
482-
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
481+
if !rpcbase.TODODRPC {
482+
conn, err := n.rpcCtx.GRPCUnvalidatedDial(n.RPCAddr(), roachpb.Locality{}).Connect(ctx)
483+
if err != nil {
484+
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
485+
}
486+
return serverpb.NewStatusClient(conn)
483487
}
484-
return serverpb.NewStatusClient(conn)
488+
return nil // This should never happen
485489
}
486490

487491
func (n *Node) logDir() string {

pkg/cli/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ go_library(
4646
"node.go",
4747
"nodelocal.go",
4848
"prefixer.go",
49-
"rpc_client.go",
49+
"rpc_clients.go",
5050
"rpc_node_shutdown.go",
5151
"sql_client.go",
5252
"sql_shell_cmd.go",

pkg/cli/debug.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1037,12 +1037,13 @@ func runDebugGossipValues(cmd *cobra.Command, args []string) error {
10371037
return errors.Wrap(err, "failed to parse provided file as gossip.InfoStatus")
10381038
}
10391039
} else {
1040-
status, finish, err := getStatusClient(ctx, serverCfg)
1040+
conn, finish, err := newClientConn(ctx, serverCfg)
10411041
if err != nil {
10421042
return err
10431043
}
10441044
defer finish()
10451045

1046+
status := conn.NewStatusClient()
10461047
gossipInfo, err = status.Gossip(ctx, &serverpb.GossipRequest{})
10471048
if err != nil {
10481049
return errors.Wrap(err, "failed to retrieve gossip from server")

pkg/cli/debug_list_files.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,14 @@ func runDebugListFiles(cmd *cobra.Command, _ []string) error {
4141
defer cancel()
4242

4343
// Connect to the node pointed to in the command line.
44-
status, finish, err := getStatusClient(ctx, serverCfg)
44+
conn, finish, err := newClientConn(ctx, serverCfg)
4545
if err != nil {
4646
return err
4747
}
4848
defer finish()
4949

5050
// Retrieve the details for the head node.
51+
status := conn.NewStatusClient()
5152
firstNodeDetails, err := status.Details(ctx, &serverpb.DetailsRequest{NodeId: "local"})
5253
if err != nil {
5354
return err

pkg/cli/haproxy.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,12 +216,13 @@ func runGenHAProxyCmd(cmd *cobra.Command, args []string) error {
216216
return err
217217
}
218218

219-
c, finish, err := getStatusClient(ctx, serverCfg)
219+
conn, finish, err := newClientConn(ctx, serverCfg)
220220
if err != nil {
221221
return err
222222
}
223223
defer finish()
224224

225+
c := conn.NewStatusClient()
225226
nodeStatuses, err := c.Nodes(ctx, &serverpb.NodesRequest{})
226227
if err != nil {
227228
return err

pkg/cli/node.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -348,15 +348,14 @@ func runDecommissionNode(cmd *cobra.Command, args []string) error {
348348
return err
349349
}
350350

351-
conn, finish, err := getClientGRPCConn(ctx, serverCfg)
351+
conn, finish, err := newClientConn(ctx, serverCfg)
352352
if err != nil {
353-
return errors.Wrap(err, "failed to connect to the node")
353+
return err
354354
}
355355
defer finish()
356356

357-
s := serverpb.NewStatusClient(conn)
358-
359-
localNodeID, err := getLocalNodeID(ctx, s)
357+
statusClient := conn.NewStatusClient()
358+
localNodeID, err := getLocalNodeID(ctx, statusClient)
360359
if err != nil {
361360
return err
362361
}
@@ -366,12 +365,12 @@ func runDecommissionNode(cmd *cobra.Command, args []string) error {
366365
return err
367366
}
368367

369-
if err := expectNodesDecommissioned(ctx, s, nodeIDs, false /* expDecommissioned */); err != nil {
368+
if err := expectNodesDecommissioned(ctx, statusClient, nodeIDs, false /* expDecommissioned */); err != nil {
370369
return err
371370
}
372371

373-
c := serverpb.NewAdminClient(conn)
374-
if err := runDecommissionNodeImpl(ctx, c, nodeCtx.nodeDecommissionWait,
372+
adminClient := conn.NewAdminClient()
373+
if err := runDecommissionNodeImpl(ctx, adminClient, nodeCtx.nodeDecommissionWait,
375374
nodeCtx.nodeDecommissionChecks, nodeCtx.nodeDecommissionDryRun,
376375
nodeIDs, localNodeID,
377376
); err != nil {
@@ -831,15 +830,14 @@ func runRecommissionNode(cmd *cobra.Command, args []string) error {
831830
return err
832831
}
833832

834-
conn, finish, err := getClientGRPCConn(ctx, serverCfg)
833+
conn, finish, err := newClientConn(ctx, serverCfg)
835834
if err != nil {
836-
return errors.Wrap(err, "failed to connect to the node")
835+
return err
837836
}
838837
defer finish()
839838

840-
s := serverpb.NewStatusClient(conn)
841-
842-
localNodeID, err := getLocalNodeID(ctx, s)
839+
statusClient := conn.NewStatusClient()
840+
localNodeID, err := getLocalNodeID(ctx, statusClient)
843841
if err != nil {
844842
return err
845843
}
@@ -849,16 +847,16 @@ func runRecommissionNode(cmd *cobra.Command, args []string) error {
849847
return err
850848
}
851849

852-
if err := expectNodesDecommissioned(ctx, s, nodeIDs, true /* expDecommissioned */); err != nil {
850+
if err := expectNodesDecommissioned(ctx, statusClient, nodeIDs, true /* expDecommissioned */); err != nil {
853851
return err
854852
}
855853

856-
c := serverpb.NewAdminClient(conn)
854+
adminClient := conn.NewAdminClient()
857855
req := &serverpb.DecommissionRequest{
858856
NodeIDs: nodeIDs,
859857
TargetMembership: livenesspb.MembershipStatus_ACTIVE,
860858
}
861-
resp, err := c.Decommission(ctx, req)
859+
resp, err := adminClient.Decommission(ctx, req)
862860
if err != nil {
863861
cause := errors.UnwrapAll(err)
864862
// If it's a specific illegal membership transition error, we try to
@@ -922,21 +920,22 @@ func runDrain(cmd *cobra.Command, args []string) (err error) {
922920
}
923921

924922
// Establish a RPC connection.
925-
c, finish, err := getAdminClient(ctx, serverCfg)
923+
conn, finish, err := newClientConn(ctx, serverCfg)
926924
if err != nil {
927925
return err
928926
}
929927
defer finish()
930928

931-
if _, _, err := doDrain(ctx, c, targetNode); err != nil {
929+
adminClient := conn.NewAdminClient()
930+
if _, _, err := doDrain(ctx, adminClient, targetNode); err != nil {
932931
return err
933932
}
934933

935934
// Report "ok" if there was no error.
936935
fmt.Println("drain ok")
937936

938937
if drainCtx.shutdown {
939-
if _, err := doShutdown(ctx, c, targetNode); err != nil {
938+
if _, err := doShutdown(ctx, adminClient, targetNode); err != nil {
940939
return err
941940
}
942941
// Report "ok" if there was no error.

pkg/cli/rpc_client.go

Lines changed: 0 additions & 58 deletions
This file was deleted.

pkg/cli/rpc_clients.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2022 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package cli
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
12+
"github.com/cockroachdb/cockroach/pkg/rpc"
13+
"github.com/cockroachdb/cockroach/pkg/server"
14+
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
15+
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
16+
"github.com/cockroachdb/errors"
17+
"google.golang.org/grpc"
18+
)
19+
20+
// rpcConn defines a common interface for creating RPC clients. It hides the
21+
// underlying RPC connection (gRPC or DRPC), making it easy to swap
22+
// them without changing the caller code.
23+
type rpcConn interface {
24+
NewStatusClient() serverpb.StatusClient
25+
NewAdminClient() serverpb.AdminClient
26+
NewInitClient() serverpb.InitClient
27+
NewTimeSeriesClient() tspb.TimeSeriesClient
28+
NewInternalClient() kvpb.InternalClient
29+
}
30+
31+
// grpcConn is an implementation of rpcConn that provides methods to create
32+
// various RPC clients. This allows the CLI to interact with the server using
33+
// gRPC without exposing the underlying connection details.
34+
type grpcConn struct {
35+
conn *grpc.ClientConn
36+
}
37+
38+
func (c *grpcConn) NewStatusClient() serverpb.StatusClient {
39+
return serverpb.NewStatusClient(c.conn)
40+
}
41+
42+
func (c *grpcConn) NewAdminClient() serverpb.AdminClient {
43+
return serverpb.NewAdminClient(c.conn)
44+
}
45+
46+
func (c *grpcConn) NewInitClient() serverpb.InitClient {
47+
return serverpb.NewInitClient(c.conn)
48+
}
49+
50+
func (c *grpcConn) NewTimeSeriesClient() tspb.TimeSeriesClient {
51+
return tspb.NewTimeSeriesClient(c.conn)
52+
}
53+
54+
func (c *grpcConn) NewInternalClient() kvpb.InternalClient {
55+
return kvpb.NewInternalClient(c.conn)
56+
}
57+
58+
func makeRPCClientConfig(cfg server.Config) rpc.ClientConnConfig {
59+
var knobs rpc.ContextTestingKnobs
60+
if sknobs := cfg.TestingKnobs.Server; sknobs != nil {
61+
knobs = sknobs.(*server.TestingKnobs).ContextTestingKnobs
62+
}
63+
return rpc.MakeClientConnConfigFromBaseConfig(
64+
*cfg.Config,
65+
cfg.Config.User,
66+
cfg.BaseConfig.Tracer,
67+
cfg.BaseConfig.Settings,
68+
nil, /* clock */
69+
knobs,
70+
)
71+
}
72+
73+
func newClientConn(ctx context.Context, cfg server.Config) (rpcConn, func(), error) {
74+
ccfg := makeRPCClientConfig(cfg)
75+
cc, finish, err := rpc.NewClientConn(ctx, ccfg)
76+
return &grpcConn{conn: cc}, finish, errors.Wrap(err, "failed to connect to the node")
77+
}
78+
79+
func getClientGRPCConn(ctx context.Context, cfg server.Config) (*grpc.ClientConn, func(), error) {
80+
ccfg := makeRPCClientConfig(cfg)
81+
return rpc.NewClientConn(ctx, ccfg)
82+
}
83+
84+
// getAdminClient returns an AdminClient and a closure that must be invoked
85+
// to free associated resources.
86+
func getAdminClient(ctx context.Context, cfg server.Config) (serverpb.AdminClient, func(), error) {
87+
conn, finish, err := getClientGRPCConn(ctx, cfg)
88+
if err != nil {
89+
return nil, nil, errors.Wrap(err, "failed to connect to the node")
90+
}
91+
return serverpb.NewAdminClient(conn), finish, nil
92+
}

0 commit comments

Comments
 (0)