Skip to content

Commit ff23260

Browse files
rpc,server,tracing: refactor logic of tracing interceptors
These changes are part of DRPC migration. As part of a plan to implement DRPC compliant interceptors, the common logic of tracing interceptors are moved to a common interceptorutil module. This enables us to reuse tracing logic in drpc interceptors. Epic: none Part of: CRDB-49356 Release note: None
1 parent e5d1c94 commit ff23260

File tree

15 files changed

+160
-104
lines changed

15 files changed

+160
-104
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2754,6 +2754,7 @@ GO_TARGETS = [
27542754
"//pkg/util/tracing/tracingpb:tracingpb",
27552755
"//pkg/util/tracing/tracingservicepb:tracingservicepb",
27562756
"//pkg/util/tracing/tracingui:tracingui",
2757+
"//pkg/util/tracing/tracingutil:tracingutil",
27572758
"//pkg/util/tracing/zipper:zipper",
27582759
"//pkg/util/tracing:tracing",
27592760
"//pkg/util/tracing:tracing_test",

pkg/rpc/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ go_library(
6565
"//pkg/util/timeutil",
6666
"//pkg/util/tracing",
6767
"//pkg/util/tracing/grpcinterceptor",
68+
"//pkg/util/tracing/tracingutil",
6869
"//pkg/util/uuid",
6970
"@com_github_cockroachdb_errors//:errors",
7071
"@com_github_cockroachdb_logtags//:logtags",

pkg/rpc/context.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4040
"github.com/cockroachdb/cockroach/pkg/util/tracing"
4141
"github.com/cockroachdb/cockroach/pkg/util/tracing/grpcinterceptor"
42+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingutil"
4243
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4344
"github.com/cockroachdb/errors"
4445
"github.com/cockroachdb/logtags"
@@ -728,7 +729,7 @@ func makeInternalClientAdapter(
728729
batchServerHandler := chainUnaryServerInterceptors(
729730
&grpc.UnaryServerInfo{
730731
Server: server,
731-
FullMethod: grpcinterceptor.BatchMethodName,
732+
FullMethod: tracingutil.BatchMethodName,
732733
},
733734
serverUnaryInterceptors,
734735
func(ctx context.Context, req interface{}) (interface{}, error) {
@@ -812,7 +813,7 @@ func makeInternalClientAdapter(
812813
ctx = tracing.ContextWithSpan(ctx, nil)
813814
}
814815

815-
err := batchClientHandler(ctx, grpcinterceptor.BatchMethodName, ba, reply, nil /* ClientConn */)
816+
err := batchClientHandler(ctx, tracingutil.BatchMethodName, ba, reply, nil /* ClientConn */)
816817
return reply, err
817818
},
818819
}

pkg/server/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,11 @@ go_library(
342342
"//pkg/util/timeutil/ptp",
343343
"//pkg/util/tracing",
344344
"//pkg/util/tracing/collector",
345-
"//pkg/util/tracing/grpcinterceptor",
346345
"//pkg/util/tracing/service",
347346
"//pkg/util/tracing/tracingpb",
348347
"//pkg/util/tracing/tracingservicepb",
349348
"//pkg/util/tracing/tracingui",
349+
"//pkg/util/tracing/tracingutil",
350350
"//pkg/util/tracing/zipper",
351351
"//pkg/util/uint128",
352352
"//pkg/util/unique",

pkg/server/node.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ import (
7474
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
7575
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
7676
"github.com/cockroachdb/cockroach/pkg/util/tracing"
77-
"github.com/cockroachdb/cockroach/pkg/util/tracing/grpcinterceptor"
7877
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
78+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingutil"
7979
"github.com/cockroachdb/cockroach/pkg/util/unique"
8080
"github.com/cockroachdb/cockroach/pkg/util/uuid"
8181
"github.com/cockroachdb/crlib/crtime"
@@ -2004,7 +2004,7 @@ func setupSpanForIncomingRPC(
20042004
// request that didn't specify tracing information. We make a child span
20052005
// if the incoming request would like to be traced.
20062006
ctx, newSpan = tracing.ChildSpan(ctx,
2007-
grpcinterceptor.BatchMethodName, tracing.WithServerSpanKind)
2007+
tracingutil.BatchMethodName, tracing.WithServerSpanKind)
20082008
} else {
20092009
// Non-local call. Tracing information comes from the request proto.
20102010

@@ -2016,7 +2016,7 @@ func setupSpanForIncomingRPC(
20162016
}
20172017

20182018
ctx, newSpan = tr.StartSpanCtx(
2019-
ctx, grpcinterceptor.BatchMethodName,
2019+
ctx, tracingutil.BatchMethodName,
20202020
tracing.WithRemoteParentFromTraceInfo(ba.TraceInfo),
20212021
tracing.WithServerSpanKind)
20222022
}

pkg/sql/distsql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ go_library(
3333
"//pkg/util/tochar",
3434
"//pkg/util/tracing",
3535
"//pkg/util/tracing/grpcinterceptor",
36+
"//pkg/util/tracing/tracingutil",
3637
"@com_github_cockroachdb_errors//:errors",
3738
"@com_github_cockroachdb_logtags//:logtags",
3839
"@com_github_cockroachdb_redact//:redact",

pkg/sql/distsql/server.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/util/tochar"
3838
"github.com/cockroachdb/cockroach/pkg/util/tracing"
3939
"github.com/cockroachdb/cockroach/pkg/util/tracing/grpcinterceptor"
40+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingutil"
4041
"github.com/cockroachdb/errors"
4142
"github.com/cockroachdb/logtags"
4243
"github.com/cockroachdb/redact"
@@ -599,13 +600,13 @@ func (ds *ServerImpl) setupSpanForIncomingRPC(
599600
// It's not expected to have a span in the context since the gRPC server
600601
// interceptor that generally opens spans exempts this particular RPC. Note
601602
// that this method is not called for flows local to the gateway.
602-
return tr.StartSpanCtx(ctx, grpcinterceptor.SetupFlowMethodName,
603+
return tr.StartSpanCtx(ctx, tracingutil.SetupFlowMethodName,
603604
tracing.WithParent(parentSpan),
604605
tracing.WithServerSpanKind)
605606
}
606607

607608
if !req.TraceInfo.Empty() {
608-
return tr.StartSpanCtx(ctx, grpcinterceptor.SetupFlowMethodName,
609+
return tr.StartSpanCtx(ctx, tracingutil.SetupFlowMethodName,
609610
tracing.WithRemoteParentFromTraceInfo(req.TraceInfo),
610611
tracing.WithServerSpanKind)
611612
}
@@ -615,7 +616,7 @@ func (ds *ServerImpl) setupSpanForIncomingRPC(
615616
if err != nil {
616617
log.Warningf(ctx, "error extracting tracing info from gRPC: %s", err)
617618
}
618-
return tr.StartSpanCtx(ctx, grpcinterceptor.SetupFlowMethodName,
619+
return tr.StartSpanCtx(ctx, tracingutil.SetupFlowMethodName,
619620
tracing.WithRemoteParentFromSpanMeta(remoteParent),
620621
tracing.WithServerSpanKind)
621622
}

pkg/util/tracing/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ go_test(
8787
"//pkg/util/log",
8888
"//pkg/util/randutil",
8989
"//pkg/util/timeutil",
90-
"//pkg/util/tracing/grpcinterceptor",
9190
"//pkg/util/tracing/tracingpb",
91+
"//pkg/util/tracing/tracingutil",
9292
"@com_github_cockroachdb_errors//:errors",
9393
"@com_github_cockroachdb_logtags//:logtags",
9494
"@com_github_gogo_protobuf//proto",

pkg/util/tracing/grpcinterceptor/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"//pkg/util/ctxutil",
1010
"//pkg/util/grpcutil",
1111
"//pkg/util/tracing",
12+
"//pkg/util/tracing/tracingutil",
1213
"@com_github_cockroachdb_errors//:errors",
1314
"@io_opentelemetry_go_otel//attribute",
1415
"@io_opentelemetry_go_otel//codes",
@@ -30,6 +31,7 @@ go_test(
3031
"//pkg/util/stop",
3132
"//pkg/util/tracing",
3233
"//pkg/util/tracing/tracingpb",
34+
"//pkg/util/tracing/tracingutil",
3335
"@com_github_cockroachdb_errors//:errors",
3436
"@com_github_gogo_protobuf//types",
3537
"@com_github_stretchr_testify//require",

pkg/util/tracing/grpcinterceptor/grpc_interceptor.go

Lines changed: 28 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/util/ctxutil"
1414
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
1515
"github.com/cockroachdb/cockroach/pkg/util/tracing"
16+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingutil"
1617
"github.com/cockroachdb/errors"
1718
"go.opentelemetry.io/otel/attribute"
1819
"go.opentelemetry.io/otel/codes"
@@ -33,43 +34,6 @@ func ExtractSpanMetaFromGRPCCtx(
3334
return tracer.ExtractMetaFrom(tracing.MetadataCarrier{MD: md})
3435
}
3536

36-
// setGRPCErrorTag sets an error tag on the span.
37-
func setGRPCErrorTag(sp *tracing.Span, err error) {
38-
if err == nil {
39-
return
40-
}
41-
s, _ := status.FromError(err)
42-
sp.SetTag("response_code", attribute.IntValue(int(codes.Error)))
43-
sp.SetOtelStatus(codes.Error, s.Message())
44-
}
45-
46-
// BatchMethodName is the method name of Internal.Batch RPC.
47-
const BatchMethodName = "/cockroach.roachpb.Internal/Batch"
48-
49-
// BatchStreamMethodName is the method name of the Internal.BatchStream RPC.
50-
const BatchStreamMethodName = "/cockroach.roachpb.Internal/BatchStream"
51-
52-
// sendKVBatchMethodName is the method name for adminServer.SendKVBatch.
53-
const sendKVBatchMethodName = "/cockroach.server.serverpb.Admin/SendKVBatch"
54-
55-
// SetupFlowMethodName is the method name of DistSQL.SetupFlow RPC.
56-
const SetupFlowMethodName = "/cockroach.sql.distsqlrun.DistSQL/SetupFlow"
57-
const flowStreamMethodName = "/cockroach.sql.distsqlrun.DistSQL/FlowStream"
58-
59-
// methodExcludedFromTracing returns true if a call to the given RPC method does
60-
// not need to propagate tracing info. Some RPCs (Internal.Batch,
61-
// DistSQL.SetupFlow) have dedicated fields for passing along the tracing
62-
// context in the request, which is more efficient than letting the RPC
63-
// interceptors deal with it. Others (DistSQL.FlowStream) are simply exempt from
64-
// tracing because it's not worth it.
65-
func methodExcludedFromTracing(method string) bool {
66-
return method == BatchMethodName ||
67-
method == BatchStreamMethodName ||
68-
method == sendKVBatchMethodName ||
69-
method == SetupFlowMethodName ||
70-
method == flowStreamMethodName
71-
}
72-
7337
// ServerInterceptor returns a grpc.UnaryServerInterceptor suitable
7438
// for use in a grpc.NewServer call.
7539
//
@@ -92,7 +56,7 @@ func ServerInterceptor(tracer *tracing.Tracer) grpc.UnaryServerInterceptor {
9256
info *grpc.UnaryServerInfo,
9357
handler grpc.UnaryHandler,
9458
) (interface{}, error) {
95-
if methodExcludedFromTracing(info.FullMethod) {
59+
if tracingutil.MethodExcludedFromTracing(info.FullMethod) {
9660
return handler(ctx, req)
9761
}
9862

@@ -139,7 +103,7 @@ func ServerInterceptor(tracer *tracing.Tracer) grpc.UnaryServerInterceptor {
139103
// application-specific gRPC handler(s) to access.
140104
func StreamServerInterceptor(tracer *tracing.Tracer) grpc.StreamServerInterceptor {
141105
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
142-
if methodExcludedFromTracing(info.FullMethod) {
106+
if tracingutil.MethodExcludedFromTracing(info.FullMethod) {
143107
return handler(srv, ss)
144108
}
145109
spanMeta, err := ExtractSpanMetaFromGRPCCtx(ss.Context(), tracer)
@@ -188,19 +152,6 @@ func (ss *tracingServerStream) Context() context.Context {
188152
return ss.ctx
189153
}
190154

191-
func injectSpanMeta(
192-
ctx context.Context, tracer *tracing.Tracer, clientSpan *tracing.Span,
193-
) context.Context {
194-
md, ok := metadata.FromOutgoingContext(ctx)
195-
if !ok {
196-
md = metadata.New(nil)
197-
} else {
198-
md = md.Copy()
199-
}
200-
tracer.InjectMetaInto(clientSpan.Meta(), tracing.MetadataCarrier{MD: md})
201-
return metadata.NewOutgoingContext(ctx, md)
202-
}
203-
204155
// ClientInterceptor returns a grpc.UnaryClientInterceptor suitable
205156
// for use in a grpc.Dial call.
206157
//
@@ -229,17 +180,13 @@ func ClientInterceptor(
229180
invoker grpc.UnaryInvoker,
230181
opts ...grpc.CallOption,
231182
) error {
232-
// Local RPCs don't need any special tracing, since the caller's context
233-
// will be used on the "server".
234-
_, localRequest := grpcutil.IsLocalRequestContext(ctx)
235-
if localRequest {
236-
return invoker(ctx, method, req, resp, cc, opts...)
237-
}
238-
parent := tracing.SpanFromContext(ctx)
239-
if !tracing.SpanInclusionFuncForClient(parent) {
183+
skipTracing := tracingutil.ShouldSkipClientTracing(ctx)
184+
if skipTracing {
240185
return invoker(ctx, method, req, resp, cc, opts...)
241186
}
242187

188+
// Create clientSpan here after determining that we shouldn't skip tracing
189+
parent := tracing.SpanFromContext(ctx)
243190
clientSpan := tracer.StartSpan(
244191
method,
245192
tracing.WithParent(parent),
@@ -248,11 +195,11 @@ func ClientInterceptor(
248195
init(clientSpan)
249196
defer clientSpan.Finish()
250197

251-
// For most RPCs we pass along tracing info as gRPC metadata. Some select
252-
// RPCs carry the tracing in the request protos, which is more efficient.
253-
if !methodExcludedFromTracing(method) {
254-
ctx = injectSpanMeta(ctx, tracer, clientSpan)
198+
// For most RPCs we pass along tracing info as metadata
199+
if !tracingutil.MethodExcludedFromTracing(method) {
200+
ctx = tracingutil.InjectSpanMeta(ctx, tracer, clientSpan)
255201
}
202+
256203
if invoker != nil {
257204
err := invoker(ctx, method, req, resp, cc, opts...)
258205
if err != nil {
@@ -294,27 +241,23 @@ func StreamClientInterceptor(
294241
streamer grpc.Streamer,
295242
opts ...grpc.CallOption,
296243
) (grpc.ClientStream, error) {
297-
// Local RPCs don't need any special tracing, since the caller's context
298-
// will be used on the "server".
299-
_, localRequest := grpcutil.IsLocalRequestContext(ctx)
300-
if localRequest {
301-
return streamer(ctx, desc, cc, method, opts...)
302-
}
303-
parent := tracing.SpanFromContext(ctx)
304-
if !tracing.SpanInclusionFuncForClient(parent) {
244+
skipTracing := tracingutil.ShouldSkipClientTracing(ctx)
245+
if skipTracing {
305246
return streamer(ctx, desc, cc, method, opts...)
306247
}
307248

308-
// Create a span that will live for the life of the stream.
249+
// Create clientSpan here after determining that we shouldn't skip tracing
250+
parent := tracing.SpanFromContext(ctx)
309251
clientSpan := tracer.StartSpan(
310252
method,
311253
tracing.WithParent(parent),
312254
tracing.WithClientSpanKind,
313255
)
314256
init(clientSpan)
315257

316-
if !methodExcludedFromTracing(method) {
317-
ctx = injectSpanMeta(ctx, tracer, clientSpan)
258+
// For most RPCs we pass along tracing info as metadata
259+
if !tracingutil.MethodExcludedFromTracing(method) {
260+
ctx = tracingutil.InjectSpanMeta(ctx, tracer, clientSpan)
318261
}
319262

320263
cs, err := streamer(ctx, desc, cc, method, opts...)
@@ -421,3 +364,13 @@ func (cs *tracingClientStream) CloseSend() error {
421364
}
422365
return errors.Wrap(err, "close send error")
423366
}
367+
368+
// setGRPCErrorTag sets an error tag on the span.
369+
func setGRPCErrorTag(sp *tracing.Span, err error) {
370+
if err == nil {
371+
return
372+
}
373+
s, _ := status.FromError(err)
374+
sp.SetTag("response_code", attribute.IntValue(int(codes.Error)))
375+
sp.SetOtelStatus(codes.Error, s.Message())
376+
}

0 commit comments

Comments
 (0)