Skip to content

Commit 9b2bf87

Browse files
authored
Merge pull request #145 from zaataylor/zaataylor/fix-out-of-order-events
Fix out of order events bug + add test
2 parents c740772 + bc4176f commit 9b2bf87

File tree

2 files changed

+113
-22
lines changed

2 files changed

+113
-22
lines changed

internal/workflow/executor.go

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,18 @@ type WorkflowExecutor interface {
4040
}
4141

4242
type executor struct {
43-
registry *Registry
44-
historyProvider WorkflowHistoryProvider
45-
workflow *workflow
46-
workflowTracer *workflowtracer.WorkflowTracer
47-
workflowState *workflowstate.WfState
48-
workflowCtx sync.Context
49-
workflowCtxCancel sync.CancelFunc
50-
clock clock.Clock
51-
logger log.Logger
52-
tracer trace.Tracer
53-
lastSequenceID int64
43+
registry *Registry
44+
historyProvider WorkflowHistoryProvider
45+
workflow *workflow
46+
workflowTracer *workflowtracer.WorkflowTracer
47+
workflowState *workflowstate.WfState
48+
workflowCtx sync.Context
49+
workflowCtxCancel sync.CancelFunc
50+
clock clock.Clock
51+
logger log.Logger
52+
tracer trace.Tracer
53+
lastSequenceID int64
54+
wfStartedEventSeen bool
5455
}
5556

5657
func NewExecutor(logger log.Logger, tracer trace.Tracer, registry *Registry, historyProvider WorkflowHistoryProvider, instance *core.WorkflowInstance, clock clock.Clock) (WorkflowExecutor, error) {
@@ -69,15 +70,16 @@ func NewExecutor(logger log.Logger, tracer trace.Tracer, registry *Registry, his
6970
)
7071

7172
return &executor{
72-
registry: registry,
73-
historyProvider: historyProvider,
74-
workflowTracer: wfTracer,
75-
workflowState: s,
76-
workflowCtx: wfCtx,
77-
workflowCtxCancel: cancel,
78-
clock: clock,
79-
logger: logger,
80-
tracer: tracer,
73+
registry: registry,
74+
historyProvider: historyProvider,
75+
workflowTracer: wfTracer,
76+
workflowState: s,
77+
workflowCtx: wfCtx,
78+
workflowCtxCancel: cancel,
79+
clock: clock,
80+
logger: logger,
81+
tracer: tracer,
82+
wfStartedEventSeen: false,
8183
}, nil
8284
}
8385

@@ -141,6 +143,36 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
141143
return nil, fmt.Errorf("task has older history than current state, cannot execute")
142144
}
143145

146+
// Potentially reorder new events here, protecting against
147+
// cases in which new events are received for a workflow instance
148+
// before the scheduler for that workflow instance has been
149+
// created. To reorder, we find the WorkflowExecutionStarted
150+
// event, then move it to the first position in t.NewEvents.
151+
// t.NewEvents is modified in-place.
152+
// See: https://github.com/cschleiden/go-workflows/issues/143
153+
if !e.wfStartedEventSeen {
154+
for i, ev := range t.NewEvents {
155+
if ev.Type == history.EventType_WorkflowExecutionStarted {
156+
if i > 0 {
157+
// Shift elements before the WorkflowExecutionStarted
158+
// event 1 index right, making space at index 0 to reinsert
159+
// the WorkflowExecutionStarted event. Shifting instead of
160+
// re-slicing and calling append() twice, i.e.:
161+
// t.NewEvents = append(t.NewEvents[0:i], t.NewEvents[i+1:]...)
162+
// t.NewEvents = append([]history.Event{ev}, t.NewEvents...)
163+
// is faster and avoids the possibility of copying
164+
// slices if t.NewEvents is large
165+
for j := i; j >= 1; j-- {
166+
t.NewEvents[j] = t.NewEvents[j-1]
167+
}
168+
t.NewEvents[0] = ev
169+
}
170+
e.wfStartedEventSeen = true
171+
break
172+
}
173+
}
174+
}
175+
144176
// Always add a WorkflowTaskStarted event before executing new tasks
145177
toExecute := []history.Event{e.createNewEvent(history.EventType_WorkflowTaskStarted, &history.WorkflowTaskStartedAttributes{})}
146178
executedEvents := toExecute
@@ -205,9 +237,9 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
205237
}, nil
206238
}
207239

208-
func (e *executor) replayHistory(history []history.Event) error {
240+
func (e *executor) replayHistory(h []history.Event) error {
209241
e.workflowState.SetReplaying(true)
210-
for _, event := range history {
242+
for _, event := range h {
211243
if event.SequenceID < e.lastSequenceID {
212244
e.logger.Panic("history has older events than current state")
213245
}
@@ -216,6 +248,14 @@ func (e *executor) replayHistory(history []history.Event) error {
216248
return err
217249
}
218250

251+
// If we need to replay history before continuing execution of
252+
// a new task, the executor must know if WorkflowExecutionStarted
253+
// was seen during replay so it can determine if events should
254+
// be reordered before it starts executing events for the new task
255+
if event.Type == history.EventType_WorkflowExecutionStarted {
256+
e.wfStartedEventSeen = true
257+
}
258+
219259
e.lastSequenceID = event.SequenceID
220260
}
221261

internal/workflow/executor_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,57 @@ func Test_Executor(t *testing.T) {
585585
require.True(t, e.workflow.Completed())
586586
},
587587
},
588+
{
589+
name: "Reorder events to protect against nil deref error",
590+
f: func(t *testing.T, r *Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
591+
workflow := func(ctx wf.Context) error {
592+
return nil
593+
}
594+
595+
r.RegisterWorkflow(workflow)
596+
597+
// Input for signal
598+
input, err := converter.DefaultConverter.To("<Insert signal arg here>")
599+
if err != nil {
600+
require.NoError(t, err)
601+
}
602+
// Create a SignalReceived event
603+
signalReceivedEvent := history.NewPendingEvent(
604+
time.Now(),
605+
history.EventType_SignalReceived,
606+
&history.SignalReceivedAttributes{
607+
Name: fn.Name(workflow),
608+
Arg: input,
609+
},
610+
)
611+
612+
// Create a TimerFired event
613+
timerFiredEvent := history.NewPendingEvent(
614+
time.Now(),
615+
history.EventType_TimerFired,
616+
&history.TimerFiredAttributes{
617+
At: time.Now().Add(time.Second),
618+
},
619+
)
620+
621+
task := startWorkflowTask("instanceID", workflow)
622+
// Here, we reorder the history so that the WorkflowExecutionStarted
623+
// event does not appear first. This simulates the condition that caused
624+
// the bug previously. To do so, we insert two events in front of the
625+
// WorkflowExecutionStarted event.
626+
require.False(t, e.wfStartedEventSeen)
627+
task.NewEvents = append([]history.Event{signalReceivedEvent, timerFiredEvent}, task.NewEvents...)
628+
result, err := e.ExecuteTask(context.Background(), task)
629+
require.True(t, e.wfStartedEventSeen)
630+
require.NoError(t, err)
631+
require.NoError(t, e.workflow.err)
632+
require.Len(t, result.Executed, 5)
633+
// verify the events were executed in the reordered order
634+
require.Equal(t, history.EventType_WorkflowExecutionStarted, result.Executed[1].Type)
635+
require.Equal(t, history.EventType_SignalReceived, result.Executed[2].Type)
636+
require.Equal(t, history.EventType_TimerFired, result.Executed[3].Type)
637+
},
638+
},
588639
}
589640

590641
for _, tt := range tests {

0 commit comments

Comments
 (0)