Skip to content

Commit 63af8ef

Browse files
authored
Support cancellation for redis backend
1 parent 8ac1c15 commit 63af8ef

File tree

13 files changed

+105
-25
lines changed

13 files changed

+105
-25
lines changed

backend/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type Backend interface {
2222
CreateWorkflowInstance(ctx context.Context, event history.WorkflowEvent) error
2323

2424
// CancelWorkflowInstance cancels a running workflow instance
25-
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance) error
25+
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error
2626

2727
// GetWorkflowInstanceState returns the state of the given workflow instance
2828
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (WorkflowState, error)

backend/mock_Backend.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
8181
return nil
8282
}
8383

84-
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance) error {
84+
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
8585
tx, err := b.db.BeginTx(ctx, nil)
8686
if err != nil {
8787
return err
@@ -91,7 +91,7 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
9191
instanceID := instance.InstanceID
9292

9393
// Cancel workflow instance
94-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{history.NewWorkflowCancellationEvent(time.Now())}); err != nil {
94+
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
9595
return errors.Wrap(err, "could not insert cancellation event")
9696
}
9797

@@ -110,7 +110,7 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
110110
}
111111

112112
// Cancel sub-workflow instance
113-
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{history.NewWorkflowCancellationEvent(time.Now())}); err != nil {
113+
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
114114
return errors.Wrap(err, "could not insert cancellation event")
115115
}
116116

backend/redis/activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (rb *redisBackend) ExtendActivityTask(ctx context.Context, activityID strin
3030
}
3131

