Skip to content

Commit 644f78e

Browse files
authored
Basic tracing for redis backend
1 parent 3bff0dd commit 644f78e

File tree

23 files changed

+549
-136
lines changed

23 files changed

+549
-136
lines changed

backend/backend.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/cschleiden/go-workflows/internal/task"
1010
"github.com/cschleiden/go-workflows/log"
1111
"github.com/cschleiden/go-workflows/workflow"
12+
"go.opentelemetry.io/otel/trace"
1213
)
1314

1415
var ErrInstanceNotFound = errors.New("workflow instance not found")
@@ -21,13 +22,15 @@ const (
2122
WorkflowStateFinished
2223
)
2324

25+
const TracerName = "go-workflow"
26+
2427
//go:generate mockery --name=Backend --inpackage
2528
type Backend interface {
2629
// CreateWorkflowInstance creates a new workflow instance
27-
CreateWorkflowInstance(ctx context.Context, event history.WorkflowEvent) error
30+
CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, metadata *workflow.Metadata, event history.Event) error
2831

2932
// CancelWorkflowInstance cancels a running workflow instance
30-
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error
33+
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error
3134

3235
// GetWorkflowInstanceState returns the state of the given workflow instance
3336
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (WorkflowState, error)
@@ -65,4 +68,7 @@ type Backend interface {
6568

6669
// Logger returns the configured logger for the backend
6770
Logger() log.Logger
71+
72+
// Tracer returns th configured trace provider for the backend
73+
Tracer() trace.Tracer
6874
}

backend/mock_Backend.go

Lines changed: 27 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cschleiden/go-workflows/workflow"
1818
_ "github.com/go-sql-driver/mysql"
1919
"github.com/google/uuid"
20+
"go.opentelemetry.io/otel/trace"
2021
)
2122

2223
//go:embed schema.sql
@@ -58,7 +59,7 @@ type mysqlBackend struct {
5859
}
5960

6061
// CreateWorkflowInstance creates a new workflow instance
61-
func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.WorkflowEvent) error {
62+
func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, metadata *workflow.Metadata, event history.Event) error {
6263
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
6364
Isolation: sql.LevelReadCommitted,
6465
})
@@ -68,12 +69,12 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
6869
defer tx.Rollback()
6970

