Skip to content

Commit 1569c4b

Browse files
committed
Fix issues
1 parent 1789a1c commit 1569c4b

File tree

12 files changed

+145
-68
lines changed

12 files changed

+145
-68
lines changed

backend/history/timer_fired.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package history
22

3-
import "time"
3+
import (
4+
"time"
5+
6+
"github.com/cschleiden/go-workflows/backend/metadata"
7+
)
48

59
type TimerFiredAttributes struct {
6-
ScheduledAt time.Time `json:"scheduled_at,omitempty"`
7-
At time.Time `json:"at,omitempty"`
10+
ScheduledAt time.Time `json:"scheduled_at,omitempty"`
11+
At time.Time `json:"at,omitempty"`
12+
Name string `json:"name,omitempty"`
13+
SpanMetadata metadata.WorkflowMetadata `json:"span_metadata,omitempty"`
814
}

backend/history/timer_scheduled.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ package history
33
import "time"
44

55
type TimerScheduledAttributes struct {
6-
At time.Time `json:"at,omitempty"`
6+
At time.Time `json:"at,omitempty"`
7+
Name string `json:"name,omitempty"`
78
}

backend/test/e2e_tracing.go

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ var e2eTracingTests = []backendTest{
6666
wf := func(ctx workflow.Context) error {
6767
workflow.ScheduleTimer(ctx, time.Millisecond*20).Get(ctx)
6868

69+
workflow.Sleep(ctx, time.Millisecond*10)
70+
6971
return nil
7072
}
7173
register(t, ctx, w, []interface{}{wf}, nil)
@@ -88,13 +90,64 @@ var e2eTracingTests = []backendTest{
8890
require.InEpsilon(t, time.Duration(20*time.Millisecond),
8991
timerSpan.EndTime().Sub(timerSpan.StartTime())/time.Millisecond,
9092
float64(5*time.Millisecond))
93+
require.Equal(t, "Timer", timerSpan.Name())
94+
95+
sleepSpan := findSpan(spans, func(span trace.ReadOnlySpan) bool {
96+
return strings.Contains(span.Name(), "Timer: Sleep")
97+
})
98+
require.NotNil(t, sleepSpan)
9199

92100
require.Equal(t,
93101
workflow1Span.SpanContext().SpanID().String(),
94102
timerSpan.Parent().SpanID().String(),
95103
)
96104
},
97105
},
106+
{
107+
name: "Tracing/TimersWithinCustomSpans",
108+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
109+
exporter := setupTracing(b)
110+
111+
wf := func(ctx workflow.Context) error {
112+
ctx, span := workflow.Tracer(ctx).Start(ctx, "custom-span")
113+
defer span.End()
114+
115+
workflow.Sleep(ctx, time.Millisecond*10)
116+
117+
return nil
118+
}
119+
register(t, ctx, w, []interface{}{wf}, nil)
120+
121+
instance := runWorkflow(t, ctx, c, wf)
122+
_, err := client.GetWorkflowResult[any](ctx, c, instance, time.Second*5)
123+
require.NoError(t, err)
124+
125+
spans := exporter.GetSpans().Snapshots()
126+
127+
workflow1Span := findSpan(spans, func(span trace.ReadOnlySpan) bool {
128+
return strings.Contains(span.Name(), "Workflow: 1")
129+
})
130+
require.NotNil(t, workflow1Span)
131+
132+
customSpan := findSpan(spans, func(span trace.ReadOnlySpan) bool {
133+
return strings.Contains(span.Name(), "custom-span")
134+
})
135+
require.NotNil(t, workflow1Span)
136+
require.Equal(t,
137+
workflow1Span.SpanContext().SpanID().String(),
138+
customSpan.Parent().SpanID().String(),
139+
)
140+
141+
sleepSpan := findSpan(spans, func(span trace.ReadOnlySpan) bool {
142+
return strings.Contains(span.Name(), "Timer: Sleep")
143+
})
144+
require.NotNil(t, sleepSpan)
145+
require.Equal(t,
146+
customSpan.SpanContext().SpanID().String(),
147+
sleepSpan.Parent().SpanID().String(),
148+
)
149+
},
150+
},
98151
{
99152
name: "Tracing/SubWorkflowsHaveSpansWithCorrectParent",
100153
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
@@ -133,19 +186,14 @@ var e2eTracingTests = []backendTest{
133186
})
134187
require.NotNil(t, workflow1Span)
135188

