Skip to content

Commit dda5160

Browse files
craig[bot]yuzefovichcthumuluru-crdb
committed
148488: sql: avoid contention between long mutation and auto stats r=yuzefovich a=yuzefovich Previously, when processing large mutations in batches (see `BatchedNext` method), we would notify the stats refresher about the number of rows affected after each batch. If the mutation is long, then this could have triggered the auto stats collection that would contend with the mutation that triggered it. There is actually no reason to try collecting auto stats on those just-mutated rows since in most cases they haven't been committed yet. This commit fixes this issue by notifying the stats refresher about the total number of rows affected when the last batch of the mutation has just been processed. The bug has been present since about 19.1 version. Note that we generally don't perform well with long mutations, but it seems like a worthwhile improvement nonetheless. We can't remove contention with auto stats triggered by _other_ mutations, but at least we can do so for self-inflicted ones. (Also probably a more important retry reason is the closed TS interval.) Fixes: #148487. Release note (bug fix): Large mutation statements (INSERTs, UPDATEs, DELETEs, UPSERTs) are now less likely to encounter contention with automatic table stats collection in some cases. The bug has been present since 19.1 version. 148607: *: consolidate `InternalClient` RPC client creation r=cthumuluru-crdb a=cthumuluru-crdb This commit consolidates `InternalClient` RPC client creation logic and replaces RPC clients with adapters. It is a continuation of the work done in #147606. Epic: CRDB-48923 Fixes: #148353 Release note: none Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Chandra Thumuluru <[email protected]>
3 parents 68ba93b + e47ca5f + 8279a65 commit dda5160

20 files changed

+256
-321
lines changed

