Skip to content

Commit 486723e

Browse files
committed
Added fix and test
1 parent df05d05 commit 486723e

File tree

2 files changed

+74
-6
lines changed

2 files changed

+74
-6
lines changed

internal/internal_workflow_testsuite.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,13 +2279,23 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelChildWorkflow(_, workflowID
22792279

22802280
func (env *testWorkflowEnvironmentImpl) RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback ResultHandler) {
22812281
if env.workflowInfo.WorkflowExecution.ID == workflowID {
2282-
// cancel current workflow
2283-
env.workflowCancelHandler()
2284-
// check if current workflow is a child workflow
2285-
if env.isChildWorkflow() && env.onChildWorkflowCanceledListener != nil {
2282+
// cancel current workflow from within workflow context
2283+
if sd, ok := env.workflowDef.(*syncWorkflowDefinition); ok {
22862284
env.postCallback(func() {
2287-
env.onChildWorkflowCanceledListener(env.workflowInfo)
2288-
}, false)
2285+
sd.dispatcher.NewCoroutine(sd.rootCtx, "cancel-self", true, func(ctx Context) {
2286+
env.workflowCancelHandler()
2287+
})
2288+
if env.isChildWorkflow() && env.onChildWorkflowCanceledListener != nil {
2289+
env.onChildWorkflowCanceledListener(env.workflowInfo)
2290+
}
2291+
}, true)
2292+
} else {
2293+
env.workflowCancelHandler()
2294+
if env.isChildWorkflow() && env.onChildWorkflowCanceledListener != nil {
2295+
env.postCallback(func() {
2296+
env.onChildWorkflowCanceledListener(env.workflowInfo)
2297+
}, false)
2298+
}
22892299
}
22902300
return
22912301
} else if childHandle, ok := env.runningWorkflows[workflowID]; ok && !childHandle.handled {

internal/workflow_testsuite_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,3 +1232,61 @@ func TestDynamicWorkflows(t *testing.T) {
12321232
require.NoError(t, err)
12331233
require.Equal(t, "dynamic-activity - grape - cherry", result)
12341234
}
1235+
1236+
func SleepHour(ctx Context) error {
1237+
Sleep(ctx, time.Hour)
1238+
return nil
1239+
}
1240+
1241+
func SleepThenCancel(ctx Context) error {
1242+
selector := NewSelector(ctx)
1243+
var activationWorkflow *WorkflowExecution
1244+
selector.AddReceive(GetSignalChannel(ctx, "activate"), func(c ReceiveChannel, more bool) {
1245+
c.Receive(ctx, nil)
1246+
GetLogger(ctx).Info("Received activation signal")
1247+
if activationWorkflow != nil {
1248+
RequestCancelExternalWorkflow(ctx, activationWorkflow.ID, activationWorkflow.RunID)
1249+
}
1250+
1251+
cwf := ExecuteChildWorkflow(
1252+
ctx,
1253+
SleepHour,
1254+
)
1255+
1256+
var res WorkflowExecution
1257+
if err := cwf.GetChildWorkflowExecution().Get(ctx, &res); err != nil {
1258+
GetLogger(ctx).Error("Failed to start child workflow", "error", err)
1259+
return
1260+
}
1261+
activationWorkflow = &res
1262+
1263+
selector.AddFuture(cwf, func(f Future) {
1264+
if err := f.Get(ctx, nil); err != nil {
1265+
GetLogger(ctx).Error("Child workflow failed", "error", err)
1266+
} else {
1267+
GetLogger(ctx).Info("Child workflow completed successfully")
1268+
}
1269+
activationWorkflow = nil
1270+
})
1271+
})
1272+
1273+
for selector.HasPending() || activationWorkflow != nil {
1274+
selector.Select(ctx)
1275+
}
1276+
return nil
1277+
}
1278+
1279+
func TestRequestCancelExternalWorkflowInSelector(t *testing.T) {
1280+
testSuite := &WorkflowTestSuite{}
1281+
env := testSuite.NewTestWorkflowEnvironment()
1282+
env.RegisterWorkflow(SleepHour)
1283+
env.RegisterDelayedCallback(func() {
1284+
env.SignalWorkflow("activate", nil)
1285+
}, 0)
1286+
env.RegisterDelayedCallback(func() {
1287+
env.SignalWorkflow("activate", nil)
1288+
}, time.Second)
1289+
env.ExecuteWorkflow(SleepThenCancel)
1290+
require.NoError(t, env.GetWorkflowError())
1291+
env.IsWorkflowCompleted()
1292+
}

0 commit comments

Comments
 (0)