diff --git a/internal/context.go b/internal/context.go index 4248fcf7d..99f7bc744 100644 --- a/internal/context.go +++ b/internal/context.go @@ -486,7 +486,7 @@ func (c *valueCtx) Value(key interface{}) interface{} { return c.Context.Value(key) } -func spanFromContext(ctx Context) opentracing.SpanContext { +func GetSpanContext(ctx Context) opentracing.SpanContext { val := ctx.Value(activeSpanContextKey) if sp, ok := val.(opentracing.SpanContext); ok { return sp @@ -494,6 +494,6 @@ func spanFromContext(ctx Context) opentracing.SpanContext { return nil } -func contextWithSpan(ctx Context, spanContext opentracing.SpanContext) Context { +func WithSpanContext(ctx Context, spanContext opentracing.SpanContext) Context { return WithValue(ctx, activeSpanContextKey, spanContext) } diff --git a/internal/tracer.go b/internal/tracer.go index 3732d4250..3267e6d30 100644 --- a/internal/tracer.go +++ b/internal/tracer.go @@ -103,7 +103,7 @@ func (t *tracingContextPropagator) InjectFromWorkflow( hw HeaderWriter, ) error { // retrieve span from context object - spanContext := spanFromContext(ctx) + spanContext := GetSpanContext(ctx) if spanContext == nil { return nil } @@ -119,5 +119,5 @@ func (t *tracingContextPropagator) ExtractToWorkflow( // did not find a tracing span, just return the current context return ctx, nil } - return contextWithSpan(ctx, spanContext), nil + return WithSpanContext(ctx, spanContext), nil } diff --git a/internal/tracer_test.go b/internal/tracer_test.go index 7c0432b67..bcc921e7d 100644 --- a/internal/tracer_test.go +++ b/internal/tracer_test.go @@ -82,7 +82,7 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) { span := tracer.StartSpan("test-operation") assert.NotNil(t, span.Context()) - ctx := contextWithSpan(Background(), span.Context()) + ctx := WithSpanContext(Background(), span.Context()) header := &shared.Header{ Fields: map[string][]byte{}, } @@ -96,9 +96,9 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) { returnCtx2, err := ctxProp.ExtractToWorkflow(Background(), NewHeaderReader(header)) require.NoError(t, err) - newSpanContext := spanFromContext(returnCtx) + newSpanContext := GetSpanContext(returnCtx) assert.NotNil(t, newSpanContext) - newSpanContext2 := spanFromContext(returnCtx2) + newSpanContext2 := GetSpanContext(returnCtx2) assert.NotNil(t, newSpanContext2) assert.Equal(t, newSpanContext2, newSpanContext) } @@ -130,7 +130,7 @@ func TestConsistentInjectionExtraction(t *testing.T) { var baggageVal = "e30=" span.SetBaggageItem("request-tenancy", baggageVal) assert.NotNil(t, span.Context()) - ctx := contextWithSpan(Background(), span.Context()) + ctx := WithSpanContext(Background(), span.Context()) header := &shared.Header{ Fields: map[string][]byte{}, } @@ -140,7 +140,7 @@ func TestConsistentInjectionExtraction(t *testing.T) { extractedCtx, err := ctxProp.ExtractToWorkflow(Background(), NewHeaderReader(header)) require.NoError(t, err) - extractedSpanContext := spanFromContext(extractedCtx) + extractedSpanContext := GetSpanContext(extractedCtx) extractedSpanContext.ForeachBaggageItem(func(k, v string) bool { if k == "request-tenancy" { assert.Equal(t, v, baggageVal) diff --git a/test/activity_test.go b/test/activity_test.go index d27d00a64..3602f08e6 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -28,6 +28,9 @@ import ( "sync" "time" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" + "go.uber.org/cadence" "go.uber.org/cadence/activity" "go.uber.org/cadence/worker" @@ -37,6 +40,7 @@ type Activities struct { mu sync.Mutex invocations []string activities2 *Activities2 + tracer opentracing.Tracer } type Activities2 struct { @@ -47,7 +51,7 @@ var errFailOnPurpose = cadence.NewCustomError("failing-on-purpose") func newActivities() *Activities { activities2 := &Activities2{} - result := &Activities{activities2: activities2} + result := &Activities{activities2: activities2, tracer: mocktracer.New()} activities2.impl = result return result } @@ -146,6 +150,13 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, domain, taskList, return nil } +func (a *Activities) InspectActivitySpan(ctx context.Context) (map[string]string, error) { + span := opentracing.SpanFromContext(ctx) + carrier := make(map[string]string) + err := a.tracer.Inject(span.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier)) + return carrier, err +} + func (a *Activities) register(worker worker.Worker) { // Kept to verify backward compatibility of activity registration. activity.RegisterWithOptions(a, activity.RegisterOptions{Name: "Activities_", DisableAlreadyRegisteredCheck: true}) @@ -155,4 +166,5 @@ func (a *Activities) register(worker worker.Worker) { worker.RegisterActivityWithOptions(a.activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true}) worker.RegisterActivityWithOptions(a.InspectActivityInfo, activity.RegisterOptions{Name: "inspectActivityInfo"}) worker.RegisterActivityWithOptions(a.HeartbeatAndSleep, activity.RegisterOptions{Name: "HeartbeatAndSleep", EnableAutoHeartbeat: true}) + worker.RegisterActivityWithOptions(a.InspectActivitySpan, activity.RegisterOptions{Name: "inspectActivitySpan"}) } diff --git a/test/integration_test.go b/test/integration_test.go index 1387781f4..d201cd0fa 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -38,6 +38,8 @@ import ( "go.uber.org/goleak" "go.uber.org/zap/zaptest" + "github.com/opentracing/opentracing-go/mocktracer" + "go.uber.org/cadence" "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/client" @@ -98,7 +100,7 @@ func (ts *IntegrationTestSuite) SetupSuite() { ts.Assertions = require.New(ts.T()) ts.config = newConfig() ts.activities = newActivities() - ts.workflows = &Workflows{} + ts.workflows = &Workflows{tracer: mocktracer.New()} ts.Nil(waitForTCP(time.Minute, ts.config.ServiceAddr)) var err error if ts.config.EnableGrpcAdapter { @@ -109,6 +111,7 @@ func (ts *IntegrationTestSuite) SetupSuite() { ts.NoError(err) ts.libClient = client.NewClient(ts.rpcClient.Interface, domainName, &client.Options{ + Tracer: ts.workflows.tracer, ContextPropagators: []workflow.ContextPropagator{NewStringMapPropagator([]string{testContextKey})}, }) ts.domainClient = client.NewDomainClient(ts.rpcClient.Interface, &client.Options{}) @@ -154,6 +157,7 @@ func (ts *IntegrationTestSuite) SetupTest() { func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) { options := worker.Options{ + Tracer: ts.workflows.tracer, DisableStickyExecution: ts.config.IsStickyOff, Logger: zaptest.NewLogger(ts.T()), WorkflowInterceptorChainFactories: []interceptors.WorkflowInterceptorFactory{ts.tracer}, @@ -545,6 +549,13 @@ func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowQuery() { ts.NoError(value.Get(&trace)) } +func (ts *IntegrationTestSuite) TestOverrideSpanContext() { + var result map[string]string + err := ts.executeWorkflow("test-override-span-context", ts.workflows.OverrideSpanContext, &result) + ts.NoError(err) + ts.Equal("some-value", result["mockpfx-baggage-some-key"]) +} + func (ts *IntegrationTestSuite) registerDomain() { ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) defer cancel() @@ -587,10 +598,13 @@ func (ts *IntegrationTestSuite) executeWorkflowWithOption( options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) defer cancel() - run, err := ts.libClient.ExecuteWorkflow(ctx, options, wfFunc, args...) + span := ts.workflows.tracer.StartSpan("test-workflow") + defer span.Finish() + execution, err := ts.libClient.StartWorkflow(ctx, options, wfFunc, args...) if err != nil { return err } + run := ts.libClient.GetWorkflow(ctx, execution.ID, execution.RunID) err = run.Get(ctx, retValPtr) logger := zaptest.NewLogger(ts.T()) if ts.config.Debug { diff --git a/test/workflow_test.go b/test/workflow_test.go index 0f60fc025..e53403477 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -27,6 +27,8 @@ import ( "math/rand" "time" + "github.com/opentracing/opentracing-go" + "go.uber.org/cadence" "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/client" @@ -41,6 +43,7 @@ const ( type Workflows struct { nonDeterminismSimulatorWorkflowCallCount int + tracer opentracing.Tracer } func (w *Workflows) Basic(ctx workflow.Context) ([]string, error) { @@ -617,6 +620,45 @@ func (w *Workflows) NonDeterminismSimulatorWorkflow(ctx workflow.Context) ([]str return res, nil } +func (w *Workflows) OverrideSpanContext(ctx workflow.Context) (map[string]string, error) { + type spanActivationResult struct { + Carrier map[string]string + Err error + } + // start a short lived new workflow span within SideEffect to avoid duplicate span creation during replay + resultValue := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + wSpan := w.tracer.StartSpan("workflow-operation-with-new-span", opentracing.ChildOf(workflow.GetSpanContext(ctx))) + defer wSpan.Finish() + wSpan.SetBaggageItem("some-key", "some-value") + carrier := make(map[string]string) + err := w.tracer.Inject(wSpan.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier)) + return spanActivationResult{Carrier: carrier, Err: err} + }) + var activationResult spanActivationResult + if err := resultValue.Get(&activationResult); err != nil { + return nil, fmt.Errorf("failed to decode span activation result: %v", err) + } + if activationResult.Err != nil { + return nil, fmt.Errorf("failed to activate new span: %v", activationResult.Err) + } + spanContext, err := w.tracer.Extract(opentracing.TextMap, opentracing.TextMapCarrier(activationResult.Carrier)) + if err != nil { + return nil, fmt.Errorf("failed to extract span context: %v", err) + } + ctx = workflow.WithSpanContext(ctx, spanContext) + + opts := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Second, + ScheduleToCloseTimeout: 10 * time.Second, + StartToCloseTimeout: 10 * time.Second, + HeartbeatTimeout: 2 * time.Second, + } + aCtx := workflow.WithActivityOptions(ctx, opts) + var res map[string]string + err = workflow.ExecuteActivity(aCtx, "inspectActivitySpan", "hello").Get(aCtx, &res) + return res, err +} + func (w *Workflows) register(worker worker.Worker) { // Kept to verify backward compatibility of workflow registration. workflow.RegisterWithOptions(w.Basic, workflow.RegisterOptions{DisableAlreadyRegisteredCheck: true}) @@ -646,6 +688,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ConsistentQueryWorkflow) worker.RegisterWorkflow(w.WorkflowWithLocalActivityCtxPropagation) worker.RegisterWorkflow(w.NonDeterminismSimulatorWorkflow) + worker.RegisterWorkflow(w.OverrideSpanContext) } diff --git a/workflow/context.go b/workflow/context.go index 8a8daa444..691f8db05 100644 --- a/workflow/context.go +++ b/workflow/context.go @@ -21,6 +21,8 @@ package workflow import ( + "github.com/opentracing/opentracing-go" + "go.uber.org/cadence/internal" ) @@ -76,3 +78,78 @@ func WithValue(parent Context, key interface{}, val interface{}) Context { func NewDisconnectedContext(parent Context) (ctx Context, cancel CancelFunc) { return internal.NewDisconnectedContext(parent) } + +// GetSpanContext returns the [opentracing.SpanContext] from [Context]. +// Returns nil if tracer is not set in [go.uber.org/cadence/worker.Options]. +// +// Note: If tracer is set, we already activate a span for each workflow. +// This SpanContext will be passed to the activities and child workflows to start new spans. +// +// Example Usage: +// +// span := GetSpanContext(ctx) +// if span != nil { +// span.SetTag("foo", "bar") +// } +func GetSpanContext(ctx Context) opentracing.SpanContext { + return internal.GetSpanContext(ctx) +} + +// WithSpanContext returns [Context] with override [opentracing.SpanContext]. +// This is useful to modify baggage items of current workflow and pass it to activities and child workflows. +// +// Example Usage: +// +// func goodWorkflow(ctx Context) (string, error) { +// // start a short lived new workflow span within SideEffect to avoid duplicate span creation during replay +// type spanActivationResult struct { +// Carrier map[string]string // exported field so it's json encoded +// Err error +// } +// resultValue := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { +// wSpan := w.tracer.StartSpan("workflow-operation-with-new-span", opentracing.ChildOf(workflow.GetSpanContext(ctx))) +// defer wSpan.Finish() +// wSpan.SetBaggageItem("some-key", "some-value") +// carrier := make(map[string]string) +// err := w.tracer.Inject(wSpan.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier)) +// return spanActivationResult{Carrier: carrier, Err: err} +// }) +// var activationResult spanActivationResult +// if err := resultValue.Get(&activationResult); err != nil { +// return nil, fmt.Errorf("failed to decode span activation result: %v", err) +// } +// if activationResult.Err != nil { +// return nil, fmt.Errorf("failed to activate new span: %v", activationResult.Err) +// } +// spanContext, err := w.tracer.Extract(opentracing.TextMap, opentracing.TextMapCarrier(activationResult.Carrier)) +// if err != nil { +// return nil, fmt.Errorf("failed to extract span context: %v", err) +// } +// ctx = workflow.WithSpanContext(ctx, spanContext) +// var activityFooResult string +// aCtx := workflow.WithActivityOptions(ctx, opts) +// err = workflow.ExecuteActivity(aCtx, activityFoo).Get(aCtx, &activityFooResult) +// return activityFooResult, err +// } +// +// Bad Example: +// +// func badWorkflow(ctx Context) (string, error) { +// // start a new workflow span for EVERY REPLAY +// wSpan := opentracing.StartSpan("workflow-operation", opentracing.ChildOf(GetSpanContext(ctx))) +// wSpan.SetBaggageItem("some-key", "some-value") +// // pass the new span context to activity +// ctx = WithSpanContext(ctx, wSpan.Context()) +// aCtx := workflow.WithActivityOptions(ctx, opts) +// var activityFooResult string +// err := ExecuteActivity(aCtx, activityFoo).Get(aCtx, &activityFooResult) +// wSpan.Finish() +// return activityFooResult, err +// } +// +// func activityFoo(ctx Context) (string, error) { +// return "activity-foo-result", nil +// } +func WithSpanContext(ctx Context, spanContext opentracing.SpanContext) Context { + return internal.WithSpanContext(ctx, spanContext) +}