Skip to content

Commit 9e187d5

Browse files
authored
Fix tests for tracing
1 parent 0129f41 commit 9e187d5

File tree

8 files changed

+50
-42
lines changed

8 files changed

+50
-42
lines changed

backend/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const TracerName = "go-workflow"
2727
//go:generate mockery --name=Backend --inpackage
2828
type Backend interface {
2929
// CreateWorkflowInstance creates a new workflow instance
30-
CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, metadata *workflow.Metadata, event history.Event) error
30+
CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event history.Event) error
3131

3232
// CancelWorkflowInstance cancels a running workflow instance
3333
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error

backend/mock_Backend.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type mysqlBackend struct {
6060
}
6161

6262
// CreateWorkflowInstance creates a new workflow instance
63-
func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, metadata *workflow.Metadata, event history.Event) error {
63+
func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event history.Event) error {
6464
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
6565
Isolation: sql.LevelReadCommitted,
6666
})
@@ -70,7 +70,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *wor
7070
defer tx.Rollback()
7171

7272
// Create workflow instance
73-
if err := createInstance(ctx, tx, instance, metadata, false); err != nil {
73+
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil {
7474
return err
7575
}
7676

@@ -580,10 +580,11 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
580580
now := time.Now()
581581
res := tx.QueryRowContext(
582582
ctx,
583-
`SELECT id, activity_id, instance_id, execution_id, instances.metadata, event_type, timestamp, schedule_event_id, attributes, visible_at
583+
`SELECT activities.id, activity_id, activities.instance_id, activities.execution_id,
584+
instances.metadata, event_type, timestamp, schedule_event_id, attributes, visible_at
584585
FROM activities
585586
INNER JOIN instances ON activities.instance_id = instances.instance_id
586-
WHERE locked_until IS NULL OR locked_until < ?
587+
WHERE activities.locked_until IS NULL OR activities.locked_until < ?
587588
LIMIT 1
588589
FOR UPDATE SKIP LOCKED`,
589590
now,

backend/redis/instance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/go-redis/redis/v8"
1414
)
1515

16-
func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, metadata *workflow.Metadata, event history.Event) error {
16+
func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event history.Event) error {
1717
state, err := readInstance(ctx, rb.rdb, instance.InstanceID)
1818
if err != nil && err != backend.ErrInstanceNotFound {
1919
return err
@@ -25,7 +25,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
2525

2626
p := rb.rdb.TxPipeline()
2727

28-
if err := createInstanceP(ctx, p, instance, metadata, false); err != nil {
28+
if err := createInstanceP(ctx, p, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil {
2929
return err
3030
}
3131

backend/sqlite/sqlite.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,18 @@ func (sb *sqliteBackend) Tracer() trace.Tracer {
6969
return sb.options.TracerProvider.Tracer(backend.TracerName)
7070
}
7171

72-
func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, metadata *workflow.Metadata, event history.Event) error {
72+
func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event history.Event) error {
7373
tx, err := sb.db.BeginTx(ctx, nil)
7474
if err != nil {
7575
return fmt.Errorf("starting transaction: %w", err)
7676
}
7777
defer tx.Rollback()
7878

7979
// Create workflow instance
80-
if err := createInstance(ctx, tx, instance, metadata, false); err != nil {
80+
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil {
8181
return err
8282
}
8383

84-
// Initial history is empty, store only new events
8584
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
8685
return fmt.Errorf("inserting new event: %w", err)
8786
}

backend/test/backendtest.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/cschleiden/go-workflows/diag"
1212
"github.com/cschleiden/go-workflows/internal/core"
1313
"github.com/cschleiden/go-workflows/internal/history"
14+
"github.com/cschleiden/go-workflows/internal/payload"
1415
"github.com/cschleiden/go-workflows/workflow"
1516
"github.com/google/uuid"
1617
"github.com/stretchr/testify/require"
@@ -29,7 +30,6 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
2930
err := b.CreateWorkflowInstance(
3031
ctx,
3132
core.NewWorkflowInstance(instanceID, uuid.NewString()),
32-
nil,
3333
history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
3434
)
3535
require.NoError(t, err)
@@ -43,15 +43,13 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
4343

4444
err := b.CreateWorkflowInstance(ctx,
4545
core.NewWorkflowInstance(instanceID, executionID),
46-
nil,
4746
history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
4847
)
4948
require.NoError(t, err)
5049

5150
err = b.CreateWorkflowInstance(
5251
ctx,
5352
core.NewWorkflowInstance(instanceID, executionID),
54-
nil,
5553
history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
5654
)
5755
require.Error(t, err)
@@ -70,8 +68,9 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
7068
err := b.CreateWorkflowInstance(
7169
ctx,
7270
wfi,
73-
metadata,
74-
history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
71+
history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{
72+
Metadata: metadata,
73+
}),
7574
)
7675
require.NoError(t, err)
7776

@@ -100,7 +99,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
10099
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
101100
wfi := core.NewWorkflowInstance(uuid.NewString(), uuid.NewString())
102101
err := b.CreateWorkflowInstance(
103-
ctx, wfi, nil, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
102+
ctx, wfi, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
104103
)
105104
require.NoError(t, err)
106105

@@ -116,7 +115,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
116115
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
117116
wfi := core.NewWorkflowInstance(uuid.NewString(), uuid.NewString())
118117
err := b.CreateWorkflowInstance(
119-
ctx, wfi, nil, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
118+
ctx, wfi, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
120119
)
121120
require.Nil(t, err)
122121

@@ -138,7 +137,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
138137
name: "CompleteWorkflowTask_ReturnsErrorIfNotLocked",
139138
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
140139
wfi := core.NewWorkflowInstance(uuid.NewString(), uuid.NewString())
141-
err := b.CreateWorkflowInstance(ctx, wfi, nil, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}))
140+
err := b.CreateWorkflowInstance(ctx, wfi, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}))
142141
require.NoError(t, err)
143142

144143
tk, err := b.GetWorkflowTask(ctx)
@@ -161,7 +160,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
161160
activityScheduledEvent := history.NewPendingEvent(time.Now(), history.EventType_ActivityScheduled, &history.ActivityScheduledAttributes{}, history.ScheduleEventID(1))
162161

163162
wfi := core.NewWorkflowInstance(uuid.NewString(), uuid.NewString())
164-
err := b.CreateWorkflowInstance(ctx, wfi, nil, startedEvent)
163+
err := b.CreateWorkflowInstance(ctx, wfi, startedEvent)
165164
require.NoError(t, err)
166165

167166
task, err := b.GetWorkflowTask(ctx)
@@ -204,10 +203,14 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
204203
{
205204
name: "CompleteWorkflowTask_SetsCompletedAtWhenFinished",
206205
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
207-
startedEvent := history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{})
206+
startedEvent := history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{
207+
Name: "some-workflow",
208+
Inputs: []payload.Payload{},
209+
Metadata: &core.WorkflowMetadata{},
210+
})
208211

209212
wfi := core.NewWorkflowInstance(uuid.NewString(), uuid.NewString())
210-
err := b.CreateWorkflowInstance(ctx, wfi, nil, startedEvent)
213+
err := b.CreateWorkflowInstance(ctx, wfi, startedEvent)
211214
require.NoError(t, err)
212215

213216
task, err := b.GetWorkflowTask(ctx)
@@ -281,7 +284,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
281284
startWorkflow(t, ctx, b, c, subInstance1)
282285

283286
// Create parent instance
284-
err := b.CreateWorkflowInstance(ctx, instance, nil, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}))
287+
err := b.CreateWorkflowInstance(ctx, instance, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}))
285288
require.NoError(t, err)
286289

287290
// Simulate context and sub-workflow cancellation
@@ -332,13 +335,15 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
332335
}
333336

334337
func startWorkflow(t *testing.T, ctx context.Context, b backend.Backend, c client.Client, instance *core.WorkflowInstance) {
335-
err := b.CreateWorkflowInstance(ctx, instance, nil, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}))
338+
err := b.CreateWorkflowInstance(
339+
ctx, instance, history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}))
336340
require.NoError(t, err)
337341

338342
// Get task to clear initial event
339343
task, err := b.GetWorkflowTask(ctx)
340344
require.NoError(t, err)
341345

342-
err = b.CompleteWorkflowTask(ctx, task, instance, backend.WorkflowStateActive, task.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
346+
err = b.CompleteWorkflowTask(
347+
ctx, task, instance, backend.WorkflowStateActive, task.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
343348
require.NoError(t, err)
344349
}

client/client.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,10 @@ func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
5959
}
6060

6161
wfi := core.NewWorkflowInstance(options.InstanceID, uuid.NewString())
62+
metadata := &workflow.Metadata{}
6263

6364
workflowName := fn.Name(wf)
6465

65-
startedEvent := history.NewPendingEvent(
66-
c.clock.Now(),
67-
history.EventType_WorkflowExecutionStarted,
68-
&history.ExecutionStartedAttributes{
69-
Name: workflowName,
70-
Inputs: inputs,
71-
})
72-
73-
metadata := &workflow.Metadata{}
74-
7566
// Start new span and add to metadata
7667
sctx, span := c.backend.Tracer().Start(ctx, fmt.Sprintf("CreateWorkflowInstance: %s", workflowName), trace.WithAttributes(
7768
attribute.String(tracing.WorkflowInstanceID, wfi.InstanceID),
@@ -81,7 +72,16 @@ func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
8172

8273
tracing.MarshalSpan(sctx, metadata)
8374

84-
if err := c.backend.CreateWorkflowInstance(ctx, wfi, metadata, startedEvent); err != nil {
75+
startedEvent := history.NewPendingEvent(
76+
c.clock.Now(),
77+
history.EventType_WorkflowExecutionStarted,
78+
&history.ExecutionStartedAttributes{
79+
Metadata: metadata,
80+
Name: workflowName,
81+
Inputs: inputs,
82+
})
83+
84+
if err := c.backend.CreateWorkflowInstance(ctx, wfi, startedEvent); err != nil {
8585
return nil, fmt.Errorf("creating workflow instance: %w", err)
8686
}
8787

tester/tester.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,7 @@ func (wt *workflowTester[TResult]) scheduleActivity(wfi *core.WorkflowInstance,
483483
executor := activity.NewExecutor(wt.logger, wt.tracer, wt.registry)
484484
activityResult, activityErr = executor.ExecuteActivity(context.Background(), &task.Activity{
485485
ID: uuid.NewString(),
486+
Metadata: &core.WorkflowMetadata{},
486487
WorkflowInstance: wfi,
487488
Event: event,
488489
})
@@ -617,8 +618,9 @@ func (wt *workflowTester[TResult]) getInitialEvent(wf interface{}, args []interf
617618
wt.clock.Now(),
618619
history.EventType_WorkflowExecutionStarted,
619620
&history.ExecutionStartedAttributes{
620-
Name: name,
621-
Inputs: inputs,
621+
Name: name,
622+
Metadata: &core.WorkflowMetadata{},
623+
Inputs: inputs,
622624
},
623625
)
624626
}
@@ -631,6 +633,7 @@ func getNextWorkflowTask(wfi *core.WorkflowInstance, history []history.Event, ne
631633

632634
return &task.Workflow{
633635
WorkflowInstance: wfi,
636+
Metadata: &core.WorkflowMetadata{},
634637
LastSequenceID: lastSequenceID,
635638
NewEvents: newEvents,
636639
}

0 commit comments

Comments
 (0)