Skip to content

Commit 010f65e

Browse files
committed
- Trace continued as new
- Trace timers with name
1 parent 1569c4b commit 010f65e

File tree

16 files changed

+146
-81
lines changed

16 files changed

+146
-81
lines changed

backend/history/timer_fired.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package history
33
import (
44
"time"
55

6-
"github.com/cschleiden/go-workflows/backend/metadata"
6+
"github.com/cschleiden/go-workflows/internal/tracing"
77
)
88

99
type TimerFiredAttributes struct {
10-
ScheduledAt time.Time `json:"scheduled_at,omitempty"`
11-
At time.Time `json:"at,omitempty"`
12-
Name string `json:"name,omitempty"`
13-
SpanMetadata metadata.WorkflowMetadata `json:"span_metadata,omitempty"`
10+
ScheduledAt time.Time `json:"scheduled_at,omitempty"`
11+
At time.Time `json:"at,omitempty"`
12+
Name string `json:"name,omitempty"`
13+
TraceContext tracing.Context `json:"span_metadata,omitempty"`
1414
}

client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (c *Client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
8787
attribute.String(log.InstanceIDKey, wfi.InstanceID),
8888
attribute.String(log.ExecutionIDKey, wfi.ExecutionID),
8989
attribute.String(log.WorkflowNameKey, workflowName),
90-
))
90+
), trace.WithSpanKind(trace.SpanKindProducer))
9191
defer span.End()
9292

9393
// Inject state from any propagators

internal/activity/executor.go

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cschleiden/go-workflows/backend/payload"
1414
"github.com/cschleiden/go-workflows/internal/args"
1515
"github.com/cschleiden/go-workflows/internal/log"
16+
"github.com/cschleiden/go-workflows/internal/tracing"
1617
"github.com/cschleiden/go-workflows/internal/workflowerrors"
1718
"github.com/cschleiden/go-workflows/registry"
1819
wf "github.com/cschleiden/go-workflows/workflow"
@@ -47,21 +48,6 @@ func NewExecutor(
4748
func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTask) (payload.Payload, error) {
4849
a := task.Event.Attributes.(*history.ActivityScheduledAttributes)
4950

50-
activity, err := e.r.GetActivity(a.Name)
51-
if err != nil {
52-
return nil, workflowerrors.NewPermanentError(fmt.Errorf("activity not found: %w", err))
53-
}
54-
55-
activityFn := reflect.ValueOf(activity)
56-
if activityFn.Type().Kind() != reflect.Func {
57-
return nil, workflowerrors.NewPermanentError(errors.New("activity not a function"))
58-
}
59-
60-
args, addContext, err := args.InputsToArgs(e.converter, activityFn, a.Inputs)
61-
if err != nil {
62-
return nil, workflowerrors.NewPermanentError(fmt.Errorf("converting activity inputs: %w", err))
63-
}
64-
6551
// Add activity state to context
6652
as := NewActivityState(
6753
task.Event.ID,
@@ -71,6 +57,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
7157
activityCtx := WithActivityState(ctx, as)
7258

7359
for _, propagator := range e.propagators {
60+
var err error
7461
activityCtx, err = propagator.Extract(activityCtx, a.Metadata)
7562
if err != nil {
7663
return nil, workflowerrors.NewPermanentError(fmt.Errorf("extracting context from propagator: %w", err))
@@ -83,6 +70,22 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
8370
attribute.String(log.ActivityIDKey, task.ID),
8471
attribute.Int(log.AttemptKey, a.Attempt),
8572
))
73+
74+
activity, err := e.r.GetActivity(a.Name)
75+
if err != nil {
76+
return nil, workflowerrors.NewPermanentError(tracing.WithSpanError(span, fmt.Errorf("activity not found: %w", err)))
77+
}
78+
79+
activityFn := reflect.ValueOf(activity)
80+
if activityFn.Type().Kind() != reflect.Func {
81+
return nil, workflowerrors.NewPermanentError(tracing.WithSpanError(span, errors.New("activity not a function")))
82+
}
83+
84+
args, addContext, err := args.InputsToArgs(e.converter, activityFn, a.Inputs)
85+
if err != nil {
86+
return nil, workflowerrors.NewPermanentError(tracing.WithSpanError(span, fmt.Errorf("converting activity inputs: %w", err)))
87+
}
88+
8689
defer span.End()
8790

8891
// Execute activity
@@ -97,7 +100,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
97100
// Recover any panic encountered during activity execution
98101
defer func() {
99102
if r := recover(); r != nil {
100-
err = workflowerrors.NewPanicError(fmt.Sprintf("panic: %v", r))
103+
err := workflowerrors.NewPanicError(fmt.Sprintf("panic: %v", r))
101104
rv = []reflect.Value{reflect.ValueOf(err)}
102105
}
103106

@@ -110,7 +113,8 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
110113
<-done
111114

112115
if len(rv) < 1 || len(rv) > 2 {
113-
return nil, workflowerrors.NewPermanentError(errors.New("activity has to return either (error) or (<result>, error)"))
116+
return nil, workflowerrors.NewPermanentError(
117+
tracing.WithSpanError(span, errors.New("activity has to return either (error) or (<result>, error)")))
114118
}
115119

116120
var result payload.Payload
@@ -120,7 +124,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
120124
var err error
121125
result, err = e.converter.To(rv[0].Interface())
122126
if err != nil {
123-
return nil, workflowerrors.NewPermanentError(fmt.Errorf("converting activity result: %w", err))
127+
return nil, workflowerrors.NewPermanentError(tracing.WithSpanError(span, fmt.Errorf("converting activity result: %w", err)))
124128
}
125129
}
126130

@@ -133,8 +137,9 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
133137

134138
err, ok := errResult.Interface().(error)
135139
if !ok {
136-
return nil, workflowerrors.NewPermanentError(fmt.Errorf("activity error result does not satisfy error interface (%T): %v", errResult, errResult))
140+
return nil, workflowerrors.NewPermanentError(
141+
tracing.WithSpanError(span, fmt.Errorf("activity error result does not satisfy error interface (%T): %v", errResult, errResult)))
137142
}
138143

139-
return result, workflowerrors.FromError(err)
144+
return result, workflowerrors.FromError(tracing.WithSpanError(span, err))
140145
}

internal/command/continueasnew.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ type ContinueAsNewCommand struct {
1717
Metadata *metadata.WorkflowMetadata
1818
Inputs []payload.Payload
1919
Result payload.Payload
20+
21+
ContinuedExecutionID string
2022
}
2123

2224
var _ Command = (*ContinueAsNewCommand)(nil)
@@ -28,27 +30,26 @@ func NewContinueAsNewCommand(id int64, instance *core.WorkflowInstance, result p
2830
name: "ContinueAsNew",
2931
state: CommandState_Pending,
3032
},
31-
Instance: instance,
32-
Name: name,
33-
Metadata: metadata,
34-
Inputs: inputs,
35-
Result: result,
33+
Instance: instance,
34+
Name: name,
35+
Metadata: metadata,
36+
Inputs: inputs,
37+
Result: result,
38+
ContinuedExecutionID: uuid.NewString(),
3639
}
3740
}
3841

