Skip to content

Commit baa368d

Browse files
authored
child workflow client change (#128)
* child workflow client change * CR comments * use GetWorkflowId() instead of *attributes.WorkflowId * assign result when err is nil * fix bug * fix wrong method called
1 parent 0bd0ffd commit baa368d

File tree

8 files changed

+471
-46
lines changed

8 files changed

+471
-46
lines changed

error.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func (e *panicError) panicError() {}
253253
type continueAsNewError struct {
254254
wfn interface{}
255255
args []interface{}
256-
options *wfEnvironmentOptions
256+
options *workflowOptions
257257
}
258258

259259
// Error from error interface

internal_event_handlers.go

Lines changed: 203 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ type (
4949
workflowDefinition workflowDefinition
5050
}
5151

52+
childWorkflowHandle struct {
53+
resultCallback resultHandler
54+
startedCallback func(r WorkflowExecution, e error)
55+
waitForCancellation bool
56+
workflowExecution *WorkflowExecution
57+
}
58+
5259
// workflowEnvironmentImpl an implementation of workflowEnvironment represents a environment for workflow execution.
5360
workflowEnvironmentImpl struct {
5461
workflowInfo *WorkflowInfo
@@ -59,6 +66,7 @@ type (
5966
scheduledEventIDToActivityID map[int64]string // Mapping from scheduled event ID to activity ID
6067
scheduledTimers map[string]resultHandler // Map of scheduledTimers(timer ID ->) and their response handlers
6168
sideEffectResult map[int32][]byte
69+
scheduledChildWorkflows map[string]*childWorkflowHandle // Map of scheduledChildWorkflows
6270
counterID int32 // To generate activity IDs
6371
executeDecisions []*m.Decision // Decisions made during the execute of the workflow
6472
completeHandler completionHandler // events completion handler
@@ -110,6 +118,7 @@ func newWorkflowExecutionEventHandler(workflowInfo *WorkflowInfo, workflowDefini
110118
scheduledTimers: make(map[string]resultHandler),
111119
executeDecisions: make([]*m.Decision, 0),
112120
sideEffectResult: make(map[int32][]byte),
121+
scheduledChildWorkflows: make(map[string]*childWorkflowHandle),
113122
completeHandler: completeHandler,
114123
postEventHooks: []func(){},
115124
enableLoggingInReplay: enableLoggingInReplay,
@@ -148,13 +157,53 @@ func (wc *workflowEnvironmentImpl) RequestCancelWorkflow(domainName, workflowID,
148157
decision.RequestCancelExternalWorkflowExecutionDecisionAttributes = attributes
149158

150159
wc.executeDecisions = append(wc.executeDecisions, decision)
160+
161+
childWorkflowHandle, ok := wc.scheduledChildWorkflows[workflowID]
162+
if ok && childWorkflowHandle.workflowExecution != nil &&
163+
childWorkflowHandle.workflowExecution.RunID == runID &&
164+
!childWorkflowHandle.waitForCancellation {
165+
delete(wc.scheduledChildWorkflows, workflowID)
166+
wc.addPostEventHooks(func() {
167+
childWorkflowHandle.resultCallback(nil, NewCanceledError())
168+
})
169+
}
151170
return nil
152171
}
153172

154173
func (wc *workflowEnvironmentImpl) RegisterCancelHandler(handler func()) {
155174
wc.cancelHandler = handler
156175
}
157176

177+
func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
178+
options workflowOptions, callback resultHandler, startedHandler func(r WorkflowExecution, e error)) error {
179+
if options.workflowID == "" {
180+
options.workflowID = wc.workflowInfo.WorkflowExecution.RunID + "_" + wc.GenerateSequenceID()
181+
}
182+
183+
attributes := m.NewStartChildWorkflowExecutionDecisionAttributes()
184+
185+
attributes.Domain = options.domain
186+
attributes.TaskList = &m.TaskList{Name: options.taskListName}
187+
attributes.WorkflowId = common.StringPtr(options.workflowID)
188+
attributes.ExecutionStartToCloseTimeoutSeconds = options.executionStartToCloseTimeoutSeconds
189+
attributes.TaskStartToCloseTimeoutSeconds = options.taskStartToCloseTimeoutSeconds
190+
attributes.Input = options.input
191+
attributes.WorkflowType = workflowTypePtr(*options.workflowType)
192+
attributes.ChildPolicy = options.childPolicy.toThriftChildPolicyPtr()
193+
194+
decision := wc.CreateNewDecision(m.DecisionType_StartChildWorkflowExecution)
195+
decision.StartChildWorkflowExecutionDecisionAttributes = attributes
196+
197+
wc.scheduledChildWorkflows[options.workflowID] = &childWorkflowHandle{
198+
resultCallback: callback,
199+
startedCallback: startedHandler,
200+
waitForCancellation: options.waitForCancellation,
201+
}
202+
wc.executeDecisions = append(wc.executeDecisions, decision)
203+
204+
return nil
205+
}
206+
158207
func (wc *workflowEnvironmentImpl) RegisterSignalHandler(handler func(name string, input []byte)) {
159208
wc.signalHandler = handler
160209
}
@@ -231,6 +280,7 @@ func (wc *workflowEnvironmentImpl) RequestCancelActivity(activityID string) {
231280
wc.executeDecisions = append(wc.executeDecisions, decision)
232281

233282
if wait, ok := wc.waitForCancelRequestActivities[activityID]; ok && !wait {
283+
delete(wc.scheduledActivities, activityID)
234284
wc.addPostEventHooks(func() {
235285
handler(nil, NewCanceledError())
236286
})
@@ -441,7 +491,10 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
441491

442492
case m.EventType_RequestCancelExternalWorkflowExecutionInitiated:
443493
// No Operation.
494+
case m.EventType_ExternalWorkflowExecutionCancelRequested:
495+
// No Operation.
444496
case m.EventType_RequestCancelExternalWorkflowExecutionFailed:
497+
// No Operation.
445498
case m.EventType_WorkflowExecutionContinuedAsNew:
446499
// No Operation.
447500

@@ -453,10 +506,47 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
453506
if err != nil {
454507
return nil, err
455508
}
509+
case m.EventType_StartChildWorkflowExecutionInitiated:
510+
// No Operation.
511+
case m.EventType_StartChildWorkflowExecutionFailed:
512+
err := weh.handleStartChildWorkflowExecutionFailed(event.StartChildWorkflowExecutionFailedEventAttributes)
513+
if err != nil {
514+
return nil, err
515+
}
516+
case m.EventType_ChildWorkflowExecutionStarted:
517+
err := weh.handleChildWorkflowExecutionStarted(event.ChildWorkflowExecutionStartedEventAttributes)
518+
if err != nil {
519+
return nil, err
520+
}
521+
case m.EventType_ChildWorkflowExecutionCompleted:
522+
err := weh.handleChildWorkflowExecutionCompleted(event.ChildWorkflowExecutionCompletedEventAttributes)
523+
if err != nil {
524+
return nil, err
525+
}
526+
case m.EventType_ChildWorkflowExecutionFailed:
527+
err := weh.handleChildWorkflowExecutionFailed(event.ChildWorkflowExecutionFailedEventAttributes)
528+
if err != nil {
529+
return nil, err
530+
}
531+
case m.EventType_ChildWorkflowExecutionCanceled:
532+
err := weh.handleChildWorkflowExecutionCanceled(event.ChildWorkflowExecutionCanceledEventAttributes)
533+
if err != nil {
534+
return nil, err
535+
}
536+
case m.EventType_ChildWorkflowExecutionTimedOut:
537+
err := weh.handleChildWorkflowExecutionTimedOut(event.ChildWorkflowExecutionTimedOutEventAttributes)
538+
if err != nil {
539+
return nil, err
540+
}
541+
case m.EventType_ChildWorkflowExecutionTerminated:
542+
err := weh.handleChildWorkflowExecutionTerminated(event.ChildWorkflowExecutionTerminatedEventAttributes)
543+
if err != nil {
544+
return nil, err
545+
}
456546
default:
457547
weh.logger.Error("unknown event type",
458548
zap.Int64(tagEventID, event.GetEventId()),
459-
zap.String(tagEventType, string(event.GetEventType())))
549+
zap.String(tagEventType, event.GetEventType().String()))
460550
// Do not fail to be forward compatible with new events
461551
}
462552

@@ -653,3 +743,115 @@ func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionSignaled(
653743
attributes *m.WorkflowExecutionSignaledEventAttributes) {
654744
weh.signalHandler(attributes.GetSignalName(), attributes.GetInput())
655745
}
746+
747+
func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionInitiated(
748+
attributes *m.StartChildWorkflowExecutionInitiatedEventAttributes) error {
749+
return nil
750+
}
751+
752+
func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionFailed(
753+
attributes *m.StartChildWorkflowExecutionFailedEventAttributes) error {
754+
childWorkflowID := attributes.GetWorkflowId()
755+
childWorkflowHandle, ok := weh.scheduledChildWorkflows[childWorkflowID]
756+
if !ok {
757+
return fmt.Errorf("unable to find child workflow callback: %v", attributes)
758+
}
759+
delete(weh.scheduledChildWorkflows, childWorkflowID)
760+
err := fmt.Errorf("ChildWorkflowFailed: %v", attributes.GetCause())
761+
childWorkflowHandle.resultCallback(nil, err)
762+
763+
return nil
764+
}
765+
766+
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionStarted(
767+
attributes *m.ChildWorkflowExecutionStartedEventAttributes) error {
768+
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
769+
childWorkflowHandle, ok := weh.scheduledChildWorkflows[childWorkflowID]
770+
if !ok {
771+
return fmt.Errorf("unable to find child workflow callback: %v", attributes)
772+
}
773+
774+
childWorkflowExecution := WorkflowExecution{
775+
ID: childWorkflowID,
776+
RunID: attributes.WorkflowExecution.GetRunId(),
777+
}
778+
779+
childWorkflowHandle.workflowExecution = &childWorkflowExecution
780+
childWorkflowHandle.startedCallback(childWorkflowExecution, nil)
781+
782+
return nil
783+
}
784+
785+
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCompleted(
786+
attributes *m.ChildWorkflowExecutionCompletedEventAttributes) error {
787+
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
788+
childWorkflowHandle, ok := weh.scheduledChildWorkflows[childWorkflowID]
789+
if !ok {
790+
return fmt.Errorf("unable to find child workflow callback: %v", attributes)
791+
}
792+
delete(weh.scheduledChildWorkflows, childWorkflowID)
793+
794+
childWorkflowHandle.resultCallback(attributes.GetResult_(), nil)
795+
796+
return nil
797+
}
798+
799+
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionFailed(
800+
attributes *m.ChildWorkflowExecutionFailedEventAttributes) error {
801+
802+
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
803+
childWorkflowHandle, ok := weh.scheduledChildWorkflows[childWorkflowID]
804+
if !ok {
805+
return fmt.Errorf("unable to find child workflow callback: %v", attributes)
806+
}
807+
delete(weh.scheduledChildWorkflows, childWorkflowID)
808+
809+
err := NewErrorWithDetails(attributes.GetReason(), attributes.GetDetails())
810+
childWorkflowHandle.resultCallback(nil, err)
811+
812+
return nil
813+
}
814+
815+
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCanceled(
816+
attributes *m.ChildWorkflowExecutionCanceledEventAttributes) error {
817+
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
818+
childWorkflowHandle, ok := weh.scheduledChildWorkflows[childWorkflowID]
819+
if !ok {
820+
// this could happen if waitForCancellation is false
821+
return nil
822+
}
823+
delete(weh.scheduledChildWorkflows, childWorkflowID)
824+
err := NewCanceledError(attributes.GetDetails())
825+
826+
childWorkflowHandle.resultCallback(nil, err)
827+
return nil
828+
}
829+
830+
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTimedOut(
831+
attributes *m.ChildWorkflowExecutionTimedOutEventAttributes) error {
832+
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
833+
childWorkflowHandle, ok := weh.scheduledChildWorkflows[childWorkflowID]
834+
if !ok {
835+
return fmt.Errorf("unable to find child workflow callback: %v", attributes)
836+
}
837+
delete(weh.scheduledChildWorkflows, childWorkflowID)
838+
err := NewTimeoutError(attributes.GetTimeoutType())
839+
840+
childWorkflowHandle.resultCallback(nil, err)
841+
842+
return nil
843+
}
844+
845+
func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTerminated(
846+
attributes *m.ChildWorkflowExecutionTerminatedEventAttributes) error {
847+
childWorkflowID := attributes.WorkflowExecution.GetWorkflowId()
848+
childWorkflowHandle, ok := weh.scheduledChildWorkflows[childWorkflowID]
849+
if !ok {
850+
return fmt.Errorf("unable to find child workflow callback: %v", attributes)
851+
}
852+
delete(weh.scheduledChildWorkflows, childWorkflowID)
853+
err := errors.New("terminated")
854+
childWorkflowHandle.resultCallback(nil, err)
855+
856+
return nil
857+
}

internal_logging_tags.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,20 @@
2121
package cadence
2222

2323
const (
24-
tagActivityID = "ActivityID"
25-
tagActivityType = "ActivityType"
26-
tagDomain = "Domain"
27-
tagEventID = "EventID"
28-
tagEventType = "EventType"
29-
tagRoutineID = "routineID"
30-
tagRunID = "RunID"
31-
tagTaskList = "TaskList"
32-
tagTimerID = "TimerID"
33-
tagWorkflowID = "WorkflowID"
34-
tagWorkflowType = "WorkflowType"
35-
tagWorkerID = "WorkerID"
36-
tagWorkerType = "WorkerType"
37-
tagSideEffectID = "SideEffectID"
38-
tagMarkerName = "MarkerName"
24+
tagActivityID = "ActivityID"
25+
tagActivityType = "ActivityType"
26+
tagDomain = "Domain"
27+
tagEventID = "EventID"
28+
tagEventType = "EventType"
29+
tagRoutineID = "routineID"
30+
tagRunID = "RunID"
31+
tagTaskList = "TaskList"
32+
tagTimerID = "TimerID"
33+
tagWorkflowID = "WorkflowID"
34+
tagWorkflowType = "WorkflowType"
35+
tagWorkerID = "WorkerID"
36+
tagWorkerType = "WorkerType"
37+
tagSideEffectID = "SideEffectID"
38+
tagMarkerName = "MarkerName"
39+
tagChildWorkflowID = "ChildWorkflowID"
3940
)

internal_task_handlers.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ func isDecisionEvent(eventType s.EventType) bool {
156156
s.EventType_TimerStarted,
157157
s.EventType_TimerCanceled,
158158
s.EventType_MarkerRecorded,
159+
s.EventType_StartChildWorkflowExecutionInitiated,
159160
s.EventType_RequestCancelExternalWorkflowExecutionInitiated:
160161
return true
161162
default:
@@ -600,6 +601,19 @@ func isDecisionMatchEvent(d *s.Decision, e *s.HistoryEvent, strictMode bool) boo
600601
}
601602
return true
602603

604+
case s.DecisionType_StartChildWorkflowExecution:
605+
if e.GetEventType() != s.EventType_StartChildWorkflowExecutionInitiated {
606+
return false
607+
}
608+
eventAttributes := e.GetStartChildWorkflowExecutionInitiatedEventAttributes()
609+
decisionAttributes := d.GetStartChildWorkflowExecutionDecisionAttributes()
610+
if eventAttributes.GetDomain() != decisionAttributes.GetDomain() ||
611+
eventAttributes.GetTaskList().GetName() != decisionAttributes.GetTaskList().GetName() ||
612+
eventAttributes.GetWorkflowType().GetName() != decisionAttributes.GetWorkflowType().GetName() {
613+
return false
614+
}
615+
616+
return true
603617
}
604618

605619
return false
@@ -612,11 +626,11 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
612626
startAttributes *s.WorkflowExecutionStartedEventAttributes,
613627
) ([]*s.Decision, error) {
614628
decisions := []*s.Decision{}
615-
if err == ErrCanceled {
629+
if canceledErr, ok := err.(*canceledError); ok {
616630
// Workflow cancelled
617631
cancelDecision := createNewDecision(s.DecisionType_CancelWorkflowExecution)
618632
cancelDecision.CancelWorkflowExecutionDecisionAttributes = &s.CancelWorkflowExecutionDecisionAttributes{
619-
Details: completionResult,
633+
Details: canceledErr.details,
620634
}
621635
decisions = append(decisions, cancelDecision)
622636
} else if contErr, ok := err.(*continueAsNewError); ok {

internal_worker_base.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type (
5757
Complete(result []byte, err error)
5858
RegisterCancelHandler(handler func())
5959
RequestCancelWorkflow(domainName, workflowID, runID string) error
60+
ExecuteChildWorkflow(options workflowOptions, callback resultHandler, startedHandler func(r WorkflowExecution, e error)) error
6061
GetLogger() *zap.Logger
6162
RegisterSignalHandler(handler func(name string, input []byte))
6263
}

0 commit comments

Comments
 (0)