Skip to content

Commit 50be4b8

Browse files
authored
Add workflow.GetSpanContext and workflow.WithSpanContext API (#1423)
Detailed Description GetSpanContext API is used for fetch span context from workflow if it exists WithSpanContext() API is used for overriding the existing span context in the workflow so it could be passed to activities and child workflows Impact Analysis Backward Compatibility: Yes, they are new APIs Forward Compatibility: Yes, WithSpanContext would write the existing span context field and it's readable for old client versions Testing Plan Unit Tests: No Persistence Tests: No Integration Tests: Yes Compatibility Tests: No Rollout Plan What is the rollout plan? new release Does the order of deployment matter? no Is it safe to rollback? Does the order of rollback matter? Safe Is there a kill switch to mitigate the impact immediately? Rollback to previous version and remove usages of the new APIs Why? Users can now access baggage items in the workflow code.
1 parent a061890 commit 50be4b8

File tree

7 files changed

+158
-12
lines changed

7 files changed

+158
-12
lines changed

internal/context.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,14 +486,14 @@ func (c *valueCtx) Value(key interface{}) interface{} {
486486
return c.Context.Value(key)
487487
}
488488

489-
func spanFromContext(ctx Context) opentracing.SpanContext {
489+
func GetSpanContext(ctx Context) opentracing.SpanContext {
490490
val := ctx.Value(activeSpanContextKey)
491491
if sp, ok := val.(opentracing.SpanContext); ok {
492492
return sp
493493
}
494494
return nil
495495
}
496496

497-
func contextWithSpan(ctx Context, spanContext opentracing.SpanContext) Context {
497+
func WithSpanContext(ctx Context, spanContext opentracing.SpanContext) Context {
498498
return WithValue(ctx, activeSpanContextKey, spanContext)
499499
}

internal/tracer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (t *tracingContextPropagator) InjectFromWorkflow(
103103
hw HeaderWriter,
104104
) error {
105105
// retrieve span from context object
106-
spanContext := spanFromContext(ctx)
106+
spanContext := GetSpanContext(ctx)
107107
if spanContext == nil {
108108
return nil
109109
}
@@ -119,5 +119,5 @@ func (t *tracingContextPropagator) ExtractToWorkflow(
119119
// did not find a tracing span, just return the current context
120120
return ctx, nil
121121
}
122-
return contextWithSpan(ctx, spanContext), nil
122+
return WithSpanContext(ctx, spanContext), nil
123123
}

internal/tracer_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) {
8282

8383
span := tracer.StartSpan("test-operation")
8484
assert.NotNil(t, span.Context())
85-
ctx := contextWithSpan(Background(), span.Context())
85+
ctx := WithSpanContext(Background(), span.Context())
8686
header := &shared.Header{
8787
Fields: map[string][]byte{},
8888
}
@@ -96,9 +96,9 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) {
9696
returnCtx2, err := ctxProp.ExtractToWorkflow(Background(), NewHeaderReader(header))
9797
require.NoError(t, err)
9898

99-
newSpanContext := spanFromContext(returnCtx)
99+
newSpanContext := GetSpanContext(returnCtx)
100100
assert.NotNil(t, newSpanContext)
101-
newSpanContext2 := spanFromContext(returnCtx2)
101+
newSpanContext2 := GetSpanContext(returnCtx2)
102102
assert.NotNil(t, newSpanContext2)
103103
assert.Equal(t, newSpanContext2, newSpanContext)
104104
}
@@ -130,7 +130,7 @@ func TestConsistentInjectionExtraction(t *testing.T) {
130130
var baggageVal = "e30="
131131
span.SetBaggageItem("request-tenancy", baggageVal)
132132
assert.NotNil(t, span.Context())
133-
ctx := contextWithSpan(Background(), span.Context())
133+
ctx := WithSpanContext(Background(), span.Context())
134134
header := &shared.Header{
135135
Fields: map[string][]byte{},
136136
}
@@ -140,7 +140,7 @@ func TestConsistentInjectionExtraction(t *testing.T) {
140140
extractedCtx, err := ctxProp.ExtractToWorkflow(Background(), NewHeaderReader(header))
141141
require.NoError(t, err)
142142

143-
extractedSpanContext := spanFromContext(extractedCtx)
143+
extractedSpanContext := GetSpanContext(extractedCtx)
144144
extractedSpanContext.ForeachBaggageItem(func(k, v string) bool {
145145
if k == "request-tenancy" {
146146
assert.Equal(t, v, baggageVal)

test/activity_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import (
2828
"sync"
2929
"time"
3030

31+
"github.com/opentracing/opentracing-go"
32+
"github.com/opentracing/opentracing-go/mocktracer"
33+
3134
"go.uber.org/cadence"
3235
"go.uber.org/cadence/activity"
3336
"go.uber.org/cadence/worker"
@@ -37,6 +40,7 @@ type Activities struct {
3740
mu sync.Mutex
3841
invocations []string
3942
activities2 *Activities2
43+
tracer opentracing.Tracer
4044
}
4145

4246
type Activities2 struct {
@@ -47,7 +51,7 @@ var errFailOnPurpose = cadence.NewCustomError("failing-on-purpose")
4751

4852
func newActivities() *Activities {
4953
activities2 := &Activities2{}
50-
result := &Activities{activities2: activities2}
54+
result := &Activities{activities2: activities2, tracer: mocktracer.New()}
5155
activities2.impl = result
5256
return result
5357
}
@@ -146,6 +150,13 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, domain, taskList,
146150
return nil
147151
}
148152

153+
func (a *Activities) InspectActivitySpan(ctx context.Context) (map[string]string, error) {
154+
span := opentracing.SpanFromContext(ctx)
155+
carrier := make(map[string]string)
156+
err := a.tracer.Inject(span.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier))
157+
return carrier, err
158+
}
159+
149160
func (a *Activities) register(worker worker.Worker) {
150161
// Kept to verify backward compatibility of activity registration.
151162
activity.RegisterWithOptions(a, activity.RegisterOptions{Name: "Activities_", DisableAlreadyRegisteredCheck: true})
@@ -155,4 +166,5 @@ func (a *Activities) register(worker worker.Worker) {
155166
worker.RegisterActivityWithOptions(a.activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true})
156167
worker.RegisterActivityWithOptions(a.InspectActivityInfo, activity.RegisterOptions{Name: "inspectActivityInfo"})
157168
worker.RegisterActivityWithOptions(a.HeartbeatAndSleep, activity.RegisterOptions{Name: "HeartbeatAndSleep", EnableAutoHeartbeat: true})
169+
worker.RegisterActivityWithOptions(a.InspectActivitySpan, activity.RegisterOptions{Name: "inspectActivitySpan"})
158170
}

test/integration_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import (
3838
"go.uber.org/goleak"
3939
"go.uber.org/zap/zaptest"
4040

41+
"github.com/opentracing/opentracing-go/mocktracer"
42+
4143
"go.uber.org/cadence"
4244
"go.uber.org/cadence/.gen/go/shared"
4345
"go.uber.org/cadence/client"
@@ -98,7 +100,7 @@ func (ts *IntegrationTestSuite) SetupSuite() {
98100
ts.Assertions = require.New(ts.T())
99101
ts.config = newConfig()
100102
ts.activities = newActivities()
101-
ts.workflows = &Workflows{}
103+
ts.workflows = &Workflows{tracer: mocktracer.New()}
102104
ts.Nil(waitForTCP(time.Minute, ts.config.ServiceAddr))
103105
var err error
104106
if ts.config.EnableGrpcAdapter {
@@ -109,6 +111,7 @@ func (ts *IntegrationTestSuite) SetupSuite() {
109111
ts.NoError(err)
110112
ts.libClient = client.NewClient(ts.rpcClient.Interface, domainName,
111113
&client.Options{
114+
Tracer: ts.workflows.tracer,
112115
ContextPropagators: []workflow.ContextPropagator{NewStringMapPropagator([]string{testContextKey})},
113116
})
114117
ts.domainClient = client.NewDomainClient(ts.rpcClient.Interface, &client.Options{})
@@ -154,6 +157,7 @@ func (ts *IntegrationTestSuite) SetupTest() {
154157

155158
func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) {
156159
options := worker.Options{
160+
Tracer: ts.workflows.tracer,
157161
DisableStickyExecution: ts.config.IsStickyOff,
158162
Logger: zaptest.NewLogger(ts.T()),
159163
WorkflowInterceptorChainFactories: []interceptors.WorkflowInterceptorFactory{ts.tracer},
@@ -545,6 +549,13 @@ func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowQuery() {
545549
ts.NoError(value.Get(&trace))
546550
}
547551

552+
func (ts *IntegrationTestSuite) TestOverrideSpanContext() {
553+
var result map[string]string
554+
err := ts.executeWorkflow("test-override-span-context", ts.workflows.OverrideSpanContext, &result)
555+
ts.NoError(err)
556+
ts.Equal("some-value", result["mockpfx-baggage-some-key"])
557+
}
558+
548559
func (ts *IntegrationTestSuite) registerDomain() {
549560
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
550561
defer cancel()
@@ -587,10 +598,13 @@ func (ts *IntegrationTestSuite) executeWorkflowWithOption(
587598
options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error {
588599
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
589600
defer cancel()
590-
run, err := ts.libClient.ExecuteWorkflow(ctx, options, wfFunc, args...)
601+
span := ts.workflows.tracer.StartSpan("test-workflow")
602+
defer span.Finish()
603+
execution, err := ts.libClient.StartWorkflow(ctx, options, wfFunc, args...)
591604
if err != nil {
592605
return err
593606
}
607+
run := ts.libClient.GetWorkflow(ctx, execution.ID, execution.RunID)
594608
err = run.Get(ctx, retValPtr)
595609
logger := zaptest.NewLogger(ts.T())
596610
if ts.config.Debug {

test/workflow_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"math/rand"
2828
"time"
2929

30+
"github.com/opentracing/opentracing-go"
31+
3032
"go.uber.org/cadence"
3133
"go.uber.org/cadence/.gen/go/shared"
3234
"go.uber.org/cadence/client"
@@ -41,6 +43,7 @@ const (
4143

4244
type Workflows struct {
4345
nonDeterminismSimulatorWorkflowCallCount int
46+
tracer opentracing.Tracer
4447
}
4548

4649
func (w *Workflows) Basic(ctx workflow.Context) ([]string, error) {
@@ -617,6 +620,45 @@ func (w *Workflows) NonDeterminismSimulatorWorkflow(ctx workflow.Context) ([]str
617620
return res, nil
618621
}
619622

623+
func (w *Workflows) OverrideSpanContext(ctx workflow.Context) (map[string]string, error) {
624+
type spanActivationResult struct {
625+
Carrier map[string]string
626+
Err error
627+
}
628+
// start a short lived new workflow span within SideEffect to avoid duplicate span creation during replay
629+
resultValue := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
630+
wSpan := w.tracer.StartSpan("workflow-operation-with-new-span", opentracing.ChildOf(workflow.GetSpanContext(ctx)))
631+
defer wSpan.Finish()
632+
wSpan.SetBaggageItem("some-key", "some-value")
633+
carrier := make(map[string]string)
634+
err := w.tracer.Inject(wSpan.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier))
635+
return spanActivationResult{Carrier: carrier, Err: err}
636+
})
637+
var activationResult spanActivationResult
638+
if err := resultValue.Get(&activationResult); err != nil {
639+
return nil, fmt.Errorf("failed to decode span activation result: %v", err)
640+
}
641+
if activationResult.Err != nil {
642+
return nil, fmt.Errorf("failed to activate new span: %v", activationResult.Err)
643+
}
644+
spanContext, err := w.tracer.Extract(opentracing.TextMap, opentracing.TextMapCarrier(activationResult.Carrier))
645+
if err != nil {
646+
return nil, fmt.Errorf("failed to extract span context: %v", err)
647+
}
648+
ctx = workflow.WithSpanContext(ctx, spanContext)
649+
650+
opts := workflow.ActivityOptions{
651+
ScheduleToStartTimeout: time.Second,
652+
ScheduleToCloseTimeout: 10 * time.Second,
653+
StartToCloseTimeout: 10 * time.Second,
654+
HeartbeatTimeout: 2 * time.Second,
655+
}
656+
aCtx := workflow.WithActivityOptions(ctx, opts)
657+
var res map[string]string
658+
err = workflow.ExecuteActivity(aCtx, "inspectActivitySpan", "hello").Get(aCtx, &res)
659+
return res, err
660+
}
661+
620662
func (w *Workflows) register(worker worker.Worker) {
621663
// Kept to verify backward compatibility of workflow registration.
622664
workflow.RegisterWithOptions(w.Basic, workflow.RegisterOptions{DisableAlreadyRegisteredCheck: true})
@@ -646,6 +688,7 @@ func (w *Workflows) register(worker worker.Worker) {
646688
worker.RegisterWorkflow(w.ConsistentQueryWorkflow)
647689
worker.RegisterWorkflow(w.WorkflowWithLocalActivityCtxPropagation)
648690
worker.RegisterWorkflow(w.NonDeterminismSimulatorWorkflow)
691+
worker.RegisterWorkflow(w.OverrideSpanContext)
649692

650693
}
651694

workflow/context.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
package workflow
2222

2323
import (
24+
"github.com/opentracing/opentracing-go"
25+
2426
"go.uber.org/cadence/internal"
2527
)
2628

@@ -76,3 +78,78 @@ func WithValue(parent Context, key interface{}, val interface{}) Context {
7678
func NewDisconnectedContext(parent Context) (ctx Context, cancel CancelFunc) {
7779
return internal.NewDisconnectedContext(parent)
7880
}
81+
82+
// GetSpanContext returns the [opentracing.SpanContext] from [Context].
83+
// Returns nil if tracer is not set in [go.uber.org/cadence/worker.Options].
84+
//
85+
// Note: If tracer is set, we already activate a span for each workflow.
86+
// This SpanContext will be passed to the activities and child workflows to start new spans.
87+
//
88+
// Example Usage:
89+
//
90+
// span := GetSpanContext(ctx)
91+
// if span != nil {
92+
// span.SetTag("foo", "bar")
93+
// }
94+
func GetSpanContext(ctx Context) opentracing.SpanContext {
95+
return internal.GetSpanContext(ctx)
96+
}
97+
98+
// WithSpanContext returns [Context] with override [opentracing.SpanContext].
99+
// This is useful to modify baggage items of current workflow and pass it to activities and child workflows.
100+
//
101+
// Example Usage:
102+
//
103+
// func goodWorkflow(ctx Context) (string, error) {
104+
// // start a short lived new workflow span within SideEffect to avoid duplicate span creation during replay
105+
// type spanActivationResult struct {
106+
// Carrier map[string]string // exported field so it's json encoded
107+
// Err error
108+
// }
109+
// resultValue := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
110+
// wSpan := w.tracer.StartSpan("workflow-operation-with-new-span", opentracing.ChildOf(workflow.GetSpanContext(ctx)))
111+
// defer wSpan.Finish()
112+
// wSpan.SetBaggageItem("some-key", "some-value")
113+
// carrier := make(map[string]string)
114+
// err := w.tracer.Inject(wSpan.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier))
115+
// return spanActivationResult{Carrier: carrier, Err: err}
116+
// })
117+
// var activationResult spanActivationResult
118+
// if err := resultValue.Get(&activationResult); err != nil {
119+
// return nil, fmt.Errorf("failed to decode span activation result: %v", err)
120+
// }
121+
// if activationResult.Err != nil {
122+
// return nil, fmt.Errorf("failed to activate new span: %v", activationResult.Err)
123+
// }
124+
// spanContext, err := w.tracer.Extract(opentracing.TextMap, opentracing.TextMapCarrier(activationResult.Carrier))
125+
// if err != nil {
126+
// return nil, fmt.Errorf("failed to extract span context: %v", err)
127+
// }
128+
// ctx = workflow.WithSpanContext(ctx, spanContext)
129+
// var activityFooResult string
130+
// aCtx := workflow.WithActivityOptions(ctx, opts)
131+
// err = workflow.ExecuteActivity(aCtx, activityFoo).Get(aCtx, &activityFooResult)
132+
// return activityFooResult, err
133+
// }
134+
//
135+
// Bad Example:
136+
//
137+
// func badWorkflow(ctx Context) (string, error) {
138+
// // start a new workflow span for EVERY REPLAY
139+
// wSpan := opentracing.StartSpan("workflow-operation", opentracing.ChildOf(GetSpanContext(ctx)))
140+
// wSpan.SetBaggageItem("some-key", "some-value")
141+
// // pass the new span context to activity
142+
// ctx = WithSpanContext(ctx, wSpan.Context())
143+
// aCtx := workflow.WithActivityOptions(ctx, opts)
144+
// var activityFooResult string
145+
// err := ExecuteActivity(aCtx, activityFoo).Get(aCtx, &activityFooResult)
146+
// wSpan.Finish()
147+
// return activityFooResult, err
148+
// }
149+
//
150+
// func activityFoo(ctx Context) (string, error) {
151+
// return "activity-foo-result", nil
152+
// }
153+
func WithSpanContext(ctx Context, spanContext opentracing.SpanContext) Context {
154+
return internal.WithSpanContext(ctx, spanContext)
155+
}

0 commit comments

Comments
 (0)