4
4
"context"
5
5
"database/sql"
6
6
_ "embed"
7
+ "errors"
7
8
"fmt"
8
9
"strings"
9
10
"time"
@@ -16,7 +17,6 @@ import (
16
17
"github.com/cschleiden/go-workflows/workflow"
17
18
_ "github.com/go-sql-driver/mysql"
18
19
"github.com/google/uuid"
19
- "github.com/pkg/errors"
20
20
)
21
21
22
22
//go:embed schema.sql
@@ -32,7 +32,7 @@ func NewMysqlBackend(host string, port int, user, password, database string, opt
32
32
}
33
33
34
34
if _ , err := db .Exec (schema ); err != nil {
35
- panic (errors . Wrap ( err , "could not initialize database" ))
35
+ panic (fmt . Errorf ( "initializing database: %w" , err ))
36
36
}
37
37
38
38
if err := db .Close (); err != nil {
@@ -63,7 +63,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
63
63
Isolation : sql .LevelReadCommitted ,
64
64
})
65
65
if err != nil {
66
- return errors . Wrap ( err , "could not start transaction" )
66
+ return fmt . Errorf ( "starting transaction: %w" , err )
67
67
}
68
68
defer tx .Rollback ()
69
69
@@ -74,11 +74,11 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
74
74
75
75
// Initial history is empty, store only new events
76
76
if err := insertNewEvents (ctx , tx , m .WorkflowInstance .InstanceID , []history.Event {m .HistoryEvent }); err != nil {
77
- return errors . Wrap ( err , "could not insert new event" )
77
+ return fmt . Errorf ( "inserting new event: %w" , err )
78
78
}
79
79
80
80
if err := tx .Commit (); err != nil {
81
- return errors . Wrap ( err , "could not create workflow instance" )
81
+ return fmt . Errorf ( "creating workflow instance: %w" , err )
82
82
}
83
83
84
84
return nil
@@ -99,7 +99,7 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
99
99
100
100
// Cancel workflow instance
101
101
if err := insertNewEvents (ctx , tx , instanceID , []history.Event {* event }); err != nil {
102
- return errors . Wrap ( err , "could not insert cancellation event" )
102
+ return fmt . Errorf ( "inserting cancellation event: %w" , err )
103
103
}
104
104
105
105
// Recursively, find any sub-workflow instance to cancel
@@ -113,12 +113,12 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
113
113
break
114
114
}
115
115
116
- return errors . Wrap ( err , "could not get workflow instance for cancelling" )
116
+ return fmt . Errorf ( "getting workflow instance for cancelling: %w" , err )
117
117
}
118
118
119
119
// Cancel sub-workflow instance
120
120
if err := insertNewEvents (ctx , tx , subWorkflowInstanceID , []history.Event {* event }); err != nil {
121
- return errors . Wrap ( err , "could not insert cancellation event" )
121
+ return fmt . Errorf ( "inserting cancellation event: %w" , err )
122
122
}
123
123
124
124
instanceID = subWorkflowInstanceID
@@ -150,7 +150,7 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
150
150
)
151
151
}
152
152
if err != nil {
153
- return nil , errors . Wrap ( err , "could not get history" )
153
+ return nil , fmt . Errorf ( "getting history: %w" , err )
154
154
}
155
155
156
156
h := make ([]history.Event , 0 )
@@ -171,12 +171,12 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
171
171
& attributes ,
172
172
& historyEvent .VisibleAt ,
173
173
); err != nil {
174
- return nil , errors . Wrap ( err , "could not scan event" )
174
+ return nil , fmt . Errorf ( "scanning event: %w" , err )
175
175
}
176
176
177
177
a , err := history .DeserializeAttributes (historyEvent .Type , attributes )
178
178
if err != nil {
179
- return nil , errors . Wrap ( err , "could not deserialize attributes" )
179
+ return nil , fmt . Errorf ( "deserializing attributes: %w" , err )
180
180
}
181
181
182
182
historyEvent .Attributes = a
@@ -229,7 +229,7 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ign
229
229
parentEventID ,
230
230
)
231
231
if err != nil {
232
- return errors . Wrap ( err , "could not insert workflow instance" )
232
+ return fmt . Errorf ( "inserting workflow instance: %w" , err )
233
233
}
234
234
235
235
if ! ignoreDuplicate {
@@ -263,7 +263,7 @@ func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, ev
263
263
}
264
264
265
265
if err := insertNewEvents (ctx , tx , instanceID , []history.Event {event }); err != nil {
266
- return errors . Wrap ( err , "could not insert signal event" )
266
+ return fmt . Errorf ( "inserting signal event: %w" , err )
267
267
}
268
268
269
269
return tx .Commit ()
@@ -309,7 +309,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
309
309
return nil , nil
310
310
}
311
311
312
- return nil , errors . Wrap ( err , "could not scan workflow instance" )
312
+ return nil , fmt . Errorf ( "scanning workflow instance: %w" , err )
313
313
}
314
314
315
315
// log.Println("Acquired workflow instance", instanceID)
@@ -324,11 +324,11 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
324
324
id ,
325
325
)
326
326
if err != nil {
327
- return nil , errors . Wrap ( err , "could not lock workflow instance" )
327
+ return nil , fmt . Errorf ( "locking workflow instance: %w" , err )
328
328
}
329
329
330
330
if affectedRows , err := res .RowsAffected (); err != nil {
331
- return nil , errors . Wrap ( err , "could not lock workflow instance" )
331
+ return nil , fmt . Errorf ( "locking workflow instance: %w" , err )
332
332
} else if affectedRows == 0 {
333
333
// No instance locked?
334
334
return nil , nil
@@ -355,7 +355,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
355
355
now ,
356
356
)
357
357
if err != nil {
358
- return nil , errors . Wrap ( err , "could not get new events" )
358
+ return nil , fmt . Errorf ( "getting new events: %w" , err )
359
359
}
360
360
361
361
for events .Next () {
@@ -374,12 +374,12 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
374
374
& attributes ,
375
375
& historyEvent .VisibleAt ,
376
376
); err != nil {
377
- return nil , errors . Wrap ( err , "could not scan event" )
377
+ return nil , fmt . Errorf ( "scanning event: %w" , err )
378
378
}
379
379
380
380
a , err := history .DeserializeAttributes (historyEvent .Type , attributes )
381
381
if err != nil {
382
- return nil , errors . Wrap ( err , "could not deserialize attributes" )
382
+ return nil , fmt . Errorf ( "deserializing attributes: %w" , err )
383
383
}
384
384
385
385
historyEvent .Attributes = a
@@ -398,7 +398,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
398
398
& t .LastSequenceID ,
399
399
); err != nil {
400
400
if err != sql .ErrNoRows {
401
- return nil , errors . Wrap ( err , "could not get most recent sequence id" )
401
+ return nil , fmt . Errorf ( "getting most recent sequence id: %w" , err )
402
402
}
403
403
}
404
404
@@ -448,12 +448,12 @@ func (b *mysqlBackend) CompleteWorkflowTask(
448
448
b .workerName ,
449
449
)
450
450
if err != nil {
451
- return errors . Wrap ( err , "could not unlock instance" )
451
+ return fmt . Errorf ( "unlocking instance: %w" , err )
452
452
}
453
453
454
454
changedRows , err := res .RowsAffected ()
455
455
if err != nil {
456
- return errors . Wrap ( err , "could not check for unlocked workflow instances" )
456
+ return fmt . Errorf ( "checking for unlocked workflow instances: %w" , err )
457
457
} else if changedRows != 1 {
458
458
return errors .New ("could not find workflow instance to unlock" )
459
459
}
@@ -471,19 +471,19 @@ func (b *mysqlBackend) CompleteWorkflowTask(
471
471
fmt .Sprintf (`DELETE FROM pending_events WHERE instance_id = ? AND event_id IN (?%v)` , strings .Repeat (",?" , len (executedEvents )- 1 )),
472
472
args ... ,
473
473
); err != nil {
474
- return errors . Wrap ( err , "could not delete handled new events" )
474
+ return fmt . Errorf ( "deleting handled new events: %w" , err )
475
475
}
476
476
}
477
477
478
478
// Insert new events generated during this workflow execution to the history
479
479
if err := insertHistoryEvents (ctx , tx , instance .InstanceID , executedEvents ); err != nil {
480
- return errors . Wrap ( err , "could not insert new history events" )
480
+ return fmt . Errorf ( "inserting new history events: %w" , err )
481
481
}
482
482
483
483
// Schedule activities
484
484
for _ , e := range activityEvents {
485
485
if err := scheduleActivity (ctx , tx , instance , e ); err != nil {
486
- return errors . Wrap ( err , "could not schedule activity" )
486
+ return fmt . Errorf ( "scheduling activity: %w" , err )
487
487
}
488
488
}
489
489
@@ -506,12 +506,12 @@ func (b *mysqlBackend) CompleteWorkflowTask(
506
506
}
507
507
508
508
if err := insertNewEvents (ctx , tx , targetInstance .InstanceID , events ); err != nil {
509
- return errors . Wrap ( err , "could not insert messages" )
509
+ return fmt . Errorf ( "inserting messages: %w" , err )
510
510
}
511
511
}
512
512
513
513
if err := tx .Commit (); err != nil {
514
- return errors . Wrap ( err , "could not commit complete workflow transaction" )
514
+ return fmt . Errorf ( "committing complete workflow transaction: %w" , err )
515
515
}
516
516
517
517
return nil
@@ -534,11 +534,11 @@ func (b *mysqlBackend) ExtendWorkflowTask(ctx context.Context, taskID string, in
534
534
b .workerName ,
535
535
)
536
536
if err != nil {
537
- return errors . Wrap ( err , "could not extend workflow task lock" )
537
+ return fmt . Errorf ( "extending workflow task lock: %w" , err )
538
538
}
539
539
540
540
if rowsAffected , err := res .RowsAffected (); err != nil {
541
- return errors . Wrap ( err , "could not determine if workflow task was extended" )
541
+ return fmt . Errorf ( "determining if workflow task was extended: %w" , err )
542
542
} else if rowsAffected == 0 {
543
543
return errors .New ("could not extend workflow task" )
544
544
}
@@ -578,12 +578,12 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
578
578
return nil , nil
579
579
}
580
580
581
- return nil , errors . Wrap ( err , "could not find activity task to lock" )
581
+ return nil , fmt . Errorf ( "finding activity task to lock: %w" , err )
582
582
}
583
583
584
584
a , err := history .DeserializeAttributes (event .Type , attributes )
585
585
if err != nil {
586
- return nil , errors . Wrap ( err , "could not deserialize attributes" )
586
+ return nil , fmt . Errorf ( "deserializing attributes: %w" , err )
587
587
}
588
588
589
589
event .Attributes = a
@@ -595,7 +595,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
595
595
b .workerName ,
596
596
id ,
597
597
); err != nil {
598
- return nil , errors . Wrap ( err , "could not lock activity" )
598
+ return nil , fmt . Errorf ( "locking activity: %w" , err )
599
599
}
600
600
601
601
t := & task.Activity {
@@ -630,11 +630,11 @@ func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workf
630
630
instance .ExecutionID ,
631
631
b .workerName ,
632
632
); err != nil {
633
- return errors . Wrap ( err , "could not complete activity" )
633
+ return fmt . Errorf ( "completing activity: %w" , err )
634
634
} else {
635
635
affected , err := res .RowsAffected ()
636
636
if err != nil {
637
- return errors . Wrap ( err , "could not check for completed activity" )
637
+ return fmt . Errorf ( "checking for completed activity: %w" , err )
638
638
}
639
639
640
640
if affected == 0 {
@@ -644,7 +644,7 @@ func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workf
644
644
645
645
// Insert new event generated during this workflow execution
646
646
if err := insertNewEvents (ctx , tx , instance .InstanceID , []history.Event {event }); err != nil {
647
- return errors . Wrap ( err , "could not insert new events for completed activity" )
647
+ return fmt . Errorf ( "inserting new events for completed activity: %w" , err )
648
648
}
649
649
650
650
if err := tx .Commit (); err != nil {
@@ -670,11 +670,11 @@ func (b *mysqlBackend) ExtendActivityTask(ctx context.Context, activityID string
670
670
b .workerName ,
671
671
)
672
672
if err != nil {
673
- return errors . Wrap ( err , "could not extend activity lock" )
673
+ return fmt . Errorf ( "extending activity lock: %w" , err )
674
674
}
675
675
676
676
if rowsAffected , err := res .RowsAffected (); err != nil {
677
- return errors . Wrap ( err , "could not determine if activity was extended" )
677
+ return fmt . Errorf ( "determining if activity was extended: %w" , err )
678
678
} else if rowsAffected == 0 {
679
679
return errors .New ("could not extend activity" )
680
680
}
0 commit comments