Skip to content

Commit 8b761fd

Browse files
craig[bot]Nukittaa-joshicthumuluru-crdb
committed
152818: tracing, rpc: add support for tracing interceptors in dRPC r=Nukitt a=Nukitt This change brings the support for client and server tracing interceptors in dRPC that start spans, propagate metadata and record error codes for unary and streaming RPCs. Epic: CRDB-49359 Fixes: #148350 and #144372 Release note: None 153519: util: introduce LabelSliceCache in metric r=aa-joshi a=aa-joshi Previously, we were persisting label value slice at each child metrics of aggregated metric type. This was inadequate because it will create redundant memory allocations for same label value slice across difference metrics. To address this, this patch introduces `LabelSliceCache` which would persist the label values at registry level and metrics would reference them through the key. The LabelSliceCache is referenced in metrics through `PrometheusEvictable` interface. The LabelSliceCache contains 2 critical methods: 1. Upsert: This method implements the add reference part of the reference counting mechanism. This method increments the reference count for label values by 1, if already exists. Otherwise, It will create a new entry for the label values with default value as 1. 2. DecrementAndDeleteIfZero: This method decrements the reference counter for the given label values by 1. If the count is reached to zero then it means that no metrics are relying on the particular label values. In that case, it deletes the entry from cache. These methods ensure that the cache tracks how many metrics are currently using each label combination, enabling proper cleanup when metrics are no longer needed. Epic: CRDB-53398 Part of: CRDB-53830 Release note: None 153698: *: drop `TODODRPC` override in favor of cluster setting r=cthumuluru-crdb a=cthumuluru-crdb Currently, at many call sites, we disable DRPC using TODODRPC override. This change drops `TODODRPC` override and ensure DRPC is controlled exclusively through the `rpc.experimental_drpc.enabled` cluster setting. Epic: CRDB-48935 Informs: None Release note: None Co-authored-by: Nukitt <[email protected]> Co-authored-by: Akshay Joshi <[email protected]> Co-authored-by: Chandra Thumuluru <[email protected]>
4 parents c34c845 + ae96aa6 + c6717b0 + 722a04e commit 8b761fd

File tree

27 files changed

+1433
-38
lines changed

27 files changed

