Skip to content

Commit e2aea83

Browse files
authored
Make workflow instance a struct
1 parent 0972c2d commit e2aea83

File tree

34 files changed

+191
-216
lines changed

34 files changed

+191
-216
lines changed

backend/backend.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ type Backend interface {
2222
CreateWorkflowInstance(ctx context.Context, event history.WorkflowEvent) error
2323

2424
// CancelWorkflowInstance cancels a running workflow instance
25-
CancelWorkflowInstance(ctx context.Context, instance workflow.Instance) error
25+
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance) error
2626

2727
// GetWorkflowInstanceState returns the state of the given workflow instance
28-
GetWorkflowInstanceState(ctx context.Context, instance workflow.Instance) (WorkflowState, error)
28+
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (WorkflowState, error)
2929

3030
// GetWorkflowInstanceHistory returns the full workflow history for the given instance
31-
GetWorkflowInstanceHistory(ctx context.Context, instance workflow.Instance) ([]history.Event, error)
31+
GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance) ([]history.Event, error)
3232

3333
// SignalWorkflow signals a running workflow instance
3434
SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error
@@ -37,22 +37,22 @@ type Backend interface {
3737
GetWorkflowTask(ctx context.Context) (*task.Workflow, error)
3838

3939
// ExtendWorkflowTask extends the lock of a workflow task
40-
ExtendWorkflowTask(ctx context.Context, taskID string, instance core.WorkflowInstance) error
40+
ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error
4141

4242
// CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask
4343
//
4444
// This checkpoints the execution. events are new events from the last workflow execution
4545
// which will be added to the workflow instance history. workflowEvents are new events for the
4646
// completed or other workflow instances.
4747
CompleteWorkflowTask(
48-
ctx context.Context, taskID string, instance workflow.Instance, state WorkflowState,
48+
ctx context.Context, taskID string, instance *workflow.Instance, state WorkflowState,
4949
executedEvents []history.Event, activityEvents []history.Event, workflowEvents []history.WorkflowEvent) error
5050

5151
// GetActivityTask returns a pending activity task or nil if there are no pending activities
5252
GetActivityTask(ctx context.Context) (*task.Activity, error)
5353

5454
// CompleteActivityTask completes a activity task retrieved using GetActivityTask
55-
CompleteActivityTask(ctx context.Context, instance workflow.Instance, activityID string, event history.Event) error
55+
CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event history.Event) error
5656

5757
// ExtendActivityTask extends the lock of an activity task
5858
ExtendActivityTask(ctx context.Context, activityID string) error

backend/mock_Backend.go

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
7070
}
7171

7272
// Initial history is empty, store only new events
73-
if err := insertNewEvents(ctx, tx, m.WorkflowInstance.GetInstanceID(), []history.Event{m.HistoryEvent}); err != nil {
73+
if err := insertNewEvents(ctx, tx, m.WorkflowInstance.InstanceID, []history.Event{m.HistoryEvent}); err != nil {
7474
return errors.Wrap(err, "could not insert new event")
7575
}
7676

@@ -81,14 +81,14 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
8181
return nil
8282
}
8383

84-
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance workflow.Instance) error {
84+
func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance) error {
8585
tx, err := b.db.BeginTx(ctx, nil)
8686
if err != nil {
8787
return err
8888
}
8989
defer tx.Rollback()
9090

91-
instanceID := instance.GetInstanceID()
91+
instanceID := instance.InstanceID
9292

9393
// Cancel workflow instance
9494
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{history.NewWorkflowCancellationEvent(time.Now())}); err != nil {
@@ -120,7 +120,7 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance work
120120
return tx.Commit()
121121
}
122122

