Skip to content

Commit 8279a65

Browse files
*: consolidate InternalClient RPC client creation
This commit consolidates `InternalClient` RPC client creation logic and replaces RPC clients with adapters. It is a continuation of the work done in #147606. Epic: CRDB-48923 Fixes: #148353 Release note: none
1 parent aed749a commit 8279a65

File tree

15 files changed

+233
-308
lines changed

15 files changed

+233
-308
lines changed

pkg/cli/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ go_library(
141141
"//pkg/raft/raftpb",
142142
"//pkg/roachpb",
143143
"//pkg/rpc",
144+
"//pkg/rpc/rpcbase",
144145
"//pkg/security",
145146
"//pkg/security/certnames",
146147
"//pkg/security/clientsecopts",
@@ -274,6 +275,7 @@ go_library(
274275
"@com_google_cloud_go_storage//:storage",
275276
"@in_gopkg_yaml_v2//:yaml_v2",
276277
"@in_gopkg_yaml_v3//:yaml_v3",
278+
"@io_storj_drpc//:drpc",
277279
"@org_golang_google_api//option",
278280
"@org_golang_google_grpc//:grpc",
279281
"@org_golang_google_grpc//codes",

pkg/cli/rpc_clients.go

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1212
"github.com/cockroachdb/cockroach/pkg/rpc"
13+
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1314
"github.com/cockroachdb/cockroach/pkg/server"
1415
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
1516
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
1617
"github.com/cockroachdb/errors"
1718
"google.golang.org/grpc"
19+
"storj.io/drpc"
1820
)
1921

2022
// rpcConn defines a common interface for creating RPC clients. It hides the
@@ -25,7 +27,7 @@ type rpcConn interface {
2527
NewAdminClient() serverpb.RPCAdminClient
2628
NewInitClient() serverpb.RPCInitClient
2729
NewTimeSeriesClient() tspb.RPCTimeSeriesClient
28-
NewInternalClient() kvpb.InternalClient
30+
NewInternalClient() kvpb.RPCInternalClient
2931
}
3032

3133
// grpcConn is an implementation of rpcConn that provides methods to create
@@ -51,8 +53,35 @@ func (c *grpcConn) NewTimeSeriesClient() tspb.RPCTimeSeriesClient {
5153
return tspb.NewGRPCTimeSeriesClientAdapter(c.conn)
5254
}
5355

54-
func (c *grpcConn) NewInternalClient() kvpb.InternalClient {
55-
return kvpb.NewInternalClient(c.conn)
56+
func (c *grpcConn) NewInternalClient() kvpb.RPCInternalClient {
57+
return kvpb.NewGRPCInternalClientAdapter(c.conn)
58+
}
59+
60+
// drpcConn is an implementation of rpcConn that provides methods to create
61+
// various RPC clients. This allows the CLI to interact with the server using
62+
// DRPC without exposing the underlying connection details.
63+
type drpcConn struct {
64+
conn drpc.Conn
65+
}
66+
67+
func (c *drpcConn) NewStatusClient() serverpb.RPCStatusClient {
68+
return serverpb.NewDRPCStatusClientAdapter(c.conn)
69+
}
70+
71+
func (c *drpcConn) NewAdminClient() serverpb.RPCAdminClient {
72+
return serverpb.NewDRPCAdminClientAdapter(c.conn)
73+
}
74+
75+
func (c *drpcConn) NewInitClient() serverpb.RPCInitClient {
76+
return serverpb.NewDRPCInitClientAdapter(c.conn)
77+
}
78+
79+
func (c *drpcConn) NewTimeSeriesClient() tspb.RPCTimeSeriesClient {
80+
return tspb.NewDRPCTimeSeriesClientAdapter(c.conn)
81+
}
82+
83+
func (c *drpcConn) NewInternalClient() kvpb.RPCInternalClient {
84+
return kvpb.NewDRPCInternalClientAdapter(c.conn)
5685
}
5786

5887
func makeRPCClientConfig(cfg server.Config) rpc.ClientConnConfig {
@@ -72,11 +101,19 @@ func makeRPCClientConfig(cfg server.Config) rpc.ClientConnConfig {
72101

73102
func newClientConn(ctx context.Context, cfg server.Config) (rpcConn, func(), error) {
74103
ccfg := makeRPCClientConfig(cfg)
75-
cc, finish, err := rpc.NewClientConn(ctx, ccfg)
76-
if err != nil {
77-
return nil, nil, errors.Wrap(err, "failed to connect to the node")
104+
if !rpcbase.TODODRPC {
105+
cc, finish, err := rpc.NewClientConn(ctx, ccfg)
106+
if err != nil {
107+
return nil, nil, errors.Wrap(err, "failed to connect to the node")
108+
}
109+
return &grpcConn{conn: cc}, finish, nil
110+
} else {
111+
dc, finish, err := rpc.NewDRPCClientConn(ctx, ccfg)
112+
if err != nil {
113+
return nil, nil, errors.Wrap(err, "failed to connect to the node")
114+
}
115+
return &drpcConn{conn: dc}, finish, nil
78116
}
79-
return &grpcConn{conn: cc}, finish, nil
80117
}
81118

82119
// dialAdminClient dials a client connection and returns an AdminClient and a

pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
112112
// cancels the context and closes the range feed stream.
113113
if spec.expectRetry {
114114
rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).MinTimes(1).Return([]roachpb.RangeDescriptor{desc}, nil, nil) //.FirstRange().Return(&desc, nil)
115-
client := kvpbmock.NewMockInternalClient(ctrl)
115+
client := kvpbmock.NewMockRPCInternalClient(ctrl)
116116

117-
stream := kvpbmock.NewMockInternal_MuxRangeFeedClient(ctrl)
117+
stream := kvpbmock.NewMockRPCInternal_MuxRangeFeedClient(ctrl)
118118
stream.EXPECT().Send(gomock.Any()).Return(nil)
119119
stream.EXPECT().Recv().Do(func() {
120120
cancel()

pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ type testRangefeedClient struct {
4949
}
5050

5151
func (c *testRangefeedClient) MuxRangeFeed(
52-
ctx context.Context, opts ...grpc.CallOption,
53-
) (kvpb.Internal_MuxRangeFeedClient, error) {
52+
ctx context.Context,
53+
) (kvpb.RPCInternal_MuxRangeFeedClient, error) {
5454
defer c.count()
55-
return c.RestrictedInternalClient.MuxRangeFeed(ctx, opts...)
55+
return c.RestrictedInternalClient.MuxRangeFeed(ctx)
5656
}
5757

5858
type internalClientCounts struct {

pkg/kv/kvclient/kvcoord/transport_test.go

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2020
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
2121
"github.com/stretchr/testify/require"
22-
"google.golang.org/grpc"
2322
)
2423

2524
func TestTransportMoveToFront(t *testing.T) {
@@ -221,17 +220,17 @@ type mockInternalClient struct {
221220
pErr *kvpb.Error
222221
}
223222

224-
var _ kvpb.InternalClient = &mockInternalClient{}
223+
var _ kvpb.RPCInternalClient = &mockInternalClient{}
225224

226225
func (*mockInternalClient) ResetQuorum(
227-
context.Context, *kvpb.ResetQuorumRequest, ...grpc.CallOption,
226+
context.Context, *kvpb.ResetQuorumRequest,
228227
) (*kvpb.ResetQuorumResponse, error) {
229228
panic("unimplemented")
230229
}
231230

232231
// Batch is part of the kvpb.InternalClient interface.
233232
func (m *mockInternalClient) Batch(
234-
ctx context.Context, in *kvpb.BatchRequest, opts ...grpc.CallOption,
233+
ctx context.Context, in *kvpb.BatchRequest,
235234
) (*kvpb.BatchResponse, error) {
236235
var sp *tracing.Span
237236
if m.tr != nil {
@@ -256,75 +255,75 @@ func (m *mockInternalClient) Batch(
256255
}
257256

258257
func (m *mockInternalClient) BatchStream(
259-
ctx context.Context, opts ...grpc.CallOption,
260-
) (kvpb.Internal_BatchStreamClient, error) {
258+
ctx context.Context,
259+
) (kvpb.RPCInternal_BatchStreamClient, error) {
261260
return nil, fmt.Errorf("unsupported BatchStream call")
262261
}
263262

264263
// RangeLookup implements the kvpb.InternalClient interface.
265264
func (m *mockInternalClient) RangeLookup(
266-
ctx context.Context, rl *kvpb.RangeLookupRequest, _ ...grpc.CallOption,
265+
ctx context.Context, rl *kvpb.RangeLookupRequest,
267266
) (*kvpb.RangeLookupResponse, error) {
268267
return nil, fmt.Errorf("unsupported RangeLookup call")
269268
}
270269

271270
func (m *mockInternalClient) MuxRangeFeed(
272-
ctx context.Context, opts ...grpc.CallOption,
273-
) (kvpb.Internal_MuxRangeFeedClient, error) {
271+
ctx context.Context,
272+
) (kvpb.RPCInternal_MuxRangeFeedClient, error) {
274273
return nil, fmt.Errorf("unsupported MuxRangeFeed call")
275274
}
276275

277276
// GossipSubscription is part of the kvpb.InternalClient interface.
278277
func (m *mockInternalClient) GossipSubscription(
279-
ctx context.Context, args *kvpb.GossipSubscriptionRequest, _ ...grpc.CallOption,
280-
) (kvpb.Internal_GossipSubscriptionClient, error) {
278+
ctx context.Context, args *kvpb.GossipSubscriptionRequest,
279+
) (kvpb.RPCInternal_GossipSubscriptionClient, error) {
281280
return nil, fmt.Errorf("unsupported GossipSubscripion call")
282281
}
283282

284283
func (m *mockInternalClient) Join(
285-
context.Context, *kvpb.JoinNodeRequest, ...grpc.CallOption,
284+
context.Context, *kvpb.JoinNodeRequest,
286285
) (*kvpb.JoinNodeResponse, error) {
287286
return nil, fmt.Errorf("unsupported Join call")
288287
}
289288

290289
func (m *mockInternalClient) TokenBucket(
291-
ctx context.Context, in *kvpb.TokenBucketRequest, _ ...grpc.CallOption,
290+
ctx context.Context, in *kvpb.TokenBucketRequest,
292291
) (*kvpb.TokenBucketResponse, error) {
293292
return nil, fmt.Errorf("unsupported TokenBucket call")
294293
}
295294

296295
func (m *mockInternalClient) GetSpanConfigs(
297-
_ context.Context, _ *roachpb.GetSpanConfigsRequest, _ ...grpc.CallOption,
296+
_ context.Context, _ *roachpb.GetSpanConfigsRequest,
298297
) (*roachpb.GetSpanConfigsResponse, error) {
299298
return nil, fmt.Errorf("unsupported GetSpanConfigs call")
300299
}
301300

302301
func (m *mockInternalClient) SpanConfigConformance(
303-
_ context.Context, _ *roachpb.SpanConfigConformanceRequest, _ ...grpc.CallOption,
302+
_ context.Context, _ *roachpb.SpanConfigConformanceRequest,
304303
) (*roachpb.SpanConfigConformanceResponse, error) {
305304
return nil, fmt.Errorf("unsupported SpanConfigConformance call")
306305
}
307306

308307
func (m *mockInternalClient) GetAllSystemSpanConfigsThatApply(
309-
context.Context, *roachpb.GetAllSystemSpanConfigsThatApplyRequest, ...grpc.CallOption,
308+
context.Context, *roachpb.GetAllSystemSpanConfigsThatApplyRequest,
310309
) (*roachpb.GetAllSystemSpanConfigsThatApplyResponse, error) {
311310
return nil, fmt.Errorf("unsupported GetAllSystemSpanConfigsThatApply call")
312311
}
313312

314313
func (m *mockInternalClient) UpdateSpanConfigs(
315-
_ context.Context, _ *roachpb.UpdateSpanConfigsRequest, _ ...grpc.CallOption,
314+
_ context.Context, _ *roachpb.UpdateSpanConfigsRequest,
316315
) (*roachpb.UpdateSpanConfigsResponse, error) {
317316
return nil, fmt.Errorf("unsupported UpdateSpanConfigs call")
318317
}
319318

320319
func (m *mockInternalClient) TenantSettings(
321-
context.Context, *kvpb.TenantSettingsRequest, ...grpc.CallOption,
322-
) (kvpb.Internal_TenantSettingsClient, error) {
320+
context.Context, *kvpb.TenantSettingsRequest,
321+
) (kvpb.RPCInternal_TenantSettingsClient, error) {
323322
return nil, fmt.Errorf("unsupported TenantSettings call")
324323
}
325324

326325
func (n *mockInternalClient) GetRangeDescriptors(
327-
context.Context, *kvpb.GetRangeDescriptorsRequest, ...grpc.CallOption,
328-
) (kvpb.Internal_GetRangeDescriptorsClient, error) {
326+
context.Context, *kvpb.GetRangeDescriptorsRequest,
327+
) (kvpb.RPCInternal_GetRangeDescriptorsClient, error) {
329328
return nil, fmt.Errorf("unsupported GetRangeDescriptors call")
330329
}

pkg/kv/kvclient/kvtenant/connector.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ type connector struct {
222222

223223
// client represents an RPC client that proxies to a KV instance.
224224
type client struct {
225-
kvpb.InternalClient
225+
kvpb.RPCInternalClient
226226
serverpb.RPCStatusClient
227227
serverpb.RPCAdminClient
228228
tspb.RPCTimeSeriesClient
@@ -983,7 +983,7 @@ func (c *connector) dialAddrs(ctx context.Context) (*client, error) {
983983
continue
984984
}
985985
return &client{
986-
InternalClient: kvpb.NewInternalClient(conn),
986+
RPCInternalClient: kvpb.NewGRPCInternalClientAdapter(conn),
987987
RPCStatusClient: serverpb.NewGRPCStatusClientAdapter(conn),
988988
RPCAdminClient: serverpb.NewGRPCAdminClientAdapter(conn),
989989
RPCTimeSeriesClient: tspb.NewGRPCTimeSeriesClientAdapter(conn),
@@ -1005,7 +1005,7 @@ func (c *connector) dialAddr(ctx context.Context, addr string) (conn *grpc.Clien
10051005
return conn, err
10061006
}
10071007

1008-
func (c *connector) tryForgetClient(ctx context.Context, client kvpb.InternalClient) {
1008+
func (c *connector) tryForgetClient(ctx context.Context, client *client) {
10091009
if ctx.Err() != nil {
10101010
// Error (may be) due to context. Don't forget client.
10111011
return

pkg/kv/kvpb/kvpbmock/BUILD.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ gomock(
44
name = "mock_kvpb",
55
out = "mocks_generated.go",
66
interfaces = [
7-
"InternalClient",
8-
"Internal_MuxRangeFeedClient",
7+
"RPCInternalClient",
8+
"RPCInternal_MuxRangeFeedClient",
99
],
1010
library = "//pkg/kv/kvpb",
1111
package = "kvpbmock",

0 commit comments

Comments
 (0)