136-
createSWFSpan := findSpan(spans, func(span trace.ReadOnlySpan) bool {
137-
return strings.Contains(span.Name(), "CreateSubworkflowInstance: swf")
138-
})
139-
require.NotNil(t, createSWFSpan)
140-
141-
workflow2Span := findSpan(spans, func(span trace.ReadOnlySpan) bool {
189+
swfSpan := findSpan(spans, func(span trace.ReadOnlySpan) bool {
142190
return strings.Contains(span.Name(), "Workflow: swf")
143191
})
144192
require.NotNil(t, workflow1Span)
145193

146194
require.Equal(t,
147-
createSWFSpan.SpanContext().SpanID().String(),
148-
workflow2Span.Parent().SpanID().String(),
195+
workflow1Span.SpanContext().SpanID().String(),
196+
swfSpan.Parent().SpanID().String(),
149197
)
150198
},
151199
},

internal/command/schedule_timer.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@ import (
55

66
"github.com/benbjohnson/clock"
77
"github.com/cschleiden/go-workflows/backend/history"
8+
"github.com/cschleiden/go-workflows/backend/metadata"
89
)
910

1011
type ScheduleTimerCommand struct {
1112
cancelableCommand
1213

13-
at time.Time
14+
at time.Time
15+
name string
16+
spanMetadata metadata.WorkflowMetadata
1417
}
1518

1619
var _ CancelableCommand = (*ScheduleTimerCommand)(nil)
1720

