Skip to content

Commit 61d80aa

Browse files
committed
Split out activity messages and set TX isolation level
1 parent e9a4dcf commit 61d80aa

File tree

9 files changed

+124
-99
lines changed

9 files changed

+124
-99
lines changed

backend/backend.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ type Backend interface {
4343
// This checkpoints the execution. events are new events from the last workflow execution
4444
// which will be added to the workflow instance history. workflowEvents are new events for the
4545
// completed or other workflow instances.
46-
CompleteWorkflowTask(ctx context.Context, instance workflow.Instance, executedEvents []history.Event, workflowEvents []history.WorkflowEvent) error
46+
CompleteWorkflowTask(
47+
ctx context.Context, instance workflow.Instance, state WorkflowState,
48+
executedEvents []history.Event, activityEvents []history.Event, workflowEvents []history.WorkflowEvent) error
4749

4850
// GetActivityTask returns a pending activity task or nil if there are no pending activities
4951
GetActivityTask(ctx context.Context) (*task.Activity, error)

backend/mock_Backend.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,9 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi workflow.Instance) erro
218218

219219
// SignalWorkflow signals a running workflow instance
220220
func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error {
221-
tx, err := b.db.BeginTx(ctx, nil)
221+
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
222+
Isolation: sql.LevelReadCommitted,
223+
})
222224
if err != nil {
223225
return err
224226
}
@@ -233,7 +235,9 @@ func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, ev
233235

234236
// GetWorkflowInstance returns a pending workflow task or nil if there are no pending worflow executions
235237
func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
236-
tx, err := b.db.BeginTx(ctx, nil)
238+
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
239+
Isolation: sql.LevelReadCommitted,
240+
})
237241
if err != nil {
238242
return nil, err
239243
}
@@ -243,19 +247,20 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
243247
now := time.Now()
244248
row := tx.QueryRowContext(
245249
ctx,
246-
`SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.sticky_until FROM instances i
250+
`SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.sticky_until
251+
FROM instances i
247252
INNER JOIN pending_events pe ON i.instance_id = pe.instance_id
248253
WHERE
249-
(i.locked_until IS NULL OR i.locked_until < ?)
250-
AND (i.sticky_until IS NULL OR i.sticky_until < ? OR i.worker = ?)
251-
AND i.completed_at IS NULL
254+
i.completed_at IS NULL
252255
AND (pe.visible_at IS NULL OR pe.visible_at <= ?)
256+
AND (i.locked_until IS NULL OR i.locked_until < ?)
257+
AND (i.sticky_until IS NULL OR i.sticky_until < ? OR i.worker = ?)
253258
LIMIT 1
254-
FOR UPDATE SKIP LOCKED`,
259+
FOR UPDATE OF i SKIP LOCKED`,
260+
now, // event.visible_at
255261
now, // locked_until
256262
now, // sticky_until
257263
b.workerName, // worker
258-
now, // event.visible_at
259264
)
260265

261266
var id int
@@ -271,6 +276,8 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
271276
return nil, errors.Wrap(err, "could not scan workflow instance")
272277
}
273278

