Skip to content

Commit ebfce49

Browse files
authored
Merge pull request #303 from cschleiden/pending-future-output
Track pending futures with a name
2 parents 9566d44 + 662072c commit ebfce49

File tree

9 files changed

+85
-38
lines changed

9 files changed

+85
-38
lines changed

internal/workflow/executor.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"log/slog"
88
"reflect"
9+
"slices"
910

1011
"github.com/benbjohnson/clock"
1112
"github.com/cschleiden/go-workflows/backend"
@@ -264,8 +265,15 @@ func (e *executor) executeNewEvents(newEvents []*history.Event) ([]*history.Even
264265
if e.workflow.Completed() {
265266
// TODO: Is this too early? We haven't committed some of the commands
266267
if e.workflowState.HasPendingFutures() {
267-
e.logger.Error("workflow completed, but there are still pending futures")
268-
panic("workflow completed, but there are still pending futures")
268+
var pending []string
269+
pf := e.workflowState.PendingFutureNames()
270+
for id, name := range pf {
271+
pending = append(pending, fmt.Sprintf("%d-%s", id, name))
272+
}
273+
slices.Sort(pending)
274+
275+
e.logger.Error("workflow completed, but there are still pending futures", "pending", pending)
276+
panic(fmt.Sprintf("workflow completed, but there are still pending futures: %s", pending))
269277
}
270278

271279
if canErr, ok := e.workflow.Error().(*continueasnew.Error); ok {
@@ -411,7 +419,7 @@ func (e *executor) handleActivityCompleted(event *history.Event, a *history.Acti
411419
return fmt.Errorf("could not find pending future for activity completion")
412420
}
413421

414-
err := f(a.Result, nil)
422+
err := f.Set(a.Result, nil)
415423
if err != nil {
416424
return fmt.Errorf("setting activity completed result: %w", err)
417425
}
@@ -440,7 +448,7 @@ func (e *executor) handleActivityFailed(event *history.Event, a *history.Activit
440448
}
441449

442450
actErr := workflowerrors.ToError(a.Error)
443-
if err := f(nil, actErr); err != nil {
451+
if err := f.Set(nil, actErr); err != nil {
444452
return fmt.Errorf("setting activity failed result: %w", err)
445453
}
446454

@@ -483,7 +491,7 @@ func (e *executor) handleTimerFired(event *history.Event, a *history.TimerFiredA
483491
return nil
484492
}
485493

486-
if err := f(nil, nil); err != nil {
494+
if err := f.Set(nil, nil); err != nil {
487495
return fmt.Errorf("setting timer fired result: %w", err)
488496
}
489497

@@ -523,7 +531,7 @@ func (e *executor) handleTimerCanceled(event *history.Event, a *history.TimerCan
523531
return nil
524532
}
525533

526-
if err := f(nil, sync.Canceled); err != nil {
534+
if err := f.Set(nil, sync.Canceled); err != nil {
527535
return fmt.Errorf("setting timer canceled result: %w", err)
528536
}
529537

@@ -580,7 +588,7 @@ func (e *executor) handleSubWorkflowFailed(event *history.Event, a *history.SubW
580588

581589
wfErr := workflowerrors.ToError(a.Error)
582590

583-
if err := f(nil, wfErr); err != nil {
591+
if err := f.Set(nil, wfErr); err != nil {
584592
return fmt.Errorf("setting sub workflow failed result: %w", err)
585593
}
586594

@@ -606,7 +614,7 @@ func (e *executor) handleSubWorkflowCompleted(event *history.Event, a *history.S
606614
return errors.New("no pending future found for sub workflow completed event")
607615
}
608616

609-
if err := f(a.Result, nil); err != nil {
617+
if err := f.Set(a.Result, nil); err != nil {
610618
return fmt.Errorf("setting sub workflow completed result: %w", err)
611619
}
612620

@@ -651,7 +659,7 @@ func (e *executor) handleSideEffectResult(event *history.Event, a *history.SideE
651659
return errors.New("no pending future found for side effect result event")
652660
}
653661

654-
if err := f(a.Result, nil); err != nil {
662+
if err := f.Set(a.Result, nil); err != nil {
655663
return fmt.Errorf("setting side effect result result: %w", err)
656664
}
657665

internal/workflow/executor_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,35 @@ func Test_Executor(t *testing.T) {
580580
require.True(t, e.workflow.Completed())
581581
},
582582
},
583+
{
584+
name: "Pending futures result in panic",
585+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
586+
subworkflow := func(ctx wf.Context) error {
587+
return nil
588+
}
589+
590+
workflow := func(ctx wf.Context) error {
591+
// Start but not wait for sub-workflow
592+
wf.CreateSubWorkflowInstance[any](ctx, wf.SubWorkflowOptions{
593+
InstanceID: "subworkflow",
594+
}, subworkflow)
595+
596+
// Schedule but not wait for timer
597+
wf.ScheduleTimer(ctx, time.Second*2)
598+
599+
return nil
600+
}
601+
602+
r.RegisterWorkflow(workflow)
603+
r.RegisterWorkflow(subworkflow)
604+
605+
task := startWorkflowTask("instanceID", workflow)
606+
607+
require.PanicsWithValue(t, "workflow completed, but there are still pending futures: [1-subworkflow:1 2-timer:2s]", func() {
608+
e.ExecuteTask(context.Background(), task)
609+
})
610+
},
611+
},
583612
}
584613

585614
for _, tt := range tests {

internal/workflowstate/workflowstate.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,32 @@ type key int
1818

1919
var workflowCtxKey key
2020

21-
type DecodingSettable func(v payload.Payload, err error) error
21+
type DecodingSettable struct {
22+
Set func(v payload.Payload, err error) error
23+
Name string
24+
}
2225

2326
// Use this to track futures for the workflow state. It's required to map the generic Future interface
2427
// to a type without type parameters.
25-
func AsDecodingSettable[T any](cv converter.Converter, f sync.SettableFuture[T]) DecodingSettable {
26-
return func(v payload.Payload, err error) error {
27-
if f.HasValue() {
28-
return fmt.Errorf("future already has value")
29-
}
28+
func AsDecodingSettable[T any](cv converter.Converter, name string, f sync.SettableFuture[T]) *DecodingSettable {
29+
return &DecodingSettable{
30+
Name: name,
31+
Set: func(v payload.Payload, err error) error {
32+
if f.HasValue() {
33+
return fmt.Errorf("future already has value")
34+
}
3035

31-
var t T
32-
if v != nil {
33-
if err := cv.From(v, &t); err != nil {
34-
return fmt.Errorf("failed to decode future: %v", err)
36+
var t T
37+
if v != nil {
38+
if err := cv.From(v, &t); err != nil {
39+
return fmt.Errorf("failed to decode future: %v", err)
40+
}
3541
}
36-
}
3742

38-
f.Set(t, err)
43+
f.Set(t, err)
3944

40-
return nil
45+
return nil
46+
},
4147
}
4248
}
4349

@@ -50,7 +56,7 @@ type WfState struct {
5056
instance *core.WorkflowInstance
5157
scheduleEventID int64
5258
commands []command.Command
53-
pendingFutures map[int64]DecodingSettable
59+
pendingFutures map[int64]*DecodingSettable
5460
replaying bool
5561

5662
pendingSignals map[string][]payload.Payload
@@ -67,7 +73,7 @@ func NewWorkflowState(instance *core.WorkflowInstance, logger *slog.Logger, cloc
6773
instance: instance,
6874
commands: []command.Command{},
6975
scheduleEventID: 1,
70-
pendingFutures: map[int64]DecodingSettable{},
76+
pendingFutures: map[int64]*DecodingSettable{},
7177

7278
pendingSignals: map[string][]payload.Payload{},
7379
signalChannels: make(map[string]*signalChannel),
@@ -96,15 +102,24 @@ func (wf *WfState) GetNextScheduleEventID() int64 {
96102
return scheduleEventID
97103
}
98104

99-
func (wf *WfState) TrackFuture(scheduleEventID int64, f DecodingSettable) {
105+
func (wf *WfState) TrackFuture(scheduleEventID int64, f *DecodingSettable) {
100106
wf.pendingFutures[scheduleEventID] = f
101107
}
102108

109+
func (wf *WfState) PendingFutureNames() map[int64]string {
110+
result := map[int64]string{}
111+
for id, f := range wf.pendingFutures {
112+
result[id] = f.Name
113+
}
114+
115+
return result
116+
}
117+
103118
func (wf *WfState) HasPendingFutures() bool {
104119
return len(wf.pendingFutures) > 0
105120
}
106121

107-
func (wf *WfState) FutureByScheduleEventID(scheduleEventID int64) (DecodingSettable, bool) {
122+
func (wf *WfState) FutureByScheduleEventID(scheduleEventID int64) (*DecodingSettable, bool) {
108123
f, ok := wf.pendingFutures[scheduleEventID]
109124
return f, ok
110125
}

internal/workflowstate/workflowstate_test.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
"github.com/benbjohnson/clock"
88
"github.com/cschleiden/go-workflows/backend/converter"
9-
"github.com/cschleiden/go-workflows/backend/payload"
109
"github.com/cschleiden/go-workflows/core"
1110
"github.com/cschleiden/go-workflows/internal/sync"
1211
"github.com/google/uuid"
@@ -21,12 +20,7 @@ func Test_PendingFutures(t *testing.T) {
2120
require.False(t, wfState.HasPendingFutures())
2221

2322
f := sync.NewFuture[int]()
24-
wfState.TrackFuture(1, func(v payload.Payload, err error) error {
25-
var r int
26-
require.NoError(t, converter.DefaultConverter.From(v, &r))
27-
f.Set(r, err)
28-
return nil
29-
})
23+
wfState.TrackFuture(1, AsDecodingSettable[int](converter.DefaultConverter, "f", f))
3024

3125
require.True(t, wfState.HasPendingFutures())
3226

workflow/activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func executeActivity[TResult any](ctx Context, options ActivityOptions, attempt
7373

7474
cmd := command.NewScheduleActivityCommand(scheduleEventID, name, inputs, metadata)
7575
wfState.AddCommand(cmd)
76-
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, f))
76+
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, fmt.Sprintf("activity: %s", name), f))
7777

7878
ctx, span := workflowtracer.Tracer(ctx).Start(ctx,
7979
fmt.Sprintf("ExecuteActivity: %s", name),

workflow/sideeffect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func SideEffect[TResult any](ctx Context, f func(ctx Context) TResult) Future[TR
2828
scheduleEventID := wfState.GetNextScheduleEventID()
2929

3030
cv := contextvalue.Converter(ctx)
31-
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, future))
31+
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, "sideeffect", future))
3232

3333
cmd := command.NewSideEffectCommand(scheduleEventID)
3434
wfState.AddCommand(cmd)

workflow/subworkflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func createSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOpti
9898
cmd := command.NewScheduleSubWorkflowCommand(scheduleEventID, wfState.Instance(), options.InstanceID, workflowName, inputs, metadata)
9999

100100
wfState.AddCommand(cmd)
101-
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, f))
101+
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, fmt.Sprintf("subworkflow:%s", workflowName), f))
102102

103103
// Check if the channel is cancelable
104104
if c, cancelable := ctx.Done().(sync.CancelChannel); cancelable {

workflow/timer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package workflow
22

33
import (
4+
"fmt"
45
"time"
56

67
"github.com/cschleiden/go-workflows/internal/command"
@@ -30,7 +31,7 @@ func ScheduleTimer(ctx Context, delay time.Duration) Future[any] {
3031

3132
timerCmd := command.NewScheduleTimerCommand(scheduleEventID, at)
3233
wfState.AddCommand(timerCmd)
33-
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(contextvalue.Converter(ctx), f))
34+
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(contextvalue.Converter(ctx), fmt.Sprintf("timer:%v", delay), f))
3435

3536
cancelReceiver := &sync.Receiver[struct{}]{
3637
Receive: func(v struct{}, ok bool) {

workflow/timer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func Test_Timer_Cancellation(t *testing.T) {
4242
cmd.Done()
4343
fs, ok := state.FutureByScheduleEventID(1)
4444
require.True(t, ok)
45-
fs(nil, nil)
45+
fs.Set(nil, nil)
4646

4747
c.Execute()
4848
require.False(t, c.Finished())

0 commit comments

Comments
 (0)