Skip to content

Commit 9aa122b

Browse files
cli,kv,server: consolidate AdminClient RPC client creation
This commit consolidates `AdminClient` RPC client creation logic in cli, kv, and server packages. It is a continuation of the work done in #147606. Epic: CRDB-48923 Fixes: #148158 Release note: none
1 parent 98ab43e commit 9aa122b

File tree

12 files changed

+140
-48
lines changed

12 files changed

+140
-48
lines changed

pkg/cli/debug_recover_loss_of_quorum.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,11 +308,12 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
308308
var stats loqrecovery.CollectionStats
309309

310310
if len(debugRecoverCollectInfoOpts.Stores.Specs) == 0 {
311-
c, finish, err := getAdminClient(ctx, serverCfg)
311+
c, finish, err := dialAdminClient(ctx, serverCfg)
312312
if err != nil {
313313
return errors.Wrapf(err, "failed to get admin connection to cluster")
314314
}
315315
defer finish()
316+
316317
replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c,
317318
debugRecoverCollectInfoOpts.maxConcurrency, stderr /* logOutput */)
318319
if err != nil {
@@ -426,11 +427,12 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error {
426427
if len(args) == 0 {
427428
// If no replica info is provided, try to connect to a cluster default or
428429
// explicitly provided to retrieve replica info.
429-
c, finish, err := getAdminClient(ctx, serverCfg)
430+
c, finish, err := dialAdminClient(ctx, serverCfg)
430431
if err != nil {
431432
return errors.Wrapf(err, "failed to get admin connection to cluster")
432433
}
433434
defer finish()
435+
434436
replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c,
435437
debugRecoverPlanOpts.maxConcurrency, stderr /* logOutput */)
436438
if err != nil {
@@ -675,7 +677,7 @@ func stageRecoveryOntoCluster(
675677
ignoreInternalVersion bool,
676678
maxConcurrency int,
677679
) error {
678-
c, finish, err := getAdminClient(ctx, serverCfg)
680+
c, finish, err := dialAdminClient(ctx, serverCfg)
679681
if err != nil {
680682
return errors.Wrapf(err, "failed to get admin connection to cluster")
681683
}
@@ -686,6 +688,7 @@ func stageRecoveryOntoCluster(
686688
nodeID roachpb.NodeID
687689
planID string
688690
}
691+
689692
vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{
690693
MaxConcurrency: int32(maxConcurrency),
691694
})
@@ -948,11 +951,12 @@ func runDebugVerify(cmd *cobra.Command, args []string) error {
948951
_, _ = fmt.Printf("Checking application of recovery plan %s\n", updatePlan.PlanID)
949952
}
950953

951-
c, finish, err := getAdminClient(ctx, serverCfg)
954+
c, finish, err := dialAdminClient(ctx, serverCfg)
952955
if err != nil {
953956
return errors.Wrapf(err, "failed to get admin connection to cluster")
954957
}
955958
defer finish()
959+
956960
req := serverpb.RecoveryVerifyRequest{
957961
DecommissionedNodeIDs: updatePlan.DecommissionedNodeIDs,
958962
MaxReportedRanges: 20,

pkg/cli/debug_reset_quorum.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ func runDebugResetQuorum(cmd *cobra.Command, args []string) error {
4545
}
4646

4747
// Set up GRPC Connection for running ResetQuorum.
48-
cc, finish, err := getClientGRPCConn(ctx, serverCfg)
48+
conn, finish, err := newClientConn(ctx, serverCfg)
4949
if err != nil {
5050
log.Errorf(ctx, "connection to server failed: %v", err)
5151
return err
5252
}
5353
defer finish()
5454

5555
// Call ResetQuorum to reset quorum for given range on target node.
56-
_, err = kvpb.NewInternalClient(cc).ResetQuorum(ctx, &kvpb.ResetQuorumRequest{
56+
_, err = conn.NewInternalClient().ResetQuorum(ctx, &kvpb.ResetQuorumRequest{
5757
RangeID: int32(rangeID),
5858
})
5959
if err != nil {

pkg/cli/debug_send_kv_batch.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,11 @@ func runSendKVBatch(cmd *cobra.Command, args []string) error {
166166
// Send BatchRequest.
167167
ctx, cancel := context.WithCancel(context.Background())
168168
defer cancel()
169-
conn, finish, err := getClientGRPCConn(ctx, serverCfg)
169+
admin, finish, err := dialAdminClient(ctx, serverCfg)
170170
if err != nil {
171171
return errors.Wrap(err, "failed to connect to the node")
172172
}
173173
defer finish()
174-
admin := serverpb.NewAdminClient(conn)
175174

176175
br, rec, err := sendKVBatchRequestWithTracingOption(ctx, enableTracing, admin, &ba)
177176
if err != nil {

pkg/cli/init.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func dialAndCheckHealth(ctx context.Context) error {
8989
// (Attempt to) establish the gRPC connection. If that fails,
9090
// it may be that the server hasn't started to listen yet, in
9191
// which case we'll retry.
92-
conn, finish, err := getClientGRPCConn(ctx, serverCfg)
92+
conn, finish, err := newClientConn(ctx, serverCfg)
9393
if err != nil {
9494
return err
9595
}
@@ -98,7 +98,7 @@ func dialAndCheckHealth(ctx context.Context) error {
9898
// Access the /health endpoint. Until/unless this succeeds, the
9999
// node is not yet fully initialized and ready to accept
100100
// Bootstrap requests.
101-
ac := serverpb.NewAdminClient(conn)
101+
ac := conn.NewAdminClient()
102102
_, err = ac.Health(ctx, &serverpb.HealthRequest{})
103103
return err
104104
}

pkg/cli/rpc_clients.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,20 +73,18 @@ func makeRPCClientConfig(cfg server.Config) rpc.ClientConnConfig {
7373
func newClientConn(ctx context.Context, cfg server.Config) (rpcConn, func(), error) {
7474
ccfg := makeRPCClientConfig(cfg)
7575
cc, finish, err := rpc.NewClientConn(ctx, ccfg)
76-
return &grpcConn{conn: cc}, finish, errors.Wrap(err, "failed to connect to the node")
77-
}
78-
79-
func getClientGRPCConn(ctx context.Context, cfg server.Config) (*grpc.ClientConn, func(), error) {
80-
ccfg := makeRPCClientConfig(cfg)
81-
return rpc.NewClientConn(ctx, ccfg)
76+
if err != nil {
77+
return nil, nil, errors.Wrap(err, "failed to connect to the node")
78+
}
79+
return &grpcConn{conn: cc}, finish, nil
8280
}
8381

84-
// getAdminClient returns an AdminClient and a closure that must be invoked
85-
// to free associated resources.
86-
func getAdminClient(ctx context.Context, cfg server.Config) (serverpb.AdminClient, func(), error) {
87-
conn, finish, err := getClientGRPCConn(ctx, cfg)
82+
// dialAdminClient dials a client connection and returns an AdminClient and a
83+
// closure that must be invoked to free associated resources.
84+
func dialAdminClient(ctx context.Context, cfg server.Config) (serverpb.AdminClient, func(), error) {
85+
cc, finish, err := newClientConn(ctx, cfg)
8886
if err != nil {
89-
return nil, nil, errors.Wrap(err, "failed to connect to the node")
87+
return nil, nil, err
9088
}
91-
return serverpb.NewAdminClient(conn), finish, nil
89+
return cc.NewAdminClient(), finish, nil
9290
}

pkg/kv/kvclient/kvtenant/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ go_library(
2626
"//pkg/roachpb",
2727
"//pkg/rpc",
2828
"//pkg/rpc/nodedialer",
29+
"//pkg/rpc/rpcbase",
2930
"//pkg/server/serverpb",
3031
"//pkg/server/settingswatcher",
3132
"//pkg/settings",

pkg/kv/kvclient/kvtenant/connector.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/roachpb"
2929
"github.com/cockroachdb/cockroach/pkg/rpc"
3030
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
31+
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
3132
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
3233
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
3334
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -975,17 +976,19 @@ func (c *connector) dialAddrs(ctx context.Context) (*client, error) {
975976
// Try each address on each retry iteration (in random order).
976977
for _, i := range rand.Perm(len(c.addrs)) {
977978
addr := c.addrs[i]
978-
conn, err := c.dialAddr(ctx, addr)
979-
if err != nil {
980-
log.Warningf(ctx, "error dialing tenant KV address %s: %v", addr, err)
981-
continue
979+
if !rpcbase.TODODRPC {
980+
conn, err := c.dialAddr(ctx, addr)
981+
if err != nil {
982+
log.Warningf(ctx, "error dialing tenant KV address %s: %v", addr, err)
983+
continue
984+
}
985+
return &client{
986+
InternalClient: kvpb.NewInternalClient(conn),
987+
StatusClient: serverpb.NewStatusClient(conn),
988+
AdminClient: serverpb.NewAdminClient(conn),
989+
TimeSeriesClient: tspb.NewTimeSeriesClient(conn),
990+
}, nil
982991
}
983-
return &client{
984-
InternalClient: kvpb.NewInternalClient(conn),
985-
StatusClient: serverpb.NewStatusClient(conn),
986-
AdminClient: serverpb.NewAdminClient(conn),
987-
TimeSeriesClient: tspb.NewTimeSeriesClient(conn),
988-
}, nil
989992
}
990993
}
991994
return nil, errors.Wrap(ctx.Err(), "dial addrs")

pkg/kv/kvserver/loqrecovery/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ go_library(
5353
"@com_github_cockroachdb_errors//:errors",
5454
"@com_github_cockroachdb_pebble//vfs",
5555
"@com_github_cockroachdb_redact//:redact",
56-
"@org_golang_google_grpc//:grpc",
5756
"@org_golang_x_sync//errgroup",
5857
],
5958
)

pkg/kv/kvserver/loqrecovery/server.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"fmt"
1111
"io"
12+
"net"
1213
"sync/atomic"
1314
"time"
1415

@@ -35,7 +36,6 @@ import (
3536
"github.com/cockroachdb/errors"
3637
"github.com/cockroachdb/redact"
3738
"golang.org/x/sync/errgroup"
38-
"google.golang.org/grpc"
3939
)
4040

4141
const rangeMetadataScanChunkSize = 100
@@ -720,6 +720,22 @@ func makeVisitAvailableNodesInParallel(
720720
return err
721721
}
722722

723+
// Initialize a nodedialer to establish connections with nodes. We'll
724+
// create a custom resolver that uses the already available node list,
725+
// which is more efficient than fetching node information again. Node
726+
// dialer allows us to reuse utility methods to create RPC connections.
727+
resolver := func(nodeID roachpb.NodeID) (net.Addr, roachpb.Locality, error) {
728+
for _, node := range nodes {
729+
if node.NodeID == nodeID {
730+
addr := node.AddressForLocality(loc)
731+
return addr, node.Locality, nil
732+
}
733+
}
734+
// This should not happen since the visitor visits the exact same nodes.
735+
return nil, roachpb.Locality{}, errors.Newf("node n%d not found in gossip", nodeID)
736+
}
737+
nd := nodedialer.New(rpcCtx, resolver)
738+
723739
var g errgroup.Group
724740
if maxConcurrency == 0 {
725741
// "A value of 0 disables concurrency."
@@ -729,7 +745,7 @@ func makeVisitAvailableNodesInParallel(
729745
for _, node := range nodes {
730746
node := node // copy for closure
731747
g.Go(func() error {
732-
return visitNodeWithRetry(ctx, loc, rpcCtx, retryOpts, visitor, node)
748+
return visitNodeWithRetry(ctx, nd, retryOpts, visitor, node)
733749
})
734750
}
735751
return g.Wait()
@@ -738,21 +754,19 @@ func makeVisitAvailableNodesInParallel(
738754

739755
func visitNodeWithRetry(
740756
ctx context.Context,
741-
loc roachpb.Locality,
742-
rpcCtx *rpc.Context,
757+
nd rpcbase.NodeDialerNoBreaker,
743758
retryOpts retry.Options,
744759
visitor visitNodeAdminFn,
745760
node roachpb.NodeDescriptor,
746761
) error {
747762
var err error
763+
var ac serverpb.AdminClient
748764
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
749765
log.Infof(ctx, "visiting node n%d, attempt %d", node.NodeID, r.CurrentAttempt())
750-
addr := node.AddressForLocality(loc)
751-
var conn *grpc.ClientConn
752766
// Note that we use ConnectNoBreaker here to avoid any race with probe
753767
// running on current node and target node restarting. Errors from circuit
754768
// breaker probes could confuse us and present node as unavailable.
755-
conn, _, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpcbase.DefaultClass).ConnectNoBreaker(ctx)
769+
ac, err = serverpb.DialAdminClientNoBreaker(nd, ctx, node.NodeID)
756770
// Nodes would contain dead nodes that we don't need to visit. We can skip
757771
// them and let caller handle incomplete info.
758772
if err != nil {
@@ -765,8 +779,7 @@ func visitNodeWithRetry(
765779
// live.
766780
continue
767781
}
768-
client := serverpb.NewAdminClient(conn)
769-
err = visitor(node.NodeID, client)
782+
err = visitor(node.NodeID, ac)
770783
if err == nil {
771784
return nil
772785
}

pkg/server/admin.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3723,11 +3723,7 @@ func (s *adminServer) queryTableID(
37233723
func (s *adminServer) dialNode(
37243724
ctx context.Context, nodeID roachpb.NodeID,
37253725
) (serverpb.AdminClient, error) {
3726-
conn, err := s.serverIterator.dialNode(ctx, serverID(nodeID))
3727-
if err != nil {
3728-
return nil, err
3729-
}
3730-
return serverpb.NewAdminClient(conn), nil
3726+
return serverpb.DialAdminClient(s.nd, ctx, nodeID)
37313727
}
37323728

37333729
func (s *adminServer) ListTracingSnapshots(

0 commit comments

Comments
 (0)