3942
func (c *ContinueAsNewCommand) Execute(clock clock.Clock) *CommandResult {
4043
switch c.state {
4144
case CommandState_Pending:
42-
continuedExecutionID := uuid.NewString()
43-
4445
var continuedInstance *core.WorkflowInstance
4546
if c.Instance.SubWorkflow() {
4647
// If the current workflow execution was a sub-workflow, ensure the new workflow execution is also a sub-workflow.
4748
// This will guarantee that the finished event for the new execution will be delivered to the right parent instance
4849
continuedInstance = core.NewSubWorkflowInstance(
49-
c.Instance.InstanceID, continuedExecutionID, c.Instance.Parent, c.Instance.ParentEventID)
50+
c.Instance.InstanceID, c.ContinuedExecutionID, c.Instance.Parent, c.Instance.ParentEventID)
5051
} else {
51-
continuedInstance = core.NewWorkflowInstance(c.Instance.InstanceID, continuedExecutionID)
52+
continuedInstance = core.NewWorkflowInstance(c.Instance.InstanceID, c.ContinuedExecutionID)
5253
}
5354

5455
c.state = CommandState_Committed
@@ -61,7 +62,7 @@ func (c *ContinueAsNewCommand) Execute(clock clock.Clock) *CommandResult {
6162
history.EventType_WorkflowExecutionContinuedAsNew,
6263
&history.ExecutionContinuedAsNewAttributes{
6364
Result: c.Result,
64-
ContinuedExecutionID: continuedExecutionID,
65+
ContinuedExecutionID: c.ContinuedExecutionID,
6566
},
6667
),
6768
},

internal/command/schedule_timer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ import (
55

66
"github.com/benbjohnson/clock"
77
"github.com/cschleiden/go-workflows/backend/history"
8-
"github.com/cschleiden/go-workflows/backend/metadata"
8+
"github.com/cschleiden/go-workflows/internal/tracing"
99
)
1010

1111
type ScheduleTimerCommand struct {
1212
cancelableCommand
1313

1414
at time.Time
1515
name string
16-
spanMetadata metadata.WorkflowMetadata
16+
traceContext tracing.Context
1717
}
1818

1919
var _ CancelableCommand = (*ScheduleTimerCommand)(nil)
2020

