Skip to content

Commit 05e28b7

Browse files
committed
Improve trace propagation
1 parent 5219384 commit 05e28b7

File tree

10 files changed

+107
-59
lines changed

10 files changed

+107
-59
lines changed

backend/redis/signal.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@ func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, e
1818
return err
1919
}
2020

21-
ctx = tracing.ExtractSpan(ctx, instanceState.Metadata)
21+
ctx, err = (&tracing.TracingContextPropagator{}).Extract(ctx, instanceState.Metadata)
22+
if err != nil {
23+
rb.Logger().Error("extracting tracing context", log.ErrorKey, err)
24+
}
25+
2226
a := event.Attributes.(*history.SignalReceivedAttributes)
23-
_, span := rb.Tracer().Start(ctx, fmt.Sprintf("SignalWorkflow: %s", a.Name), trace.WithAttributes(
27+
ctx, span := rb.Tracer().Start(ctx, fmt.Sprintf("SignalWorkflow: %s", a.Name), trace.WithAttributes(
2428
attribute.String(log.InstanceIDKey, instanceID),
2529
attribute.String(log.SignalNameKey, event.Attributes.(*history.SignalReceivedAttributes).Name),
2630
))

backend/redis/workflow.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,11 @@ func (rb *redisBackend) CompleteWorkflowTask(
275275

276276
if state == core.WorkflowInstanceStateFinished {
277277
// Trace workflow completion
278-
ctx = tracing.ExtractSpan(ctx, instanceState.Metadata)
278+
ctx, err = (&tracing.TracingContextPropagator{}).Extract(ctx, instanceState.Metadata)
279+
if err != nil {
280+
rb.Logger().Error("extracting tracing context", log.ErrorKey, err)
281+
}
282+
279283
_, span := rb.Tracer().Start(ctx, "WorkflowComplete",
280284
trace.WithAttributes(
281285
attribute.String(log.NamespaceKey+log.InstanceIDKey, instanceState.Instance.InstanceID),

go.mod

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ require (
1212
github.com/redis/go-redis/v9 v9.0.2
1313
github.com/stretchr/testify v1.8.3
1414
go.opentelemetry.io/otel v1.16.0
15+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0
16+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0
1517
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0
1618
go.opentelemetry.io/otel/trace v1.16.0
1719
golang.org/x/tools v0.6.0
@@ -24,6 +26,7 @@ require (
2426
github.com/breml/bidichk v0.2.3 // indirect
2527
github.com/curioswitch/go-reassign v0.2.0 // indirect
2628
github.com/firefart/nonamedreturns v1.0.4 // indirect
29+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
2730
github.com/kkHAIKE/contextcheck v1.1.2 // indirect
2831
github.com/lufeee/execinquery v1.2.1 // indirect
2932
github.com/maratori/testableexamples v1.0.0 // indirect
@@ -34,12 +37,16 @@ require (
3437
github.com/sivchari/nosnakecase v1.7.0 // indirect
3538
github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect
3639
github.com/timonwong/loggercheck v0.9.3 // indirect
40+
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
3741
go.opentelemetry.io/otel/metric v1.16.0 // indirect
42+
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
3843
go.uber.org/atomic v1.7.0 // indirect
3944
go.uber.org/multierr v1.6.0 // indirect
4045
go.uber.org/zap v1.17.0 // indirect
4146
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
4247
golang.org/x/exp/typeparams v0.0.0-20220827204233-334a2380cb91 // indirect
48+
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
49+
google.golang.org/grpc v1.55.0 // indirect
4350
)
4451

4552
require (

go.sum

Lines changed: 42 additions & 0 deletions
Large diffs are not rendered by default.

internal/tracing/tracing.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/cschleiden/go-workflows/internal/contextpropagation"
77
"github.com/cschleiden/go-workflows/internal/core"
88
"github.com/cschleiden/go-workflows/internal/sync"
9+
"github.com/cschleiden/go-workflows/internal/workflowtracer"
910
"go.opentelemetry.io/otel/propagation"
1011
"go.opentelemetry.io/otel/trace"
1112
)
@@ -15,52 +16,40 @@ var propagator propagation.TextMapPropagator = propagation.NewCompositeTextMapPr
1516
propagation.Baggage{},
1617
)
1718

18-
func InjectSpan(ctx context.Context, metadata *core.WorkflowMetadata) {
19+
func injectSpan(ctx context.Context, metadata *core.WorkflowMetadata) {
1920
propagator.Inject(ctx, metadata)
2021
}
2122

22-
func ExtractSpan(ctx context.Context, metadata *core.WorkflowMetadata) context.Context {
23+
func extractSpan(ctx context.Context, metadata *core.WorkflowMetadata) context.Context {
2324
return propagator.Extract(ctx, metadata)
2425
}
2526

26-
type traceContextKeyType int
27-
28-
const currentSpanKey traceContextKeyType = iota
29-
30-
func WorkflowContextWithSpan(ctx sync.Context, span trace.Span) sync.Context {
31-
return sync.WithValue(ctx, currentSpanKey, span)
32-
}
33-
34-
func SpanFromWorkflowContext(ctx sync.Context) trace.Span {
35-
if span, ok := ctx.Value(currentSpanKey).(trace.Span); ok {
36-
return span
37-
}
38-
39-
panic("no span in context")
40-
}
41-
4227
type TracingContextPropagator struct {
4328
}
4429

4530
var _ contextpropagation.ContextPropagator = &TracingContextPropagator{}
4631

4732
func (*TracingContextPropagator) Inject(ctx context.Context, metadata *core.WorkflowMetadata) error {
48-
InjectSpan(ctx, metadata)
33+
injectSpan(ctx, metadata)
4934
return nil
5035
}
5136

5237
func (*TracingContextPropagator) Extract(ctx context.Context, metadata *core.WorkflowMetadata) (context.Context, error) {
53-
return ExtractSpan(ctx, metadata), nil
38+
return extractSpan(ctx, metadata), nil
5439
}
5540

5641
func (*TracingContextPropagator) InjectFromWorkflow(ctx sync.Context, metadata *core.WorkflowMetadata) error {
57-
// Ignore
42+
span := workflowtracer.SpanFromContext(ctx)
43+
sctx := trace.ContextWithSpan(context.Background(), span)
44+
45+
injectSpan(sctx, metadata)
5846

5947
return nil
6048
}
6149

6250
func (*TracingContextPropagator) ExtractToWorkflow(ctx sync.Context, metadata *core.WorkflowMetadata) (sync.Context, error) {
63-
// Ignore
51+
sctx := extractSpan(context.Background(), metadata)
52+
span := trace.SpanFromContext(sctx)
6453

65-
return ctx, nil
54+
return workflowtracer.ContextWithSpan(ctx, span), nil
6655
}

internal/workflow/executor.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@ import (
1515
"github.com/cschleiden/go-workflows/internal/payload"
1616
"github.com/cschleiden/go-workflows/internal/sync"
1717
"github.com/cschleiden/go-workflows/internal/task"
18-
"github.com/cschleiden/go-workflows/internal/tracing"
1918
"github.com/cschleiden/go-workflows/internal/workflowstate"
2019
"github.com/cschleiden/go-workflows/internal/workflowtracer"
2120
"github.com/cschleiden/go-workflows/log"
22-
"go.opentelemetry.io/otel/attribute"
2321
"go.opentelemetry.io/otel/trace"
2422
)
2523

@@ -53,6 +51,7 @@ type executor struct {
5351
logger log.Logger
5452
tracer trace.Tracer
5553
lastSequenceID int64
54+
parentSpan trace.Span
5655
}
5756

5857
func NewExecutor(
@@ -85,6 +84,9 @@ func NewExecutor(
8584
}
8685
}
8786

87+
// Get span from the workflow context, set by the default context propagator
88+
parentSpan := workflowtracer.SpanFromContext(wfCtx)
89+
8890
return &executor{
8991
registry: registry,
9092
historyProvider: historyProvider,
@@ -95,23 +97,11 @@ func NewExecutor(
9597
clock: clock,
9698
logger: logger,
9799
tracer: tracer,
100+
parentSpan: parentSpan,
98101
}, nil
99102
}
100103

101104
func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*ExecutionResult, error) {
102-
ctx = tracing.ExtractSpan(ctx, t.Metadata)
103-
ctx, span := e.tracer.Start(ctx, "WorkflowTaskExecution", trace.WithAttributes(
104-
attribute.String(log.InstanceIDKey, t.WorkflowInstance.InstanceID),
105-
attribute.String(log.TaskIDKey, t.ID),
106-
attribute.Int(log.NewEventsKey, len(t.NewEvents)),
107-
))
108-
defer span.End()
109-
110-
// Make the current span available to the tracer that we pass into the workflow execution. With caching
111-
// the executor instance might be used for multiple workflow tasks and we want calls made in each task
112-
// execution to be associated with the new span for the WorkflowTaskExecution.
113-
e.workflowTracer.UpdateExecution(span)
114-
115105
logger := e.logger.With(
116106
log.TaskIDKey, t.ID,
117107
log.InstanceIDKey, t.WorkflowInstance.InstanceID)

internal/workflowtracer/tracer.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ func ContextWithSpan(ctx sync.Context, span trace.Span) sync.Context {
3232
return sync.WithValue(ctx, spanKey, span)
3333
}
3434

35-
func SpanFromContext(ctx sync.Context) *trace.Span {
35+
func SpanFromContext(ctx sync.Context) trace.Span {
3636
if span, ok := ctx.Value(spanKey).(trace.Span); ok {
37-
return &span
37+
return span
3838
}
3939

4040
return nil
@@ -51,21 +51,10 @@ func New(tracer trace.Tracer) *WorkflowTracer {
5151
}
5252
}
5353

54-
func (wt *WorkflowTracer) UpdateExecution(span trace.Span) {
55-
wt.parentSpan = span
56-
}
57-
5854
func (wt *WorkflowTracer) Start(ctx sync.Context, name string, opts ...trace.SpanStartOption) (sync.Context, Span) {
59-
parentSpan := wt.parentSpan
60-
61-
// Try to get parent span from ctx, otherwise use the one from the tracer instance
62-
if span := SpanFromContext(ctx); span != nil {
63-
parentSpan = *span
64-
}
65-
6655
state := workflowstate.WorkflowState(ctx)
6756

68-
sctx := trace.ContextWithSpan(context.Background(), parentSpan)
57+
sctx := trace.ContextWithSpan(context.Background(), SpanFromContext(ctx))
6958
opts = append(opts, trace.WithTimestamp(state.Time()))
7059
sctx, span := wt.tracer.Start(sctx, name, opts...)
7160

log/fields.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package log
33
const (
44
NamespaceKey = "workflows"
55

6+
ErrorKey = "error"
7+
68
ActivityIDKey = NamespaceKey + ".activity.id"
79
ActivityNameKey = NamespaceKey + ".activity.name"
810
InstanceIDKey = NamespaceKey + ".instance.id"

samples/tracing/tracing.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"go.opentelemetry.io/otel/sdk/trace"
1515
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
1616

17+
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
18+
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
1719
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
1820

1921
"github.com/cschleiden/go-workflows/worker"
@@ -31,14 +33,25 @@ func main() {
3133
attribute.String("environment", "sample"),
3234
)
3335

36+
oclient := otlptracehttp.NewClient(
37+
// otlptracehttp.WithEndpoint("localhost:8360"),
38+
// otlptracehttp.WithURLPath("/traces/otlp/v0.9"),
39+
otlptracehttp.WithEndpoint("localhost:4318"),
40+
otlptracehttp.WithInsecure(),
41+
)
42+
exp, err := otlptrace.New(ctx, oclient)
43+
if err != nil {
44+
panic(err)
45+
}
46+
3447
stdoutexp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
3548
if err != nil {
3649
panic(err)
3750
}
3851

3952
tp := trace.NewTracerProvider(
4053
trace.WithSyncer(stdoutexp),
41-
// trace.WithBatcher(exp),
54+
trace.WithBatcher(exp),
4255
trace.WithResource(r),
4356
)
4457

@@ -75,7 +88,7 @@ func runWorkflow(ctx context.Context, c client.Client) {
7588
}
7689

7790
// Test signal
78-
time.Sleep(time.Millisecond * 500)
91+
time.Sleep(time.Second * 5)
7992
c.SignalWorkflow(ctx, wf.InstanceID, "test-signal", "")
8093

8194
result, err := client.GetWorkflowResult[int](ctx, c, wf, time.Second*120)

samples/tracing/workflow.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@ func Workflow1(ctx workflow.Context, msg string, times int, inputs Inputs) (int,
2121
logger.Debug("Entering Workflow1", "msg", msg, "times", times, "inputs", inputs)
2222
defer logger.Debug("Leaving Workflow1")
2323

24-
ctx, span := workflow.Tracer(ctx).Start(ctx, "Workflow1 span", trace.WithAttributes(
24+
tracer := workflow.Tracer(ctx)
25+
ctx, span := tracer.Start(ctx, "Workflow1")
26+
defer span.End()
27+
28+
_, customSpan := tracer.Start(ctx, "Workflow1 span", trace.WithAttributes(
2529
// Add additional
2630
attribute.String("msg", "hello world"),
2731
))
2832

2933
// Do something
3034

31-
span.End()
35+
customSpan.End()
3236

3337
workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
3438

@@ -38,6 +42,10 @@ func Workflow1(ctx workflow.Context, msg string, times int, inputs Inputs) (int,
3842

3943
workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
4044

45+
_, span = workflow.Tracer(ctx).Start(ctx, "Wait-for-signal")
46+
workflow.NewSignalChannel[string](ctx, "test-signal").Receive(ctx)
47+
span.End()
48+
4149
r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
4250

4351
return r1, nil

0 commit comments

Comments
 (0)