Skip to content

Commit 86e934b

Browse files
committed
add integration test
1 parent b5f4223 commit 86e934b

File tree

4 files changed

+96
-14
lines changed

4 files changed

+96
-14
lines changed

test/activity_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import (
2828
"sync"
2929
"time"
3030

31+
"github.com/opentracing/opentracing-go"
32+
"github.com/opentracing/opentracing-go/mocktracer"
33+
3134
"go.uber.org/cadence"
3235
"go.uber.org/cadence/activity"
3336
"go.uber.org/cadence/worker"
@@ -37,6 +40,7 @@ type Activities struct {
3740
mu sync.Mutex
3841
invocations []string
3942
activities2 *Activities2
43+
tracer opentracing.Tracer
4044
}
4145

4246
type Activities2 struct {
@@ -47,7 +51,7 @@ var errFailOnPurpose = cadence.NewCustomError("failing-on-purpose")
4751

4852
func newActivities() *Activities {
4953
activities2 := &Activities2{}
50-
result := &Activities{activities2: activities2}
54+
result := &Activities{activities2: activities2, tracer: mocktracer.New()}
5155
activities2.impl = result
5256
return result
5357
}
@@ -146,6 +150,13 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, domain, taskList,
146150
return nil
147151
}
148152

153+
func (a *Activities) InspectActivitySpan(ctx context.Context) (map[string]string, error) {
154+
span := opentracing.SpanFromContext(ctx)
155+
carrier := make(map[string]string)
156+
err := a.tracer.Inject(span.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier))
157+
return carrier, err
158+
}
159+
149160
func (a *Activities) register(worker worker.Worker) {
150161
// Kept to verify backward compatibility of activity registration.
151162
activity.RegisterWithOptions(a, activity.RegisterOptions{Name: "Activities_", DisableAlreadyRegisteredCheck: true})
@@ -155,4 +166,5 @@ func (a *Activities) register(worker worker.Worker) {
155166
worker.RegisterActivityWithOptions(a.activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true})
156167
worker.RegisterActivityWithOptions(a.InspectActivityInfo, activity.RegisterOptions{Name: "inspectActivityInfo"})
157168
worker.RegisterActivityWithOptions(a.HeartbeatAndSleep, activity.RegisterOptions{Name: "HeartbeatAndSleep", EnableAutoHeartbeat: true})
169+
worker.RegisterActivityWithOptions(a.InspectActivitySpan, activity.RegisterOptions{Name: "inspectActivitySpan"})
158170
}

