Skip to content

Commit 7656f35

Browse files
authored
Merge pull request #100 from cschleiden/better-commands
Use command state machine
2 parents fac24a1 + 5ed9438 commit 7656f35

26 files changed

+661
-497
lines changed

backend/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type Backend interface {
5252
// completed or other workflow instances.
5353
CompleteWorkflowTask(
5454
ctx context.Context, task *task.Workflow, instance *workflow.Instance, state WorkflowState,
55-
executedEvents []history.Event, activityEvents []history.Event, workflowEvents []history.WorkflowEvent) error
55+
executedEvents, activityEvents, timerEvents []history.Event, workflowEvents []history.WorkflowEvent) error
5656

5757
// GetActivityTask returns a pending activity task or nil if there are no pending activities
5858
GetActivityTask(ctx context.Context) (*task.Activity, error)

backend/mock_Backend.go

Lines changed: 23 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,8 +407,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
407407
task *task.Workflow,
408408
instance *workflow.Instance,
409409
state backend.WorkflowState,
410-
executedEvents []history.Event,
411-
activityEvents []history.Event,
410+
executedEvents, activityEvents, timerEvents []history.Event,
412411
workflowEvents []history.WorkflowEvent,
413412
) error {
414413
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
@@ -475,6 +474,11 @@ func (b *mysqlBackend) CompleteWorkflowTask(
475474
}
476475
}
477476

477+
// Timer events
478+
if err := insertPendingEvents(ctx, tx, instance.InstanceID, timerEvents); err != nil {
479+
return fmt.Errorf("scheduling timers: %w", err)
480+
}
481+
478482
for _, event := range executedEvents {
479483
switch event.Type {
480484
case history.EventType_TimerCanceled:

backend/redis/workflow.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
163163
task *task.Workflow,
164164
instance *core.WorkflowInstance,
165165
state backend.WorkflowState,
166-
executedEvents []history.Event,
167-
activityEvents []history.Event,
166+
executedEvents, activityEvents, timerEvents []history.Event,
168167
workflowEvents []history.WorkflowEvent,
169168
) error {
170169
instanceState, err := readInstance(ctx, rb.rdb, instance.InstanceID)
@@ -189,6 +188,13 @@ func (rb *redisBackend) CompleteWorkflowTask(
189188
}
190189
}
191190

191+
// Scheduler timers
192+
for _, timerEvent := range timerEvents {
193+
if err := addFutureEventP(ctx, p, instance, &timerEvent); err != nil {
194+
return err
195+
}
196+
}
197+
192198
// Send new workflow events to the respective streams
193199
groupedEvents := eventsByWorkflowInstance(workflowEvents)
194200
for targetInstance, events := range groupedEvents {
@@ -200,27 +206,17 @@ func (rb *redisBackend) CompleteWorkflowTask(
200206
}
201207

202208
// Insert pending events for target instance
203-
msgAdded := false
204-
205209
for _, event := range events {
206210
event := event
207211

208-
if event.VisibleAt != nil {
209-
if err := addFutureEventP(ctx, p, targetInstance, &event); err != nil {
210-
return err
211-
}
212-
} else {
213-
// Add pending event to stream
214-
if err := addEventToStreamP(ctx, p, pendingEventsKey(targetInstance.InstanceID), &event); err != nil {
215-
return err
216-
}
217-
218-
msgAdded = true
212+
// Add pending event to stream
213+
if err := addEventToStreamP(ctx, p, pendingEventsKey(targetInstance.InstanceID), &event); err != nil {
214+
return err
219215
}
220216
}
221217

222218
// If any pending message was added, try to queue workflow task
223-
if targetInstance != instance && msgAdded {
219+
if targetInstance != instance {
224220
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstance.InstanceID, nil); err != nil {
225221
if err != errTaskAlreadyInQueue {
226222
return fmt.Errorf("adding instance to locked instances set: %w", err)

backend/sqlite/events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func scanEvent(row Scanner) (history.Event, error) {
8484
return historyEvent, nil
8585
}
8686

87-
func insertNewEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []history.Event) error {
87+
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []history.Event) error {
8888
return insertEvents(ctx, tx, "pending_events", instanceID, newEvents)
8989
}
9090

backend/sqlite/sqlite.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, m history.W
7676
}
7777

7878
// Initial history is empty, store only new events
79-
if err := insertNewEvents(ctx, tx, m.WorkflowInstance.InstanceID, []history.Event{m.HistoryEvent}); err != nil {
79+
if err := insertPendingEvents(ctx, tx, m.WorkflowInstance.InstanceID, []history.Event{m.HistoryEvent}); err != nil {
8080
return fmt.Errorf("inserting new event: %w", err)
8181
}
8282

@@ -143,7 +143,7 @@ func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *w
143143
return err
144144
}
145145

146-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
146+
if err := insertPendingEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
147147
return fmt.Errorf("inserting cancellation event: %w", err)
148148
}
149149

@@ -200,7 +200,7 @@ func (sb *sqliteBackend) SignalWorkflow(ctx context.Context, instanceID string,
200200
return backend.ErrInstanceNotFound
201201
}
202202

203-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{event}); err != nil {
203+
if err := insertPendingEvents(ctx, tx, instanceID, []history.Event{event}); err != nil {
204204
return fmt.Errorf("inserting signal event: %w", err)
205205
}
206206

@@ -303,8 +303,7 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
303303
task *task.Workflow,
304304
instance *workflow.Instance,
305305
state backend.WorkflowState,
306-
executedEvents []history.Event,
307-
activityEvents []history.Event,
306+
executedEvents, activityEvents, timerEvents []history.Event,
308307
workflowEvents []history.WorkflowEvent,
309308
) error {
310309
tx, err := sb.db.BeginTx(ctx, nil)
@@ -365,6 +364,11 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
365364
}
366365
}
367366

367+
// Timer events
368+
if err := insertPendingEvents(ctx, tx, instance.InstanceID, timerEvents); err != nil {
369+
return fmt.Errorf("scheduling timers: %w", err)
370+
}
371+
368372
for _, event := range executedEvents {
369373
switch event.Type {
370374
case history.EventType_TimerCanceled:
@@ -393,7 +397,7 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
393397
}
394398

395399
// Insert pending events for target instance
396-
if err := insertNewEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
400+
if err := insertPendingEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
397401
return fmt.Errorf("inserting messages: %w", err)
398402
}
399403
}
@@ -511,7 +515,7 @@ func (sb *sqliteBackend) CompleteActivityTask(ctx context.Context, instance *wor
511515
}
512516

