Skip to content

Commit 70d1c0c

Browse files
authored
Fix client side cancellation && signal (#401)
* bugfix: child / external workflow signal / cancellation combination were not handled correctly if child does continue as new. * bugfix: child workflow cancellation does not validate input run ID, which can corrupt internal state machine. * bugfix: workflow cancellation state machine should not use target workflow ID as identifier, right now use seuqence ID as unique identifier. * bugfix: *childWorkflowFutureImpl should be use as function receiver, instead of childWorkflowFutureImpl * add testsuite case * back to use child workflow state machine for child workflow cancellation
1 parent 9ec7b2a commit 70d1c0c

13 files changed

+1324
-200
lines changed

.gen/go/shared/idl.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.gen/go/shared/types.go

Lines changed: 812 additions & 53 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

idl/github.com/uber/cadence/shared.thrift

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ enum DecisionTaskFailedCause {
160160
RESET_STICKY_TASKLIST,
161161
WORKFLOW_WORKER_UNHANDLED_FAILURE,
162162
BAD_SIGNAL_WORKFLOW_EXECUTION_ATTRIBUTES,
163+
BAD_START_CHILD_EXECUTION_ATTRIBUTES,
163164
}
164165

165166
enum CancelExternalWorkflowExecutionFailedCause {
@@ -296,6 +297,7 @@ struct RequestCancelExternalWorkflowExecutionDecisionAttributes {
296297
20: optional string workflowId
297298
30: optional string runId
298299
40: optional binary control
300+
50: optional bool childWorkflowOnly
299301
}
300302

301303
struct SignalExternalWorkflowExecutionDecisionAttributes {
@@ -304,6 +306,7 @@ struct SignalExternalWorkflowExecutionDecisionAttributes {
304306
30: optional string signalName
305307
40: optional binary input
306308
50: optional binary control
309+
60: optional bool childWorkflowOnly
307310
}
308311

309312
struct RecordMarkerDecisionAttributes {
@@ -535,6 +538,7 @@ struct RequestCancelExternalWorkflowExecutionInitiatedEventAttributes {
535538
20: optional string domain
536539
30: optional WorkflowExecution workflowExecution
537540
40: optional binary control
541+
50: optional bool childWorkflowOnly
538542
}
539543

540544
struct RequestCancelExternalWorkflowExecutionFailedEventAttributes {
@@ -559,6 +563,7 @@ struct SignalExternalWorkflowExecutionInitiatedEventAttributes {
559563
40: optional string signalName
560564
50: optional binary input
561565
60: optional binary control
566+
70: optional bool childWorkflowOnly
562567
}
563568

564569
struct SignalExternalWorkflowExecutionFailedEventAttributes {
@@ -734,12 +739,23 @@ struct UpdateDomainInfo {
734739
20: optional string ownerEmail
735740
}
736741

742+
struct ClusterReplicationConfiguration {
743+
10: optional string clusterName
744+
}
745+
746+
struct DomainReplicationConfiguration {
747+
10: optional string activeClusterName
748+
20: optional list<ClusterReplicationConfiguration> clusters
749+
}
750+
737751
struct RegisterDomainRequest {
738752
10: optional string name
739753
20: optional string description
740754
30: optional string ownerEmail
741755
40: optional i32 workflowExecutionRetentionPeriodInDays
742756
50: optional bool emitMetric
757+
60: optional list<ClusterReplicationConfiguration> clusters
758+
70: optional string activeClusterName
743759
}
744760

745761
struct DescribeDomainRequest {
@@ -749,17 +765,24 @@ struct DescribeDomainRequest {
749765
struct DescribeDomainResponse {
750766
10: optional DomainInfo domainInfo
751767
20: optional DomainConfiguration configuration
768+
30: optional DomainReplicationConfiguration replicationConfiguration
769+
40: optional i64 (js.type = "Long") failoverVersion
770+
50: optional bool isGlobalDomain
752771
}
753772

754773
struct UpdateDomainRequest {
755774
10: optional string name
756775
20: optional UpdateDomainInfo updatedInfo
757776
30: optional DomainConfiguration configuration
777+
40: optional DomainReplicationConfiguration replicationConfiguration
758778
}
759779

760780
struct UpdateDomainResponse {
761781
10: optional DomainInfo domainInfo
762782
20: optional DomainConfiguration configuration
783+
30: optional DomainReplicationConfiguration replicationConfiguration
784+
40: optional i64 (js.type = "Long") failoverVersion
785+
50: optional bool isGlobalDomain
763786
}
764787

765788
struct DeprecateDomainRequest {

internal/internal_decision_state_machine.go

Lines changed: 96 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ type (
8080
childWorkflowDecisionStateMachine struct {
8181
*decisionStateMachineBase
8282
attributes *s.StartChildWorkflowExecutionDecisionAttributes
83-
runID string
8483
}
8584

8685
naiveDecisionStateMachine struct {
@@ -106,9 +105,9 @@ type (
106105
orderedDecisions *list.List
107106
decisions map[decisionID]decisionStateMachine
108107

109-
scheduledEventIDToActivityID map[int64]string
110-
111-
scheduledEventIDToSignalID map[int64]string
108+
scheduledEventIDToActivityID map[int64]string
109+
scheduledEventIDToCancellationID map[int64]string
110+
scheduledEventIDToSignalID map[int64]string
112111
}
113112
)
114113

@@ -126,12 +125,12 @@ const (
126125
)
127126

128127
const (
129-
decisionTypeActivity decisionType = 0
130-
decisionTypeChildWorkflow decisionType = 1
131-
decisionTypeExternalWorkflow decisionType = 2
132-
decisionTypeMarker decisionType = 3
133-
decisionTypeTimer decisionType = 4
134-
decisionTypeSignal decisionType = 5
128+
decisionTypeActivity decisionType = 0
129+
decisionTypeChildWorkflow decisionType = 1
130+
decisionTypeCancellation decisionType = 2
131+
decisionTypeMarker decisionType = 3
132+
decisionTypeTimer decisionType = 4
133+
decisionTypeSignal decisionType = 5
135134
)
136135

137136
const (
@@ -185,12 +184,14 @@ func (d decisionType) String() string {
185184
return "Activity"
186185
case decisionTypeChildWorkflow:
187186
return "ChildWorkflow"
188-
case decisionTypeExternalWorkflow:
189-
return "ExternalWorkflow"
187+
case decisionTypeCancellation:
188+
return "Cancellation"
190189
case decisionTypeMarker:
191190
return "Marker"
192191
case decisionTypeTimer:
193192
return "Timer"
193+
case decisionTypeSignal:
194+
return "Signal"
194195
default:
195196
return "Unknown"
196197
}
@@ -252,11 +253,11 @@ func newMarkerDecisionStateMachine(id string, attributes *s.RecordMarkerDecision
252253
}
253254
}
254255

255-
func newCancelExternalWorkflowStateMachine(attributes *s.RequestCancelExternalWorkflowExecutionDecisionAttributes) *cancelExternalWorkflowDecisionStateMachine {
256+
func newCancelExternalWorkflowStateMachine(attributes *s.RequestCancelExternalWorkflowExecutionDecisionAttributes, cancellationID string) *cancelExternalWorkflowDecisionStateMachine {
256257
d := createNewDecision(s.DecisionTypeRequestCancelExternalWorkflowExecution)
257258
d.RequestCancelExternalWorkflowExecutionDecisionAttributes = attributes
258259
return &cancelExternalWorkflowDecisionStateMachine{
259-
naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeExternalWorkflow, attributes.GetWorkflowId(), d),
260+
naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeCancellation, cancellationID, d),
260261
}
261262
}
262263

@@ -477,9 +478,9 @@ func (d *childWorkflowDecisionStateMachine) getDecision() *s.Decision {
477478
case decisionStateCanceledAfterStarted:
478479
decision := createNewDecision(s.DecisionTypeRequestCancelExternalWorkflowExecution)
479480
decision.RequestCancelExternalWorkflowExecutionDecisionAttributes = &s.RequestCancelExternalWorkflowExecutionDecisionAttributes{
480-
Domain: d.attributes.Domain,
481-
WorkflowId: d.attributes.WorkflowId,
482-
RunId: common.StringPtr(d.runID),
481+
Domain: d.attributes.Domain,
482+
WorkflowId: d.attributes.WorkflowId,
483+
ChildWorkflowOnly: common.BoolPtr(true),
483484
}
484485
return decision
485486
default:
@@ -637,9 +638,9 @@ func newDecisionsHelper() *decisionsHelper {
637638
orderedDecisions: list.New(),
638639
decisions: make(map[decisionID]decisionStateMachine),
639640

640-
scheduledEventIDToActivityID: make(map[int64]string),
641-
642-
scheduledEventIDToSignalID: make(map[int64]string),
641+
scheduledEventIDToActivityID: make(map[int64]string),
642+
scheduledEventIDToCancellationID: make(map[int64]string),
643+
scheduledEventIDToSignalID: make(map[int64]string),
643644
}
644645
}
645646

@@ -777,71 +778,104 @@ func (h *decisionsHelper) handleStartChildWorkflowExecutionFailed(workflowID str
777778
return decision
778779
}
779780

780-
func (h *decisionsHelper) requestCancelExternalWorkflowExecution(domain, workflowID, runID string) (bool, decisionStateMachine) {
781-
decision, ok := h.decisions[makeDecisionID(decisionTypeChildWorkflow, workflowID)]
782-
if ok {
783-
// this is for child workflow
784-
decision.(*childWorkflowDecisionStateMachine).runID = runID
781+
func (h *decisionsHelper) requestCancelExternalWorkflowExecution(domain, workflowID, runID string, cancellationID string, childWorkflowOnly bool) decisionStateMachine {
782+
if childWorkflowOnly {
783+
// For cancellation of child workflow only, we do not use cancellation ID
784+
// since the child workflow cancellation go through the existing child workflow
785+
// state machine, and we use workflow ID as identifier
786+
// we also do not use run ID, since child workflow can do continue-as-new
787+
// which will have different run ID
788+
// there will be server side validation that target workflow is child workflow
789+
790+
// sanity check that cancellation ID is not set
791+
if len(cancellationID) != 0 {
792+
panic("cancellation on child workflow should not use cancellation ID")
793+
}
794+
// sanity check that run ID is not set
795+
if len(runID) != 0 {
796+
panic("cancellation on child workflow should not use run ID")
797+
}
798+
// targeting child workflow
799+
decision := h.getDecision(makeDecisionID(decisionTypeChildWorkflow, workflowID))
785800
decision.cancel()
786-
return true, decision
801+
return decision
787802
}
788803

789-
// this is for external non-child workflow
804+
// For cancellation of external workflow, we have to use cancellation ID
805+
// to identify different cancellation request (decision) / response (history event)
806+
// client can also use this code path to cancel its own child workflow, however, there will
807+
// be no server side validation that target workflow is the child
808+
809+
// sanity check that cancellation ID is set
810+
if len(cancellationID) == 0 {
811+
panic("cancellation on external workflow should use cancellation ID")
812+
}
790813
attributes := &s.RequestCancelExternalWorkflowExecutionDecisionAttributes{
791-
Domain: common.StringPtr(domain),
792-
WorkflowId: common.StringPtr(workflowID),
793-
RunId: common.StringPtr(runID),
814+
Domain: common.StringPtr(domain),
815+
WorkflowId: common.StringPtr(workflowID),
816+
RunId: common.StringPtr(runID),
817+
Control: []byte(cancellationID),
818+
ChildWorkflowOnly: common.BoolPtr(false),
794819
}
795-
decision = newCancelExternalWorkflowStateMachine(attributes)
820+
decision := newCancelExternalWorkflowStateMachine(attributes, cancellationID)
796821
h.addDecision(decision)
797822

798-
return false, decision
823+
return decision
799824
}
800825

801-
func (h *decisionsHelper) handleRequestCancelExternalWorkflowExecutionInitiated(workflowID string) {
802-
decision, ok := h.decisions[makeDecisionID(decisionTypeChildWorkflow, workflowID)]
803-
if ok {
804-
// for child workflow
826+
func (h *decisionsHelper) handleRequestCancelExternalWorkflowExecutionInitiated(initiatedeventID int64, workflowID, cancellationID string) {
827+
if h.isCancelExternalWorkflowEventForChildWorkflow(cancellationID) {
828+
// this is cancellation for child workflow only
829+
decision := h.getDecision(makeDecisionID(decisionTypeChildWorkflow, workflowID))
805830
decision.handleCancelInitiatedEvent()
806831
} else {
807-
// for non-child external workflow
808-
decision = h.getDecision(makeDecisionID(decisionTypeExternalWorkflow, workflowID))
832+
// this is cancellation for external workflow
833+
h.scheduledEventIDToCancellationID[initiatedeventID] = cancellationID
834+
decision := h.getDecision(makeDecisionID(decisionTypeCancellation, cancellationID))
809835
decision.handleInitiatedEvent()
810836
}
811837
}
812838

813-
func (h *decisionsHelper) handleExternalWorkflowExecutionCancelRequested(workflowID string) {
814-
decision, ok := h.decisions[makeDecisionID(decisionTypeChildWorkflow, workflowID)]
815-
// no state change for child workflow, it is still in CancellationDecisionSent
816-
if !ok {
817-
// for non-child external workflow, this is the end of the decision state
818-
decision = h.getDecision(makeDecisionID(decisionTypeExternalWorkflow, workflowID))
839+
func (h *decisionsHelper) handleExternalWorkflowExecutionCancelRequested(initiatedeventID int64, workflowID string) (bool, decisionStateMachine) {
840+
var decision decisionStateMachine
841+
cancellationID, isExternal := h.scheduledEventIDToCancellationID[initiatedeventID]
842+
if !isExternal {
843+
decision = h.getDecision(makeDecisionID(decisionTypeChildWorkflow, workflowID))
844+
// no state change for child workflow, it is still in CancellationDecisionSent
845+
} else {
846+
// this is cancellation for external workflow
847+
decision = h.getDecision(makeDecisionID(decisionTypeCancellation, cancellationID))
819848
decision.handleCompletionEvent()
820849
}
850+
return isExternal, decision
821851
}
822852

823-
func (h *decisionsHelper) handleRequestCancelExternalWorkflowExecutionFailed(workflowID string) {
824-
decision, ok := h.decisions[makeDecisionID(decisionTypeChildWorkflow, workflowID)]
825-
if ok {
826-
// for child workflow
853+
func (h *decisionsHelper) handleRequestCancelExternalWorkflowExecutionFailed(initiatedeventID int64, workflowID string) (bool, decisionStateMachine) {
854+
var decision decisionStateMachine
855+
cancellationID, isExternal := h.scheduledEventIDToCancellationID[initiatedeventID]
856+
if !isExternal {
857+
// this is cancellation for child workflow only
858+
decision = h.getDecision(makeDecisionID(decisionTypeChildWorkflow, workflowID))
827859
decision.handleCancelFailedEvent()
828860
} else {
829-
// for non-child external workflow, this is the end of the decision state
830-
decision = h.getDecision(makeDecisionID(decisionTypeExternalWorkflow, workflowID))
861+
// this is cancellation for external workflow
862+
decision = h.getDecision(makeDecisionID(decisionTypeCancellation, cancellationID))
831863
decision.handleCompletionEvent()
832864
}
865+
return isExternal, decision
833866
}
834867

835-
func (h *decisionsHelper) signalExternalWorkflowExecution(domain, workflowID, runID, signalName string, input []byte, signalID string) decisionStateMachine {
868+
func (h *decisionsHelper) signalExternalWorkflowExecution(domain, workflowID, runID, signalName string, input []byte, signalID string, childWorkflowOnly bool) decisionStateMachine {
836869
attributes := &s.SignalExternalWorkflowExecutionDecisionAttributes{
837870
Domain: common.StringPtr(domain),
838871
Execution: &s.WorkflowExecution{
839872
WorkflowId: common.StringPtr(workflowID),
840873
RunId: common.StringPtr(runID),
841874
},
842-
SignalName: common.StringPtr(signalName),
843-
Input: input,
844-
Control: []byte(signalID),
875+
SignalName: common.StringPtr(signalName),
876+
Input: input,
877+
Control: []byte(signalID),
878+
ChildWorkflowOnly: common.BoolPtr(childWorkflowOnly),
845879
}
846880
decision := newSignalExternalWorkflowStateMachine(attributes, signalID)
847881
h.addDecision(decision)
@@ -950,3 +984,11 @@ func (h *decisionsHelper) getDecisions(markAsSent bool) []*s.Decision {
950984

951985
return result
952986
}
987+
988+
func (h *decisionsHelper) isCancelExternalWorkflowEventForChildWorkflow(cancellationID string) bool {
989+
// the cancellationID, i.e. Control in RequestCancelExternalWorkflowExecutionInitiatedEventAttributes
990+
// will be empty if the event is for child workflow.
991+
// for cancellation external workflow, Control in RequestCancelExternalWorkflowExecutionInitiatedEventAttributes
992+
// will have a client generated sequency ID
993+
return len(cancellationID) == 0
994+
}

0 commit comments

Comments
 (0)