Skip to content

Commit c93eab1

Browse files
craig[bot]cthumuluru-crdb
andcommitted
Merge #153034
153034: kv,server,upgrade: use generic dialers to create RPC clients r=cthumuluru-crdb a=cthumuluru-crdb Changes in this PR are a followup to #152948. Epic: CRDB-48935 Informs: None Release note: None Co-authored-by: Chandra Thumuluru <[email protected]>
2 parents 59c096a + 9fe154b commit c93eab1

File tree

11 files changed

+52
-80
lines changed

11 files changed

+52
-80
lines changed

pkg/kv/kvserver/loqrecovery/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ func makeVisitAvailableNodesInParallel(
758758

759759
func visitNodeWithRetry(
760760
ctx context.Context,
761-
nd rpcbase.NodeDialerNoBreaker,
761+
nd *nodedialer.Dialer,
762762
retryOpts retry.Options,
763763
visitor visitNodeAdminFn,
764764
node roachpb.NodeDescriptor,
@@ -807,7 +807,7 @@ func visitNodeWithRetry(
807807
//
808808
// For latter, errors marked with errMarkRetry marker are retried. It is up
809809
// to the visitor to mark appropriate errors are retryable.
810-
func makeVisitNode(nd rpcbase.NodeDialerNoBreaker) visitNodeStatusFn {
810+
func makeVisitNode(nd *nodedialer.Dialer) visitNodeStatusFn {
811811
return func(ctx context.Context, nodeID roachpb.NodeID, retryOpts retry.Options,
812812
visitor func(client serverpb.RPCStatusClient) error,
813813
) error {

pkg/server/admin.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
3535
"github.com/cockroachdb/cockroach/pkg/roachpb"
3636
"github.com/cockroachdb/cockroach/pkg/rpc"
37-
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
3837
"github.com/cockroachdb/cockroach/pkg/security/username"
3938
"github.com/cockroachdb/cockroach/pkg/server/apiconstants"
4039
"github.com/cockroachdb/cockroach/pkg/server/authserver"
@@ -116,7 +115,7 @@ type adminServer struct {
116115
statsLimiter *quotapool.IntPool
117116
st *cluster.Settings
118117
serverIterator ServerIterator
119-
nd rpcbase.NodeDialer
118+
nd *nodeDialer
120119
distSender *kvcoord.DistSender
121120
rpcContext *rpc.Context
122121
clock *hlc.Clock
@@ -221,7 +220,7 @@ func newAdminServer(
221220
),
222221
st: cs,
223222
serverIterator: serverIterator,
224-
nd: &nodeDialer{si: serverIterator},
223+
nd: &nodeDialer{cs: cs, si: serverIterator},
225224
distSender: distSender,
226225
rpcContext: rpcCtx,
227226
clock: clock,
@@ -1357,7 +1356,7 @@ func (s *adminServer) statsForSpan(
13571356
var spanResponse *roachpb.SpanStatsResponse
13581357
err := timeutil.RunWithTimeout(ctx, "request remote stats", 20*time.Second,
13591358
func(ctx context.Context) error {
1360-
client, err := serverpb.DialStatusClient(s.nd, ctx, nodeID)
1359+
client, err := serverpb.DialStatusClient(s.nd, ctx, nodeID, s.nd.cs)
13611360
if err == nil {
13621361
req := roachpb.SpanStatsRequest{
13631362
Spans: []roachpb.Span{span},
@@ -3770,7 +3769,7 @@ func (s *adminServer) queryTableID(
37703769
func (s *adminServer) dialNode(
37713770
ctx context.Context, nodeID roachpb.NodeID,
37723771
) (serverpb.RPCAdminClient, error) {
3773-
return serverpb.DialAdminClient(s.nd, ctx, nodeID)
3772+
return serverpb.DialAdminClient(s.nd, ctx, nodeID, s.nd.cs)
37743773
}
37753774

37763775
func (s *adminServer) ListTracingSnapshots(

pkg/server/fanout_clients.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type ServerIterator interface {
8686
// implementation that can be use to create RPC clients. nodeDialer allows
8787
// reusing utity function in serverpb package to create RPC clients.
8888
type nodeDialer struct {
89+
cs *cluster.Settings
8990
si ServerIterator
9091
}
9192

pkg/server/server_sql.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,13 +1270,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
12701270
Dialer: cfg.kvNodeDialer,
12711271
RangeDescScanner: rangedesc.NewScanner(cfg.db),
12721272
DB: cfg.db,
1273+
Settings: cfg.Settings,
12731274
})
12741275
} else {
12751276
c = upgradecluster.NewTenantCluster(
12761277
upgradecluster.TenantClusterConfig{
12771278
Dialer: cfg.sqlInstanceDialer,
12781279
InstanceReader: cfg.sqlInstanceReader,
12791280
DB: cfg.db,
1281+
Settings: cfg.Settings,
12801282
})
12811283
}
12821284
systemDeps = upgrade.SystemDeps{

pkg/server/serverpb/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ go_library(
113113
deps = [
114114
"//pkg/gossip",
115115
"//pkg/roachpb",
116+
"//pkg/rpc/nodedialer",
116117
"//pkg/rpc/rpcbase",
118+
"//pkg/settings/cluster",
117119
"//pkg/util/errorutil",
118120
"//pkg/util/metric",
119121
"@com_github_prometheus_client_model//go",

pkg/server/serverpb/rpc_clients.go

Lines changed: 20 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -9,110 +9,62 @@ import (
99
context "context"
1010

1111
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
12+
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
1213
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
14+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1315
)
1416

1517
// DialMigrationClient establishes a DRPC connection if enabled; otherwise,
1618
// it falls back to gRPC. The established connection is used to create a
1719
// RPCMigrationClient.
1820
func DialMigrationClient(
19-
nd rpcbase.NodeDialer, ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
21+
nd rpcbase.NodeDialer,
22+
ctx context.Context,
23+
nodeID roachpb.NodeID,
24+
class rpcbase.ConnectionClass,
25+
cs *cluster.Settings,
2026
) (RPCMigrationClient, error) {
21-
if !rpcbase.TODODRPC {
22-
conn, err := nd.Dial(ctx, nodeID, class)
23-
if err != nil {
24-
return nil, err
25-
}
26-
return NewGRPCMigrationClientAdapter(conn), nil
27-
}
28-
conn, err := nd.DRPCDial(ctx, nodeID, class)
29-
if err != nil {
30-
return nil, err
31-
}
32-
return NewDRPCMigrationClientAdapter(conn), nil
27+
return rpcbase.DialRPCClient(nd, ctx, nodeID, class, NewGRPCMigrationClientAdapter, NewDRPCMigrationClientAdapter, cs)
3328
}
3429

3530
// DialStatusClientNoBreaker establishes a DRPC connection if enabled;
3631
// otherwise, it falls back to gRPC. The established connection is used
3732
// to create a StatusClient. This method is same as DialStatusClient, but it
3833
// does not check the breaker before dialing the connection.
3934
func DialStatusClientNoBreaker(
40-
nd rpcbase.NodeDialerNoBreaker,
41-
ctx context.Context,
42-
nodeID roachpb.NodeID,
43-
class rpcbase.ConnectionClass,
35+
nd *nodedialer.Dialer, ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
4436
) (RPCStatusClient, error) {
45-
if !rpcbase.TODODRPC {
46-
conn, err := nd.DialNoBreaker(ctx, nodeID, class)
47-
if err != nil {
48-
return nil, err
49-
}
50-
return NewGRPCStatusClientAdapter(conn), nil
51-
}
52-
conn, err := nd.DRPCDialNoBreaker(ctx, nodeID, class)
53-
if err != nil {
54-
return nil, err
55-
}
56-
return NewDRPCStatusClientAdapter(conn), nil
37+
return nodedialer.DialRPCClientNoBreaker(nd, ctx, nodeID, class,
38+
NewGRPCStatusClientAdapter, NewDRPCStatusClientAdapter)
5739
}
5840

5941
// DialStatusClient establishes a DRPC connection if enabled; otherwise, it
6042
// falls back to gRPC. The established connection is used to create a
6143
// RPCStatusClient.
6244
func DialStatusClient(
63-
nd rpcbase.NodeDialer, ctx context.Context, nodeID roachpb.NodeID,
45+
nd rpcbase.NodeDialer, ctx context.Context, nodeID roachpb.NodeID, cs *cluster.Settings,
6446
) (RPCStatusClient, error) {
65-
if !rpcbase.TODODRPC {
66-
conn, err := nd.Dial(ctx, nodeID, rpcbase.DefaultClass)
67-
if err != nil {
68-
return nil, err
69-
}
70-
return NewGRPCStatusClientAdapter(conn), nil
71-
}
72-
conn, err := nd.DRPCDial(ctx, nodeID, rpcbase.DefaultClass)
73-
if err != nil {
74-
return nil, err
75-
}
76-
return NewDRPCStatusClientAdapter(conn), nil
47+
return rpcbase.DialRPCClient(nd, ctx, nodeID, rpcbase.DefaultClass,
48+
NewGRPCStatusClientAdapter, NewDRPCStatusClientAdapter, cs)
7749
}
7850

7951
// DialAdminClient establishes a DRPC connection if enabled; otherwise, it
8052
// falls back to gRPC. The established connection is used to create a
8153
// RPCAdminClient.
8254
func DialAdminClient(
83-
nd rpcbase.NodeDialer, ctx context.Context, nodeID roachpb.NodeID,
55+
nd rpcbase.NodeDialer, ctx context.Context, nodeID roachpb.NodeID, cs *cluster.Settings,
8456
) (RPCAdminClient, error) {
85-
if !rpcbase.TODODRPC {
86-
conn, err := nd.Dial(ctx, nodeID, rpcbase.DefaultClass)
87-
if err != nil {
88-
return nil, err
89-
}
90-
return NewGRPCAdminClientAdapter(conn), nil
91-
}
92-
conn, err := nd.DRPCDial(ctx, nodeID, rpcbase.DefaultClass)
93-
if err != nil {
94-
return nil, err
95-
}
96-
return NewDRPCAdminClientAdapter(conn), nil
57+
return rpcbase.DialRPCClient(nd, ctx, nodeID, rpcbase.DefaultClass,
58+
NewGRPCAdminClientAdapter, NewDRPCAdminClientAdapter, cs)
9759
}
9860

9961
// DialAdminClientNoBreaker establishes a DRPC connection if enabled;
10062
// otherwise, it falls back to gRPC. The established connection is used to
10163
// create a AdminClient. This method is same as DialAdminClient, but it
10264
// does not check the breaker before dialing the connection.
10365
func DialAdminClientNoBreaker(
104-
nd rpcbase.NodeDialerNoBreaker, ctx context.Context, nodeID roachpb.NodeID,
66+
nd *nodedialer.Dialer, ctx context.Context, nodeID roachpb.NodeID,
10567
) (RPCAdminClient, error) {
106-
if !rpcbase.TODODRPC {
107-
conn, err := nd.DialNoBreaker(ctx, nodeID, rpcbase.DefaultClass)
108-
if err != nil {
109-
return nil, err
110-
}
111-
return NewGRPCAdminClientAdapter(conn), nil
112-
}
113-
conn, err := nd.DRPCDialNoBreaker(ctx, nodeID, rpcbase.DefaultClass)
114-
if err != nil {
115-
return nil, err
116-
}
117-
return NewDRPCAdminClientAdapter(conn), nil
68+
return nodedialer.DialRPCClientNoBreaker(nd, ctx, nodeID, rpcbase.DefaultClass,
69+
NewGRPCAdminClientAdapter, NewDRPCAdminClientAdapter)
11870
}

pkg/server/status.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ type baseStatusServer struct {
150150
rpcCtx *rpc.Context
151151
stopper *stop.Stopper
152152
serverIterator ServerIterator
153-
nd rpcbase.NodeDialer
153+
nd *nodeDialer
154154
clock *hlc.Clock
155155
}
156156

@@ -649,7 +649,7 @@ func newStatusServer(
649649
rpcCtx: rpcCtx,
650650
stopper: stopper,
651651
serverIterator: serverIterator,
652-
nd: &nodeDialer{si: serverIterator},
652+
nd: &nodeDialer{cs: st, si: serverIterator},
653653
clock: clock,
654654
},
655655
cfg: cfg,
@@ -776,7 +776,7 @@ func (s *statusServer) dialNode(
776776
return nil, err
777777
}
778778
}
779-
return serverpb.DialStatusClient(s.nd, ctx, nodeID)
779+
return serverpb.DialStatusClient(s.nd, ctx, nodeID, s.nd.cs)
780780
}
781781

782782
// Gossip returns current state of gossip information on the given node

pkg/upgrade/upgradecluster/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ go_library(
1717
"//pkg/rpc",
1818
"//pkg/rpc/rpcbase",
1919
"//pkg/server/serverpb",
20+
"//pkg/settings/cluster",
2021
"//pkg/sql/sqlinstance",
2122
"//pkg/sql/sqlinstance/instancestorage",
2223
"//pkg/util/ctxgroup",
@@ -47,6 +48,7 @@ go_test(
4748
"//pkg/security/securitytest",
4849
"//pkg/server",
4950
"//pkg/server/serverpb",
51+
"//pkg/settings/cluster",
5052
"//pkg/testutils",
5153
"//pkg/testutils/serverutils",
5254
"//pkg/testutils/testcluster",

pkg/upgrade/upgradecluster/cluster.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/roachpb"
1616
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1717
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
18+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1819
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
1920
"github.com/cockroachdb/cockroach/pkg/util/log"
2021
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
@@ -47,6 +48,9 @@ type ClusterConfig struct {
4748
// to expose only relevant, vetted bits of kv.DB. It'll make our tests less
4849
// "integration-ey".
4950
DB *kv.DB
51+
52+
// Cluster settings allow access to version and other settings.
53+
Settings *cluster.Settings
5054
}
5155

5256
// New constructs a new Cluster with the provided dependencies.
@@ -132,7 +136,7 @@ func (c *Cluster) ForEveryNodeOrServer(
132136
grp.GoCtx(func(ctx context.Context) error {
133137
defer alloc.Release()
134138

135-
client, err := serverpb.DialMigrationClient(c.c.Dialer, ctx, node.ID, rpcbase.DefaultClass)
139+
client, err := serverpb.DialMigrationClient(c.c.Dialer, ctx, node.ID, rpcbase.DefaultClass, c.c.Settings)
136140
if err != nil {
137141
return err
138142
}

pkg/upgrade/upgradecluster/helper_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/roachpb"
1515
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1616
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
17+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1718
"github.com/cockroachdb/cockroach/pkg/testutils"
1819
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1920
"github.com/cockroachdb/cockroach/pkg/util/retry"
@@ -52,6 +53,7 @@ func TestHelperEveryNode(t *testing.T) {
5253
h := New(ClusterConfig{
5354
NodeLiveness: tc,
5455
Dialer: NoopDialer{},
56+
Settings: cluster.MakeClusterSettings(),
5557
})
5658
opCount := 0
5759
err := h.UntilClusterStable(ctx, retry.Options{
@@ -91,6 +93,7 @@ func TestHelperEveryNode(t *testing.T) {
9193
h := New(ClusterConfig{
9294
NodeLiveness: tc,
9395
Dialer: NoopDialer{},
96+
Settings: cluster.MakeClusterSettings(),
9497
})
9598
opCount := 0
9699
err := h.UntilClusterStable(ctx, retry.Options{
@@ -131,6 +134,7 @@ func TestHelperEveryNode(t *testing.T) {
131134
h := New(ClusterConfig{
132135
NodeLiveness: tc,
133136
Dialer: NoopDialer{},
137+
Settings: cluster.MakeClusterSettings(),
134138
})
135139
expRe := "cluster not stable, nodes: n\\{1,2,3\\}, unavailable: n\\{2\\}"
136140
opCount := 0

0 commit comments

Comments
 (0)