Skip to content

Commit 67de345

Browse files
server,sql: use generic dialers to create RPC clients
Epic: CRDB-48935 Informs: None Release note: None
1 parent c93eab1 commit 67de345

File tree

7 files changed

+35
-32
lines changed

7 files changed

+35
-32
lines changed

pkg/sql/colflow/colrpc/colrpc_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/base"
1919
"github.com/cockroachdb/cockroach/pkg/col/coldata"
2020
"github.com/cockroachdb/cockroach/pkg/col/coldatatestutils"
21+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2122
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
2223
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
2324
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
@@ -866,7 +867,12 @@ func TestOutboxStreamIDPropagation(t *testing.T) {
866867
outboxMemAcc := testMemMonitor.MakeBoundAccount()
867868
defer outboxMemAcc.Close(ctx)
868869
outbox, err := NewOutbox(
869-
&execinfra.FlowCtx{Gateway: false},
870+
&execinfra.FlowCtx{
871+
Gateway: false,
872+
Cfg: &execinfra.ServerConfig{
873+
Settings: cluster.MakeTestingClusterSettings(),
874+
},
875+
},
870876
0, /* processorID */
871877
colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory),
872878
testMemAcc, colexecargs.OpWithMetaInfo{Root: input}, typs, nil, /* getStats */

pkg/sql/colflow/colrpc/outbox.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func (o *Outbox) Run(
180180

181181
var stream execinfrapb.RPCDistSQL_FlowStreamClient
182182
if err := func() error {
183-
client, err := execinfra.GetDistSQLClientForOutbox(ctx, dialer, sqlInstanceID, connectionTimeout)
183+
client, err := execinfra.GetDistSQLClientForOutbox(ctx, dialer, o.flowCtx.Cfg.Settings, sqlInstanceID, connectionTimeout)
184184
if err != nil {
185185
log.Dev.VWarningf(ctx, 1, "Outbox Dial connection error, distributed query will fail: %+v", err)
186186
return err

pkg/sql/colflow/vectorized_flow_shutdown_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ func TestVectorizedFlowShutdown(t *testing.T) {
136136
flowCtx := &execinfra.FlowCtx{
137137
EvalCtx: &evalCtx,
138138
Mon: evalCtx.TestingMon,
139-
Cfg: &execinfra.ServerConfig{Settings: st},
139+
Cfg: &execinfra.ServerConfig{
140+
Settings: st,
141+
},
140142
}
141143
rng, _ := randutil.NewTestRand()
142144
var (
@@ -194,7 +196,12 @@ func TestVectorizedFlowShutdown(t *testing.T) {
194196
toDrain[i] = createMetadataSourceForID(i)
195197
}
196198
hashRouter, hashRouterOutputs := colflow.NewHashRouter(
197-
&execinfra.FlowCtx{Gateway: false},
199+
&execinfra.FlowCtx{
200+
Gateway: false,
201+
Cfg: &execinfra.ServerConfig{
202+
Settings: st,
203+
},
204+
},
198205
0, /* processorID */
199206
allocators,
200207
colexecargs.OpWithMetaInfo{
@@ -238,7 +245,12 @@ func TestVectorizedFlowShutdown(t *testing.T) {
238245
outboxMetadataSources []colexecop.MetadataSource,
239246
) {
240247
outbox, err := colrpc.NewOutbox(
241-
&execinfra.FlowCtx{Gateway: false},
248+
&execinfra.FlowCtx{
249+
Gateway: false,
250+
Cfg: &execinfra.ServerConfig{
251+
Settings: st,
252+
},
253+
},
242254
0, /* processorID */
243255
colmem.NewAllocator(outboxCtx, outboxMemAcc, testColumnFactory),
244256
outboxConverterMemAcc,

pkg/sql/execinfra/outboxbase.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/cockroach/pkg/base"
1313
"github.com/cockroachdb/cockroach/pkg/roachpb"
1414
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
15+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1516
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
1617
"github.com/cockroachdb/cockroach/pkg/util/retry"
1718
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -30,12 +31,13 @@ import (
3031
func GetDistSQLClientForOutbox(
3132
ctx context.Context,
3233
dialer rpcbase.NodeDialerNoBreaker,
34+
cs *cluster.Settings,
3335
sqlInstanceID base.SQLInstanceID,
3436
timeout time.Duration,
3537
) (client execinfrapb.RPCDistSQLClient, err error) {
3638
firstConnectionAttempt := timeutil.Now()
3739
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
38-
client, err = execinfrapb.DialDistSQLClientNoBreaker(dialer, ctx, roachpb.NodeID(sqlInstanceID), rpcbase.DefaultClass)
40+
client, err = execinfrapb.DialDistSQLClientNoBreaker(dialer, ctx, roachpb.NodeID(sqlInstanceID), rpcbase.DefaultClass, cs)
3941
if err == nil || timeutil.Since(firstConnectionAttempt) > timeout {
4042
break
4143
}

pkg/sql/execinfrapb/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ go_library(
2020
deps = [
2121
"//pkg/base",
2222
"//pkg/roachpb",
23+
"//pkg/rpc/nodedialer",
2324
"//pkg/rpc/rpcbase",
2425
"//pkg/security/username",
26+
"//pkg/settings/cluster",
2527
"//pkg/sql/catalog/catalogkeys",
2628
"//pkg/sql/catalog/catenumpb",
2729
"//pkg/sql/catalog/colinfo",

pkg/sql/execinfrapb/rpc_clients.go

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,18 @@ import (
99
context "context"
1010

1111
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
12+
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
1213
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
14+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1315
)
1416

1517
// DialDistSQLClient establishes a DRPC connection if enabled; otherwise,
1618
// it falls back to gRPC. The established connection is used to create a
1719
// RPCDistSQLClient.
1820
func DialDistSQLClient(
19-
nd rpcbase.NodeDialer, ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
21+
nd *nodedialer.Dialer, ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
2022
) (RPCDistSQLClient, error) {
21-
if !rpcbase.TODODRPC {
22-
conn, err := nd.Dial(ctx, nodeID, class)
23-
if err != nil {
24-
return nil, err
25-
}
26-
return NewGRPCDistSQLClientAdapter(conn), nil
27-
}
28-
conn, err := nd.DRPCDial(ctx, nodeID, class)
29-
if err != nil {
30-
return nil, err
31-
}
32-
return NewDRPCDistSQLClientAdapter(conn), nil
23+
return nodedialer.DialRPCClient(nd, ctx, nodeID, class, NewGRPCDistSQLClientAdapter, NewDRPCDistSQLClientAdapter)
3324
}
3425

3526
// DialDistSQLClientNoBreaker establishes a DRPC connection if enabled;
@@ -41,17 +32,7 @@ func DialDistSQLClientNoBreaker(
4132
ctx context.Context,
4233
nodeID roachpb.NodeID,
4334
class rpcbase.ConnectionClass,
35+
cs *cluster.Settings,
4436
) (RPCDistSQLClient, error) {
45-
if !rpcbase.TODODRPC {
46-
conn, err := nd.DialNoBreaker(ctx, nodeID, class)
47-
if err != nil {
48-
return nil, err
49-
}
50-
return NewGRPCDistSQLClientAdapter(conn), nil
51-
}
52-
conn, err := nd.DRPCDialNoBreaker(ctx, nodeID, class)
53-
if err != nil {
54-
return nil, err
55-
}
56-
return NewDRPCDistSQLClientAdapter(conn), nil
37+
return rpcbase.DialRPCClientNoBreaker(nd, ctx, nodeID, class, NewGRPCDistSQLClientAdapter, NewDRPCDistSQLClientAdapter, cs)
5738
}

pkg/sql/flowinfra/outbox.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error
232232

233233
if err := func() error {
234234
client, err := execinfra.GetDistSQLClientForOutbox(
235-
ctx, m.flowCtx.Cfg.SQLInstanceDialer, m.sqlInstanceID, SettingFlowStreamTimeout.Get(&m.flowCtx.Cfg.Settings.SV),
235+
ctx, m.flowCtx.Cfg.SQLInstanceDialer, m.flowCtx.Cfg.Settings, m.sqlInstanceID, SettingFlowStreamTimeout.Get(&m.flowCtx.Cfg.Settings.SV),
236236
)
237237
if err != nil {
238238
log.Dev.VWarningf(ctx, 1, "Outbox Dial connection error, distributed query will fail: %+v", err)

0 commit comments

Comments
 (0)