Skip to content

Commit 3bff0dd

Browse files
authored
Instrument common workflow operations
1 parent feba6db commit 3bff0dd

File tree

6 files changed

+52
-11
lines changed

6 files changed

+52
-11
lines changed

workflow/activity.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import (
88
"github.com/cschleiden/go-workflows/internal/converter"
99
"github.com/cschleiden/go-workflows/internal/fn"
1010
"github.com/cschleiden/go-workflows/internal/sync"
11+
"github.com/cschleiden/go-workflows/internal/tracing"
1112
"github.com/cschleiden/go-workflows/internal/workflowstate"
13+
"go.opentelemetry.io/otel/attribute"
14+
"go.opentelemetry.io/otel/trace"
1215
)
1316

1417
type ActivityOptions struct {
@@ -20,13 +23,13 @@ var DefaultActivityOptions = ActivityOptions{
2023
}
2124

2225
// ExecuteActivity schedules the given activity to be executed
23-
func ExecuteActivity[TResult any](ctx sync.Context, options ActivityOptions, activity interface{}, args ...interface{}) Future[TResult] {
24-
return withRetries(ctx, options.RetryOptions, func(ctx sync.Context) Future[TResult] {
25-
return executeActivity[TResult](ctx, options, activity, args...)
26+
func ExecuteActivity[TResult any](ctx Context, options ActivityOptions, activity interface{}, args ...interface{}) Future[TResult] {
27+
return withRetries(ctx, options.RetryOptions, func(ctx sync.Context, attempt int) Future[TResult] {
28+
return executeActivity[TResult](ctx, options, attempt, activity, args...)
2629
})
2730
}
2831

29-
func executeActivity[TResult any](ctx sync.Context, options ActivityOptions, activity interface{}, args ...interface{}) Future[TResult] {
32+
func executeActivity[TResult any](ctx Context, options ActivityOptions, attempt int, activity interface{}, args ...interface{}) Future[TResult] {
3033
f := sync.NewFuture[TResult]()
3134

3235
if ctx.Err() != nil {
@@ -48,10 +51,18 @@ func executeActivity[TResult any](ctx sync.Context, options ActivityOptions, act
4851
wfState.AddCommand(cmd)
4952
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(f))
5053

54+
span := tracing.Tracer(ctx).Start(
55+
"ExecuteActivity", trace.WithAttributes(
56+
attribute.String("name", name),
57+
attribute.Int64(tracing.ScheduleEventID, scheduleEventID),
58+
attribute.Int("attempt", attempt),
59+
))
60+
defer span.End()
61+
5162
// Handle cancellation
5263
if d := ctx.Done(); d != nil {
5364
if c, ok := d.(sync.ChannelInternal[struct{}]); ok {
54-
if _, ok := c.ReceiveNonBlocking(ctx); ok {
65+
if _, ok := c.ReceiveNonBlocking(); ok {
5566
// Workflow has been canceled, check if the activity has already been scheduled, no need to schedule otherwise
5667
if cmd.State() != command.CommandState_Committed {
5768
cmd.Done()

workflow/retries.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ var DefaultRetryOptions = RetryOptions{
2929
BackoffCoefficient: 1,
3030
}
3131

32-
func withRetries[T any](ctx sync.Context, retryOptions RetryOptions, fn func(ctx sync.Context) Future[T]) Future[T] {
32+
func withRetries[T any](ctx sync.Context, retryOptions RetryOptions, fn func(ctx sync.Context, attempt int) Future[T]) Future[T] {
3333
if retryOptions.MaxAttempts <= 1 {
3434
// Short-circuit if we don't need to retry
35-
return fn(ctx)
35+
return fn(ctx, 0)
3636
}
3737

3838
r := sync.NewFuture[T]()
@@ -54,7 +54,7 @@ func withRetries[T any](ctx sync.Context, retryOptions RetryOptions, fn func(ctx
5454
break
5555
}
5656

57-
result, err = fn(ctx).Get(ctx)
57+
result, err = fn(ctx, attempt).Get(ctx)
5858
if err != nil {
5959
if err == sync.Canceled {
6060
break

workflow/sideeffect.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@ import (
44
"github.com/cschleiden/go-workflows/internal/command"
55
"github.com/cschleiden/go-workflows/internal/converter"
66
"github.com/cschleiden/go-workflows/internal/sync"
7+
"github.com/cschleiden/go-workflows/internal/tracing"
78
"github.com/cschleiden/go-workflows/internal/workflowstate"
89
)
910

1011
func SideEffect[TResult any](ctx Context, f func(ctx Context) TResult) Future[TResult] {
12+
span := tracing.Tracer(ctx).Start("SideEffect")
13+
defer span.End()
14+
1115
future := sync.NewFuture[TResult]()
1216

1317
if ctx.Err() != nil {

workflow/sleep.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,17 @@ import (
44
"time"
55

66
"github.com/cschleiden/go-workflows/internal/sync"
7+
"github.com/cschleiden/go-workflows/internal/tracing"
8+
"go.opentelemetry.io/otel/attribute"
9+
"go.opentelemetry.io/otel/trace"
710
)
811

912
func Sleep(ctx sync.Context, d time.Duration) error {
13+
span := tracing.Tracer(ctx).Start("Sleep",
14+
trace.WithAttributes(attribute.Int64("duration_s", int64(d/time.Second))))
15+
defer span.End()
16+
1017
_, err := ScheduleTimer(ctx, d).Get(ctx)
18+
1119
return err
1220
}

workflow/subworkflow.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import (
88
"github.com/cschleiden/go-workflows/internal/converter"
99
"github.com/cschleiden/go-workflows/internal/fn"
1010
"github.com/cschleiden/go-workflows/internal/sync"
11+
"github.com/cschleiden/go-workflows/internal/tracing"
1112
"github.com/cschleiden/go-workflows/internal/workflowstate"
13+
"go.opentelemetry.io/otel/attribute"
14+
"go.opentelemetry.io/otel/trace"
1215
)
1316

1417
type SubWorkflowOptions struct {
@@ -22,12 +25,12 @@ var DefaultSubWorkflowOptions = SubWorkflowOptions{
2225
}
2326

2427
func CreateSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflowOptions, workflow interface{}, args ...interface{}) Future[TResult] {
25-
return withRetries(ctx, options.RetryOptions, func(ctx sync.Context) Future[TResult] {
26-
return createSubWorkflowInstance[TResult](ctx, options, workflow, args...)
28+
return withRetries(ctx, options.RetryOptions, func(ctx sync.Context, attempt int) Future[TResult] {
29+
return createSubWorkflowInstance[TResult](ctx, options, attempt, workflow, args...)
2730
})
2831
}
2932

30-
func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflowOptions, workflow interface{}, args ...interface{}) Future[TResult] {
33+
func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflowOptions, attempt int, workflow interface{}, args ...interface{}) Future[TResult] {
3134
f := sync.NewFuture[TResult]()
3235

3336
// If the context is already canceled, return immediately.
@@ -51,6 +54,14 @@ func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflo
5154

5255
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(f))
5356

57+
span := tracing.Tracer(ctx).Start(
58+
"CreateSubworkflowInstance", trace.WithAttributes(
59+
attribute.String("name", name),
60+
attribute.Int64(tracing.ScheduleEventID, scheduleEventID),
61+
attribute.Int("attempt", attempt),
62+
))
63+
defer span.End()
64+
5465
// Check if the channel is cancelable
5566
if c, cancelable := ctx.Done().(sync.CancelChannel); cancelable {
5667
c.AddReceiveCallback(func(v struct{}, ok bool) {

workflow/timer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import (
55

66
"github.com/cschleiden/go-workflows/internal/command"
77
"github.com/cschleiden/go-workflows/internal/sync"
8+
"github.com/cschleiden/go-workflows/internal/tracing"
89
"github.com/cschleiden/go-workflows/internal/workflowstate"
10+
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/trace"
912
)
1013

1114
func ScheduleTimer(ctx Context, delay time.Duration) Future[struct{}] {
@@ -25,6 +28,10 @@ func ScheduleTimer(ctx Context, delay time.Duration) Future[struct{}] {
2528

2629
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(f))
2730

31+
span := tracing.Tracer(ctx).Start("ScheduleTimer",
32+
trace.WithAttributes(attribute.Int64("duration_s", int64(delay/time.Second))))
33+
defer span.End()
34+
2835
// Check if the context is cancelable
2936
if c, cancelable := ctx.Done().(sync.CancelChannel); cancelable {
3037
// Register a callback for when it's canceled. The only operation on the `Done` channel

0 commit comments

Comments
 (0)