21-
func NewScheduleTimerCommand(id int64, at time.Time, name string, carrier metadata.WorkflowMetadata) *ScheduleTimerCommand {
21+
func NewScheduleTimerCommand(id int64, at time.Time, name string, traceContext tracing.Context) *ScheduleTimerCommand {
2222
return &ScheduleTimerCommand{
2323
cancelableCommand: cancelableCommand{
2424
command: command{
@@ -29,7 +29,7 @@ func NewScheduleTimerCommand(id int64, at time.Time, name string, carrier metada
2929
},
3030
at: at,
3131
name: name,
32-
spanMetadata: carrier,
32+
traceContext: traceContext,
3333
}
3434
}
3535

@@ -61,7 +61,7 @@ func (c *ScheduleTimerCommand) Execute(clock clock.Clock) *CommandResult {
6161
ScheduledAt: now,
6262
At: c.at,
6363
Name: c.name,
64-
SpanMetadata: c.spanMetadata,
64+
TraceContext: c.traceContext,
6565
},
6666
history.ScheduleEventID(c.id),
6767
history.VisibleAt(c.at),

internal/log/fields.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ const (
1010
InstanceIDKey = NamespaceKey + ".instance.id"
1111
ExecutionIDKey = NamespaceKey + ".execution.id"
1212

13+
ContinuedExecutionIDKey = NamespaceKey + ".continued_execution.id"
14+
1315
WorkflowNameKey = NamespaceKey + ".workflow.name"
1416

1517
SignalNameKey = NamespaceKey + ".signal.name"

internal/tracing/spanerror.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package tracing
2+
3+
import (
4+
"go.opentelemetry.io/otel/codes"
5+
"go.opentelemetry.io/otel/trace"
6+
)
7+
8+
func WithSpanError(span trace.Span, err error) error {
9+
if err != nil {
10+
span.SetStatus(codes.Error, err.Error())
11+
}
12+
13+
return err
14+
}

internal/tracing/tracecontext.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package tracing
2+
3+
import (
4+
"context"
5+
6+
"github.com/cschleiden/go-workflows/internal/sync"
7+
"go.opentelemetry.io/otel/propagation"
8+
"go.opentelemetry.io/otel/trace"
9+
)
10+
11+
type Context map[string]string
12+
13+
func (wim Context) Get(key string) string {
14+
return wim[key]
15+
}
16+
17+
func (wim Context) Set(key string, value string) {
18+
wim[key] = value
19+
}
20+
21+
func (wim Context) Keys() []string {
22+
r := make([]string, 0, len(wim))
23+
24+
for k := range wim {
25+
r = append(r, k)
26+
}
27+
28+
return r
29+
}
30+
31+
var propagator propagation.TraceContext
32+
33+
func ContextFromWfCtx(ctx sync.Context) Context {
34+
span := SpanFromContext(ctx)
35+
spanCtx := trace.ContextWithSpan(context.Background(), span)
36+
carrier := make(Context)
37+
propagator.Inject(spanCtx, carrier)
38+
return carrier
39+
}
40+
41+
func SpanContextFromContext(ctx context.Context, tctx Context) context.Context {
42+
return propagator.Extract(ctx, tctx)
43+
}

internal/tracing/workflowspan.go renamed to internal/tracing/tracing.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ import (
66
"time"
77
"unsafe"
88

9-
"github.com/cschleiden/go-workflows/internal/sync"
10-
"github.com/cschleiden/go-workflows/internal/workflowstate"
119
"go.opentelemetry.io/otel/trace"
1210
)
1311

14-
func SpanWithStartTime(ctx context.Context, tracer trace.Tracer, name string, spanID trace.SpanID, startTime time.Time) trace.Span {
12+
func SpanWithStartTime(
13+
ctx context.Context, tracer trace.Tracer, name string, spanID trace.SpanID, startTime time.Time, opts ...trace.SpanStartOption) trace.Span {
14+
15+
opts = append(opts, trace.WithTimestamp(startTime), trace.WithSpanKind(trace.SpanKindConsumer))
1516
_, span := tracer.Start(ctx,
1617
name,
17-
trace.WithTimestamp(startTime),
18+
opts...,
1819
)
1920

2021
SetSpanID(span, spanID)
@@ -33,11 +34,6 @@ func GetNewSpanID(tracer trace.Tracer) trace.SpanID {
3334
return span.SpanContext().SpanID()
3435
}
3536

36-
func GetNewSpanIDWF(ctx sync.Context) trace.SpanID {
37-
tracer := workflowstate.WorkflowState(ctx).Tracer()
38-
return GetNewSpanID(tracer)
39-
}
40-
4137
func SetSpanID(span trace.Span, sid trace.SpanID) {
4238
sc := span.SpanContext()
4339
sc = sc.WithSpanID(sid)

samples/tracing/tracing.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ func RunWorker(ctx context.Context, mb backend.Backend) *worker.Worker {
139139
w.RegisterWorkflow(Subworkflow)
140140

141141
w.RegisterActivity(Activity1)
142+
w.RegisterActivity(RetriedActivity)
142143

143144
if err := w.Start(ctx); err != nil {
144145
panic("could not start worker")

0 commit comments

Comments
 (0)