Skip to content

Commit 1789a1c

Browse files
committed
Correctly trace timers and adjust subworkflows
1 parent 38feda9 commit 1789a1c

File tree

7 files changed

+67
-32
lines changed

7 files changed

+67
-32
lines changed

backend/history/timer_fired.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ package history
33
import "time"
44

55
type TimerFiredAttributes struct {
6-
At time.Time `json:"at,omitempty"`
6+
ScheduledAt time.Time `json:"scheduled_at,omitempty"`
7+
At time.Time `json:"at,omitempty"`
78
}

backend/test/e2e_tracing.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,43 @@ var e2eTracingTests = []backendTest{
5858
)
5959
},
6060
},
61+
{
62+
name: "Tracing/TimersHaveSpans",
63+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
64+
exporter := setupTracing(b)
65+
66+
wf := func(ctx workflow.Context) error {
67+
workflow.ScheduleTimer(ctx, time.Millisecond*20).Get(ctx)
68+
69+
return nil
70+
}
71+
register(t, ctx, w, []interface{}{wf}, nil)
72+
73+
instance := runWorkflow(t, ctx, c, wf)
74+
_, err := client.GetWorkflowResult[any](ctx, c, instance, time.Second*5)
75+
require.NoError(t, err)
76+
77+
spans := exporter.GetSpans().Snapshots()
78+
79+
workflow1Span := findSpan(spans, func(span trace.ReadOnlySpan) bool {
80+
return strings.Contains(span.Name(), "Workflow: 1")
81+
})
82+
require.NotNil(t, workflow1Span)
83+
84+
timerSpan := findSpan(spans, func(span trace.ReadOnlySpan) bool {
85+
return strings.Contains(span.Name(), "Timer")
86+
})
87+
require.NotNil(t, workflow1Span)
88+
require.InEpsilon(t, time.Duration(20*time.Millisecond),
89+
timerSpan.EndTime().Sub(timerSpan.StartTime())/time.Millisecond,
90+
float64(5*time.Millisecond))
91+
92+
require.Equal(t,
93+
workflow1Span.SpanContext().SpanID().String(),
94+
timerSpan.Parent().SpanID().String(),
95+
)
96+
},
97+
},
6198
{
6299
name: "Tracing/SubWorkflowsHaveSpansWithCorrectParent",
63100
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
@@ -96,13 +133,18 @@ var e2eTracingTests = []backendTest{
96133
})
97134
require.NotNil(t, workflow1Span)
98135

136+
createSWFSpan := findSpan(spans, func(span trace.ReadOnlySpan) bool {
137+
return strings.Contains(span.Name(), "CreateSubworkflowInstance: swf")
138+
})
139+
require.NotNil(t, createSWFSpan)
140+
99141
workflow2Span := findSpan(spans, func(span trace.ReadOnlySpan) bool {
100142
return strings.Contains(span.Name(), "Workflow: swf")
101143
})
102144
require.NotNil(t, workflow1Span)
103145

104146
require.Equal(t,
105-
workflow1Span.SpanContext().SpanID().String(),
147+
createSWFSpan.SpanContext().SpanID().String(),
106148
workflow2Span.Parent().SpanID().String(),
107149
)
108150
},

