Skip to content

Commit bc4176f

Browse files
committed
Fix out of order events bug + add test
This commit: - Modifies executor.go to be aware of whether or not it has seen a WorkflowExecutionStarted event while executing tasks for the workflow instance it is tied to. If it has seen this event previously, execution continues as normal. If it hasn't seen the event previously, it checks through the events in the current task. If one of those events is WorkflowExecutionStarted, it shifts that event to the front of the task.NewEvents slice, ensuring that WorkflowExecutionStarted is processed immediately after the WorkflowTaskStarted event. This in turn ensures that the Scheduler the workflow structcontains is never nil, preventing the nil pointer dereference error described in issue #143. - Adds a test inside of executor_test.go to verify the reordering behavior.
1 parent c740772 commit bc4176f

File tree

2 files changed

+113
-22
lines changed

2 files changed

+113
-22
lines changed

internal/workflow/executor.go

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,18 @@ type WorkflowExecutor interface {
4040
}
4141

4242
type executor struct {
43-
registry *Registry
44-
historyProvider WorkflowHistoryProvider
45-
workflow *workflow
46-
workflowTracer *workflowtracer.WorkflowTracer
47-
workflowState *workflowstate.WfState
48-
workflowCtx sync.Context
49-
workflowCtxCancel sync.CancelFunc
50-
clock clock.Clock
51-
logger log.Logger
52-
tracer trace.Tracer
53-
lastSequenceID int64
43+
registry *Registry
44+
historyProvider WorkflowHistoryProvider
45+
workflow *workflow
46+
workflowTracer *workflowtracer.WorkflowTracer
47+
workflowState *workflowstate.WfState
48+
workflowCtx sync.Context
49+
workflowCtxCancel sync.CancelFunc
50+
clock clock.Clock
51+
logger log.Logger
52+
tracer trace.Tracer
53+
lastSequenceID int64
54+
wfStartedEventSeen bool
5455
}
5556

5657
func NewExecutor(logger log.Logger, tracer trace.Tracer, registry *Registry, historyProvider WorkflowHistoryProvider, instance *core.WorkflowInstance, clock clock.Clock) (WorkflowExecutor, error) {
@@ -69,15 +70,16 @@ func NewExecutor(logger log.Logger, tracer trace.Tracer, registry *Registry, his
6970
)
7071

7172
return &executor{
72-
registry: registry,
73-
historyProvider: historyProvider,
74-
workflowTracer: wfTracer,
75-
workflowState: s,
76-
workflowCtx: wfCtx,
77-
workflowCtxCancel: cancel,
78-
clock: clock,
79-
logger: logger,
80-
tracer: tracer,
73+
registry: registry,
74+
historyProvider: historyProvider,
75+
workflowTracer: wfTracer,
76+
workflowState: s,
77+
workflowCtx: wfCtx,
78+
workflowCtxCancel: cancel,
79+
clock: clock,
80+
logger: logger,
81+
tracer: tracer,
82+
wfStartedEventSeen: false,
8183
}, nil
8284
}
8385

@@ -141,6 +143,36 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
141143
return nil, fmt.Errorf("task has older history than current state, cannot execute")
142144
}
143145

146+
// Potentially reorder new events here, protecting against
147+
// cases in which new events are received for a workflow instance
148+
// before the scheduler for that workflow instance has been
149+
// created. To reorder, we find the WorkflowExecutionStarted
150+
// event, then move it to the first position in t.NewEvents.
151+
// t.NewEvents is modified in-place.
152+
// See: https://github.com/cschleiden/go-workflows/issues/143
153+
if !e.wfStartedEventSeen {
154+
for i, ev := range t.NewEvents {
155+
if ev.Type == history.EventType_WorkflowExecutionStarted {
156+
if i > 0 {
157+
// Shift elements before the WorkflowExecutionStarted
158+
// event 1 index right, making space at index 0 to reinsert
159+
// the WorkflowExecutionStarted event. Shifting instead of
160+
// re-slicing and calling append() twice, i.e.:
161+
// t.NewEvents = append(t.NewEvents[0:i], t.NewEvents[i+1:]...)
162+
// t.NewEvents = append([]history.Event{ev}, t.NewEvents...)
163+
// is faster and avoids the possibility of copying
164+
// slices if t.NewEvents is large
165+
for j := i; j >= 1; j-- {
166+
t.NewEvents[j] = t.NewEvents[j-1]
167+
}
168+
t.NewEvents[0] = ev
169+
}
170+
e.wfStartedEventSeen = true
171+
break
172+
}
173+
}
174+
}
175+
144176
// Always add a WorkflowTaskStarted event before executing new tasks
145177
toExecute := []history.Event{e.createNewEvent(history.EventType_WorkflowTaskStarted, &history.WorkflowTaskStartedAttributes{})}
146178
executedEvents := toExecute
@@ -205,9 +237,9 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
205237
}, nil
206238
}
207239

