Skip to content

Commit 4be6b26

Browse files
authored
fix decision state machine (#532)
1 parent 5b2441d commit 4be6b26

File tree

4 files changed

+73
-53
lines changed

4 files changed

+73
-53
lines changed

internal/error_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,11 @@ func Test_TimeoutError(t *testing.T) {
9898
decisionsHelper: newDecisionsHelper(),
9999
dataConverter: newDefaultDataConverter(),
100100
}
101+
h := newDecisionsHelper()
101102
var actualErr error
102103
activityID := "activityID"
103104
context.decisionsHelper.scheduledEventIDToActivityID[5] = activityID
104-
di := newActivityDecisionStateMachine(
105+
di := h.newActivityDecisionStateMachine(
105106
&shared.ScheduleActivityTaskDecisionAttributes{ActivityId: common.StringPtr(activityID)})
106107
di.state = decisionStateInitiated
107108
di.setData(&scheduledActivity{

internal/internal_decision_state_machine.go

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ type (
6565
state decisionState
6666
history []string
6767
data interface{}
68+
helper *decisionsHelper
6869
}
6970

7071
activityDecisionStateMachine struct {
@@ -207,67 +208,68 @@ func makeDecisionID(decisionType decisionType, id string) decisionID {
207208
return decisionID{decisionType: decisionType, id: id}
208209
}
209210

210-
func newDecisionStateMachineBase(decisionType decisionType, id string) *decisionStateMachineBase {
211+
func (h *decisionsHelper) newDecisionStateMachineBase(decisionType decisionType, id string) *decisionStateMachineBase {
211212
return &decisionStateMachineBase{
212213
id: makeDecisionID(decisionType, id),
213214
state: decisionStateCreated,
214215
history: []string{decisionStateCreated.String()},
216+
helper: h,
215217
}
216218
}
217219

218-
func newActivityDecisionStateMachine(attributes *s.ScheduleActivityTaskDecisionAttributes) *activityDecisionStateMachine {
219-
base := newDecisionStateMachineBase(decisionTypeActivity, attributes.GetActivityId())
220+
func (h *decisionsHelper) newActivityDecisionStateMachine(attributes *s.ScheduleActivityTaskDecisionAttributes) *activityDecisionStateMachine {
221+
base := h.newDecisionStateMachineBase(decisionTypeActivity, attributes.GetActivityId())
220222
return &activityDecisionStateMachine{
221223
decisionStateMachineBase: base,
222224
attributes: attributes,
223225
}
224226
}
225227

226-
func newTimerDecisionStateMachine(attributes *s.StartTimerDecisionAttributes) *timerDecisionStateMachine {
227-
base := newDecisionStateMachineBase(decisionTypeTimer, attributes.GetTimerId())
228+
func (h *decisionsHelper) newTimerDecisionStateMachine(attributes *s.StartTimerDecisionAttributes) *timerDecisionStateMachine {
229+
base := h.newDecisionStateMachineBase(decisionTypeTimer, attributes.GetTimerId())
228230
return &timerDecisionStateMachine{
229231
decisionStateMachineBase: base,
230232
attributes: attributes,
231233
}
232234
}
233235

234-
func newChildWorkflowDecisionStateMachine(attributes *s.StartChildWorkflowExecutionDecisionAttributes) *childWorkflowDecisionStateMachine {
235-
base := newDecisionStateMachineBase(decisionTypeChildWorkflow, attributes.GetWorkflowId())
236+
func (h *decisionsHelper) newChildWorkflowDecisionStateMachine(attributes *s.StartChildWorkflowExecutionDecisionAttributes) *childWorkflowDecisionStateMachine {
237+
base := h.newDecisionStateMachineBase(decisionTypeChildWorkflow, attributes.GetWorkflowId())
236238
return &childWorkflowDecisionStateMachine{
237239
decisionStateMachineBase: base,
238240
attributes: attributes,
239241
}
240242
}
241243

242-
func newNaiveDecisionStateMachine(decisionType decisionType, id string, decision *s.Decision) *naiveDecisionStateMachine {
243-
base := newDecisionStateMachineBase(decisionType, id)
244+
func (h *decisionsHelper) newNaiveDecisionStateMachine(decisionType decisionType, id string, decision *s.Decision) *naiveDecisionStateMachine {
245+
base := h.newDecisionStateMachineBase(decisionType, id)
244246
return &naiveDecisionStateMachine{
245247
decisionStateMachineBase: base,
246248
decision: decision,
247249
}
248250
}
249251

250-
func newMarkerDecisionStateMachine(id string, attributes *s.RecordMarkerDecisionAttributes) *markerDecisionStateMachine {
252+
func (h *decisionsHelper) newMarkerDecisionStateMachine(id string, attributes *s.RecordMarkerDecisionAttributes) *markerDecisionStateMachine {
251253
d := createNewDecision(s.DecisionTypeRecordMarker)
252254
d.RecordMarkerDecisionAttributes = attributes
253255
return &markerDecisionStateMachine{
254-
naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeMarker, id, d),
256+
naiveDecisionStateMachine: h.newNaiveDecisionStateMachine(decisionTypeMarker, id, d),
255257
}
256258
}
257259

258-
func newCancelExternalWorkflowStateMachine(attributes *s.RequestCancelExternalWorkflowExecutionDecisionAttributes, cancellationID string) *cancelExternalWorkflowDecisionStateMachine {
260+
func (h *decisionsHelper) newCancelExternalWorkflowStateMachine(attributes *s.RequestCancelExternalWorkflowExecutionDecisionAttributes, cancellationID string) *cancelExternalWorkflowDecisionStateMachine {
259261
d := createNewDecision(s.DecisionTypeRequestCancelExternalWorkflowExecution)
260262
d.RequestCancelExternalWorkflowExecutionDecisionAttributes = attributes
261263
return &cancelExternalWorkflowDecisionStateMachine{
262-
naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeCancellation, cancellationID, d),
264+
naiveDecisionStateMachine: h.newNaiveDecisionStateMachine(decisionTypeCancellation, cancellationID, d),
263265
}
264266
}
265267

266-
func newSignalExternalWorkflowStateMachine(attributes *s.SignalExternalWorkflowExecutionDecisionAttributes, signalID string) *signalExternalWorkflowDecisionStateMachine {
268+
func (h *decisionsHelper) newSignalExternalWorkflowStateMachine(attributes *s.SignalExternalWorkflowExecutionDecisionAttributes, signalID string) *signalExternalWorkflowDecisionStateMachine {
267269
d := createNewDecision(s.DecisionTypeSignalExternalWorkflowExecution)
268270
d.SignalExternalWorkflowExecutionDecisionAttributes = attributes
269271
return &signalExternalWorkflowDecisionStateMachine{
270-
naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeSignal, signalID, d),
272+
naiveDecisionStateMachine: h.newNaiveDecisionStateMachine(decisionTypeSignal, signalID, d),
271273
}
272274
}
273275

@@ -295,6 +297,13 @@ func (d *decisionStateMachineBase) moveState(newState decisionState, event strin
295297
d.history = append(d.history, event)
296298
d.state = newState
297299
d.history = append(d.history, newState.String())
300+
301+
if newState == decisionStateCompleted {
302+
if elem, ok := d.helper.decisions[d.getID()]; ok {
303+
d.helper.orderedDecisions.Remove(elem)
304+
delete(d.helper.decisions, d.getID())
305+
}
306+
}
298307
}
299308

300309
func (d *decisionStateMachineBase) failStateTransition(event string) {
@@ -660,12 +669,16 @@ func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine {
660669
}
661670

662671
func (h *decisionsHelper) addDecision(decision decisionStateMachine) {
672+
if _, ok := h.decisions[decision.getID()]; ok {
673+
panicMsg := fmt.Sprintf("adding duplicate decision %v", decision)
674+
panic(panicMsg)
675+
}
663676
element := h.orderedDecisions.PushBack(decision)
664677
h.decisions[decision.getID()] = element
665678
}
666679

667680
func (h *decisionsHelper) scheduleActivityTask(attributes *s.ScheduleActivityTaskDecisionAttributes) decisionStateMachine {
668-
decision := newActivityDecisionStateMachine(attributes)
681+
decision := h.newActivityDecisionStateMachine(attributes)
669682
h.addDecision(decision)
670683
return decision
671684
}
@@ -739,7 +752,7 @@ func (h *decisionsHelper) recordVersionMarker(changeID string, version Version,
739752
Details: details, // Keep
740753
}
741754

742-
decision := newMarkerDecisionStateMachine(markerID, recordMarker)
755+
decision := h.newMarkerDecisionStateMachine(markerID, recordMarker)
743756
h.addDecision(decision)
744757
return decision
745758
}
@@ -750,7 +763,7 @@ func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int32, data []byte
750763
MarkerName: common.StringPtr(sideEffectMarkerName),
751764
Details: data,
752765
}
753-
decision := newMarkerDecisionStateMachine(markerID, attributes)
766+
decision := h.newMarkerDecisionStateMachine(markerID, attributes)
754767
h.addDecision(decision)
755768
return decision
756769
}
@@ -761,7 +774,7 @@ func (h *decisionsHelper) recordLocalActivityMarker(activityID string, result []
761774
MarkerName: common.StringPtr(localActivityMarkerName),
762775
Details: result,
763776
}
764-
decision := newMarkerDecisionStateMachine(markerID, attributes)
777+
decision := h.newMarkerDecisionStateMachine(markerID, attributes)
765778
h.addDecision(decision)
766779
return decision
767780
}
@@ -772,13 +785,13 @@ func (h *decisionsHelper) recordMutableSideEffectMarker(mutableSideEffectID stri
772785
MarkerName: common.StringPtr(mutableSideEffectMarkerName),
773786
Details: data,
774787
}
775-
decision := newMarkerDecisionStateMachine(markerID, attributes)
788+
decision := h.newMarkerDecisionStateMachine(markerID, attributes)
776789
h.addDecision(decision)
777790
return decision
778791
}
779792

780793
func (h *decisionsHelper) startChildWorkflowExecution(attributes *s.StartChildWorkflowExecutionDecisionAttributes) decisionStateMachine {
781-
decision := newChildWorkflowDecisionStateMachine(attributes)
794+
decision := h.newChildWorkflowDecisionStateMachine(attributes)
782795
h.addDecision(decision)
783796
return decision
784797
}
@@ -833,7 +846,7 @@ func (h *decisionsHelper) requestCancelExternalWorkflowExecution(domain, workflo
833846
Control: []byte(cancellationID),
834847
ChildWorkflowOnly: common.BoolPtr(false),
835848
}
836-
decision := newCancelExternalWorkflowStateMachine(attributes, cancellationID)
849+
decision := h.newCancelExternalWorkflowStateMachine(attributes, cancellationID)
837850
h.addDecision(decision)
838851

839852
return decision
@@ -893,7 +906,7 @@ func (h *decisionsHelper) signalExternalWorkflowExecution(domain, workflowID, ru
893906
Control: []byte(signalID),
894907
ChildWorkflowOnly: common.BoolPtr(childWorkflowOnly),
895908
}
896-
decision := newSignalExternalWorkflowStateMachine(attributes, signalID)
909+
decision := h.newSignalExternalWorkflowStateMachine(attributes, signalID)
897910
h.addDecision(decision)
898911
return decision
899912
}
@@ -925,7 +938,7 @@ func (h *decisionsHelper) getSignalID(initiatedEventID int64) string {
925938
}
926939

927940
func (h *decisionsHelper) startTimer(attributes *s.StartTimerDecisionAttributes) decisionStateMachine {
928-
decision := newTimerDecisionStateMachine(attributes)
941+
decision := h.newTimerDecisionStateMachine(attributes)
929942
h.addDecision(decision)
930943
return decision
931944
}

internal/internal_event_handlers.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -593,9 +593,9 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
593593
event *m.HistoryEvent,
594594
isReplay bool,
595595
isLast bool,
596-
) (result []*m.Decision, err error) {
596+
) (err error) {
597597
if event == nil {
598-
return nil, errors.New("nil event provided")
598+
return errors.New("nil event provided")
599599
}
600600
defer func() {
601601
if p := recover(); p != nil {
@@ -746,7 +746,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
746746
}
747747

748748
if err != nil {
749-
return nil, err
749+
return err
750750
}
751751

752752
// When replaying histories to get stack trace or current state the last event might be not
@@ -756,7 +756,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
756756
weh.workflowDefinition.OnDecisionTaskStarted()
757757
}
758758

759-
return weh.decisionsHelper.getDecisions(true), nil
759+
return nil
760760
}
761761

762762
func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(queryType string, queryArgs []byte) ([]byte, error) {
@@ -928,7 +928,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(markerDa
928928
return nil
929929
}
930930

931-
func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *localActivityResult) ([]*m.Decision, error) {
931+
func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *localActivityResult) error {
932932
// convert local activity result and error to marker data
933933
lamd := localActivityMarkerData{
934934
ActivityID: lar.task.activityID,
@@ -945,7 +945,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *lo
945945
// encode marker data
946946
markerData, err := weh.encodeArg(lamd)
947947
if err != nil {
948-
return nil, err
948+
return err
949949
}
950950

951951
// create marker event for local activity result

internal/internal_task_handlers.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type (
5454
workflowExecutionEventHandler interface {
5555
// Process a single event and return the assosciated decisions.
5656
// Return List of decisions made, any error.
57-
ProcessEvent(event *s.HistoryEvent, isReplay bool, isLast bool) ([]*s.Decision, error)
57+
ProcessEvent(event *s.HistoryEvent, isReplay bool, isLast bool) error
5858
// ProcessQuery process a query request.
5959
ProcessQuery(queryType string, queryArgs []byte) ([]byte, error)
6060
StackTrace() string
@@ -293,8 +293,7 @@ OrderEvents:
293293
}
294294

295295
func isPreloadMarkerEvent(event *s.HistoryEvent) bool {
296-
return event.GetEventType() == s.EventTypeMarkerRecorded &&
297-
event.MarkerRecordedEventAttributes.GetMarkerName() != localActivityMarkerName
296+
return event.GetEventType() == s.EventTypeMarkerRecorded
298297
}
299298

300299
// newWorkflowTaskHandler returns an implementation of workflow task handler.
@@ -653,9 +652,12 @@ ProcessEvents:
653652
}
654653
// Markers are from the events that are produced from the current decision
655654
for _, m := range markers {
656-
_, err := eventHandler.ProcessEvent(m, true, false)
657-
if err != nil {
658-
return nil, err
655+
if m.MarkerRecordedEventAttributes.GetMarkerName() != localActivityMarkerName {
656+
// local activity marker needs to be applied after decision task started event
657+
err := eventHandler.ProcessEvent(m, true, false)
658+
if err != nil {
659+
return nil, err
660+
}
659661
}
660662
}
661663

@@ -672,23 +674,26 @@ ProcessEvents:
672674
return nil, err
673675
}
674676

675-
eventDecisions, err := eventHandler.ProcessEvent(event, isInReplay, isLast)
677+
err = eventHandler.ProcessEvent(event, isInReplay, isLast)
676678
if err != nil {
677679
return nil, err
678680
}
681+
}
679682

680-
if eventDecisions != nil {
681-
if !isInReplay {
682-
w.newDecisions = append(w.newDecisions, eventDecisions...)
683-
} else if !skipReplayCheck {
684-
replayDecisions = append(replayDecisions, eventDecisions...)
683+
// now apply local activity markers
684+
for _, m := range markers {
685+
if m.MarkerRecordedEventAttributes.GetMarkerName() == localActivityMarkerName {
686+
err := eventHandler.ProcessEvent(m, true, false)
687+
if err != nil {
688+
return nil, err
685689
}
686690
}
687-
688-
if w.isWorkflowCompleted {
689-
// If workflow is already completed then we can break from processing
690-
// further decisions.
691-
break ProcessEvents
691+
}
692+
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
693+
if isReplay {
694+
eventDecisions := eventHandler.decisionsHelper.getDecisions(true)
695+
if len(eventDecisions) > 0 && !skipReplayCheck {
696+
replayDecisions = append(replayDecisions, eventDecisions...)
692697
}
693698
}
694699
}
@@ -728,15 +733,11 @@ ProcessEvents:
728733
}
729734

730735
func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(lar *localActivityResult) (interface{}, error) {
731-
eventDecisions, err := w.eventHandler.ProcessLocalActivityResult(lar)
736+
err := w.eventHandler.ProcessLocalActivityResult(lar)
732737
if err != nil {
733738
return nil, err
734739
}
735740

736-
if eventDecisions != nil {
737-
w.newDecisions = append(w.newDecisions, eventDecisions...)
738-
}
739-
740741
return w.CompleteDecisionTask(true), nil
741742
}
742743

@@ -760,6 +761,11 @@ func (w *workflowExecutionContextImpl) CompleteDecisionTask(waitLocalActivities
760761
}
761762
}
762763

764+
eventDecisions := w.eventHandler.decisionsHelper.getDecisions(true)
765+
if len(eventDecisions) > 0 {
766+
w.newDecisions = append(w.newDecisions, eventDecisions...)
767+
}
768+
763769
completeRequest := w.wth.completeWorkflow(w.eventHandler, w.currentDecisionTask, w, w.newDecisions, !waitLocalActivities)
764770
w.clearCurrentTask()
765771

0 commit comments

Comments
 (0)