Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,14 @@ func (c *valueCtx) Value(key interface{}) interface{} {
return c.Context.Value(key)
}

func spanFromContext(ctx Context) opentracing.SpanContext {
func GetSpanContext(ctx Context) opentracing.SpanContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also expose contextWithSpan or is there another way for users to add their existing context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm a bit concerned about the determinism if users start a new span inside the workflow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't have a clear story around replay traces. It wouldn't cause non-determinism though. Just weird/duplicate traces potentially.
Are you suggesting to expose the span via GetSpanContext but don't let user set/override the span in the context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. It's still deterministics. Replay is going to cause issues. We will see multiple child spans if we allow users to set them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Sideeffect would work. But is span serializable or recoverable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. We can try and see. It sounds like we need to start a new span even for replays but let's discuss that separately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpanContext is serializable and should be passed. With that said, we'll require users to start a short-lived new span just to modify spancontext. Please refer the examples in the comments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an integration test which works.

val := ctx.Value(activeSpanContextKey)
if sp, ok := val.(opentracing.SpanContext); ok {
return sp
}
return nil
}

func contextWithSpan(ctx Context, spanContext opentracing.SpanContext) Context {
func WithSpanContext(ctx Context, spanContext opentracing.SpanContext) Context {
return WithValue(ctx, activeSpanContextKey, spanContext)
}
4 changes: 2 additions & 2 deletions internal/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (t *tracingContextPropagator) InjectFromWorkflow(
hw HeaderWriter,
) error {
// retrieve span from context object
spanContext := spanFromContext(ctx)
spanContext := GetSpanContext(ctx)
if spanContext == nil {
return nil
}
Expand All @@ -119,5 +119,5 @@ func (t *tracingContextPropagator) ExtractToWorkflow(
// did not find a tracing span, just return the current context
return ctx, nil
}
return contextWithSpan(ctx, spanContext), nil
return WithSpanContext(ctx, spanContext), nil
}
10 changes: 5 additions & 5 deletions internal/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) {

span := tracer.StartSpan("test-operation")
assert.NotNil(t, span.Context())
ctx := contextWithSpan(Background(), span.Context())
ctx := WithSpanContext(Background(), span.Context())
header := &shared.Header{
Fields: map[string][]byte{},
}
Expand All @@ -96,9 +96,9 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) {
returnCtx2, err := ctxProp.ExtractToWorkflow(Background(), NewHeaderReader(header))
require.NoError(t, err)

newSpanContext := spanFromContext(returnCtx)
newSpanContext := GetSpanContext(returnCtx)
assert.NotNil(t, newSpanContext)
newSpanContext2 := spanFromContext(returnCtx2)
newSpanContext2 := GetSpanContext(returnCtx2)
assert.NotNil(t, newSpanContext2)
assert.Equal(t, newSpanContext2, newSpanContext)
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestConsistentInjectionExtraction(t *testing.T) {
var baggageVal = "e30="
span.SetBaggageItem("request-tenancy", baggageVal)
assert.NotNil(t, span.Context())
ctx := contextWithSpan(Background(), span.Context())
ctx := WithSpanContext(Background(), span.Context())
header := &shared.Header{
Fields: map[string][]byte{},
}
Expand All @@ -140,7 +140,7 @@ func TestConsistentInjectionExtraction(t *testing.T) {
extractedCtx, err := ctxProp.ExtractToWorkflow(Background(), NewHeaderReader(header))
require.NoError(t, err)

extractedSpanContext := spanFromContext(extractedCtx)
extractedSpanContext := GetSpanContext(extractedCtx)
extractedSpanContext.ForeachBaggageItem(func(k, v string) bool {
if k == "request-tenancy" {
assert.Equal(t, v, baggageVal)
Expand Down
14 changes: 13 additions & 1 deletion test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"

"go.uber.org/cadence"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/worker"
Expand All @@ -37,6 +40,7 @@ type Activities struct {
mu sync.Mutex
invocations []string
activities2 *Activities2
tracer opentracing.Tracer
}

type Activities2 struct {
Expand All @@ -47,7 +51,7 @@ var errFailOnPurpose = cadence.NewCustomError("failing-on-purpose")

func newActivities() *Activities {
activities2 := &Activities2{}
result := &Activities{activities2: activities2}
result := &Activities{activities2: activities2, tracer: mocktracer.New()}
activities2.impl = result
return result
}
Expand Down Expand Up @@ -146,6 +150,13 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, domain, taskList,
return nil
}

func (a *Activities) InspectActivitySpan(ctx context.Context) (map[string]string, error) {
span := opentracing.SpanFromContext(ctx)
carrier := make(map[string]string)
err := a.tracer.Inject(span.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier))
return carrier, err
}

func (a *Activities) register(worker worker.Worker) {
// Kept to verify backward compatibility of activity registration.
activity.RegisterWithOptions(a, activity.RegisterOptions{Name: "Activities_", DisableAlreadyRegisteredCheck: true})
Expand All @@ -155,4 +166,5 @@ func (a *Activities) register(worker worker.Worker) {
worker.RegisterActivityWithOptions(a.activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true})
worker.RegisterActivityWithOptions(a.InspectActivityInfo, activity.RegisterOptions{Name: "inspectActivityInfo"})
worker.RegisterActivityWithOptions(a.HeartbeatAndSleep, activity.RegisterOptions{Name: "HeartbeatAndSleep", EnableAutoHeartbeat: true})
worker.RegisterActivityWithOptions(a.InspectActivitySpan, activity.RegisterOptions{Name: "inspectActivitySpan"})
}
18 changes: 16 additions & 2 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.uber.org/goleak"
"go.uber.org/zap/zaptest"

"github.com/opentracing/opentracing-go/mocktracer"

"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/client"
Expand Down Expand Up @@ -98,7 +100,7 @@ func (ts *IntegrationTestSuite) SetupSuite() {
ts.Assertions = require.New(ts.T())
ts.config = newConfig()
ts.activities = newActivities()
ts.workflows = &Workflows{}
ts.workflows = &Workflows{tracer: mocktracer.New()}
ts.Nil(waitForTCP(time.Minute, ts.config.ServiceAddr))
var err error
if ts.config.EnableGrpcAdapter {
Expand All @@ -109,6 +111,7 @@ func (ts *IntegrationTestSuite) SetupSuite() {
ts.NoError(err)
ts.libClient = client.NewClient(ts.rpcClient.Interface, domainName,
&client.Options{
Tracer: ts.workflows.tracer,
ContextPropagators: []workflow.ContextPropagator{NewStringMapPropagator([]string{testContextKey})},
})
ts.domainClient = client.NewDomainClient(ts.rpcClient.Interface, &client.Options{})
Expand Down Expand Up @@ -154,6 +157,7 @@ func (ts *IntegrationTestSuite) SetupTest() {

func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) {
options := worker.Options{
Tracer: ts.workflows.tracer,
DisableStickyExecution: ts.config.IsStickyOff,
Logger: zaptest.NewLogger(ts.T()),
WorkflowInterceptorChainFactories: []interceptors.WorkflowInterceptorFactory{ts.tracer},
Expand Down Expand Up @@ -545,6 +549,13 @@ func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowQuery() {
ts.NoError(value.Get(&trace))
}

func (ts *IntegrationTestSuite) TestOverrideSpanContext() {
var result map[string]string
err := ts.executeWorkflow("test-override-span-context", ts.workflows.OverrideSpanContext, &result)
ts.NoError(err)
ts.Equal("some-value", result["mockpfx-baggage-some-key"])
}

func (ts *IntegrationTestSuite) registerDomain() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
Expand Down Expand Up @@ -587,10 +598,13 @@ func (ts *IntegrationTestSuite) executeWorkflowWithOption(
options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
run, err := ts.libClient.ExecuteWorkflow(ctx, options, wfFunc, args...)
span := ts.workflows.tracer.StartSpan("test-workflow")
defer span.Finish()
execution, err := ts.libClient.StartWorkflow(ctx, options, wfFunc, args...)
if err != nil {
return err
}
run := ts.libClient.GetWorkflow(ctx, execution.ID, execution.RunID)
err = run.Get(ctx, retValPtr)
logger := zaptest.NewLogger(ts.T())
if ts.config.Debug {
Expand Down
43 changes: 43 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"math/rand"
"time"

"github.com/opentracing/opentracing-go"

"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/client"
Expand All @@ -41,6 +43,7 @@ const (

type Workflows struct {
nonDeterminismSimulatorWorkflowCallCount int
tracer opentracing.Tracer
}

func (w *Workflows) Basic(ctx workflow.Context) ([]string, error) {
Expand Down Expand Up @@ -617,6 +620,45 @@ func (w *Workflows) NonDeterminismSimulatorWorkflow(ctx workflow.Context) ([]str
return res, nil
}

func (w *Workflows) OverrideSpanContext(ctx workflow.Context) (map[string]string, error) {
type spanActivationResult struct {
Carrier map[string]string
Err error
}
// start a short lived new workflow span within SideEffect to avoid duplicate span creation during replay
resultValue := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
wSpan := w.tracer.StartSpan("workflow-operation-with-new-span", opentracing.ChildOf(workflow.GetSpanContext(ctx)))
defer wSpan.Finish()
wSpan.SetBaggageItem("some-key", "some-value")
carrier := make(map[string]string)
err := w.tracer.Inject(wSpan.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier))
return spanActivationResult{Carrier: carrier, Err: err}
})
var activationResult spanActivationResult
if err := resultValue.Get(&activationResult); err != nil {
return nil, fmt.Errorf("failed to decode span activation result: %v", err)
}
if activationResult.Err != nil {
return nil, fmt.Errorf("failed to activate new span: %v", activationResult.Err)
}
spanContext, err := w.tracer.Extract(opentracing.TextMap, opentracing.TextMapCarrier(activationResult.Carrier))
if err != nil {
return nil, fmt.Errorf("failed to extract span context: %v", err)
}
ctx = workflow.WithSpanContext(ctx, spanContext)

opts := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Second,
ScheduleToCloseTimeout: 10 * time.Second,
StartToCloseTimeout: 10 * time.Second,
HeartbeatTimeout: 2 * time.Second,
}
aCtx := workflow.WithActivityOptions(ctx, opts)
var res map[string]string
err = workflow.ExecuteActivity(aCtx, "inspectActivitySpan", "hello").Get(aCtx, &res)
return res, err
}

func (w *Workflows) register(worker worker.Worker) {
// Kept to verify backward compatibility of workflow registration.
workflow.RegisterWithOptions(w.Basic, workflow.RegisterOptions{DisableAlreadyRegisteredCheck: true})
Expand Down Expand Up @@ -646,6 +688,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ConsistentQueryWorkflow)
worker.RegisterWorkflow(w.WorkflowWithLocalActivityCtxPropagation)
worker.RegisterWorkflow(w.NonDeterminismSimulatorWorkflow)
worker.RegisterWorkflow(w.OverrideSpanContext)

}

Expand Down
77 changes: 77 additions & 0 deletions workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package workflow

import (
"github.com/opentracing/opentracing-go"

"go.uber.org/cadence/internal"
)

Expand Down Expand Up @@ -76,3 +78,78 @@ func WithValue(parent Context, key interface{}, val interface{}) Context {
func NewDisconnectedContext(parent Context) (ctx Context, cancel CancelFunc) {
return internal.NewDisconnectedContext(parent)
}

// GetSpanContext returns the [opentracing.SpanContext] from [Context].
// Returns nil if tracer is not set in [go.uber.org/cadence/worker.Options].
//
// Note: If tracer is set, we already activate a span for each workflow.
// This SpanContext will be passed to the activities and child workflows to start new spans.
//
// Example Usage:
//
// span := GetSpanContext(ctx)
// if span != nil {
// span.SetTag("foo", "bar")
// }
func GetSpanContext(ctx Context) opentracing.SpanContext {
return internal.GetSpanContext(ctx)
}

// WithSpanContext returns [Context] with override [opentracing.SpanContext].
// This is useful to modify baggage items of current workflow and pass it to activities and child workflows.
//
// Example Usage:
//
// func goodWorkflow(ctx Context) (string, error) {
// // start a short lived new workflow span within SideEffect to avoid duplicate span creation during replay
// type spanActivationResult struct {
// Carrier map[string]string // exported field so it's json encoded
// Err error
// }
// resultValue := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
// wSpan := w.tracer.StartSpan("workflow-operation-with-new-span", opentracing.ChildOf(workflow.GetSpanContext(ctx)))
// defer wSpan.Finish()
// wSpan.SetBaggageItem("some-key", "some-value")
// carrier := make(map[string]string)
// err := w.tracer.Inject(wSpan.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier))
// return spanActivationResult{Carrier: carrier, Err: err}
// })
// var activationResult spanActivationResult
// if err := resultValue.Get(&activationResult); err != nil {
// return nil, fmt.Errorf("failed to decode span activation result: %v", err)
// }
// if activationResult.Err != nil {
// return nil, fmt.Errorf("failed to activate new span: %v", activationResult.Err)
// }
// spanContext, err := w.tracer.Extract(opentracing.TextMap, opentracing.TextMapCarrier(activationResult.Carrier))
// if err != nil {
// return nil, fmt.Errorf("failed to extract span context: %v", err)
// }
// ctx = workflow.WithSpanContext(ctx, spanContext)
// var activityFooResult string
// aCtx := workflow.WithActivityOptions(ctx, opts)
// err = workflow.ExecuteActivity(aCtx, activityFoo).Get(aCtx, &activityFooResult)
// return activityFooResult, err
// }
//
// Bad Example:
//
// func badWorkflow(ctx Context) (string, error) {
// // start a new workflow span for EVERY REPLAY
// wSpan := opentracing.StartSpan("workflow-operation", opentracing.ChildOf(GetSpanContext(ctx)))
// wSpan.SetBaggageItem("some-key", "some-value")
// // pass the new span context to activity
// ctx = WithSpanContext(ctx, wSpan.Context())
// aCtx := workflow.WithActivityOptions(ctx, opts)
// var activityFooResult string
// err := ExecuteActivity(aCtx, activityFoo).Get(aCtx, &activityFooResult)
// wSpan.Finish()
// return activityFooResult, err
// }
//
// func activityFoo(ctx Context) (string, error) {
// return "activity-foo-result", nil
// }
func WithSpanContext(ctx Context, spanContext opentracing.SpanContext) Context {
return internal.WithSpanContext(ctx, spanContext)
}