Skip to content

Commit 314c8bc

Browse files
authored
Merge pull request dapr#8649 from cicoyle/feat-baggage
Feat add baggage support
2 parents ede2057 + 4579e61 commit 314c8bc

File tree

23 files changed

+1770
-96
lines changed

23 files changed

+1770
-96
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ require (
1111
github.com/argoproj/argo-rollouts v1.4.1
1212
github.com/cenkalti/backoff/v4 v4.3.0
1313
github.com/cloudevents/sdk-go/v2 v2.15.2
14-
github.com/dapr/components-contrib v1.15.1-0.20250409220637-70c99725fd12
14+
github.com/dapr/components-contrib v1.15.1-0.20250423123324-b2c31ceba20b
1515
github.com/dapr/durabletask-go v0.6.5
1616
github.com/dapr/kit v0.15.2
1717
github.com/diagridio/go-etcd-cron v0.6.2
@@ -286,7 +286,7 @@ require (
286286
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
287287
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
288288
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
289-
github.com/hamba/avro/v2 v2.25.1 // indirect
289+
github.com/hamba/avro/v2 v2.28.0 // indirect
290290
github.com/hashicorp/consul/api v1.25.1 // indirect
291291
github.com/hashicorp/errwrap v1.1.0 // indirect
292292
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -502,8 +502,8 @@ github.com/dancannon/gorethink v4.0.0+incompatible h1:KFV7Gha3AuqT+gr0B/eKvGhbjm
502502
github.com/dancannon/gorethink v4.0.0+incompatible/go.mod h1:BLvkat9KmZc1efyYwhz3WnybhRZtgF1K929FD8z1avU=
503503
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
504504
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
505-
github.com/dapr/components-contrib v1.15.1-0.20250409220637-70c99725fd12 h1:VETULsWb8KIjzuqmEiXCv6BHw0QVgkjMkP7tztIv9NI=
506-
github.com/dapr/components-contrib v1.15.1-0.20250409220637-70c99725fd12/go.mod h1:0yFCUv1c6TMWvC6sQYdv0FMWmdCeEpWGE8AYp9cb6Ic=
505+
github.com/dapr/components-contrib v1.15.1-0.20250423123324-b2c31ceba20b h1:u1xUF4CuNsTlv8pZsrX6XH1FolmMCjJ0Kyfef0wJkzY=
506+
github.com/dapr/components-contrib v1.15.1-0.20250423123324-b2c31ceba20b/go.mod h1:mDA9c8w/KGAOx18yCTaMXHkfWS6mFHR/o3FjI6a14dA=
507507
github.com/dapr/durabletask-go v0.6.5 h1:aWcxMfYudojpgRjJRdUr7yyZ7rGcvLtWXUuA4cGHBR0=
508508
github.com/dapr/durabletask-go v0.6.5/go.mod h1:nTZ5fCbJLnZbVdi6Z2YxdDF1OgQZL3LroogGuetrwuA=
509509
github.com/dapr/kit v0.15.2 h1:5H9IhKScU/SpE2Hxvr5vUlmYN1e2MJN15RoT8/KSziU=
@@ -946,8 +946,8 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8
946946
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
947947
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
948948
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
949-
github.com/hamba/avro/v2 v2.25.1 h1:t8cOyv0wkNAPF6/khArMtR0nK9HtGa+WKbp9q+KdFZQ=
950-
github.com/hamba/avro/v2 v2.25.1/go.mod h1:I8glyswHnpED3Nlx2ZdUe+4LJnCOOyiCzLMno9i/Uu0=
949+
github.com/hamba/avro/v2 v2.28.0 h1:E8J5D27biyAulWKNiEBhV85QPc9xRMCUCGJewS0KYCE=
950+
github.com/hamba/avro/v2 v2.28.0/go.mod h1:9TVrlt1cG1kkTUtm9u2eO5Qb7rZXlYzoKqPt8TSH+TA=
951951
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
952952
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
953953
github.com/hashicorp/consul/api v1.25.1 h1:CqrdhYzc8XZuPnhIYZWH45toM0LB9ZeYr/gvpLVI3PE=

pkg/api/grpc/grpc.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ import (
2222
"net/http"
2323
"slices"
2424
"strconv"
25+
"strings"
2526
"sync"
2627
"sync/atomic"
2728
"time"
2829

30+
otelbaggage "go.opentelemetry.io/otel/baggage"
2931
otelTrace "go.opentelemetry.io/otel/trace"
3032
"google.golang.org/grpc"
3133
"google.golang.org/grpc/codes"
@@ -47,6 +49,7 @@ import (
4749
stateLoader "github.com/dapr/dapr/pkg/components/state"
4850
"github.com/dapr/dapr/pkg/config"
4951
diag "github.com/dapr/dapr/pkg/diagnostics"
52+
diagConsts "github.com/dapr/dapr/pkg/diagnostics/consts"
5053
diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
5154
"github.com/dapr/dapr/pkg/encryption"
5255
"github.com/dapr/dapr/pkg/messages"
@@ -505,23 +508,33 @@ func (a *api) InvokeBinding(ctx context.Context, in *runtimev1pb.InvokeBindingRe
505508
sc := span.SpanContext()
506509
tp := diag.SpanContextToW3CString(sc)
507510
if span != nil {
508-
if _, ok := req.Metadata[diag.TraceparentHeader]; !ok {
509-
req.Metadata[diag.TraceparentHeader] = tp
511+
if _, ok := req.Metadata[diagConsts.TraceparentHeader]; !ok {
512+
req.Metadata[diagConsts.TraceparentHeader] = tp
510513
}
511-
if _, ok := req.Metadata[diag.TracestateHeader]; !ok {
514+
if _, ok := req.Metadata[diagConsts.TracestateHeader]; !ok {
512515
if sc.TraceState().Len() > 0 {
513-
req.Metadata[diag.TracestateHeader] = diag.TraceStateToW3CString(sc)
516+
req.Metadata[diagConsts.TracestateHeader] = diag.TraceStateToW3CString(sc)
514517
}
515518
}
516519
}
517520

518521
// Allow for distributed tracing by passing context metadata.
519522
if incomingMD, ok := metadata.FromIncomingContext(ctx); ok {
523+
if baggageValues := incomingMD[diagConsts.BaggageHeader]; len(baggageValues) > 0 {
524+
baggageString := strings.Join(baggageValues, ",")
525+
baggage, err := otelbaggage.Parse(baggageString)
526+
if err != nil {
527+
return nil, err
528+
}
529+
ctx = otelbaggage.ContextWithBaggage(ctx, baggage)
530+
req.Metadata[diagConsts.BaggageHeader] = baggageString
531+
}
532+
520533
for key, val := range incomingMD {
521534
sanitizedKey := invokev1.ReservedGRPCMetadataToDaprPrefixHeader(key)
522535
// Not to overwrite the existing metadata
523536
// But if the key is traceparent or tracestate, we allow overwrite the existing metadata.
524-
if _, exist := req.Metadata[sanitizedKey]; !exist || (key == diag.TraceparentHeader || key == diag.TracestateHeader) {
537+
if _, exist := req.Metadata[sanitizedKey]; !exist || (key == diagConsts.TraceparentHeader || key == diagConsts.TracestateHeader) {
525538
req.Metadata[sanitizedKey] = val[0]
526539
}
527540
}

pkg/api/grpc/proxy/handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/dapr/dapr/pkg/api/grpc/proxy/codec"
2020
"github.com/dapr/dapr/pkg/diagnostics"
21+
diagConsts "github.com/dapr/dapr/pkg/diagnostics/consts"
2122
"github.com/dapr/dapr/pkg/resiliency"
2223
"github.com/dapr/kit/utils"
2324
)
@@ -96,7 +97,7 @@ func (s *handler) handler(srv any, serverStream grpc.ServerStream) error {
9697
// Fetch the AppId so we can reference it for resiliency.
9798
ctx := serverStream.Context()
9899
md, _ := metadata.FromIncomingContext(ctx)
99-
v := md[diagnostics.GRPCProxyAppIDKey]
100+
v := md[diagConsts.GRPCProxyAppIDKey]
100101

101102
// The app id check is handled in the StreamDirector. If we don't have it here, we just use a NoOp policy since we know the request is impossible.
102103
var policyDef *resiliency.PolicyDefinition

pkg/api/grpc/proxy/handler_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
pb "github.com/dapr/dapr/pkg/api/grpc/proxy/testservice"
4545
"github.com/dapr/dapr/pkg/config"
4646
diag "github.com/dapr/dapr/pkg/diagnostics"
47+
diagConsts "github.com/dapr/dapr/pkg/diagnostics/consts"
4748
"github.com/dapr/dapr/pkg/resiliency"
4849
"github.com/dapr/kit/logger"
4950
"github.com/dapr/kit/retry"
@@ -440,7 +441,7 @@ func (s *proxyTestSuite) TestResiliencyUnary() {
440441

441442
ctx, cancel := s.ctx()
442443
defer cancel()
443-
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(diag.GRPCProxyAppIDKey, testAppID))
444+
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(diagConsts.GRPCProxyAppIDKey, testAppID))
444445

445446
// Reset callCount before this test
446447
s.service.pingCallCount.Store(0)
@@ -476,7 +477,7 @@ func (s *proxyTestSuite) TestResiliencyUnary() {
476477

477478
setupMetrics(s)
478479

479-
ctx := metadata.NewOutgoingContext(t.Context(), metadata.Pairs(diag.GRPCProxyAppIDKey, testAppID))
480+
ctx := metadata.NewOutgoingContext(t.Context(), metadata.Pairs(diagConsts.GRPCProxyAppIDKey, testAppID))
480481

481482
_, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: message})
482483
require.Error(t, err, "Ping should fail due to timeouts")
@@ -515,7 +516,7 @@ func (s *proxyTestSuite) TestResiliencyUnary() {
515516
go func(i int) {
516517
for j := range numOperations {
517518
pingMsg := fmt.Sprintf("%d:%d", i, j)
518-
ctx := metadata.NewOutgoingContext(t.Context(), metadata.Pairs(diag.GRPCProxyAppIDKey, testAppID))
519+
ctx := metadata.NewOutgoingContext(t.Context(), metadata.Pairs(diagConsts.GRPCProxyAppIDKey, testAppID))
519520
res, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: pingMsg})
520521
require.NoErrorf(t, err, "Ping should succeed for operation %d:%d", i, j)
521522
require.NotNilf(t, res, "Response should not be nil for operation %d:%d", i, j)
@@ -584,7 +585,7 @@ func (s *proxyTestSuite) TestResiliencyStreaming() {
584585
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
585586
defer cancel()
586587
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(
587-
diag.GRPCProxyAppIDKey, "test",
588+
diagConsts.GRPCProxyAppIDKey, "test",
588589
"dapr-test", t.Name(),
589590
))
590591

@@ -622,7 +623,7 @@ func (s *proxyTestSuite) TestResiliencyStreaming() {
622623
setupMetrics(s)
623624

624625
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(
625-
diag.GRPCProxyAppIDKey, testAppID,
626+
diagConsts.GRPCProxyAppIDKey, testAppID,
626627
StreamMetadataKey, "1",
627628
"dapr-test", t.Name(),
628629
))
@@ -673,7 +674,7 @@ func (s *proxyTestSuite) TestResiliencyStreaming() {
673674
setupMetrics(s)
674675

675676
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(
676-
diag.GRPCProxyAppIDKey, testAppID,
677+
diagConsts.GRPCProxyAppIDKey, testAppID,
677678
StreamMetadataKey, "1",
678679
"dapr-test", t.Name(),
679680
))

pkg/api/http/http.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/go-chi/chi/v5"
2929
"github.com/mitchellh/mapstructure"
30+
otelBaggage "go.opentelemetry.io/otel/baggage"
3031
"go.opentelemetry.io/otel/trace"
3132

3233
"github.com/dapr/components-contrib/bindings"
@@ -410,7 +411,8 @@ func (a *api) onOutputBindingMessage(w nethttp.ResponseWriter, r *nethttp.Reques
410411
return
411412
}
412413

413-
b, err := json.Marshal(req.Data)
414+
var b []byte
415+
b, err = json.Marshal(req.Data)
414416
if err != nil {
415417
resp := messages.NewAPIErrorHTTP(fmt.Sprintf(messages.ErrMalformedRequestData, err), errorcodes.CommonMalformedRequestData, nethttp.StatusInternalServerError)
416418
respondWithError(w, resp)
@@ -433,6 +435,19 @@ func (a *api) onOutputBindingMessage(w nethttp.ResponseWriter, r *nethttp.Reques
433435
}
434436
}
435437

438+
if baggageHeaders := r.Header.Values(diagConsts.BaggageHeader); len(baggageHeaders) > 0 {
439+
baggageString := strings.Join(baggageHeaders, ",")
440+
if _, err = otelBaggage.Parse(baggageString); err != nil {
441+
resp := messages.NewAPIErrorHTTP(fmt.Sprintf("invalid baggage header: %v", err), errorcodes.CommonMalformedRequest, nethttp.StatusBadRequest)
442+
respondWithError(w, resp)
443+
return
444+
}
445+
if req.Metadata == nil {
446+
req.Metadata = map[string]string{}
447+
}
448+
req.Metadata[diagConsts.BaggageHeader] = baggageString
449+
}
450+
436451
start := time.Now()
437452
resp, err := a.sendToOutputBindingFn(r.Context(), name, &bindings.InvokeRequest{
438453
Metadata: req.Metadata,

pkg/diagnostics/consts/consts.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,21 @@ const (
6161
// Keys used in the context's metadata for streaming calls
6262
// Note: these keys must always be all-lowercase
6363
DaprCallLocalStreamMethodKey = "__dapr_calllocalstream_method"
64+
65+
// We have leveraged the code from opencensus-go plugin to adhere the w3c trace context.
66+
// Reference : https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ochttp/propagation/tracecontext/propagation.go
67+
// Trace context headers
68+
TraceparentHeader = "traceparent"
69+
TracestateHeader = "tracestate"
70+
BaggageHeader = "baggage"
71+
72+
GRPCTraceContextKey = "grpc-trace-bin"
73+
GRPCProxyAppIDKey = "dapr-app-id"
74+
75+
// Trace sampling constants
76+
SupportedVersion = 0
77+
MaxVersion = 254
78+
MaxTracestateLen = 512
6479
)
6580

6681
// GrpcAppendSpanAttributesFn is the interface that applies to gRPC requests that add span attributes.

pkg/diagnostics/grpc_monitoring.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"google.golang.org/protobuf/proto"
2626

2727
"github.com/dapr/dapr/pkg/api/grpc/metadata"
28+
diagConsts "github.com/dapr/dapr/pkg/diagnostics/consts"
2829
diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
2930
)
3031

@@ -264,7 +265,7 @@ func (g *grpcMetrics) StreamingServerInterceptor() grpc.StreamServerInterceptor
264265
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
265266
ctx := ss.Context()
266267
md, _ := metadata.FromIncomingContext(ctx)
267-
vals, ok := md[GRPCProxyAppIDKey]
268+
vals, ok := md[diagConsts.GRPCProxyAppIDKey]
268269
if !ok || len(vals) == 0 {
269270
return handler(srv, ss)
270271
}
@@ -285,7 +286,7 @@ func (g *grpcMetrics) StreamingClientInterceptor() grpc.StreamServerInterceptor
285286
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
286287
ctx := ss.Context()
287288
md, _ := metadata.FromIncomingContext(ctx)
288-
vals, ok := md[GRPCProxyAppIDKey]
289+
vals, ok := md[diagConsts.GRPCProxyAppIDKey]
289290
if !ok || len(vals) == 0 {
290291
return handler(srv, ss)
291292
}

pkg/diagnostics/grpc_monitoring_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (f *fakeProxyStream) Context() context.Context {
4141
}
4242

4343
ctx := context.Background()
44-
ctx = grpcMetadata.NewIncomingContext(ctx, grpcMetadata.New(map[string]string{GRPCProxyAppIDKey: f.appID}))
44+
ctx = grpcMetadata.NewIncomingContext(ctx, grpcMetadata.New(map[string]string{"dapr-app-id": f.appID}))
4545
ctx, _ = metadata.SetMetadataInTapHandle(ctx, nil)
4646
return ctx
4747
}

0 commit comments

Comments
 (0)