pkg/cli/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ go_library(
141141
"//pkg/raft/raftpb",
142142
"//pkg/roachpb",
143143
"//pkg/rpc",
144+
"//pkg/rpc/rpcbase",
144145
"//pkg/security",
145146
"//pkg/security/certnames",
146147
"//pkg/security/clientsecopts",
@@ -274,6 +275,7 @@ go_library(
274275
"@com_google_cloud_go_storage//:storage",
275276
"@in_gopkg_yaml_v2//:yaml_v2",
276277
"@in_gopkg_yaml_v3//:yaml_v3",
278+
"@io_storj_drpc//:drpc",
277279
"@org_golang_google_api//option",
278280
"@org_golang_google_grpc//:grpc",
279281
"@org_golang_google_grpc//codes",

pkg/cli/rpc_clients.go

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1212
"github.com/cockroachdb/cockroach/pkg/rpc"
13+
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1314
"github.com/cockroachdb/cockroach/pkg/server"
1415
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
1516
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
1617
"github.com/cockroachdb/errors"
1718
"google.golang.org/grpc"
19+
"storj.io/drpc"
1820
)
1921

2022
// rpcConn defines a common interface for creating RPC clients. It hides the
@@ -25,7 +27,7 @@ type rpcConn interface {
2527
NewAdminClient() serverpb.RPCAdminClient
2628
NewInitClient() serverpb.RPCInitClient
2729
NewTimeSeriesClient() tspb.RPCTimeSeriesClient
28-
NewInternalClient() kvpb.InternalClient
30+
NewInternalClient() kvpb.RPCInternalClient
2931
}
3032

3133
// grpcConn is an implementation of rpcConn that provides methods to create
@@ -51,8 +53,35 @@ func (c *grpcConn) NewTimeSeriesClient() tspb.RPCTimeSeriesClient {
5153
return tspb.NewGRPCTimeSeriesClientAdapter(c.conn)
5254
}
5355

54-
func (c *grpcConn) NewInternalClient() kvpb.InternalClient {
55-
return kvpb.NewInternalClient(c.conn)
56+
func (c *grpcConn) NewInternalClient() kvpb.RPCInternalClient {
57+
return kvpb.NewGRPCInternalClientAdapter(c.conn)
58+
}
59+
60+
// drpcConn is an implementation of rpcConn that provides methods to create
61+
// various RPC clients. This allows the CLI to interact with the server using
62+
// DRPC without exposing the underlying connection details.
63+
type drpcConn struct {
64+
conn drpc.Conn
65+
}
66+
67+
func (c *drpcConn) NewStatusClient() serverpb.RPCStatusClient {
68+
return serverpb.NewDRPCStatusClientAdapter(c.conn)
69+
}
70+
71+
func (c *drpcConn) NewAdminClient() serverpb.RPCAdminClient {
72+
return serverpb.NewDRPCAdminClientAdapter(c.conn)
73+
}
74+
75+
func (c *drpcConn) NewInitClient() serverpb.RPCInitClient {
76+
return serverpb.NewDRPCInitClientAdapter(c.conn)
77+
}
78+
79+
func (c *drpcConn) NewTimeSeriesClient() tspb.RPCTimeSeriesClient {
80+
return tspb.NewDRPCTimeSeriesClientAdapter(c.conn)
81+
}
82+
83+
func (c *drpcConn) NewInternalClient() kvpb.RPCInternalClient {
84+
return kvpb.NewDRPCInternalClientAdapter(c.conn)
5685
}
5786

5887
func makeRPCClientConfig(cfg server.Config) rpc.ClientConnConfig {
@@ -72,11 +101,19 @@ func makeRPCClientConfig(cfg server.Config) rpc.ClientConnConfig {
72101

73102
func newClientConn(ctx context.Context, cfg server.Config) (rpcConn, func(), error) {
74103
ccfg := makeRPCClientConfig(cfg)
75-
cc, finish, err := rpc.NewClientConn(ctx, ccfg)
76-
if err != nil {
77-
return nil, nil, errors.Wrap(err, "failed to connect to the node")
104+
if !rpcbase.TODODRPC {
105+
cc, finish, err := rpc.NewClientConn(ctx, ccfg)
106+
if err != nil {
107+
return nil, nil, errors.Wrap(err, "failed to connect to the node")
108+
}
109+
return &grpcConn{conn: cc}, finish, nil
110+
} else {
111+
dc, finish, err := rpc.NewDRPCClientConn(ctx, ccfg)
112+
if err != nil {
113+
return nil, nil, errors.Wrap(err, "failed to connect to the node")
114+
}
115+
return &drpcConn{conn: dc}, finish, nil
78116
}
79-
return &grpcConn{conn: cc}, finish, nil
80117
}
81118

82119
// dialAdminClient dials a client connection and returns an AdminClient and a

pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
112112
// cancels the context and closes the range feed stream.
113113
if spec.expectRetry {
114114
rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).MinTimes(1).Return([]roachpb.RangeDescriptor{desc}, nil, nil) //.FirstRange().Return(&desc, nil)
115-
client := kvpbmock.NewMockInternalClient(ctrl)
115+
client := kvpbmock.NewMockRPCInternalClient(ctrl)
116116

117-
stream := kvpbmock.NewMockInternal_MuxRangeFeedClient(ctrl)
117+
stream := kvpbmock.NewMockRPCInternal_MuxRangeFeedClient(ctrl)
118118
stream.EXPECT().Send(gomock.Any()).Return(nil)
119119
stream.EXPECT().Recv().Do(func() {
120120
cancel()

pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ type testRangefeedClient struct {
4949
}
5050

5151
func (c *testRangefeedClient) MuxRangeFeed(
52-
ctx context.Context, opts ...grpc.CallOption,
53-
) (kvpb.Internal_MuxRangeFeedClient, error) {
52+
ctx context.Context,
53+
) (kvpb.RPCInternal_MuxRangeFeedClient, error) {
5454
defer c.count()
55-
return c.RestrictedInternalClient.MuxRangeFeed(ctx, opts...)
55+
return c.RestrictedInternalClient.MuxRangeFeed(ctx)
5656
}
5757

5858
type internalClientCounts struct {

pkg/kv/kvclient/kvcoord/transport_test.go

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2020
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
2121
"github.com/stretchr/testify/require"
22-
"google.golang.org/grpc"
2322
)
2423

2524
func TestTransportMoveToFront(t *testing.T) {
@@ -221,17 +220,17 @@ type mockInternalClient struct {
221220
pErr *kvpb.Error
222221
}
223222

224-
var _ kvpb.InternalClient = &mockInternalClient{}
223+
var _ kvpb.RPCInternalClient = &mockInternalClient{}
225224

226225
func (*mockInternalClient) ResetQuorum(
227-
context.Context, *kvpb.ResetQuorumRequest, ...grpc.CallOption,
226+
context.Context, *kvpb.ResetQuorumRequest,
228227
) (*kvpb.ResetQuorumResponse, error) {
229228
panic("unimplemented")
230229
}
231230

232231
// Batch is part of the kvpb.InternalClient interface.
233232
func (m *mockInternalClient) Batch(
234-
ctx context.Context, in *kvpb.BatchRequest, opts ...grpc.CallOption,
233+
ctx context.Context, in *kvpb.BatchRequest,
235234
) (*kvpb.BatchResponse, error) {
236235
var sp *tracing.Span
237236
if m.tr != nil {
@@ -256,75 +255,75 @@ func (m *mockInternalClient) Batch(
256255
}
257256

258257
func (m *mockInternalClient) BatchStream(
259-
ctx context.Context, opts ...grpc.CallOption,
260-
) (kvpb.Internal_BatchStreamClient, error) {
258+
ctx context.Context,
259+
) (kvpb.RPCInternal_BatchStreamClient, error) {
261260
return nil, fmt.Errorf("unsupported BatchStream call")
262261
}
263262

264263
// RangeLookup implements the kvpb.InternalClient interface.
265264
func (m *mockInternalClient) RangeLookup(
266-
ctx context.Context, rl *kvpb.RangeLookupRequest, _ ...grpc.CallOption,
265+
ctx context.Context, rl *kvpb.RangeLookupRequest,
267266
) (*kvpb.RangeLookupResponse, error) {
268267
return nil, fmt.Errorf("unsupported RangeLookup call")
269268
}
270269

271270
func (m *mockInternalClient) MuxRangeFeed(
272-
ctx context.Context, opts ...grpc.CallOption,
273-
) (kvpb.Internal_MuxRangeFeedClient, error) {
271+
ctx context.Context,
272+
) (kvpb.RPCInternal_MuxRangeFeedClient, error) {
274273
return nil, fmt.Errorf("unsupported MuxRangeFeed call")
275274
}
276275

277276
// GossipSubscription is part of the kvpb.InternalClient interface.
278277
func (m *mockInternalClient) GossipSubscription(
279-
ctx context.Context, args *kvpb.GossipSubscriptionRequest, _ ...grpc.CallOption,
280-
) (kvpb.Internal_GossipSubscriptionClient, error) {
278+
ctx context.Context, args *kvpb.GossipSubscriptionRequest,
279+
) (kvpb.RPCInternal_GossipSubscriptionClient, error) {
281280
return nil, fmt.Errorf("unsupported GossipSubscripion call")
282281
}
283282

284283
func (m *mockInternalClient) Join(
285-
context.Context, *kvpb.JoinNodeRequest, ...grpc.CallOption,
284+
context.Context, *kvpb.JoinNodeRequest,
286285
) (*kvpb.JoinNodeResponse, error) {
287286
return nil, fmt.Errorf("unsupported Join call")
288287
}
289288

290289
func (m *mockInternalClient) TokenBucket(
291-
ctx context.Context, in *kvpb.TokenBucketRequest, _ ...grpc.CallOption,
290+
ctx context.Context, in *kvpb.TokenBucketRequest,
292291
) (*kvpb.TokenBucketResponse, error) {
293292
return nil, fmt.Errorf("unsupported TokenBucket call")
294293
}
295294

296295
func (m *mockInternalClient) GetSpanConfigs(
297-
_ context.Context, _ *roachpb.GetSpanConfigsRequest, _ ...grpc.CallOption,
296+
_ context.Context, _ *roachpb.GetSpanConfigsRequest,
298297
) (*roachpb.GetSpanConfigsResponse, error) {
299298
return nil, fmt.Errorf("unsupported GetSpanConfigs call")
300299
}
301300

302301
func (m *mockInternalClient) SpanConfigConformance(
303-
_ context.Context, _ *roachpb.SpanConfigConformanceRequest, _ ...grpc.CallOption,
302+
_ context.Context, _ *roachpb.SpanConfigConformanceRequest,
304303
) (*roachpb.SpanConfigConformanceResponse, error) {
305304
return nil, fmt.Errorf("unsupported SpanConfigConformance call")
306305
}
307306

308307
func (m *mockInternalClient) GetAllSystemSpanConfigsThatApply(
309-
context.Context, *roachpb.GetAllSystemSpanConfigsThatApplyRequest, ...grpc.CallOption,
308+
context.Context, *roachpb.GetAllSystemSpanConfigsThatApplyRequest,
310309
) (*roachpb.GetAllSystemSpanConfigsThatApplyResponse, error) {
311310
return nil, fmt.Errorf("unsupported GetAllSystemSpanConfigsThatApply call")
312311
}
313312

314313
func (m *mockInternalClient) UpdateSpanConfigs(
315-
_ context.Context, _ *roachpb.UpdateSpanConfigsRequest, _ ...grpc.CallOption,
314+
_ context.Context, _ *roachpb.UpdateSpanConfigsRequest,
316315
) (*roachpb.UpdateSpanConfigsResponse, error) {
317316
return nil, fmt.Errorf("unsupported UpdateSpanConfigs call")
318317
}
319318

320319
func (m *mockInternalClient) TenantSettings(
321-
context.Context, *kvpb.TenantSettingsRequest, ...grpc.CallOption,
322-
) (kvpb.Internal_TenantSettingsClient, error) {
320+
context.Context, *kvpb.TenantSettingsRequest,
321+
) (kvpb.RPCInternal_TenantSettingsClient, error) {
323322
return nil, fmt.Errorf("unsupported TenantSettings call")
324323
}
325324

326325
func (n *mockInternalClient) GetRangeDescriptors(
327-
context.Context, *kvpb.GetRangeDescriptorsRequest, ...grpc.CallOption,
328-
) (kvpb.Internal_GetRangeDescriptorsClient, error) {
326+
context.Context, *kvpb.GetRangeDescriptorsRequest,
327+
) (kvpb.RPCInternal_GetRangeDescriptorsClient, error) {
329328
return nil, fmt.Errorf("unsupported GetRangeDescriptors call")
330329
}

pkg/kv/kvclient/kvtenant/connector.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ type connector struct {
222222

223223
// client represents an RPC client that proxies to a KV instance.
224224
type client struct {
225-
kvpb.InternalClient
225+
kvpb.RPCInternalClient
226226
serverpb.RPCStatusClient
227227
serverpb.RPCAdminClient
228228
tspb.RPCTimeSeriesClient
@@ -983,7 +983,7 @@ func (c *connector) dialAddrs(ctx context.Context) (*client, error) {
983983
continue
984984
}
985985
return &client{
986-
InternalClient: kvpb.NewInternalClient(conn),
986+
RPCInternalClient: kvpb.NewGRPCInternalClientAdapter(conn),
987987
RPCStatusClient: serverpb.NewGRPCStatusClientAdapter(conn),
988988
RPCAdminClient: serverpb.NewGRPCAdminClientAdapter(conn),
989989
RPCTimeSeriesClient: tspb.NewGRPCTimeSeriesClientAdapter(conn),
@@ -1005,7 +1005,7 @@ func (c *connector) dialAddr(ctx context.Context, addr string) (conn *grpc.Clien
10051005
return conn, err
10061006
}
10071007

1008-
func (c *connector) tryForgetClient(ctx context.Context, client kvpb.InternalClient) {
1008+
func (c *connector) tryForgetClient(ctx context.Context, client *client) {
10091009
if ctx.Err() != nil {
10101010
// Error (may be) due to context. Don't forget client.
10111011
return

pkg/kv/kvpb/kvpbmock/BUILD.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ gomock(
44
name = "mock_kvpb",
55
out = "mocks_generated.go",
66
interfaces = [
7-
"InternalClient",
8-
"Internal_MuxRangeFeedClient",
7+
"RPCInternalClient",
8+
"RPCInternal_MuxRangeFeedClient",
99
],
1010
library = "//pkg/kv/kvpb",
1111
package = "kvpbmock",

0 commit comments

Comments
 (0)