Skip to content

Commit da2360f

Browse files
authored
Beginning of replay-aware workflow tracer
1 parent 4fee3f7 commit da2360f

File tree

7 files changed

+30
-18
lines changed

7 files changed

+30
-18
lines changed

internal/workflow/executor.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cschleiden/go-workflows/internal/task"
1616
"github.com/cschleiden/go-workflows/internal/tracing"
1717
"github.com/cschleiden/go-workflows/internal/workflowstate"
18+
"github.com/cschleiden/go-workflows/internal/workflowtracer"
1819
"github.com/cschleiden/go-workflows/log"
1920
"go.opentelemetry.io/otel/attribute"
2021
"go.opentelemetry.io/otel/trace"
@@ -42,7 +43,7 @@ type executor struct {
4243
registry *Registry
4344
historyProvider WorkflowHistoryProvider
4445
workflow *workflow
45-
workflowTracer *tracing.WorkflowTracer
46+
workflowTracer *workflowtracer.WorkflowTracer
4647
workflowState *workflowstate.WfState
4748
workflowCtx sync.Context
4849
workflowCtxCancel sync.CancelFunc
@@ -55,11 +56,11 @@ type executor struct {
5556
func NewExecutor(logger log.Logger, tracer trace.Tracer, registry *Registry, historyProvider WorkflowHistoryProvider, instance *core.WorkflowInstance, clock clock.Clock) (WorkflowExecutor, error) {
5657
s := workflowstate.NewWorkflowState(instance, logger, clock)
5758

58-
wfTracer := tracing.NewWorkflowTracer(tracer)
59+
wfTracer := workflowtracer.New(tracer)
5960

6061
wfCtx, cancel := sync.WithCancel(
6162
workflowstate.WithWorkflowState(
62-
tracing.WithWorkflowTracer(
63+
workflowtracer.WithWorkflowTracer(
6364
sync.Background(),
6465
wfTracer,
6566
),

internal/tracing/tracer.go renamed to internal/workflowtracer/tracer.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
package tracing
1+
package workflowtracer
22

33
import (
44
"context"
55

66
"github.com/cschleiden/go-workflows/internal/sync"
7+
"github.com/cschleiden/go-workflows/internal/workflowstate"
78
"go.opentelemetry.io/otel/trace"
89
)
910

@@ -28,7 +29,7 @@ type WorkflowTracer struct {
2829
tracer trace.Tracer
2930
}
3031

31-
func NewWorkflowTracer(tracer trace.Tracer) *WorkflowTracer {
32+
func New(tracer trace.Tracer) *WorkflowTracer {
3233
return &WorkflowTracer{
3334
tracer: tracer,
3435
}
@@ -38,8 +39,15 @@ func (wt *WorkflowTracer) UpdateExecution(span trace.Span) {
3839
wt.parentSpan = span
3940
}
4041

41-
func (wt *WorkflowTracer) Start(name string, opts ...trace.SpanStartOption) trace.Span {
42-
ctx := trace.ContextWithSpan(context.Background(), wt.parentSpan)
43-
_, span := wt.tracer.Start(ctx, name, opts...)
42+
func (wt *WorkflowTracer) Start(ctx sync.Context, name string, opts ...trace.SpanStartOption) trace.Span {
43+
sctx := trace.ContextWithSpan(context.Background(), wt.parentSpan)
44+
sctx, span := wt.tracer.Start(sctx, name, opts...)
45+
46+
state := workflowstate.WorkflowState(ctx)
47+
if state.Replaying() {
48+
sctx = trace.ContextWithSpanContext(sctx, span.SpanContext())
49+
return trace.SpanFromContext(sctx)
50+
}
51+
4452
return span
4553
}

workflow/activity.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/cschleiden/go-workflows/internal/sync"
1111
"github.com/cschleiden/go-workflows/internal/tracing"
1212
"github.com/cschleiden/go-workflows/internal/workflowstate"
13+
"github.com/cschleiden/go-workflows/internal/workflowtracer"
1314
"go.opentelemetry.io/otel/attribute"
1415
"go.opentelemetry.io/otel/trace"
1516
)
@@ -51,7 +52,8 @@ func executeActivity[TResult any](ctx Context, options ActivityOptions, attempt
5152
wfState.AddCommand(cmd)
5253
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(f))
5354

54-
span := tracing.Tracer(ctx).Start(
55+
span := workflowtracer.Tracer(ctx).Start(
56+
ctx,
5557
"ExecuteActivity", trace.WithAttributes(
5658
attribute.String("name", name),
5759
attribute.Int64(tracing.ScheduleEventID, scheduleEventID),

workflow/sideeffect.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import (
44
"github.com/cschleiden/go-workflows/internal/command"
55
"github.com/cschleiden/go-workflows/internal/converter"
66
"github.com/cschleiden/go-workflows/internal/sync"
7-
"github.com/cschleiden/go-workflows/internal/tracing"
87
"github.com/cschleiden/go-workflows/internal/workflowstate"
8+
"github.com/cschleiden/go-workflows/internal/workflowtracer"
99
)
1010

1111
func SideEffect[TResult any](ctx Context, f func(ctx Context) TResult) Future[TResult] {
12-
span := tracing.Tracer(ctx).Start("SideEffect")
12+
span := workflowtracer.Tracer(ctx).Start(ctx, "SideEffect")
1313
defer span.End()
1414

1515
future := sync.NewFuture[TResult]()

workflow/sleep.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import (
44
"time"
55

66
"github.com/cschleiden/go-workflows/internal/sync"
7-
"github.com/cschleiden/go-workflows/internal/tracing"
7+
"github.com/cschleiden/go-workflows/internal/workflowtracer"
88
"go.opentelemetry.io/otel/attribute"
99
"go.opentelemetry.io/otel/trace"
1010
)
1111

1212
func Sleep(ctx sync.Context, d time.Duration) error {
13-
span := tracing.Tracer(ctx).Start("Sleep",
13+
span := workflowtracer.Tracer(ctx).Start(ctx, "Sleep",
1414
trace.WithAttributes(attribute.Int64("duration_s", int64(d/time.Second))))
1515
defer span.End()
1616

workflow/subworkflow.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/cschleiden/go-workflows/internal/sync"
1111
"github.com/cschleiden/go-workflows/internal/tracing"
1212
"github.com/cschleiden/go-workflows/internal/workflowstate"
13+
"github.com/cschleiden/go-workflows/internal/workflowtracer"
1314
"go.opentelemetry.io/otel/attribute"
1415
"go.opentelemetry.io/otel/trace"
1516
)
@@ -30,7 +31,7 @@ func CreateSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflo
3031
})
3132
}
3233

33-
func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflowOptions, attempt int, workflow interface{}, args ...interface{}) Future[TResult] {
34+
func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflowOptions, attempt int, wf interface{}, args ...interface{}) Future[TResult] {
3435
f := sync.NewFuture[TResult]()
3536

3637
// If the context is already canceled, return immediately.
@@ -39,7 +40,7 @@ func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflo
3940
return f
4041
}
4142

42-
name := fn.Name(workflow)
43+
name := fn.Name(wf)
4344

4445
inputs, err := a.ArgsToInputs(converter.DefaultConverter, args...)
4546
if err != nil {
@@ -54,7 +55,7 @@ func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflo
5455

5556
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(f))
5657

57-
span := tracing.Tracer(ctx).Start(
58+
span := workflowtracer.Tracer(ctx).Start(ctx,
5859
"CreateSubworkflowInstance", trace.WithAttributes(
5960
attribute.String("name", name),
6061
attribute.Int64(tracing.ScheduleEventID, scheduleEventID),

workflow/timer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import (
55

66
"github.com/cschleiden/go-workflows/internal/command"
77
"github.com/cschleiden/go-workflows/internal/sync"
8-
"github.com/cschleiden/go-workflows/internal/tracing"
98
"github.com/cschleiden/go-workflows/internal/workflowstate"
9+
"github.com/cschleiden/go-workflows/internal/workflowtracer"
1010
"go.opentelemetry.io/otel/attribute"
1111
"go.opentelemetry.io/otel/trace"
1212
)
@@ -28,7 +28,7 @@ func ScheduleTimer(ctx Context, delay time.Duration) Future[struct{}] {
2828

2929
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(f))
3030

31-
span := tracing.Tracer(ctx).Start("ScheduleTimer",
31+
span := workflowtracer.Tracer(ctx).Start(ctx, "ScheduleTimer",
3232
trace.WithAttributes(attribute.Int64("duration_s", int64(delay/time.Second))))
3333
defer span.End()
3434

0 commit comments

Comments
 (0)