513517
// Insert new event generated during this workflow execution
514-
if err := insertNewEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
518+
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
515519
return fmt.Errorf("inserting new events for completed activity: %w", err)
516520
}
517521

backend/test/backendtest.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,11 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
128128
require.NotNil(t, tk)
129129

130130
// Complete workflow task
131-
err = b.CompleteWorkflowTask(ctx, tk, wfi, backend.WorkflowStateActive, tk.NewEvents, []history.Event{}, []history.WorkflowEvent{})
131+
err = b.CompleteWorkflowTask(ctx, tk, wfi, backend.WorkflowStateActive, tk.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
132132
require.NoError(t, err)
133133

134134
// Task is already completed, this should error
135-
err = b.CompleteWorkflowTask(ctx, tk, wfi, backend.WorkflowStateActive, tk.NewEvents, []history.Event{}, []history.WorkflowEvent{})
135+
err = b.CompleteWorkflowTask(ctx, tk, wfi, backend.WorkflowStateActive, tk.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
136136
require.Error(t, err)
137137
},
138138
},
@@ -171,7 +171,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
171171

172172
workflowEvents := []history.WorkflowEvent{}
173173

174-
err = b.CompleteWorkflowTask(ctx, task, wfi, backend.WorkflowStateActive, events, activityEvents, workflowEvents)
174+
err = b.CompleteWorkflowTask(ctx, task, wfi, backend.WorkflowStateActive, events, activityEvents, []history.Event{}, workflowEvents)
175175
require.NoError(t, err)
176176

177177
time.Sleep(time.Second)
@@ -213,7 +213,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
213213
events[i].SequenceID = sequenceID
214214
}
215215

