Skip to content

Commit 9ec7b2a

Browse files
authored
should use initiated event ID instead of control / signal ID (#415)
1 parent c8237b3 commit 9ec7b2a

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

internal/internal_decision_state_machine.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ type (
107107
decisions map[decisionID]decisionStateMachine
108108

109109
scheduledEventIDToActivityID map[int64]string
110+
111+
scheduledEventIDToSignalID map[int64]string
110112
}
111113
)
112114

@@ -636,6 +638,8 @@ func newDecisionsHelper() *decisionsHelper {
636638
decisions: make(map[decisionID]decisionStateMachine),
637639

638640
scheduledEventIDToActivityID: make(map[int64]string),
641+
642+
scheduledEventIDToSignalID: make(map[int64]string),
639643
}
640644
}
641645

@@ -844,23 +848,32 @@ func (h *decisionsHelper) signalExternalWorkflowExecution(domain, workflowID, ru
844848
return decision
845849
}
846850

847-
func (h *decisionsHelper) handleSignalExternalWorkflowExecutionInitiated(signalID string) {
851+
func (h *decisionsHelper) handleSignalExternalWorkflowExecutionInitiated(initiatedEventID int64, signalID string) {
852+
h.scheduledEventIDToSignalID[initiatedEventID] = signalID
848853
decision := h.getDecision(makeDecisionID(decisionTypeSignal, signalID))
849854
decision.handleInitiatedEvent()
850855
}
851856

852-
func (h *decisionsHelper) handleSignalExternalWorkflowExecutionCompleted(signalID string) decisionStateMachine {
853-
decision := h.getDecision(makeDecisionID(decisionTypeSignal, signalID))
857+
func (h *decisionsHelper) handleSignalExternalWorkflowExecutionCompleted(initiatedEventID int64) decisionStateMachine {
858+
decision := h.getDecision(makeDecisionID(decisionTypeSignal, h.getSignalID(initiatedEventID)))
854859
decision.handleCompletionEvent()
855860
return decision
856861
}
857862

858-
func (h *decisionsHelper) handleSignalExternalWorkflowExecutionFailed(signalID string) decisionStateMachine {
859-
decision := h.getDecision(makeDecisionID(decisionTypeSignal, signalID))
863+
func (h *decisionsHelper) handleSignalExternalWorkflowExecutionFailed(initiatedEventID int64) decisionStateMachine {
864+
decision := h.getDecision(makeDecisionID(decisionTypeSignal, h.getSignalID(initiatedEventID)))
860865
decision.handleCompletionEvent()
861866
return decision
862867
}
863868

869+
func (h *decisionsHelper) getSignalID(initiatedEventID int64) string {
870+
signalID, ok := h.scheduledEventIDToSignalID[initiatedEventID]
871+
if !ok {
872+
panic(fmt.Sprintf("unable to find signal ID: %v", initiatedEventID))
873+
}
874+
return signalID
875+
}
876+
864877
func (h *decisionsHelper) startTimer(attributes *s.StartTimerDecisionAttributes) decisionStateMachine {
865878
decision := newTimerDecisionStateMachine(attributes)
866879
h.addDecision(decision)

internal/internal_event_handlers.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
616616

617617
case m.EventTypeSignalExternalWorkflowExecutionInitiated:
618618
signalID := string(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
619-
weh.decisionsHelper.handleSignalExternalWorkflowExecutionInitiated(signalID)
619+
weh.decisionsHelper.handleSignalExternalWorkflowExecutionInitiated(event.GetEventId(), signalID)
620620

621621
case m.EventTypeSignalExternalWorkflowExecutionFailed:
622622
weh.handleSignalExternalWorkflowExecutionFailed(event)
@@ -980,8 +980,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTermin
980980

981981
func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionCompleted(event *m.HistoryEvent) error {
982982
attributes := event.ExternalWorkflowExecutionSignaledEventAttributes
983-
signalID := string(attributes.Control)
984-
decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionCompleted(signalID)
983+
decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionCompleted(attributes.GetInitiatedEventId())
985984
signal := decision.getData().(*scheduledSignal)
986985
if signal.handled {
987986
return nil
@@ -993,8 +992,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecut
993992

994993
func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionFailed(event *m.HistoryEvent) error {
995994
attributes := event.SignalExternalWorkflowExecutionFailedEventAttributes
996-
signalID := string(attributes.Control)
997-
decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionFailed(signalID)
995+
decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionFailed(attributes.GetInitiatedEventId())
998996
signal := decision.getData().(*scheduledSignal)
999997
if signal.handled {
1000998
return nil

0 commit comments

Comments
 (0)