123-
func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance workflow.Instance) ([]history.Event, error) {
123+
func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance) ([]history.Event, error) {
124124
tx, err := b.db.BeginTx(ctx, nil)
125125
if err != nil {
126126
return nil, err
@@ -130,7 +130,7 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
130130
historyEvents, err := tx.QueryContext(
131131
ctx,
132132
"SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
133-
instance.GetInstanceID(),
133+
instance.InstanceID,
134134
)
135135
if err != nil {
136136
return nil, errors.Wrap(err, "could not get history")
@@ -169,12 +169,12 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
169169
return h, nil
170170
}
171171

172-
func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance workflow.Instance) (backend.WorkflowState, error) {
172+
func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (backend.WorkflowState, error) {
173173
row := b.db.QueryRowContext(
174174
ctx,
175175
"SELECT completed_at FROM instances WHERE instance_id = ? AND execution_id = ?",
176-
instance.GetInstanceID(),
177-
instance.GetExecutionID(),
176+
instance.InstanceID,
177+
instance.ExecutionID,
178178
)
179179

180180
var completedAt sql.NullTime
@@ -191,22 +191,22 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance wo
191191
return backend.WorkflowStateActive, nil
192192
}
193193

194-
func createInstance(ctx context.Context, tx *sql.Tx, wfi workflow.Instance, ignoreDuplicate bool) error {
194+
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ignoreDuplicate bool) error {
195195
var parentInstanceID *string
196196
var parentEventID *int
197197
if wfi.SubWorkflow() {
198-
i := wfi.ParentInstance().GetInstanceID()
198+
i := wfi.ParentInstanceID
199199
parentInstanceID = &i
200200

201-
n := wfi.ParentEventID()
201+
n := wfi.ParentEventID
202202
parentEventID = &n
203203
}
204204

205205
res, err := tx.ExecContext(
206206
ctx,
207207
"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_schedule_event_id) VALUES (?, ?, ?, ?)",
208-
wfi.GetInstanceID(),
209-
wfi.GetExecutionID(),
208+
wfi.InstanceID,
209+
wfi.ExecutionID,
210210
parentInstanceID,
211211
parentEventID,
212212
)
@@ -316,15 +316,15 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
316316
kind = task.Continuation
317317
}
318318

319-
var wfi workflow.Instance
319+
var wfi *workflow.Instance
320320
if parentInstanceID != nil {
321-
wfi = core.NewSubWorkflowInstance(instanceID, executionID, core.NewWorkflowInstance(*parentInstanceID, ""), *parentEventID)
321+
wfi = core.NewSubWorkflowInstance(instanceID, executionID, *parentInstanceID, *parentEventID)
322322
} else {
323323
wfi = core.NewWorkflowInstance(instanceID, executionID)
324324
}
325325

326326
t := &task.Workflow{
327-
ID: wfi.GetInstanceID(),
327+
ID: wfi.InstanceID,
328328
WorkflowInstance: wfi,
329329
NewEvents: []history.Event{},
330330
History: []history.Event{},
@@ -451,7 +451,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
451451
func (b *mysqlBackend) CompleteWorkflowTask(
452452
ctx context.Context,
453453
taskID string,
454-
instance workflow.Instance,
454+
instance *workflow.Instance,
455455
state backend.WorkflowState,
456456
executedEvents []history.Event,
457457
activityEvents []history.Event,
@@ -477,8 +477,8 @@ func (b *mysqlBackend) CompleteWorkflowTask(
477477
`UPDATE instances SET locked_until = NULL, sticky_until = ?, completed_at = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`,
478478
time.Now().Add(b.options.StickyTimeout),
479479
completedAt,
480-
instance.GetInstanceID(),
481-
instance.GetExecutionID(),
480+
instance.InstanceID,
481+
instance.ExecutionID,
482482
b.workerName,
483483
)
484484
if err != nil {
@@ -495,7 +495,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
495495
// Remove handled events from task
496496
if len(executedEvents) > 0 {
497497
args := make([]interface{}, 0, len(executedEvents)+1)
498-
args = append(args, instance.GetInstanceID())
498+
args = append(args, instance.InstanceID)
499499
for _, e := range executedEvents {
500500
args = append(args, e.ID)
501501
}
@@ -510,19 +510,19 @@ func (b *mysqlBackend) CompleteWorkflowTask(
510510
}
511511

512512
// Insert new events generated during this workflow execution to the history
513-
if err := insertHistoryEvents(ctx, tx, instance.GetInstanceID(), executedEvents); err != nil {
513+
if err := insertHistoryEvents(ctx, tx, instance.InstanceID, executedEvents); err != nil {
514514
return errors.Wrap(err, "could not insert new history events")
515515
}
516516

517517
// Schedule activities
518518
for _, e := range activityEvents {
519-
if err := scheduleActivity(ctx, tx, instance.GetInstanceID(), instance.GetExecutionID(), e); err != nil {
519+
if err := scheduleActivity(ctx, tx, instance.InstanceID, instance.ExecutionID, e); err != nil {
520520
return errors.Wrap(err, "could not schedule activity")
521521
}
522522
}
523523

524524
// Insert new workflow events
525-
groupedEvents := make(map[workflow.Instance][]history.Event)
525+
groupedEvents := make(map[*workflow.Instance][]history.Event)
526526
for _, m := range workflowEvents {
527527
if _, ok := groupedEvents[m.WorkflowInstance]; !ok {
528528
groupedEvents[m.WorkflowInstance] = []history.Event{}
@@ -532,14 +532,14 @@ func (b *mysqlBackend) CompleteWorkflowTask(
532532
}
533533

534534
for targetInstance, events := range groupedEvents {
535-
if targetInstance.GetInstanceID() != instance.GetInstanceID() {
535+
if targetInstance.InstanceID != instance.InstanceID {
536536
// Create new instance
537537
if err := createInstance(ctx, tx, targetInstance, true); err != nil {
538538
return err
539539
}
540540
}
541541

542-
if err := insertNewEvents(ctx, tx, targetInstance.GetInstanceID(), events); err != nil {
542+
if err := insertNewEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
543543
return errors.Wrap(err, "could not insert messages")
544544
}
545545
}
@@ -551,7 +551,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
551551
return nil
552552
}
553553

554-
func (b *mysqlBackend) ExtendWorkflowTask(ctx context.Context, taskID string, instance core.WorkflowInstance) error {
554+
func (b *mysqlBackend) ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error {
555555
tx, err := b.db.BeginTx(ctx, nil)
556556
if err != nil {
557557
return err
@@ -563,8 +563,8 @@ func (b *mysqlBackend) ExtendWorkflowTask(ctx context.Context, taskID string, in
563563
ctx,
564564
`UPDATE instances SET locked_until = ? WHERE instance_id = ? AND execution_id = ? AND worker = ?`,
565565
until,
566-
instance.GetInstanceID(),
567-
instance.GetExecutionID(),
566+
instance.InstanceID,
567+
instance.ExecutionID,
568568
b.workerName,
569569
)
570570
if err != nil {
@@ -646,7 +646,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
646646
}
647647

648648
// CompleteActivityTask completes a activity task retrieved using GetActivityTask
649-
func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance workflow.Instance, id string, event history.Event) error {
649+
func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workflow.Instance, id string, event history.Event) error {
650650
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
651651
Isolation: sql.LevelReadCommitted,
652652
})
@@ -660,8 +660,8 @@ func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance workfl
660660
ctx,
661661
`DELETE FROM activities WHERE activity_id = ? AND instance_id = ? AND execution_id = ? AND worker = ?`,
662662
id,
663-
instance.GetInstanceID(),
664-
instance.GetExecutionID(),
663+
instance.InstanceID,
664+
instance.ExecutionID,
665665
b.workerName,
666666
); err != nil {
667667
return errors.Wrap(err, "could not complete activity")
@@ -677,7 +677,7 @@ func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance workfl
677677
}
678678

679679
// Insert new event generated during this workflow execution
680-
if err := insertNewEvents(ctx, tx, instance.GetInstanceID(), []history.Event{event}); err != nil {
680+
if err := insertNewEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
681681
return errors.Wrap(err, "could not insert new events for completed activity")
682682
}
683683

backend/redis/activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (rb *redisBackend) ExtendActivityTask(ctx context.Context, activityID strin
2929
return rb.activityQueue.Extend(ctx, activityID)
3030
}
3131

32-
func (rb *redisBackend) CompleteActivityTask(ctx context.Context, instance core.WorkflowInstance, activityID string, event history.Event) error {
32+
func (rb *redisBackend) CompleteActivityTask(ctx context.Context, instance *core.WorkflowInstance, activityID string, event history.Event) error {
3333
if err := rb.addWorkflowInstanceEvent(ctx, instance, event); err != nil {
3434
return err
3535
}

0 commit comments

Comments
 (0)