208-
func (e *executor) replayHistory(history []history.Event) error {
240+
func (e *executor) replayHistory(h []history.Event) error {
209241
e.workflowState.SetReplaying(true)
210-
for _, event := range history {
242+
for _, event := range h {
211243
if event.SequenceID < e.lastSequenceID {
212244
e.logger.Panic("history has older events than current state")
213245
}
@@ -216,6 +248,14 @@ func (e *executor) replayHistory(history []history.Event) error {
216248
return err
217249
}
218250

251+
// If we need to replay history before continuing execution of
252+
// a new task, the executor must know if WorkflowExecutionStarted
253+
// was seen during replay so it can determine if events should
254+
// be reordered before it starts executing events for the new task
255+
if event.Type == history.EventType_WorkflowExecutionStarted {
256+
e.wfStartedEventSeen = true
257+
}
258+
219259
e.lastSequenceID = event.SequenceID
220260
}
221261

internal/workflow/executor_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,57 @@ func Test_Executor(t *testing.T) {
585585
require.True(t, e.workflow.Completed())
586586
},
587587
},
588+
{
589+
name: "Reorder events to protect against nil deref error",
590+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
591+
workflow := func(ctx wf.Context) error {
592+
return nil
593+
}
594+
595+
r.RegisterWorkflow(workflow)
596+
597+
// Input for signal
598+
input, err := converter.DefaultConverter.To("<Insert signal arg here>")
599+
if err != nil {
600+
require.NoError(t, err)
601+
}
602+
// Create a SignalReceived event
603+
signalReceivedEvent := history.NewPendingEvent(
604+
time.Now(),
605+
history.EventType_SignalReceived,
606+
&history.SignalReceivedAttributes{
607+
Name: fn.Name(workflow),
608+
Arg: input,
609+
},
610+
)
611+
612+
// Create a TimerFired event
613+
timerFiredEvent := history.NewPendingEvent(
614+
time.Now(),
615+
history.EventType_TimerFired,
616+
&history.TimerFiredAttributes{
617+
At: time.Now().Add(time.Second),
618+
},
619+
)
620+
621+
task := startWorkflowTask("instanceID", workflow)
622+
// Here, we reorder the history so that the WorkflowExecutionStarted
623+
// event does not appear first. This simulates the condition that caused
624+
// the bug previously. To do so, we insert two events in front of the
625+
// WorkflowExecutionStarted event.
626+
require.False(t, e.wfStartedEventSeen)
627+
task.NewEvents = append([]history.Event{signalReceivedEvent, timerFiredEvent}, task.NewEvents...)
628+
result, err := e.ExecuteTask(context.Background(), task)
629+
require.True(t, e.wfStartedEventSeen)
630+
require.NoError(t, err)
631+
require.NoError(t, e.workflow.err)
632+
require.Len(t, result.Executed, 5)
633+
// verify the events were executed in the reordered order
634+
require.Equal(t, history.EventType_WorkflowExecutionStarted, result.Executed[1].Type)
635+
require.Equal(t, history.EventType_SignalReceived, result.Executed[2].Type)
636+
require.Equal(t, history.EventType_TimerFired, result.Executed[3].Type)
637+
},
638+
},
588639
}
589640

590641
for _, tt := range tests {

0 commit comments

Comments
 (0)