+1433
-38
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,7 @@ ALL_TESTS = [
830830
"//pkg/util/timeutil:timeutil_test",
831831
"//pkg/util/tochar:tochar_test",
832832
"//pkg/util/tracing/collector:collector_test",
833+
"//pkg/util/tracing/drpcinterceptor:drpcinterceptor_test",
833834
"//pkg/util/tracing/goexectrace:goexectrace_test",
834835
"//pkg/util/tracing/grpcinterceptor:grpcinterceptor_test",
835836
"//pkg/util/tracing/service:service_test",
@@ -2818,6 +2819,8 @@ GO_TARGETS = [
28182819
"//pkg/util/tochar:tochar_test",
28192820
"//pkg/util/tracing/collector:collector",
28202821
"//pkg/util/tracing/collector:collector_test",
2822+
"//pkg/util/tracing/drpcinterceptor:drpcinterceptor",
2823+
"//pkg/util/tracing/drpcinterceptor:drpcinterceptor_test",
28212824
"//pkg/util/tracing/goexectrace:goexectrace",
28222825
"//pkg/util/tracing/goexectrace:goexectrace_test",
28232826
"//pkg/util/tracing/grpcinterceptor:grpcinterceptor",

pkg/acceptance/localcluster/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ func (n *Node) StatusClient(ctx context.Context) serverpb.RPCStatusClient {
478478
return existingClient
479479
}
480480

481-
if !rpcbase.TODODRPC {
481+
if !rpcbase.DRPCEnabled(ctx, n.rpcCtx.Settings) {
482482
conn, err := n.rpcCtx.GRPCUnvalidatedDial(n.RPCAddr(), roachpb.Locality{}).Connect(ctx)
483483
if err != nil {
484484
log.Dev.Fatalf(context.Background(), "failed to initialize status client: %s", err)

pkg/cli/rpc_clients.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func makeRPCClientConfig(cfg server.Config) rpc.ClientConnConfig {
101101

102102
func newClientConn(ctx context.Context, cfg server.Config) (rpcConn, func(), error) {
103103
ccfg := makeRPCClientConfig(cfg)
104-
if !rpcbase.TODODRPC {
104+
if !rpcbase.DRPCEnabled(ctx, cfg.Settings) {
105105
cc, finish, err := rpc.NewClientConn(ctx, ccfg)
106106
if err != nil {
107107
return nil, nil, errors.Wrap(err, "failed to connect to the node")

pkg/gossip/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func (c *client) drpcDial(ctx context.Context, rpcCtx *rpc.Context) (drpc.Conn,
437437
func (c *client) dialGossipClient(
438438
ctx context.Context, rpcCtx *rpc.Context,
439439
) (RPCGossipClient, error) {
440-
if !rpcbase.TODODRPC {
440+
if !rpcbase.DRPCEnabled(ctx, rpcCtx.Settings) {
441441
conn, err := c.dial(ctx, rpcCtx)
442442
if err != nil {
443443
return nil, err

pkg/kv/kvclient/kvtenant/connector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -979,7 +979,7 @@ func (c *connector) dialAddrs(ctx context.Context) (*client, error) {
979979
// Try each address on each retry iteration (in random order).
980980
for _, i := range rand.Perm(len(c.addrs)) {
981981
addr := c.addrs[i]
982-
if !rpcbase.TODODRPC {
982+
if !rpcbase.DRPCEnabled(ctx, c.rpcContext.Settings) {
983983
conn, err := c.dialAddr(ctx, addr)
984984
if err != nil {
985985
log.Dev.Warningf(ctx, "error dialing tenant KV address %s: %v", addr, err)

pkg/rpc/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ go_library(
6464
"//pkg/util/sysutil",
6565
"//pkg/util/timeutil",
6666
"//pkg/util/tracing",
67+
"//pkg/util/tracing/drpcinterceptor",
6768
"//pkg/util/tracing/grpcinterceptor",
6869
"//pkg/util/tracing/tracingutil",
6970
"//pkg/util/uuid",

pkg/rpc/context.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3939
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4040
"github.com/cockroachdb/cockroach/pkg/util/tracing"
41+
"github.com/cockroachdb/cockroach/pkg/util/tracing/drpcinterceptor"
4142
"github.com/cockroachdb/cockroach/pkg/util/tracing/grpcinterceptor"
4243
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingutil"
4344
"github.com/cockroachdb/cockroach/pkg/util/uuid"
@@ -651,6 +652,10 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
651652
grpcinterceptor.ClientInterceptor(tracer, tagger))
652653
rpcCtx.clientStreamInterceptors = append(rpcCtx.clientStreamInterceptors,
653654
grpcinterceptor.StreamClientInterceptor(tracer, tagger))
655+
rpcCtx.clientUnaryInterceptorsDRPC = append(rpcCtx.clientUnaryInterceptorsDRPC,
656+
drpcinterceptor.ClientInterceptor(tracer, tagger))
657+
rpcCtx.clientStreamInterceptorsDRPC = append(rpcCtx.clientStreamInterceptorsDRPC,
658+
drpcinterceptor.StreamClientInterceptor(tracer, tagger))
654659
}
655660
// Note that we do not consult rpcCtx.Knobs.StreamClientInterceptor. That knob
656661
// can add another interceptor, but it can only do it dynamically, based on

pkg/rpc/drpc.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1818
"github.com/cockroachdb/cockroach/pkg/util/log"
1919
"github.com/cockroachdb/cockroach/pkg/util/stop"
20+
"github.com/cockroachdb/cockroach/pkg/util/tracing/drpcinterceptor"
2021
"github.com/cockroachdb/errors"
2122
"storj.io/drpc"
2223
"storj.io/drpc/drpcclient"
@@ -232,6 +233,11 @@ func NewDRPCServer(_ context.Context, rpcCtx *Context, opts ...ServerOption) (DR
232233
streamInterceptors = append(streamInterceptors, a.AuthDRPCStream())
233234
}
234235

236+
if tracer := rpcCtx.Stopper.Tracer(); tracer != nil {
237+
unaryInterceptors = append(unaryInterceptors, drpcinterceptor.ServerInterceptor(tracer))
238+
streamInterceptors = append(streamInterceptors, drpcinterceptor.StreamServerInterceptor(tracer))
239+
}
240+
235241
mux := drpcmux.NewWithInterceptors(unaryInterceptors, streamInterceptors)
236242

237243
d.Server = drpcserver.NewWithOptions(mux, drpcserver.Options{

pkg/rpc/nodedialer/nodedialer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (n *Dialer) DialInternalClient(
171171

172172
var client rpc.RestrictedInternalClient
173173
useStreamPoolClient := shouldUseBatchStreamPoolClient(ctx, n.rpcContext.Settings)
174-
if !rpcbase.ExperimentalDRPCEnabled.Get(&n.rpcContext.Settings.SV) {
174+
if !rpcbase.DRPCEnabled(ctx, n.rpcContext.Settings) {
175175
gc, conn, err := dial(ctx, n.resolver, n.rpcContext.GRPCDialNode, nodeID, class, true /* checkBreaker */)
176176
if err != nil {
177177
return nil, errors.Wrapf(err, "gRPC")

pkg/rpc/rpcbase/nodedialer.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ var ExperimentalDRPCEnabled = settings.RegisterBoolSetting(
3939
return nil
4040
}))
4141

42-
// TODODRPC is a marker to identify each RPC client creation site that needs to
43-
// be updated to support DRPC.
44-
const TODODRPC = false
45-
4642
// NodeDialer interface defines methods for dialing peer nodes using their
4743
// node IDs.
4844
type NodeDialer interface {
@@ -70,10 +66,8 @@ func DialRPCClient[C any](
7066
drpcClientFn func(drpc.Conn) C,
7167
st *cluster.Settings,
7268
) (C, error) {
73-
useDRPC := ExperimentalDRPCEnabled.Get(&st.SV)
74-
7569
var nilC C
76-
if !TODODRPC && !useDRPC {
70+
if !DRPCEnabled(ctx, st) {
7771
conn, err := nd.Dial(ctx, nodeID, class)
7872
if err != nil {
7973
return nilC, err
@@ -99,10 +93,8 @@ func DialRPCClientNoBreaker[C any](
9993
drpcClientFn func(drpc.Conn) C,
10094
st *cluster.Settings,
10195
) (C, error) {
102-
useDRPC := ExperimentalDRPCEnabled.Get(&st.SV)
103-
10496
var nilC C
105-
if !TODODRPC && !useDRPC {
97+
if !DRPCEnabled(ctx, st) {
10698
conn, err := nd.DialNoBreaker(ctx, nodeID, class)
10799
if err != nil {
108100
return nilC, err
@@ -116,3 +108,7 @@ func DialRPCClientNoBreaker[C any](
116108
}
117109
return drpcClientFn(conn), nil
118110
}
111+
112+
func DRPCEnabled(ctx context.Context, st *cluster.Settings) bool {
113+
return st != nil && ExperimentalDRPCEnabled.Get(&st.SV)
114+
}

0 commit comments

Comments
 (0)