Skip to content

Commit feba6db

Browse files
authored
Instrument activity execution
1 parent 44e3bdd commit feba6db

File tree

1 file changed

+18
-2
lines changed

1 file changed

+18
-2
lines changed

internal/activity/executor.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,27 @@ import (
1111
"github.com/cschleiden/go-workflows/internal/history"
1212
"github.com/cschleiden/go-workflows/internal/payload"
1313
"github.com/cschleiden/go-workflows/internal/task"
14+
"github.com/cschleiden/go-workflows/internal/tracing"
1415
"github.com/cschleiden/go-workflows/internal/workflow"
1516
"github.com/cschleiden/go-workflows/log"
17+
"go.opentelemetry.io/otel/attribute"
18+
"go.opentelemetry.io/otel/trace"
1619
)
1720

1821
type Executor struct {
1922
logger log.Logger
23+
tracer trace.Tracer
2024
r *workflow.Registry
2125
}
2226

23-
func NewExecutor(logger log.Logger, r *workflow.Registry) Executor {
27+
func NewExecutor(logger log.Logger, tracer trace.Tracer, r *workflow.Registry) Executor {
2428
return Executor{
2529
logger: logger,
30+
tracer: tracer,
2631
r: r,
2732
}
2833
}
34+
2935
func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (payload.Payload, error) {
3036
a := task.Event.Attributes.(*history.ActivityScheduledAttributes)
3137

@@ -44,18 +50,28 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (pa
4450
return nil, fmt.Errorf("converting activity inputs: %w", err)
4551
}
4652

53+
// Add activity state to context
4754
as := NewActivityState(
4855
task.Event.ID,
4956
task.WorkflowInstance,
5057
e.logger)
5158
activityCtx := WithActivityState(ctx, as)
5259

60+
activityCtx = tracing.UnmarshalSpan(activityCtx, task.WorkflowMetadata)
61+
activityCtx, span := e.tracer.Start(activityCtx, "ActivityTaskExecution", trace.WithAttributes(
62+
attribute.String("activity", a.Name),
63+
attribute.String(tracing.WorkflowInstanceID, task.WorkflowInstance.InstanceID),
64+
attribute.String(tracing.ActivityTaskID, task.ID),
65+
))
66+
67+
// Execute activity
5368
if addContext {
5469
args[0] = reflect.ValueOf(activityCtx)
5570
}
56-
5771
r := activityFn.Call(args)
5872

73+
defer span.End()
74+
5975
if len(r) < 1 || len(r) > 2 {
6076
return nil, errors.New("activity has to return either (error) or (<result>, error)")
6177
}

0 commit comments

Comments
 (0)