Skip to content

Commit 2e11d9b

Browse files
authored
Support Cron Schedule as part of the TestWorkflowEnvironment (#1020)
* Support Cron Schedule as part of the TestWorkflowEnvironment
1 parent 6a5830b commit 2e11d9b

File tree

3 files changed

+172
-41
lines changed

3 files changed

+172
-41
lines changed

internal/internal_workflow_testsuite.go

Lines changed: 123 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ type (
153153
onTimerScheduledListener func(timerID string, duration time.Duration)
154154
onTimerFiredListener func(timerID string)
155155
onTimerCancelledListener func(timerID string)
156+
157+
cronMaxIterations int
156158
}
157159

158160
// testWorkflowEnvironmentImpl is the environment that runs the workflow/activity unit tests.
@@ -183,6 +185,11 @@ type (
183185

184186
workerStopChannel chan struct{}
185187
sessionEnvironment *testSessionEnvironmentImpl
188+
189+
cronSchedule string
190+
cronIterations int
191+
workflowInput []byte
192+
186193
}
187194

188195
testSessionEnvironmentImpl struct {
@@ -225,6 +232,8 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist
225232
testTimeout: time.Second * 3,
226233

227234
expectedMockCalls: make(map[string]struct{}),
235+
236+
cronMaxIterations: -1,
228237
},
229238

230239
workflowInfo: &WorkflowInfo{
@@ -246,13 +255,19 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist
246255

247256
doneChannel: make(chan struct{}),
248257
workerStopChannel: make(chan struct{}),
258+
259+
cronIterations: 0,
249260
}
250261

251262
// move forward the mock clock to start time.
252263
env.setStartTime(time.Now())
253264

254265
// put current workflow as a running workflow so child can send signal to parent
255-
env.runningWorkflows[env.workflowInfo.WorkflowExecution.ID] = &testWorkflowHandle{env: env, callback: func(result []byte, err error) {}}
266+
testWorkflowHandle := &testWorkflowHandle{env: env, callback: func(result []byte, err error) {}}
267+
if env.workflowInfo.CronSchedule != nil && len(*env.workflowInfo.CronSchedule) > 0 {
268+
testWorkflowHandle.params.cronSchedule = *env.workflowInfo.CronSchedule
269+
}
270+
env.runningWorkflows[env.workflowInfo.WorkflowExecution.ID] = testWorkflowHandle
256271

257272
if env.logger == nil {
258273
logger, _ := zap.NewDevelopment()
@@ -329,7 +344,14 @@ func (env *testWorkflowEnvironmentImpl) setStartTime(startTime time.Time) {
329344
startTime = env.wallClock.Now()
330345
}
331346
env.mockClock.Add(startTime.Sub(env.mockClock.Now()))
347+
}
348+
349+
func (env *testWorkflowEnvironmentImpl) setCronSchedule(cronSchedule string) {
350+
env.workflowInfo.CronSchedule = &cronSchedule
351+
}
332352

353+
func (env *testWorkflowEnvironmentImpl) setCronMaxIterationas(cronMaxIterations int) {
354+
env.cronMaxIterations = cronMaxIterations
333355
}
334356

335357
func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(params *executeWorkflowParams, callback resultHandler, startedHandler func(r WorkflowExecution, e error)) (*testWorkflowEnvironmentImpl, error) {
@@ -457,6 +479,8 @@ func (env *testWorkflowEnvironmentImpl) executeWorkflowInternal(delayStart time.
457479
panic(err)
458480
}
459481
env.workflowDef = workflowDefinition
482+
// Store the Workflow input for potential Cron
483+
env.workflowInput = input
460484

461485
// env.workflowDef.Execute() method will execute dispatcher. We want the dispatcher to only run in main loop.
462486
// In case of child workflow, this executeWorkflowInternal() is run in separate goroutinue, so use postCallback
@@ -784,7 +808,10 @@ func (env *testWorkflowEnvironmentImpl) Complete(result []byte, err error) {
784808
}
785809

786810
dc := env.GetDataConverter()
787-
env.isTestCompleted = true
811+
// Test is potentially not over, for parent Cron workflows
812+
if (!env.isChildWorkflow() && !env.IsCron()) || env.isChildWorkflow() {
813+
env.isTestCompleted = true
814+
}
788815

789816
if err != nil {
790817
switch err := err.(type) {
@@ -800,7 +827,12 @@ func (env *testWorkflowEnvironmentImpl) Complete(result []byte, err error) {
800827
env.testResult = newEncodedValue(result, dc)
801828
}
802829

803-
close(env.doneChannel)
830+
// Only close on:
831+
// 1. Child-Workflows
832+
// 2. non-cron Workflows
833+
if env.isChildWorkflow() && !env.IsCron() {
834+
close(env.doneChannel)
835+
}
804836

805837
if env.isChildWorkflow() {
806838
// this is completion of child workflow
@@ -810,7 +842,7 @@ func (env *testWorkflowEnvironmentImpl) Complete(result []byte, err error) {
810842
// would have already been removed from the runningWorkflows map by RequestCancelWorkflow().
811843
childWorkflowHandle.handled = true
812844
// check if a retry is needed
813-
if childWorkflowHandle.rerunAsChild() {
845+
if childWorkflowHandle.rerun(true) {
814846
// rerun requested, so we don't want to post the error to parent workflow, return here.
815847
return
816848
}
@@ -825,12 +857,27 @@ func (env *testWorkflowEnvironmentImpl) Complete(result []byte, err error) {
825857
}
826858
}, true /* true to trigger parent workflow to resume to handle child workflow's result */)
827859
}
860+
} else {
861+
if env.IsCron() {
862+
workflowID := env.workflowInfo.WorkflowExecution.ID
863+
if workflowHandle, ok := env.runningWorkflows[workflowID]; ok {
864+
// On rerun, consider Workflow as not-handled
865+
if workflowHandle.rerun(false) {
866+
return
867+
}
868+
}
869+
}
828870
}
871+
// No Reruns....Test is Complete
872+
env.isTestCompleted = true
829873
}
830874

831-
func (h *testWorkflowHandle) rerunAsChild() bool {
875+
func (h *testWorkflowHandle) rerun(asChild bool) bool {
832876
env := h.env
833-
if !env.isChildWorkflow() {
877+
if asChild && !env.isChildWorkflow() {
878+
return false
879+
}
880+
if !asChild && env.isChildWorkflow() {
834881
return false
835882
}
836883
params := h.params
@@ -844,40 +891,74 @@ func (h *testWorkflowHandle) rerunAsChild() bool {
844891
// not successful run this time, carry over from whatever previous run pass to this run.
845892
result = env.workflowInfo.lastCompletionResult
846893
}
847-
params.lastCompletionResult = result
894+
if asChild {
895+
params.lastCompletionResult = result
848896

849-
if params.retryPolicy != nil && env.testError != nil {
850-
errReason, _ := getErrorDetails(env.testError, env.GetDataConverter())
851-
var expireTime time.Time
852-
if params.retryPolicy.GetExpirationIntervalInSeconds() > 0 {
853-
expireTime = params.scheduledTime.Add(time.Second * time.Duration(params.retryPolicy.GetExpirationIntervalInSeconds()))
854-
}
855-
backoff := getRetryBackoffFromThriftRetryPolicy(params.retryPolicy, env.workflowInfo.Attempt, errReason, env.Now(), expireTime)
856-
if backoff > 0 {
857-
// remove the current child workflow from the pending child workflow map because
858-
// the childWorkflowID will be the same for retry run.
859-
delete(env.runningWorkflows, env.workflowInfo.WorkflowExecution.ID)
860-
params.attempt++
861-
env.parentEnv.executeChildWorkflowWithDelay(backoff, *params, h.callback, nil /* child workflow already started */)
862-
863-
return true
897+
if params.retryPolicy != nil && env.testError != nil {
898+
errReason, _ := getErrorDetails(env.testError, env.GetDataConverter())
899+
var expireTime time.Time
900+
if params.retryPolicy.GetExpirationIntervalInSeconds() > 0 {
901+
expireTime = params.scheduledTime.Add(time.Second * time.Duration(params.retryPolicy.GetExpirationIntervalInSeconds()))
902+
}
903+
backoff := getRetryBackoffFromThriftRetryPolicy(params.retryPolicy, env.workflowInfo.Attempt, errReason, env.Now(), expireTime)
904+
if backoff > 0 {
905+
// remove the current child workflow from the pending child workflow map because
906+
// the childWorkflowID will be the same for retry run.
907+
delete(env.runningWorkflows, env.workflowInfo.WorkflowExecution.ID)
908+
params.attempt++
909+
env.parentEnv.executeChildWorkflowWithDelay(backoff, *params, h.callback, nil /* child workflow already started */)
910+
return true
911+
}
864912
}
865-
}
866-
867-
if len(params.cronSchedule) > 0 {
868-
schedule, err := cron.ParseStandard(params.cronSchedule)
869-
if err != nil {
870-
panic(fmt.Errorf("invalid cron schedule %v, err: %v", params.cronSchedule, err))
913+
if len(params.cronSchedule) > 0 {
914+
if env.cronMaxIterations < 0 || (env.cronMaxIterations > 0 && env.cronIterations < env.cronMaxIterations) {
915+
schedule, err := cron.ParseStandard(params.cronSchedule)
916+
if err != nil {
917+
panic(fmt.Errorf("invalid cron schedule %v, err: %v", params.cronSchedule, err))
918+
}
919+
workflowNow := env.Now().In(time.UTC)
920+
backoff := schedule.Next(workflowNow).Sub(workflowNow)
921+
if backoff > 0 {
922+
env.cronIterations += 1
923+
delete(env.runningWorkflows, env.workflowInfo.WorkflowExecution.ID)
924+
params.attempt = 0
925+
params.scheduledTime = env.Now()
926+
env.parentEnv.executeChildWorkflowWithDelay(backoff, *params, h.callback, nil /* child workflow already started */)
927+
return true
928+
}
929+
}
871930
}
872-
873-
workflowNow := env.Now().In(time.UTC)
874-
backoff := schedule.Next(workflowNow).Sub(workflowNow)
875-
if backoff > 0 {
876-
delete(env.runningWorkflows, env.workflowInfo.WorkflowExecution.ID)
877-
params.attempt = 0
878-
params.scheduledTime = env.Now()
879-
env.parentEnv.executeChildWorkflowWithDelay(backoff, *params, h.callback, nil /* child workflow already started */)
880-
return true
931+
} else {
932+
// Re-run a non-Child workflow if it has a Cron Schedule
933+
if h.env.workflowInfo.CronSchedule != nil {
934+
if env.cronMaxIterations < 0 || (env.cronMaxIterations > 0 && env.cronIterations < env.cronMaxIterations) {
935+
cronSchedule := *h.env.workflowInfo.CronSchedule
936+
if len(cronSchedule) == 0 {
937+
return false
938+
}
939+
schedule, err := cron.ParseStandard(cronSchedule)
940+
if err != nil {
941+
panic(fmt.Errorf("invalid cron schedule %v, err: %v", cronSchedule, err))
942+
}
943+
workflowNow := env.Now().In(time.UTC)
944+
backoff := schedule.Next(workflowNow).Sub(workflowNow)
945+
if backoff > 0 {
946+
env.cronIterations += 1
947+
// Prepare the env for the next iteration
948+
env.runningCount--
949+
env.setLastCompletionResult(result)
950+
// Since MainLoop is already running, we just want to execute the dispatcher
951+
// which will run the Workflow,
952+
env.registerDelayedCallback(func() {
953+
env.runningCount++
954+
env.workflowDef, _ = env.getWorkflowDefinition(env.workflowInfo.WorkflowType)
955+
// Use the existing headers and input
956+
env.workflowDef.Execute(env, env.header, env.workflowInput)
957+
env.startDecisionTask()
958+
}, backoff - backoff)
959+
return true
960+
}
961+
}
881962
}
882963
}
883964

@@ -1722,6 +1803,11 @@ func (env *testWorkflowEnvironmentImpl) IsReplaying() bool {
17221803
return false
17231804
}
17241805

1806+
func (env *testWorkflowEnvironmentImpl) IsCron() bool {
1807+
// this test environment never replay
1808+
return env.workflowInfo.CronSchedule != nil && len(*env.workflowInfo.CronSchedule) > 0
1809+
}
1810+
17251811
func (env *testWorkflowEnvironmentImpl) SignalExternalWorkflow(domainName, workflowID, runID, signalName string, input []byte, arg interface{}, childWorkflowOnly bool, callback resultHandler) {
17261812
// check if target workflow is a known workflow
17271813
if childHandle, ok := env.runningWorkflows[workflowID]; ok {

internal/internal_workflow_testsuite_test.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2796,16 +2796,15 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowAlreadyRunning() {
27962796
s.Equal("heartbeat_child1 ", result)
27972797
}
27982798

2799-
func (s *WorkflowTestSuiteUnitTest) Test_CronWorkflow() {
2800-
2799+
func (s *WorkflowTestSuiteUnitTest) Test_CronChildWorkflow() {
28012800
failedCount, successCount, lastCompletionResult := 0, 0, 0
28022801
cronWorkflow := func(ctx Context) (int, error) {
28032802
info := GetWorkflowInfo(ctx)
28042803
var result int
28052804
if HasLastCompletionResult(ctx) {
28062805
GetLastCompletionResult(ctx, &result)
28072806
}
2808-
Sleep(ctx, time.Second*3)
2807+
Sleep(ctx, time.Second * 3)
28092808
if info.Attempt == 0 {
28102809
failedCount++
28112810
return 0, errors.New("please-retry")
@@ -2832,7 +2831,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_CronWorkflow() {
28322831

28332832
cronFuture := ExecuteChildWorkflow(ctx1, cronWorkflow) // cron never stop so this future won't return
28342833

2835-
timeoutTimer := NewTimer(ctx, time.Hour*3)
2834+
timeoutTimer := NewTimer(ctx, time.Hour * 3)
28362835
selector := NewSelector(ctx)
28372836
var err error
28382837
selector.AddFuture(cronFuture, func(f Future) {
@@ -2861,6 +2860,35 @@ func (s *WorkflowTestSuiteUnitTest) Test_CronWorkflow() {
28612860
s.Equal(4, lastCompletionResult)
28622861
}
28632862

2863+
func (s *WorkflowTestSuiteUnitTest) Test_CronWorkflow() {
2864+
var totalRuns int
2865+
cronWorkflow := func(ctx Context) (int, error) {
2866+
var result int
2867+
if HasLastCompletionResult(ctx) {
2868+
GetLastCompletionResult(ctx, &result)
2869+
}
2870+
Sleep(ctx, time.Second * 3)
2871+
result++
2872+
return result, nil
2873+
}
2874+
2875+
env := s.NewTestWorkflowEnvironment()
2876+
env.SetWorkflowCronSchedule("0 * * * *") // hourly)
2877+
env.SetWorkflowCronMaxIterations(1)
2878+
env.RegisterWorkflow(cronWorkflow)
2879+
2880+
startTime, _ := time.Parse(time.RFC3339, "2018-12-20T16:30:00+08:00")
2881+
env.SetStartTime(startTime)
2882+
env.ExecuteWorkflow(cronWorkflow)
2883+
2884+
env.GetWorkflowResult(&totalRuns)
2885+
s.True(env.IsWorkflowCompleted())
2886+
err := env.GetWorkflowError()
2887+
s.NoError(err)
2888+
2889+
s.Equal(2, totalRuns)
2890+
}
2891+
28642892
func (s *WorkflowTestSuiteUnitTest) Test_CronHasLastResult() {
28652893
cronWorkflow := func(ctx Context) (int, error) {
28662894
var result int

internal/workflow_testsuite.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,23 @@ func (t *TestWorkflowEnvironment) SetWorkflowTimeout(executionTimeout time.Durat
447447
return t
448448
}
449449

450+
// SetWorkflowCronSchedule sets the Cron schedule for this tested workflow.
451+
// The first execution of the workflow will not adhere to the Cron schedule and will start executing immediately.
452+
// Consecutive iterations will follow the specified schedule.
453+
// Use SetWorkflowCronMaxIterations() to enforce a limit on the number of consecutive iterations after the initial
454+
// execution.
455+
func (t *TestWorkflowEnvironment) SetWorkflowCronSchedule(cron string) *TestWorkflowEnvironment {
456+
t.impl.setCronSchedule(cron)
457+
return t
458+
}
459+
460+
// SetWorkflowCronMaxIterations sets the a limit on the number of Cron iterations, not including the first one
461+
// of the tested workflow.
462+
func (t *TestWorkflowEnvironment) SetWorkflowCronMaxIterations(maxIterations int) *TestWorkflowEnvironment {
463+
t.impl.setCronMaxIterationas(maxIterations )
464+
return t
465+
}
466+
450467
// SetOnActivityStartedListener sets a listener that will be called before activity starts execution.
451468
// Note: ActivityInfo is defined in internal package, use public type activity.Info instead.
452469
func (t *TestWorkflowEnvironment) SetOnActivityStartedListener(

0 commit comments

Comments
 (0)