Skip to content

Commit ae96aa6

Browse files
committed
tracing, rpc: add support for tracing interceptors in dRPC
This change brings the support for client and tracing interceptors in dRPC that start spans, propagate metadata via DRPC context and record error codes for unary and streaming RPCs . Epic: CRDB-49359 Fixes: #148350 and #144372 Release note: None
1 parent e860c36 commit ae96aa6

File tree

9 files changed

+793
-1
lines changed

9 files changed

+793
-1
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,7 @@ ALL_TESTS = [
830830
"//pkg/util/timeutil:timeutil_test",
831831
"//pkg/util/tochar:tochar_test",
832832
"//pkg/util/tracing/collector:collector_test",
833+
"//pkg/util/tracing/drpcinterceptor:drpcinterceptor_test",
833834
"//pkg/util/tracing/goexectrace:goexectrace_test",
834835
"//pkg/util/tracing/grpcinterceptor:grpcinterceptor_test",
835836
"//pkg/util/tracing/service:service_test",
@@ -2818,6 +2819,8 @@ GO_TARGETS = [
28182819
"//pkg/util/tochar:tochar_test",
28192820
"//pkg/util/tracing/collector:collector",
28202821
"//pkg/util/tracing/collector:collector_test",
2822+
"//pkg/util/tracing/drpcinterceptor:drpcinterceptor",
2823+
"//pkg/util/tracing/drpcinterceptor:drpcinterceptor_test",
28212824
"//pkg/util/tracing/goexectrace:goexectrace",
28222825
"//pkg/util/tracing/goexectrace:goexectrace_test",
28232826
"//pkg/util/tracing/grpcinterceptor:grpcinterceptor",

pkg/rpc/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ go_library(
6464
"//pkg/util/sysutil",
6565
"//pkg/util/timeutil",
6666
"//pkg/util/tracing",
67+
"//pkg/util/tracing/drpcinterceptor",
6768
"//pkg/util/tracing/grpcinterceptor",
6869
"//pkg/util/tracing/tracingutil",
6970
"//pkg/util/uuid",

pkg/rpc/context.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3939
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4040
"github.com/cockroachdb/cockroach/pkg/util/tracing"
41+
"github.com/cockroachdb/cockroach/pkg/util/tracing/drpcinterceptor"
4142
"github.com/cockroachdb/cockroach/pkg/util/tracing/grpcinterceptor"
4243
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingutil"
4344
"github.com/cockroachdb/cockroach/pkg/util/uuid"
@@ -651,6 +652,10 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
651652
grpcinterceptor.ClientInterceptor(tracer, tagger))
652653
rpcCtx.clientStreamInterceptors = append(rpcCtx.clientStreamInterceptors,
653654
grpcinterceptor.StreamClientInterceptor(tracer, tagger))
655+
rpcCtx.clientUnaryInterceptorsDRPC = append(rpcCtx.clientUnaryInterceptorsDRPC,
656+
drpcinterceptor.ClientInterceptor(tracer, tagger))
657+
rpcCtx.clientStreamInterceptorsDRPC = append(rpcCtx.clientStreamInterceptorsDRPC,
658+
drpcinterceptor.StreamClientInterceptor(tracer, tagger))
654659
}
655660
// Note that we do not consult rpcCtx.Knobs.StreamClientInterceptor. That knob
656661
// can add another interceptor, but it can only do it dynamically, based on

pkg/rpc/drpc.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1818
"github.com/cockroachdb/cockroach/pkg/util/log"
1919
"github.com/cockroachdb/cockroach/pkg/util/stop"
20+
"github.com/cockroachdb/cockroach/pkg/util/tracing/drpcinterceptor"
2021
"github.com/cockroachdb/errors"
2122
"storj.io/drpc"
2223
"storj.io/drpc/drpcclient"
@@ -232,6 +233,11 @@ func NewDRPCServer(_ context.Context, rpcCtx *Context, opts ...ServerOption) (DR
232233
streamInterceptors = append(streamInterceptors, a.AuthDRPCStream())
233234
}
234235

236+
if tracer := rpcCtx.Stopper.Tracer(); tracer != nil {
237+
unaryInterceptors = append(unaryInterceptors, drpcinterceptor.ServerInterceptor(tracer))
238+
streamInterceptors = append(streamInterceptors, drpcinterceptor.StreamServerInterceptor(tracer))
239+
}
240+
235241
mux := drpcmux.NewWithInterceptors(unaryInterceptors, streamInterceptors)
236242

237243
d.Server = drpcserver.NewWithOptions(mux, drpcserver.Options{
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "drpcinterceptor",
5+
srcs = ["drpc_interceptor.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/util/tracing/drpcinterceptor",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/util/ctxutil",
10+
"//pkg/util/tracing",
11+
"//pkg/util/tracing/tracingutil",
12+
"@io_opentelemetry_go_otel//attribute",
13+
"@io_opentelemetry_go_otel//codes",
14+
"@io_storj_drpc//:drpc",
15+
"@io_storj_drpc//drpcclient",
16+
"@io_storj_drpc//drpcerr",
17+
"@io_storj_drpc//drpcmetadata",
18+
"@io_storj_drpc//drpcmux",
19+
],
20+
)
21+
22+
go_test(
23+
name = "drpcinterceptor_test",
24+
srcs = ["drpc_interceptor_test.go"],
25+
deps = [
26+
":drpcinterceptor",
27+
"//pkg/testutils",
28+
"//pkg/util",
29+
"//pkg/util/leaktest",
30+
"//pkg/util/protoutil",
31+
"//pkg/util/tracing",
32+
"//pkg/util/tracing/tracingpb",
33+
"//pkg/util/tracing/tracingutil",
34+
"@com_github_cockroachdb_errors//:errors",
35+
"@com_github_gogo_protobuf//types",
36+
"@com_github_stretchr_testify//require",
37+
"@io_storj_drpc//:drpc",
38+
"@io_storj_drpc//drpcclient",
39+
"@io_storj_drpc//drpcconn",
40+
"@io_storj_drpc//drpcmux",
41+
"@io_storj_drpc//drpcserver",
42+
],
43+
)

0 commit comments

Comments
 (0)