7071
// Create workflow instance
71-
if err := createInstance(ctx, tx, m.WorkflowInstance, false); err != nil {
72+
if err := createInstance(ctx, tx, instance, false); err != nil {
7273
return err
7374
}
7475

7576
// Initial history is empty, store only new events
76-
if err := insertPendingEvents(ctx, tx, m.WorkflowInstance.InstanceID, []history.Event{m.HistoryEvent}); err != nil {
77+
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
7778
return fmt.Errorf("inserting new event: %w", err)
7879
}
7980

@@ -88,6 +89,10 @@ func (b *mysqlBackend) Logger() log.Logger {
8889
return b.options.Logger
8990
}
9091

92+
func (b *mysqlBackend) Tracer() trace.Tracer {
93+
return b.options.TracerProvider.Tracer(backend.TracerName)
94+
}
95+
9196
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
9297
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
9398
Isolation: sql.LevelReadCommitted,

backend/options.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ import (
55

66
"github.com/cschleiden/go-workflows/internal/logger"
77
"github.com/cschleiden/go-workflows/log"
8+
"go.opentelemetry.io/otel/trace"
89
)
910

1011
type Options struct {
1112
Logger log.Logger
1213

14+
TracerProvider trace.TracerProvider
15+
1316
StickyTimeout time.Duration
1417

1518
WorkflowLockTimeout time.Duration
@@ -21,6 +24,9 @@ var DefaultOptions Options = Options{
2124
StickyTimeout: 30 * time.Second,
2225
WorkflowLockTimeout: time.Minute,
2326
ActivityLockTimeout: time.Minute * 2,
27+
28+
Logger: logger.NewDefaultLogger(),
29+
TracerProvider: trace.NewNoopTracerProvider(),
2430
}
2531

2632
type BackendOption func(*Options)
@@ -37,6 +43,12 @@ func WithLogger(logger log.Logger) BackendOption {
3743
}
3844
}
3945

46+
func WithTracerProvider(tp trace.TracerProvider) BackendOption {
47+
return func(o *Options) {
48+
o.TracerProvider = tp
49+
}
50+
}
51+
4052
func ApplyOptions(opts ...BackendOption) Options {
4153
options := DefaultOptions
4254

backend/redis/activity.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package redis
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/cschleiden/go-workflows/internal/core"
78
"github.com/cschleiden/go-workflows/internal/history"
@@ -18,8 +19,14 @@ func (rb *redisBackend) GetActivityTask(ctx context.Context) (*task.Activity, er
1819
return nil, nil
1920
}
2021

22+
instanceState, err := readInstance(ctx, rb.rdb, activityTask.Data.Instance.InstanceID)
23+
if err != nil {
24+
return nil, fmt.Errorf("reading workflow instance for activity task: %w", err)
25+
}
26+
2127
return &task.Activity{
2228
WorkflowInstance: activityTask.Data.Instance,
29+
WorkflowMetadata: instanceState.Metadata,
2330
ID: activityTask.TaskID, // Use the queue generated ID here
2431
Event: activityTask.Data.Event,
2532
}, nil

backend/redis/instance.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ import (
99
"github.com/cschleiden/go-workflows/backend"
1010
"github.com/cschleiden/go-workflows/internal/core"
1111
"github.com/cschleiden/go-workflows/internal/history"
12+
"github.com/cschleiden/go-workflows/workflow"
1213
"github.com/go-redis/redis/v8"
1314
)
1415

15-
func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, event history.WorkflowEvent) error {
16-
state, err := readInstance(ctx, rb.rdb, event.WorkflowInstance.InstanceID)
16+
func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, metadata *workflow.Metadata, event history.Event) error {
17+
state, err := readInstance(ctx, rb.rdb, instance.InstanceID)
1718
if err != nil && err != backend.ErrInstanceNotFound {
1819
return err
1920
}
@@ -24,29 +25,27 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, event histor
2425

2526
p := rb.rdb.TxPipeline()
2627

27-
if err := createInstanceP(ctx, p, event.WorkflowInstance, false); err != nil {
28+
if err := createInstanceP(ctx, p, instance, metadata, false); err != nil {
2829
return err
2930
}
3031

3132
// Create event stream
32-
eventData, err := json.Marshal(event.HistoryEvent)
33+
eventData, err := json.Marshal(event)
3334
if err != nil {
3435
return err
3536
}
3637

3738
p.XAdd(ctx, &redis.XAddArgs{
38-
Stream: pendingEventsKey(event.WorkflowInstance.InstanceID),
39+
Stream: pendingEventsKey(instance.InstanceID),
3940
ID: "*",
4041
Values: map[string]interface{}{
4142
"event": string(eventData),
4243
},
4344
})
4445

4546
// Queue workflow instance task
46-
if err := rb.workflowQueue.Enqueue(ctx, p, event.WorkflowInstance.InstanceID, nil); err != nil {
47-
if err != errTaskAlreadyInQueue {
48-
return fmt.Errorf("queueing workflow task: %w", err)
49-
}
47+
if err := rb.workflowQueue.Enqueue(ctx, p, instance.InstanceID, nil); err != nil {
48+
return fmt.Errorf("queueing workflow task: %w", err)
5049
}
5150

5251
if _, err := p.Exec(ctx); err != nil {
@@ -105,21 +104,26 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
105104
}
106105

107106
type instanceState struct {
108-
Instance *core.WorkflowInstance `json:"instance,omitempty"`
109-
State backend.WorkflowState `json:"state,omitempty"`
110-
CreatedAt time.Time `json:"created_at,omitempty"`
111-
CompletedAt *time.Time `json:"completed_at,omitempty"`
112-
LastSequenceID int64 `json:"last_sequence_id,omitempty"`
107+
Instance *core.WorkflowInstance `json:"instance,omitempty"`
108+
State backend.WorkflowState `json:"state,omitempty"`
109+
110+
Metadata *core.WorkflowInstanceMetadata `json:"metadata,omitempty"`
111+
112+
CreatedAt time.Time `json:"created_at,omitempty"`
113+
CompletedAt *time.Time `json:"completed_at,omitempty"`
114+
115+
LastSequenceID int64 `json:"last_sequence_id,omitempty"`
113116
}
114117

115-
func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, ignoreDuplicate bool) error {
118+
func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *core.WorkflowInstanceMetadata, ignoreDuplicate bool) error {
116119
key := instanceKey(instance.InstanceID)
117120

118121
createdAt := time.Now()
119122

120123
b, err := json.Marshal(&instanceState{
121124
Instance: instance,
122125
State: backend.WorkflowStateActive,
126+
Metadata: metadata,
123127
CreatedAt: createdAt,
124128
})
125129
if err != nil {

backend/redis/redis.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/cschleiden/go-workflows/internal/history"
1111
"github.com/cschleiden/go-workflows/log"
1212
"github.com/go-redis/redis/v8"
13+
"go.opentelemetry.io/otel/trace"
1314
)
1415

1516
type RedisOptions struct {
@@ -50,7 +51,7 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
5051
// Default options
5152
options := &RedisOptions{
5253
Options: backend.ApplyOptions(),
53-
BlockTimeout: time.Second * 5,
54+
BlockTimeout: time.Second * 2,
5455
}
5556

5657
for _, opt := range opts {
@@ -105,6 +106,10 @@ func (rb *redisBackend) Logger() log.Logger {
105106
return rb.options.Logger
106107
}
107108

109+
func (rb *redisBackend) Tracer() trace.Tracer {
110+
return rb.options.TracerProvider.Tracer(backend.TracerName)
111+
}
112+
108113
func (rb *redisBackend) Close() error {
109114
return rb.rdb.Close()
110115
}

backend/redis/signal.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,25 @@ import (
55
"fmt"
66

77
"github.com/cschleiden/go-workflows/internal/history"
8+
"github.com/cschleiden/go-workflows/internal/tracing"
89
"github.com/go-redis/redis/v8"
10+
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/trace"
912
)
1013

1114
func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error {
12-
_, err := readInstance(ctx, rb.rdb, instanceID)
15+
instanceState, err := readInstance(ctx, rb.rdb, instanceID)
1316
if err != nil {
1417
return err
1518
}
1619

20+
ctx = tracing.UnmarshalSpan(ctx, instanceState.Metadata)
21+
_, span := rb.Tracer().Start(ctx, "SignalWorkflow", trace.WithAttributes(
22+
attribute.String(tracing.WorkflowInstanceID, instanceID),
23+
attribute.String("signal.name", event.Attributes.(*history.SignalReceivedAttributes).Name),
24+
))
25+
defer span.End()
26+
1727
if _, err = rb.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
1828
if err := addEventToStreamP(ctx, p, pendingEventsKey(instanceID), &event); err != nil {
1929
return fmt.Errorf("adding event to stream: %w", err)

0 commit comments

Comments
 (0)