216-
err = b.CompleteWorkflowTask(ctx, task, wfi, backend.WorkflowStateFinished, events, []history.Event{}, []history.WorkflowEvent{})
216+
err = b.CompleteWorkflowTask(ctx, task, wfi, backend.WorkflowStateFinished, events, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
217217
require.NoError(t, err)
218218

219219
time.Sleep(time.Second)
@@ -278,7 +278,7 @@ func BackendTest(t *testing.T, setup func() TestBackend, teardown func(b TestBac
278278
// Simulate context and sub-workflow cancellation
279279
task, err := b.GetWorkflowTask(ctx)
280280
require.NoError(t, err)
281-
err = b.CompleteWorkflowTask(ctx, task, instance, backend.WorkflowStateActive, task.NewEvents, []history.Event{}, []history.WorkflowEvent{
281+
err = b.CompleteWorkflowTask(ctx, task, instance, backend.WorkflowStateActive, task.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{
282282
{
283283
WorkflowInstance: subInstance1,
284284
HistoryEvent: history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionCanceled, &history.SubWorkflowCancellationRequestedAttributes{
@@ -323,6 +323,6 @@ func startWorkflow(t *testing.T, ctx context.Context, b backend.Backend, c clien
323323
task, err := b.GetWorkflowTask(ctx)
324324
require.NoError(t, err)
325325

326-
err = b.CompleteWorkflowTask(ctx, task, instance, backend.WorkflowStateActive, task.NewEvents, []history.Event{}, []history.WorkflowEvent{})
326+
err = b.CompleteWorkflowTask(ctx, task, instance, backend.WorkflowStateActive, task.NewEvents, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
327327
require.NoError(t, err)
328328
}

backend/test/e2e.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
9191
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
9292
a := func(context.Context) error { return nil }
9393
wf := func(ctx workflow.Context) (int, error) {
94-
return workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, a).Get(ctx)
94+
return workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
95+
RetryOptions: workflow.RetryOptions{
96+
MaxAttempts: 1,
97+
},
98+
}, a).Get(ctx)
9599
}
96100
register(t, ctx, w, []interface{}{wf}, nil)
97101

@@ -106,7 +110,11 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
106110
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
107111
a := func(context.Context, int, int) error { return nil }
108112
wf := func(ctx workflow.Context) (int, error) {
109-
return workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, a, 42).Get(ctx)
113+
return workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
114+
RetryOptions: workflow.RetryOptions{
115+
MaxAttempts: 1,
116+
},
117+
}, a, 42).Get(ctx)
110118
}
111119
register(t, ctx, w, []interface{}{wf}, []interface{}{a})
112120

client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func Test_Client_GetWorkflowResultSuccess(t *testing.T) {
5151
mockClock.Add(time.Second)
5252
})
5353
b.On("GetWorkflowInstanceState", mock.Anything, instance).Return(backend.WorkflowStateFinished, nil)
54-
b.On("GetWorkflowInstanceHistory", mock.Anything, instance).Return([]history.Event{
54+
b.On("GetWorkflowInstanceHistory", mock.Anything, instance, (*int64)(nil)).Return([]history.Event{
5555
history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
5656
history.NewHistoryEvent(2, time.Now(), history.EventType_WorkflowExecutionFinished, &history.ExecutionCompletedAttributes{
5757
Result: r,
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package command
2+
3+
import (
4+
"github.com/benbjohnson/clock"
5+
"github.com/cschleiden/go-workflows/internal/core"
6+
"github.com/cschleiden/go-workflows/internal/history"
7+
)
8+
9+
type CancelSubWorkflowCommand struct {
10+
command
11+
12+
SubWorkflowInstance *core.WorkflowInstance
13+
}
14+
15+
var _ Command = (*CancelSubWorkflowCommand)(nil)
16+
17+
func NewCancelSubWorkflowCommand(id int64, subWorkflowInstance *core.WorkflowInstance) *CancelSubWorkflowCommand {
18+
return &CancelSubWorkflowCommand{
19+
command: command{
20+
state: CommandState_Pending,
21+
id: id,
22+
},
23+
SubWorkflowInstance: subWorkflowInstance,
24+
}
25+
}
26+
27+
func (*CancelSubWorkflowCommand) Type() string {
28+
return "CancelSubworkflow"
29+
}
30+
31+
func (c *CancelSubWorkflowCommand) Commit(clock clock.Clock) *CommandResult {
32+
c.commit()
33+
34+
return &CommandResult{
35+
// Record that cancellation was requested
36+
Events: []history.Event{
37+
history.NewPendingEvent(
38+
clock.Now(),
39+
history.EventType_SubWorkflowCancellationRequested,
40+
&history.SubWorkflowCancellationRequestedAttributes{
41+
SubWorkflowInstance: c.SubWorkflowInstance,
42+
},
43+
),
44+
},
45+
46+
// Send cancellation event to sub-workflow
47+
WorkflowEvents: []history.WorkflowEvent{
48+
{
49+
WorkflowInstance: c.SubWorkflowInstance,
50+
HistoryEvent: history.NewWorkflowCancellationEvent(clock.Now()),
51+
},
52+
},
53+
}
54+
}

0 commit comments

Comments
 (0)