Skip to content

Commit 60adf88

Browse files
authored
Merge pull request #131 from cschleiden/panic-when-completing-pending-workflow
Panic when completing workflow with pending futures
2 parents 4e9d2e2 + 3a7b047 commit 60adf88

File tree

6 files changed

+78
-5
lines changed

6 files changed

+78
-5
lines changed

internal/workflow/executor.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,11 @@ func (e *executor) executeNewEvents(newEvents []history.Event) ([]history.Event,
218218
}
219219

220220
if e.workflow.Completed() {
221+
// TODO: Is this too early? We haven't committed some of the commands
222+
if e.workflowState.HasPendingFutures() {
223+
panic("Workflow completed, but there are still pending futures")
224+
}
225+
221226
e.workflowCompleted(e.workflow.Result(), e.workflow.Error())
222227
}
223228

@@ -356,6 +361,8 @@ func (e *executor) handleActivityCompleted(event history.Event, a *history.Activ
356361
return fmt.Errorf("setting activity completed result: %w", err)
357362
}
358363

364+
e.workflowState.RemoveFuture(event.ScheduleEventID)
365+
359366
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
360367
if c == nil {
361368
return fmt.Errorf("previous workflow execution scheduled an activity which could not be found")
@@ -381,6 +388,8 @@ func (e *executor) handleActivityFailed(event history.Event, a *history.Activity
381388
return fmt.Errorf("setting activity failed result: %w", err)
382389
}
383390

391+
e.workflowState.RemoveFuture(event.ScheduleEventID)
392+
384393
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
385394
if c == nil {
386395
return fmt.Errorf("previous workflow execution scheduled an activity which could not be found")
@@ -422,6 +431,8 @@ func (e *executor) handleTimerFired(event history.Event, a *history.TimerFiredAt
422431
return fmt.Errorf("setting timer fired result: %w", err)
423432
}
424433

434+
e.workflowState.RemoveFuture(event.ScheduleEventID)
435+
425436
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
426437
if c == nil {
427438
return fmt.Errorf("no command found for timer fired event")
@@ -460,6 +471,8 @@ func (e *executor) handleTimerCanceled(event history.Event, a *history.TimerCanc
460471
return fmt.Errorf("setting timer canceled result: %w", err)
461472
}
462473

474+
e.workflowState.RemoveFuture(event.ScheduleEventID)
475+
463476
return e.workflow.Continue()
464477
}
465478

@@ -513,6 +526,8 @@ func (e *executor) handleSubWorkflowFailed(event history.Event, a *history.SubWo
513526
return fmt.Errorf("setting sub workflow failed result: %w", err)
514527
}
515528

529+
e.workflowState.RemoveFuture(event.ScheduleEventID)
530+
516531
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
517532
if c == nil {
518533
// TODO: Adjust
@@ -539,6 +554,8 @@ func (e *executor) handleSubWorkflowCompleted(event history.Event, a *history.Su
539554
return fmt.Errorf("setting sub workflow completed result: %w", err)
540555
}
541556

557+
e.workflowState.RemoveFuture(event.ScheduleEventID)
558+
542559
c := e.workflowState.CommandByScheduleEventID(event.ScheduleEventID)
543560
if c == nil {
544561
// TODO: Adjust
@@ -600,6 +617,8 @@ func (e *executor) handleSideEffectResult(event history.Event, a *history.SideEf
600617
return fmt.Errorf("setting side effect result result: %w", err)
601618
}
602619

620+
e.workflowState.RemoveFuture(event.ScheduleEventID)
621+
603622
return e.workflow.Continue()
604623
}
605624

internal/workflow/executor_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,14 +529,16 @@ func Test_Executor(t *testing.T) {
529529
workflow := func(ctx wf.Context) error {
530530
swctx, cancel := wf.WithCancel(ctx)
531531

532-
wf.CreateSubWorkflowInstance[any](swctx, wf.SubWorkflowOptions{
532+
f := wf.CreateSubWorkflowInstance[any](swctx, wf.SubWorkflowOptions{
533533
InstanceID: "subworkflow",
534534
}, subworkflow)
535535

536536
wf.Sleep(ctx, time.Millisecond)
537537

538538
cancel()
539539

540+
f.Get(ctx)
541+
540542
return nil
541543
}
542544

@@ -559,6 +561,7 @@ func Test_Executor(t *testing.T) {
559561
result, err = e.ExecuteTask(context.Background(), continueTask("instanceID", []history.Event{
560562
result.TimerEvents[0],
561563
}, result.Executed[len(result.Executed)-1].SequenceID))
564+
562565
require.NoError(t, err)
563566
require.Len(t, result.WorkflowEvents, 1, "Cancellation should have been requested")
564567
require.Equal(t, history.EventType_WorkflowExecutionCanceled, result.WorkflowEvents[0].HistoryEvent.Type)
@@ -567,8 +570,19 @@ func Test_Executor(t *testing.T) {
567570
subWorkflowInstance,
568571
result.WorkflowEvents[0].WorkflowInstance)
569572

573+
require.Len(t, e.workflowState.Commands(), 2)
574+
575+
// Complete subworkflow
576+
swr, _ := converter.DefaultConverter.To(nil)
577+
hp.history = append(hp.history, result.Executed...)
578+
result, err = e.ExecuteTask(context.Background(), continueTask("instanceID", []history.Event{
579+
history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowCompleted, &history.SubWorkflowCompletedAttributes{
580+
Result: swr,
581+
}, history.ScheduleEventID(1)),
582+
}, result.Executed[len(result.Executed)-1].SequenceID))
583+
584+
require.NoError(t, err)
570585
require.True(t, e.workflow.Completed())
571-
require.Len(t, e.workflowState.Commands(), 3)
572586
},
573587
},
574588
}

internal/workflowstate/workflowstate.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ func (wf *WfState) TrackFuture(scheduleEventID int64, f DecodingSettable) {
100100
wf.pendingFutures[scheduleEventID] = f
101101
}
102102

103+
func (wf *WfState) HasPendingFutures() bool {
104+
return len(wf.pendingFutures) > 0
105+
}
106+
103107
func (wf *WfState) FutureByScheduleEventID(scheduleEventID int64) (DecodingSettable, bool) {
104108
f, ok := wf.pendingFutures[scheduleEventID]
105109
return f, ok
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package workflowstate
2+
3+
import (
4+
"testing"
5+
6+
"github.com/benbjohnson/clock"
7+
"github.com/cschleiden/go-workflows/internal/converter"
8+
"github.com/cschleiden/go-workflows/internal/core"
9+
"github.com/cschleiden/go-workflows/internal/logger"
10+
"github.com/cschleiden/go-workflows/internal/payload"
11+
"github.com/cschleiden/go-workflows/internal/sync"
12+
"github.com/google/uuid"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func Test_PendingFutures(t *testing.T) {
17+
i := core.NewWorkflowInstance(uuid.NewString(), "")
18+
19+
wfState := NewWorkflowState(i, logger.NewDefaultLogger(), clock.New())
20+
21+
require.False(t, wfState.HasPendingFutures())
22+
23+
f := sync.NewFuture[int]()
24+
wfState.TrackFuture(1, func(v payload.Payload, err error) error {
25+
var r int
26+
require.NoError(t, converter.DefaultConverter.From(v, &r))
27+
f.Set(r, nil)
28+
return nil
29+
})
30+
31+
require.True(t, wfState.HasPendingFutures())
32+
33+
wfState.RemoveFuture(1)
34+
35+
require.False(t, wfState.HasPendingFutures())
36+
}

samples/timer/timer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ func Workflow1(ctx workflow.Context, msg string) (string, error) {
6868

6969
a1 := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12)
7070

71-
tctx, _ := workflow.WithCancel(ctx)
72-
// cancel()
71+
tctx, cancel := workflow.WithCancel(ctx)
7372

7473
workflow.Select(
7574
ctx,
@@ -89,7 +88,7 @@ func Workflow1(ctx workflow.Context, msg string) (string, error) {
8988
logger.Debug("Activity result", r)
9089

9190
// Cancel timer
92-
// cancel()
91+
cancel()
9392
}),
9493
)
9594

workflow/sideeffect.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func SideEffect[TResult any](ctx Context, f func(ctx Context) TResult) Future[TR
3838

3939
cmd.SetResult(payload)
4040
future.Set(r, nil)
41+
wfState.RemoveFuture(scheduleEventID)
4142
}
4243

4344
return future

0 commit comments

Comments
 (0)