internal/command/schedule_timer.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ func (c *ScheduleTimerCommand) Execute(clock clock.Clock) *CommandResult {
3333
case CommandState_Pending:
3434
c.state = CommandState_Committed
3535

36+
now := clock.Now()
37+
3638
return &CommandResult{
3739
Events: []*history.Event{
3840
history.NewPendingEvent(
39-
clock.Now(),
41+
now,
4042
history.EventType_TimerScheduled,
4143
&history.TimerScheduledAttributes{
4244
At: c.at,
@@ -50,7 +52,8 @@ func (c *ScheduleTimerCommand) Execute(clock clock.Clock) *CommandResult {
5052
clock.Now(),
5153
history.EventType_TimerFired,
5254
&history.TimerFiredAttributes{
53-
At: c.at,
55+
ScheduledAt: now,
56+
At: c.at,
5457
},
5558
history.ScheduleEventID(c.id),
5659
history.VisibleAt(c.at),

workflow/executor/executor.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"reflect"
99
"slices"
1010
"testing"
11+
"time"
1112

1213
"github.com/benbjohnson/clock"
1314
"github.com/cschleiden/go-workflows/backend"
@@ -26,6 +27,7 @@ import (
2627
"github.com/cschleiden/go-workflows/internal/workflowstate"
2728
"github.com/cschleiden/go-workflows/registry"
2829
wf "github.com/cschleiden/go-workflows/workflow"
30+
"go.opentelemetry.io/otel/attribute"
2931
"go.opentelemetry.io/otel/trace"
3032
)
3133

@@ -530,6 +532,18 @@ func (e *executor) handleTimerFired(event *history.Event, a *history.TimerFiredA
530532
return nil
531533
}
532534

535+
if !e.workflowState.Replaying() {
536+
// Trace timer
537+
parentSpan := tracing.SpanFromContext(e.workflowCtx)
538+
ctx := trace.ContextWithSpan(context.Background(), parentSpan)
539+
_, span := e.tracer.Start(ctx, "Timer", trace.WithAttributes(
540+
attribute.Int64(log.DurationKey, int64(a.At.Sub(a.ScheduledAt)/time.Millisecond)),
541+
attribute.String(log.NowKey, a.ScheduledAt.String()),
542+
attribute.String(log.AtKey, a.At.String()),
543+
), trace.WithTimestamp(a.ScheduledAt))
544+
span.End()
545+
}
546+
533547
if err := f.Set(nil, nil); err != nil {
534548
return fmt.Errorf("setting timer fired result: %w", err)
535549
}

workflow/subworkflow.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,7 @@ func createSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOpti
9090
attribute.Int64(log.ScheduleEventIDKey, scheduleEventID),
9191
attribute.Int(log.AttemptKey, attempt),
9292
))
93-
94-
tf := sync.NewFuture[TResult]()
95-
Go(ctx, func(ctx Context) {
96-
r, err := f.Get(ctx)
97-
span.End()
98-
tf.Set(r, err)
99-
})
93+
defer span.End()
10094

10195
// Capture context
10296
propagators := propagators(ctx)
@@ -146,5 +140,5 @@ func createSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOpti
146140
})
147141
}
148142

149-
return tf
143+
return f
150144
}

workflow/timer.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,8 @@ import (
66

77
"github.com/cschleiden/go-workflows/internal/command"
88
"github.com/cschleiden/go-workflows/internal/contextvalue"
9-
"github.com/cschleiden/go-workflows/internal/log"
109
"github.com/cschleiden/go-workflows/internal/sync"
1110
"github.com/cschleiden/go-workflows/internal/workflowstate"
12-
"go.opentelemetry.io/otel/attribute"
13-
"go.opentelemetry.io/otel/trace"
1411
)
1512

1613
// ScheduleTimer schedules a timer to fire after the given delay.
@@ -48,19 +45,6 @@ func ScheduleTimer(ctx Context, delay time.Duration) Future[any] {
4845
},
4946
}
5047

51-
ctx, span := Tracer(ctx).Start(ctx, "ScheduleTimer",
52-
trace.WithAttributes(
53-
attribute.Int64(log.DurationKey, int64(delay/time.Millisecond)),
54-
attribute.String(log.NowKey, Now(ctx).String()),
55-
attribute.String(log.AtKey, at.String()),
56-
))
57-
tf := sync.NewFuture[any]()
58-
Go(ctx, func(ctx Context) {
59-
r, err := f.Get(ctx)
60-
span.End()
61-
tf.Set(r, err)
62-
})
63-
6448
// Check if the context is cancelable
6549
if c, cancelable := ctx.Done().(sync.CancelChannel); cancelable {
6650
// Register a callback for when it's canceled. The only operation on the `Done` channel
@@ -72,5 +56,5 @@ func ScheduleTimer(ctx Context, delay time.Duration) Future[any] {
7256
})
7357
}
7458

75-
return tf
59+
return f
7660
}

workflow/tracer.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package workflow
22

33
import (
44
"context"
5-
"log"
65

76
"github.com/cschleiden/go-workflows/internal/command"
87
"github.com/cschleiden/go-workflows/internal/contextvalue"
@@ -21,8 +20,6 @@ func (s *wfSpan) End() {
2120
if !s.state.Replaying() {
2221
// Only end the trace when we are not replaying
2322
s.span.End()
24-
} else {
25-
log.Println("Not ending span as we are replaying")
2623
}
2724
}
2825

0 commit comments

Comments
 (0)