279+
// log.Println("Acquired workflow instance", instanceID)
280+
274281
res, err := tx.ExecContext(
275282
ctx,
276283
`UPDATE instances i
@@ -431,7 +438,9 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
431438
func (b *mysqlBackend) CompleteWorkflowTask(
432439
ctx context.Context,
433440
instance workflow.Instance,
441+
state backend.WorkflowState,
434442
executedEvents []history.Event,
443+
activityEvents []history.Event,
435444
workflowEvents []history.WorkflowEvent,
436445
) error {
437446
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
@@ -443,10 +452,17 @@ func (b *mysqlBackend) CompleteWorkflowTask(
443452
defer tx.Rollback()
444453

445454
// Unlock instance, but keep it sticky to the current worker
455+
var completedAt *time.Time
456+
if state == backend.WorkflowStateFinished {
457+
t := time.Now()
458+
completedAt = &t
459+
}
460+
446461
res, err := tx.ExecContext(
447462
ctx,
448-
`UPDATE instances SET locked_until = NULL, sticky_until = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`,
463+
`UPDATE instances SET locked_until = NULL, sticky_until = ?, completed_at = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`,
449464
time.Now().Add(b.options.StickyTimeout),
465+
completedAt,
450466
instance.GetInstanceID(),
451467
instance.GetExecutionID(),
452468
b.workerName,
@@ -484,18 +500,10 @@ func (b *mysqlBackend) CompleteWorkflowTask(
484500
return errors.Wrap(err, "could not insert new history events")
485501
}
486502

487-
workflowCompleted := false
488-
489503
// Schedule activities
490-
for _, e := range executedEvents {
491-
switch e.Type {
492-
case history.EventType_ActivityScheduled:
493-
if err := scheduleActivity(ctx, tx, instance.GetInstanceID(), instance.GetExecutionID(), e); err != nil {
494-
return errors.Wrap(err, "could not schedule activity")
495-
}
496-
497-
case history.EventType_WorkflowExecutionFinished:
498-
workflowCompleted = true
504+
for _, e := range activityEvents {
505+
if err := scheduleActivity(ctx, tx, instance.GetInstanceID(), instance.GetExecutionID(), e); err != nil {
506+
return errors.Wrap(err, "could not schedule activity")
499507
}
500508
}
501509

@@ -522,20 +530,8 @@ func (b *mysqlBackend) CompleteWorkflowTask(
522530
}
523531
}
524532

525-
if workflowCompleted {
526-
if _, err := tx.ExecContext(
527-
ctx,
528-
"UPDATE instances SET completed_at = ? WHERE instance_id = ? AND execution_id = ?",
529-
time.Now(),
530-
instance.GetInstanceID(),
531-
instance.GetExecutionID(),
532-
); err != nil {
533-
return errors.Wrap(err, "could not mark instance as completed")
534-
}
535-
}
536-
537533
if err := tx.Commit(); err != nil {
538-
return err
534+
return errors.Wrap(err, "could not commit complete workflow transaction")
539535
}
540536

541537
return nil
@@ -572,7 +568,9 @@ func (b *mysqlBackend) ExtendWorkflowTask(ctx context.Context, instance workflow
572568

573569
// GetActivityTask returns a pending activity task or nil if there are no pending activities
574570
func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, error) {
575-
tx, err := b.db.BeginTx(ctx, nil)
571+
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
572+
Isolation: sql.LevelReadCommitted,
573+
})
576574
if err != nil {
577575
return nil, err
578576
}
@@ -635,7 +633,9 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
635633

636634
// CompleteActivityTask completes a activity task retrieved using GetActivityTask
637635
func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance workflow.Instance, id string, event history.Event) error {
638-
tx, err := b.db.BeginTx(ctx, nil)
636+
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
637+
Isolation: sql.LevelReadCommitted,
638+
})
639639
if err != nil {
640640
return err
641641
}

backend/sqlite/sqlite.go

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,9 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
310310
func (sb *sqliteBackend) CompleteWorkflowTask(
311311
ctx context.Context,
312312
instance workflow.Instance,
313+
state backend.WorkflowState,
313314
executedEvents []history.Event,
315+
activityEvents []history.Event,
314316
workflowEvents []history.WorkflowEvent,
315317
) error {
316318
tx, err := sb.db.BeginTx(ctx, nil)
@@ -319,11 +321,18 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
319321
}
320322
defer tx.Rollback()
321323

324+
var completedAt *time.Time
325+
if state == backend.WorkflowStateFinished {
326+
t := time.Now()
327+
completedAt = &t
328+
}
329+
322330
// Unlock instance, but keep it sticky to the current worker
323331
if res, err := tx.ExecContext(
324332
ctx,
325-
`UPDATE instances SET locked_until = NULL, sticky_until = ? WHERE id = ? AND execution_id = ? AND worker = ?`,
333+
`UPDATE instances SET locked_until = NULL, sticky_until = ?, completed_at = ? WHERE id = ? AND execution_id = ? AND worker = ?`,
326334
time.Now().Add(sb.options.StickyTimeout),
335+
completedAt,
327336
instance.GetInstanceID(),
328337
instance.GetExecutionID(),
329338
sb.workerName,
@@ -357,18 +366,10 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
357366
return errors.Wrap(err, "could not insert new history events")
358367
}
359368

360-
workflowCompleted := false
361-
362369
// Schedule activities
363-
for _, event := range executedEvents {
364-
switch event.Type {
365-
case history.EventType_ActivityScheduled:
366-
if err := scheduleActivity(ctx, tx, instance.GetInstanceID(), instance.GetExecutionID(), event); err != nil {
367-
return errors.Wrap(err, "could not schedule activity")
368-
}
369-
370-
case history.EventType_WorkflowExecutionFinished:
371-
workflowCompleted = true
370+
for _, event := range activityEvents {
371+
if err := scheduleActivity(ctx, tx, instance.GetInstanceID(), instance.GetExecutionID(), event); err != nil {
372+
return errors.Wrap(err, "could not schedule activity")
372373
}
373374
}
374375

@@ -396,18 +397,6 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
396397
}
397398
}
398399

399-
if workflowCompleted {
400-
if _, err := tx.ExecContext(
401-
ctx,
402-
"UPDATE instances SET completed_at = ? WHERE id = ? AND execution_id = ?",
403-
time.Now(),
404-
instance.GetInstanceID(),
405-
instance.GetExecutionID(),
406-
); err != nil {
407-
return errors.Wrap(err, "could not mark instance as completed")
408-
}
409-
}
410-
411400
return tx.Commit()
412401
}
413402

backend/test/backend.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func (s *BackendTestSuite) Test_CompleteWorkflowTask_ReturnsErrorIfNotLocked() {
129129
})
130130
s.NoError(err)
131131

132-
err = s.b.CompleteWorkflowTask(ctx, wfi, []history.Event{}, []history.WorkflowEvent{})
132+
err = s.b.CompleteWorkflowTask(ctx, wfi, backend.WorkflowStateActive, []history.Event{}, []history.Event{}, []history.WorkflowEvent{})
133133

134134
s.Error(err)
135135
}
@@ -160,14 +160,18 @@ func (s *BackendTestSuite) Test_CompleteWorkflowTask_AddsNewEventsToHistory() {
160160
taskFinishedEvent,
161161
}
162162

163+
activityEvents := []history.Event{
164+
activityScheduledEvent,
165+
}
166+
163167
workflowEvents := []history.WorkflowEvent{
164168
{
165169
WorkflowInstance: wfi,
166170
HistoryEvent: activityCompletedEvent,
167171
},
168172
}
169173

170-
err = s.b.CompleteWorkflowTask(ctx, wfi, events, workflowEvents)
174+
err = s.b.CompleteWorkflowTask(ctx, wfi, backend.WorkflowStateActive, events, activityEvents, workflowEvents)
171175
s.NoError(err)
172176

173177
time.Sleep(time.Second)

internal/tester/tester.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,17 +210,17 @@ func (wt *workflowTester) Execute(args ...interface{}) {
210210
panic("could not create workflow executor" + err.Error())
211211
}
212212

213-
executedEvents, workflowEvents, err := e.ExecuteTask(context.Background(), t)
213+
result, err := e.ExecuteTask(context.Background(), t)
214214
if err != nil {
215215
panic("Error while executing workflow" + err.Error())
216216
}
217217

218218
e.Close()
219219

220220
// Add all executed events to history
221-
tw.history = append(tw.history, executedEvents...)
221+
tw.history = append(tw.history, result.NewEvents...)
222222

223-
for _, event := range executedEvents {
223+
for _, event := range result.NewEvents {
224224
log.Println("Event", event.Type)
225225

226226
switch event.Type {
@@ -238,7 +238,7 @@ func (wt *workflowTester) Execute(args ...interface{}) {
238238
}
239239
}
240240

241-
for _, workflowEvent := range workflowEvents {
241+
for _, workflowEvent := range result.WorkflowEvents {
242242
gotNewEvents = true
243243
log.Println("Workflow event", workflowEvent.HistoryEvent.Type)
244244

internal/worker/workflow.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/benbjohnson/clock"
1010
"github.com/cschleiden/go-workflows/backend"
11-
"github.com/cschleiden/go-workflows/internal/history"
1211
"github.com/cschleiden/go-workflows/internal/task"
1312
"github.com/cschleiden/go-workflows/internal/workflow"
1413
"github.com/pkg/errors"
@@ -118,20 +117,29 @@ func (ww *workflowWorker) runDispatcher(ctx context.Context) {
118117
}
119118

120119
func (ww *workflowWorker) handle(ctx context.Context, t *task.Workflow) {
121-
executedEvents, workflowEvents, err := ww.handleTask(ctx, t)
120+
result, err := ww.handleTask(ctx, t)
122121
if err != nil {
123122
ww.logger.Panic(err)
124123
}
125124

126-
if err := ww.backend.CompleteWorkflowTask(ctx, t.WorkflowInstance, executedEvents, workflowEvents); err != nil {
125+
state := backend.WorkflowStateActive
126+
if result.Completed {
127+
state = backend.WorkflowStateFinished
128+
}
129+
130+
if err := ww.backend.CompleteWorkflowTask(
131+
ctx, t.WorkflowInstance, state, result.NewEvents, result.ActivityEvents, result.WorkflowEvents); err != nil {
127132
ww.logger.Panic(err)
128133
}
129134
}
130135

131-
func (ww *workflowWorker) handleTask(ctx context.Context, t *task.Workflow) ([]history.Event, []history.WorkflowEvent, error) {
136+
func (ww *workflowWorker) handleTask(
137+
ctx context.Context,
138+
t *task.Workflow,
139+
) (*workflow.ExecutionResult, error) {
132140
executor, err := ww.getExecutor(ctx, t)
133141
if err != nil {
134-
return nil, nil, err
142+
return nil, err
135143
}
136144

137145
if ww.options.HeartbeatWorkflowTasks {
@@ -141,12 +149,12 @@ func (ww *workflowWorker) handleTask(ctx context.Context, t *task.Workflow) ([]h
141149
go ww.heartbeatTask(heartbeatCtx, t)
142150
}
143151

144-
executedEvents, workflowEvents, err := executor.ExecuteTask(ctx, t)
152+
result, err := executor.ExecuteTask(ctx, t)
145153
if err != nil {
146-
return nil, nil, errors.Wrap(err, "could not execute workflow task")
154+
return nil, errors.Wrap(err, "could not execute workflow task")
147155
}
148156

149-
return executedEvents, workflowEvents, nil
157+
return result, nil
150158
}
151159

152160
func (ww *workflowWorker) getExecutor(ctx context.Context, t *task.Workflow) (workflow.WorkflowExecutor, error) {

0 commit comments

Comments
 (0)