18-
func NewScheduleTimerCommand(id int64, at time.Time) *ScheduleTimerCommand {
21+
func NewScheduleTimerCommand(id int64, at time.Time, name string, carrier metadata.WorkflowMetadata) *ScheduleTimerCommand {
1922
return &ScheduleTimerCommand{
2023
cancelableCommand: cancelableCommand{
2124
command: command{
@@ -24,7 +27,9 @@ func NewScheduleTimerCommand(id int64, at time.Time) *ScheduleTimerCommand {
2427
state: CommandState_Pending,
2528
},
2629
},
27-
at: at,
30+
at: at,
31+
name: name,
32+
spanMetadata: carrier,
2833
}
2934
}
3035

@@ -41,7 +46,8 @@ func (c *ScheduleTimerCommand) Execute(clock clock.Clock) *CommandResult {
4146
now,
4247
history.EventType_TimerScheduled,
4348
&history.TimerScheduledAttributes{
44-
At: c.at,
49+
At: c.at,
50+
Name: c.name,
4551
},
4652
history.ScheduleEventID(c.id),
4753
),
@@ -52,8 +58,10 @@ func (c *ScheduleTimerCommand) Execute(clock clock.Clock) *CommandResult {
5258
clock.Now(),
5359
history.EventType_TimerFired,
5460
&history.TimerFiredAttributes{
55-
ScheduledAt: now,
56-
At: c.at,
61+
ScheduledAt: now,
62+
At: c.at,
63+
Name: c.name,
64+
SpanMetadata: c.spanMetadata,
5765
},
5866
history.ScheduleEventID(c.id),
5967
history.VisibleAt(c.at),

internal/command/schedule_timer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func TestScheduleTimerCommand_StateTransitions(t *testing.T) {
8989
for _, tt := range tests {
9090
t.Run(tt.name, func(t *testing.T) {
9191
clock := clock.NewMock()
92-
cmd := NewScheduleTimerCommand(1, clock.Now().Add(time.Second))
92+
cmd := NewScheduleTimerCommand(1, clock.Now().Add(time.Second), "", nil)
9393

9494
tt.f(t, cmd, clock)
9595
})

samples/tracing/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ func Workflow1(ctx workflow.Context, msg string, times int, inputs Inputs) (int,
2222
defer logger.Debug("Leaving Workflow1")
2323

2424
tracer := workflow.Tracer(ctx)
25-
ctx, span := tracer.Start(ctx, "Workflow1")
25+
ctx, span := tracer.Start(ctx, "Workflow1 custom span")
2626
defer span.End()
2727

28-
_, customSpan := tracer.Start(ctx, "Workflow1 span", trace.WithAttributes(
28+
_, customSpan := tracer.Start(ctx, "Workflow1 custom inner span", trace.WithAttributes(
2929
// Add additional
3030
attribute.String("msg", "hello world"),
3131
))

workflow/activity.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,8 @@ import (
88
"github.com/cschleiden/go-workflows/internal/command"
99
"github.com/cschleiden/go-workflows/internal/contextvalue"
1010
"github.com/cschleiden/go-workflows/internal/fn"
11-
"github.com/cschleiden/go-workflows/internal/log"
1211
"github.com/cschleiden/go-workflows/internal/sync"
1312
"github.com/cschleiden/go-workflows/internal/workflowstate"
14-
"go.opentelemetry.io/otel/attribute"
15-
"go.opentelemetry.io/otel/trace"
1613
)
1714

1815
type ActivityOptions struct {
@@ -79,21 +76,6 @@ func executeActivity[TResult any](ctx Context, options ActivityOptions, attempt
7976
wfState.AddCommand(cmd)
8077
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, fmt.Sprintf("activity: %s", name), f))
8178

82-
ctx, span := Tracer(ctx).Start(ctx,
83-
fmt.Sprintf("ExecuteActivity: %s", name),
84-
trace.WithAttributes(
85-
attribute.String(log.ActivityNameKey, name),
86-
attribute.Int64(log.ScheduleEventIDKey, scheduleEventID),
87-
attribute.Int(log.AttemptKey, attempt),
88-
))
89-
90-
tf := sync.NewFuture[TResult]()
91-
Go(ctx, func(ctx Context) {
92-
r, err := f.Get(ctx)
93-
span.End()
94-
tf.Set(r, err)
95-
})
96-
9779
// Handle cancellation
9880
if d := ctx.Done(); d != nil {
9981
if c, ok := d.(sync.ChannelInternal[struct{}]); ok {
@@ -108,5 +90,5 @@ func executeActivity[TResult any](ctx Context, options ActivityOptions, attempt
10890
}
10991
}
11092

111-
return tf
93+
return f
11294
}

workflow/executor/executor.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cschleiden/go-workflows/registry"
2929
wf "github.com/cschleiden/go-workflows/workflow"
3030
"go.opentelemetry.io/otel/attribute"
31+
"go.opentelemetry.io/otel/propagation"
3132
"go.opentelemetry.io/otel/trace"
3233
)
3334

@@ -449,7 +450,7 @@ func (e *executor) handleActivityScheduled(event *history.Event, a *history.Acti
449450
return fmt.Errorf("previous workflow execution scheduled different type of activity: %s, %s", a.Name, sac.Name)
450451
}
451452

452-
c.Commit()
453+
sac.Commit()
453454

454455
return nil
455456
}
@@ -534,9 +535,16 @@ func (e *executor) handleTimerFired(event *history.Event, a *history.TimerFiredA
534535

535536
if !e.workflowState.Replaying() {
536537
// Trace timer
537-
parentSpan := tracing.SpanFromContext(e.workflowCtx)
538-
ctx := trace.ContextWithSpan(context.Background(), parentSpan)
539-
_, span := e.tracer.Start(ctx, "Timer", trace.WithAttributes(
538+
spanName := "Timer"
539+
if a.Name != "" {
540+
spanName = spanName + ": " + a.Name
541+
}
542+
543+
sctx := context.Background()
544+
if a.SpanMetadata != nil {
545+
sctx = propagation.TraceContext{}.Extract(sctx, a.SpanMetadata)
546+
}
547+
_, span := e.tracer.Start(sctx, spanName, trace.WithAttributes(
540548
attribute.Int64(log.DurationKey, int64(a.At.Sub(a.ScheduledAt)/time.Millisecond)),
541549
attribute.String(log.NowKey, a.ScheduledAt.String()),
542550
attribute.String(log.AtKey, a.At.String()),

workflow/retries.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func WithRetries[T any](ctx Context, retryOptions RetryOptions, fn func(ctx Cont
8787
break
8888
}
8989

90+
// TODO: Not trace this?
9091
if err := Sleep(ctx, backoffDuration); err != nil {
9192
r.Set(*new(T), err)
9293
return

workflow/sleep.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,11 @@ package workflow
22

33
import (
44
"time"
5-
6-
"github.com/cschleiden/go-workflows/internal/log"
7-
"go.opentelemetry.io/otel/attribute"
8-
"go.opentelemetry.io/otel/trace"
95
)
106

117
// Sleep sleeps for the given duration.
128
func Sleep(ctx Context, d time.Duration) error {
13-
ctx, span := Tracer(ctx).Start(ctx, "Sleep",
14-
trace.WithAttributes(attribute.Int64(log.DurationKey, int64(d/time.Millisecond))))
15-
defer span.End()
16-
17-
_, err := ScheduleTimer(ctx, d).Get(ctx)
9+
_, err := ScheduleTimer(ctx, d, WithTimerName("Sleep")).Get(ctx)
1810

1911
return err
2012
}

0 commit comments

Comments
 (0)