test/integration_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import (
3838
"go.uber.org/goleak"
3939
"go.uber.org/zap/zaptest"
4040

41+
"github.com/opentracing/opentracing-go/mocktracer"
42+
4143
"go.uber.org/cadence"
4244
"go.uber.org/cadence/.gen/go/shared"
4345
"go.uber.org/cadence/client"
@@ -98,7 +100,7 @@ func (ts *IntegrationTestSuite) SetupSuite() {
98100
ts.Assertions = require.New(ts.T())
99101
ts.config = newConfig()
100102
ts.activities = newActivities()
101-
ts.workflows = &Workflows{}
103+
ts.workflows = &Workflows{tracer: mocktracer.New()}
102104
ts.Nil(waitForTCP(time.Minute, ts.config.ServiceAddr))
103105
var err error
104106
if ts.config.EnableGrpcAdapter {
@@ -109,6 +111,7 @@ func (ts *IntegrationTestSuite) SetupSuite() {
109111
ts.NoError(err)
110112
ts.libClient = client.NewClient(ts.rpcClient.Interface, domainName,
111113
&client.Options{
114+
Tracer: ts.workflows.tracer,
112115
ContextPropagators: []workflow.ContextPropagator{NewStringMapPropagator([]string{testContextKey})},
113116
})
114117
ts.domainClient = client.NewDomainClient(ts.rpcClient.Interface, &client.Options{})
@@ -154,6 +157,7 @@ func (ts *IntegrationTestSuite) SetupTest() {
154157

155158
func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) {
156159
options := worker.Options{
160+
Tracer: ts.workflows.tracer,
157161
DisableStickyExecution: ts.config.IsStickyOff,
158162
Logger: zaptest.NewLogger(ts.T()),
159163
WorkflowInterceptorChainFactories: []interceptors.WorkflowInterceptorFactory{ts.tracer},
@@ -545,6 +549,13 @@ func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowQuery() {
545549
ts.NoError(value.Get(&trace))
546550
}
547551

552+
func (ts *IntegrationTestSuite) TestOverrideSpanContext() {
553+
var result map[string]string
554+
err := ts.executeWorkflow("test-override-span-context", ts.workflows.OverrideSpanContext, &result)
555+
ts.NoError(err)
556+
ts.Equal("some-value", result["mockpfx-baggage-some-key"])
557+
}
558+
548559
func (ts *IntegrationTestSuite) registerDomain() {
549560
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
550561
defer cancel()
@@ -587,10 +598,13 @@ func (ts *IntegrationTestSuite) executeWorkflowWithOption(
587598
options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error {
588599
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
589600
defer cancel()
590-
run, err := ts.libClient.ExecuteWorkflow(ctx, options, wfFunc, args...)
601+
span := ts.workflows.tracer.StartSpan("test-workflow")
602+
defer span.Finish()
603+
execution, err := ts.libClient.StartWorkflow(ctx, options, wfFunc, args...)
591604
if err != nil {
592605
return err
593606
}
607+
run := ts.libClient.GetWorkflow(ctx, execution.ID, execution.RunID)
594608
err = run.Get(ctx, retValPtr)
595609
logger := zaptest.NewLogger(ts.T())
596610
if ts.config.Debug {

test/workflow_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"math/rand"
2828
"time"
2929

30+
"github.com/opentracing/opentracing-go"
31+
3032
"go.uber.org/cadence"
3133
"go.uber.org/cadence/.gen/go/shared"
3234
"go.uber.org/cadence/client"
@@ -41,6 +43,7 @@ const (
4143

4244
type Workflows struct {
4345
nonDeterminismSimulatorWorkflowCallCount int
46+
tracer opentracing.Tracer
4447
}
4548

4649
func (w *Workflows) Basic(ctx workflow.Context) ([]string, error) {
@@ -617,6 +620,45 @@ func (w *Workflows) NonDeterminismSimulatorWorkflow(ctx workflow.Context) ([]str
617620
return res, nil
618621
}
619622

623+
func (w *Workflows) OverrideSpanContext(ctx workflow.Context) (map[string]string, error) {
624+
type spanActivationResult struct {
625+
Carrier map[string]string
626+
Err error
627+
}
628+
// start a short lived new workflow span within SideEffect to avoid duplicate span creation during replay
629+
resultValue := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
630+
wSpan := w.tracer.StartSpan("workflow-operation-with-new-span", opentracing.ChildOf(workflow.GetSpanContext(ctx)))
631+
defer wSpan.Finish()
632+
wSpan.SetBaggageItem("some-key", "some-value")
633+
carrier := make(map[string]string)
634+
err := w.tracer.Inject(wSpan.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier))
635+
return spanActivationResult{Carrier: carrier, Err: err}
636+
})
637+
var activationResult spanActivationResult
638+
if err := resultValue.Get(&activationResult); err != nil {
639+
return nil, fmt.Errorf("failed to decode span activation result: %v", err)
640+
}
641+
if activationResult.Err != nil {
642+
return nil, fmt.Errorf("failed to activate new span: %v", activationResult.Err)
643+
}
644+
spanContext, err := w.tracer.Extract(opentracing.TextMap, opentracing.TextMapCarrier(activationResult.Carrier))
645+
if err != nil {
646+
return nil, fmt.Errorf("failed to extract span context: %v", err)
647+
}
648+
ctx = workflow.WithSpanContext(ctx, spanContext)
649+
650+
opts := workflow.ActivityOptions{
651+
ScheduleToStartTimeout: time.Second,
652+
ScheduleToCloseTimeout: 10 * time.Second,
653+
StartToCloseTimeout: 10 * time.Second,
654+
HeartbeatTimeout: 2 * time.Second,
655+
}
656+
aCtx := workflow.WithActivityOptions(ctx, opts)
657+
var res map[string]string
658+
err = workflow.ExecuteActivity(aCtx, "inspectActivitySpan", "hello").Get(aCtx, &res)
659+
return res, err
660+
}
661+
620662
func (w *Workflows) register(worker worker.Worker) {
621663
// Kept to verify backward compatibility of workflow registration.
622664
workflow.RegisterWithOptions(w.Basic, workflow.RegisterOptions{DisableAlreadyRegisteredCheck: true})
@@ -646,6 +688,7 @@ func (w *Workflows) register(worker worker.Worker) {
646688
worker.RegisterWorkflow(w.ConsistentQueryWorkflow)
647689
worker.RegisterWorkflow(w.WorkflowWithLocalActivityCtxPropagation)
648690
worker.RegisterWorkflow(w.NonDeterminismSimulatorWorkflow)
691+
worker.RegisterWorkflow(w.OverrideSpanContext)
649692

650693
}
651694

workflow/context.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,21 +102,33 @@ func GetSpanContext(ctx Context) opentracing.SpanContext {
102102
//
103103
// func goodWorkflow(ctx Context) (string, error) {
104104
// // start a short lived new workflow span within SideEffect to avoid duplicate span creation during replay
105-
// spanContextValue := SideEffect(ctx, func(ctx Context) interface{} {
106-
// wSpan := opentracing.StartSpan("workflow-operation-with-new-span", opentracing.ChildOf(GetSpanContext(ctx)))
105+
// type spanActivationResult struct {
106+
// Carrier map[string]string // exported field so it's json encoded
107+
// Err error
108+
// }
109+
// resultValue := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
110+
// wSpan := w.tracer.StartSpan("workflow-operation-with-new-span", opentracing.ChildOf(workflow.GetSpanContext(ctx)))
107111
// defer wSpan.Finish()
108-
// wSpan.SetTag("some-key", "some-value")
109-
// return wSpan.Context()
112+
// wSpan.SetBaggageItem("some-key", "some-value")
113+
// carrier := make(map[string]string)
114+
// err := w.tracer.Inject(wSpan.Context(), opentracing.TextMap, opentracing.TextMapCarrier(carrier))
115+
// return spanActivationResult{Carrier: carrier, Err: err}
110116
// })
111-
// var spanContext opentracing.SpanContext
112-
// err := spanContextValue.Get(&spanContext)
117+
// var activationResult spanActivationResult
118+
// if err := resultValue.Get(&activationResult); err != nil {
119+
// return nil, fmt.Errorf("failed to decode span activation result: %v", err)
120+
// }
121+
// if activationResult.Err != nil {
122+
// return nil, fmt.Errorf("failed to activate new span: %v", activationResult.Err)
123+
// }
124+
// spanContext, err := w.tracer.Extract(opentracing.TextMap, opentracing.TextMapCarrier(activationResult.Carrier))
113125
// if err != nil {
114-
// return "",fmt.Errorf("failed to get span context: %w", err)
126+
// return nil, fmt.Errorf("failed to extract span context: %v", err)
115127
// }
116-
//
117-
// aCtx := WithSpanContext(ctx, spanContext)
128+
// ctx = workflow.WithSpanContext(ctx, spanContext)
118129
// var activityFooResult string
119-
// err = ExecuteActivity(aCtx, activityFoo).Get(aCtx, &activityFooResult)
130+
// aCtx := workflow.WithActivityOptions(ctx, opts)
131+
// err = workflow.ExecuteActivity(aCtx, activityFoo).Get(aCtx, &activityFooResult)
120132
// return activityFooResult, err
121133
// }
122134
//
@@ -127,7 +139,8 @@ func GetSpanContext(ctx Context) opentracing.SpanContext {
127139
// wSpan := opentracing.StartSpan("workflow-operation", opentracing.ChildOf(GetSpanContext(ctx)))
128140
// wSpan.SetBaggageItem("some-key", "some-value")
129141
// // pass the new span context to activity
130-
// aCtx := WithSpanContext(ctx, wSpan.Context())
142+
// ctx = WithSpanContext(ctx, wSpan.Context())
143+
// aCtx := workflow.WithActivityOptions(ctx, opts)
131144
// var activityFooResult string
132145
// err := ExecuteActivity(aCtx, activityFoo).Get(aCtx, &activityFooResult)
133146
// wSpan.Finish()

0 commit comments

Comments
 (0)