3232
func (rb *redisBackend) CompleteActivityTask(ctx context.Context, instance *core.WorkflowInstance, activityID string, event history.Event) error {
33-
if err := rb.addWorkflowInstanceEvent(ctx, instance, event); err != nil {
33+
if err := rb.addWorkflowInstanceEvent(ctx, instance, &event); err != nil {
3434
return err
3535
}
3636

backend/redis/instance.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,29 @@ func (rb *redisBackend) GetWorkflowInstanceState(ctx context.Context, instance *
7373
return instanceState.State, nil
7474
}
7575

76-
func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error {
77-
panic("unimplemented")
76+
func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error {
77+
// Recursively, find any sub-workflow instance to cancel
78+
toCancel := make([]*core.WorkflowInstance, 0)
79+
toCancel = append(toCancel, instance)
80+
for len(toCancel) > 0 {
81+
instance := toCancel[0]
82+
toCancel = toCancel[1:]
83+
84+
// Cancel instance
85+
if err := rb.addWorkflowInstanceEvent(ctx, instance, event); err != nil {
86+
return errors.Wrap(err, "could not add cancellation event to workflow instance")
87+
}
88+
89+
// Find sub-workflows
90+
subInstances, err := subWorkflowInstances(ctx, rb.rdb, instance)
91+
if err != nil {
92+
return err
93+
}
94+
95+
toCancel = append(toCancel, subInstances...)
96+
}
97+
98+
return nil
7899
}
79100

80101
type instanceState struct {
@@ -105,6 +126,22 @@ func createInstance(ctx context.Context, rdb redis.UniversalClient, instance *co
105126
return errors.New("workflow instance already exists")
106127
}
107128

129+
if instance.SubWorkflow() {
130+
instanceStr, err := json.Marshal(instance)
131+
if err != nil {
132+
return err
133+
}
134+
135+
c, err := rdb.RPush(ctx, subInstanceKey(instance.ParentInstanceID), instanceStr).Result()
136+
if err != nil {
137+
return errors.Wrap(err, "could not track sub-workflow")
138+
}
139+
140+
if c != 1 {
141+
return errors.New("could not track sub-workflow")
142+
}
143+
}
144+
108145
return nil
109146
}
110147

@@ -137,5 +174,37 @@ func readInstance(ctx context.Context, rdb redis.UniversalClient, instanceID str
137174
return nil, errors.Wrap(err, "could not unmarshal instance state")
138175
}
139176

177+
if state.Instance.SubWorkflow() && state.State == backend.WorkflowStateFinished {
178+
instanceStr, err := json.Marshal(state.Instance)
179+
if err != nil {
180+
return nil, err
181+
}
182+
183+
if err := rdb.LRem(ctx, subInstanceKey(state.Instance.ParentInstanceID), 1, instanceStr).Err(); err != nil {
184+
return nil, errors.Wrap(err, "could not remove sub-workflow from parent list")
185+
}
186+
}
187+
140188
return &state, nil
141189
}
190+
191+
func subWorkflowInstances(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance) ([]*core.WorkflowInstance, error) {
192+
key := subInstanceKey(instance.InstanceID)
193+
res, err := rdb.LRange(ctx, key, 0, -1).Result()
194+
if err != nil {
195+
return nil, errors.Wrap(err, "could not read sub-workflow instances")
196+
}
197+
198+
var instances []*core.WorkflowInstance
199+
200+
for _, instanceStr := range res {
201+
var instance core.WorkflowInstance
202+
if err := json.Unmarshal([]byte(instanceStr), &instance); err != nil {
203+
return nil, errors.Wrap(err, "could not unmarshal sub-workflow instance")
204+
}
205+
206+
instances = append(instances, &instance)
207+
}
208+
209+
return instances, nil
210+
}

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ func instanceKey(instanceID string) string {
88
return fmt.Sprintf("instance:%v", instanceID)
99
}
1010

11+
func subInstanceKey(instanceID string) string {
12+
return fmt.Sprintf("sub-instance:%v", instanceID)
13+
}
14+
1115
func pendingEventsKey(instanceID string) string {
1216
return fmt.Sprintf("pending-events:%v", instanceID)
1317
}

backend/redis/redis_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package redis
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/cschleiden/go-workflows/backend"
78
"github.com/cschleiden/go-workflows/backend/test"
@@ -12,7 +13,8 @@ func Test_RedisBackend(t *testing.T) {
1213
test.TestBackend(t, test.Tester{
1314
New: func() backend.Backend {
1415
// Disable sticky workflow behavior for the test execution
15-
b, err := NewRedisBackend("localhost:6379", "", "RedisPassw0rd", 0)
16+
b, err := NewRedisBackend("localhost:6379", "", "RedisPassw0rd", 0,
17+
WithBlockTimeout(time.Millisecond*2))
1618
require.NoError(t, err)
1719
return b
1820
},

backend/redis/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,9 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
172172
return nil
173173
}
174174

175-
func (rb *redisBackend) addWorkflowInstanceEvent(ctx context.Context, instance *core.WorkflowInstance, event history.Event) error {
175+
func (rb *redisBackend) addWorkflowInstanceEvent(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error {
176176
// Add event to pending events for instance
177-
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instance.InstanceID), &event); err != nil {
177+
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instance.InstanceID), event); err != nil {
178178
return err
179179
}
180180

backend/sqlite/sqlite.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ign
119119
return nil
120120
}
121121

122-
func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance) error {
122+
func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
123123
tx, err := sb.db.BeginTx(ctx, nil)
124124
if err != nil {
125125
return err
@@ -129,7 +129,7 @@ func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *w
129129
instanceID := instance.InstanceID
130130

131131
// Cancel workflow instance
132-
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{history.NewWorkflowCancellationEvent(time.Now())}); err != nil {
132+
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
133133
return errors.Wrap(err, "could not insert cancellation event")
134134
}
135135

@@ -148,7 +148,7 @@ func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance *w
148148
}
149149

150150
// Cancel sub-workflow instance
151-
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{history.NewWorkflowCancellationEvent(time.Now())}); err != nil {
151+
if err := insertNewEvents(ctx, tx, subWorkflowInstanceID, []history.Event{*event}); err != nil {
152152
return errors.Wrap(err, "could not insert cancellation event")
153153
}
154154

backend/test/backend.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (s *BackendTestSuite) Test_GetWorkflowTask_LocksTask() {
131131
s.NotNil(t)
132132

133133
// First task is locked, second call should return nil
134-
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*10)
134+
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
135135
defer cancel()
136136

137137
t, err = s.b.GetWorkflowTask(ctx)
@@ -169,7 +169,7 @@ func (s *BackendTestSuite) Test_CompleteWorkflowTask_AddsNewEventsToHistory() {
169169
})
170170
s.NoError(err)
171171

172-
_, err = s.b.GetWorkflowTask(ctx)
172+
t, err := s.b.GetWorkflowTask(ctx)
173173
s.NoError(err)
174174

175175
taskStartedEvent := history.NewHistoryEvent(time.Now(), history.EventType_WorkflowTaskStarted, &history.WorkflowTaskStartedAttributes{})
@@ -192,12 +192,12 @@ func (s *BackendTestSuite) Test_CompleteWorkflowTask_AddsNewEventsToHistory() {
192192
},
193193
}
194194

195-
err = s.b.CompleteWorkflowTask(ctx, "taskID", wfi, backend.WorkflowStateActive, events, activityEvents, workflowEvents)
195+
err = s.b.CompleteWorkflowTask(ctx, t.ID, wfi, backend.WorkflowStateActive, events, activityEvents, workflowEvents)
196196
s.NoError(err)
197197

198198
time.Sleep(time.Second)
199199

200-
t, err := s.b.GetWorkflowTask(ctx)
200+
t, err = s.b.GetWorkflowTask(ctx)
201201
s.NotEqual(task.Continuation, t.Kind, "Expect full task")
202202
s.NoError(err)
203203
s.NotNil(t)

0 commit comments

Comments
 (0)