Skip to content

Commit fff7f18

Browse files
authored
Add wait period between scan iterations (#1065)
1 parent d39d9ab commit fff7f18

File tree

5 files changed

+74
-6
lines changed

5 files changed

+74
-6
lines changed

internal/workflow_shadower.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ const (
4444
statusStopped int32 = 2
4545
)
4646

47+
const (
48+
defaultWaitDurationPerIteration = 5 * time.Minute
49+
)
50+
4751
type (
4852
// ShadowOptions is used to configure workflow shadowing.
4953
ShadowOptions struct {
@@ -134,6 +138,8 @@ const (
134138
ShadowModeNormal ShadowMode = iota
135139
// ShadowModeContinuous mode will start a new round of shadowing
136140
// after all workflows matches WorkflowQuery have been replayed.
141+
// There will be a 5 min wait period between each round,
142+
// currently this wait period is not configurable.
137143
// Shadowing will complete only when ExitCondition is met.
138144
// ExitCondition must be specified when using this mode
139145
ShadowModeContinuous
@@ -268,8 +274,12 @@ func (s *WorkflowShadower) shadowWorker() error {
268274
}
269275
}
270276

271-
if len(scanResult.NextPageToken) == 0 && s.options.Mode == ShadowModeNormal {
272-
return nil
277+
if len(scanResult.NextPageToken) == 0 {
278+
if s.options.Mode == ShadowModeNormal || s.clock.Now().Add(defaultWaitDurationPerIteration).After(expirationTime) {
279+
return nil
280+
}
281+
282+
s.clock.Sleep(defaultWaitDurationPerIteration)
273283
}
274284

275285
scanRequest.NextPageToken = scanResult.NextPageToken

internal/workflow_shadower_test.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package internal
2222

2323
import (
24+
"sync"
2425
"testing"
2526
"time"
2627

@@ -282,7 +283,7 @@ func (s *workflowShadowerSuite) TestShadowWorkerExitCondition_MaxShadowingCount(
282283
s.NoError(s.testShadower.shadowWorker())
283284
}
284285

285-
func (s *workflowShadowerSuite) TestShadowWorkerExitCondition_FullScan() {
286+
func (s *workflowShadowerSuite) TestShadowWorker_NormalMode() {
286287
workflowExecutions := newTestWorkflowExecutions(10)
287288
numScan := 3
288289
totalWorkflows := len(workflowExecutions) * numScan
@@ -305,7 +306,49 @@ func (s *workflowShadowerSuite) TestShadowWorkerExitCondition_FullScan() {
305306
s.NoError(s.testShadower.shadowWorker())
306307
}
307308

308-
func (s *workflowShadowerSuite) TestShadowWorkerExitCondition_ReplayFailed() {
309+
func (s *workflowShadowerSuite) TestShadowWorker_ContinuousMode() {
310+
workflowExecutions := newTestWorkflowExecutions(10)
311+
numScan := 3
312+
totalWorkflows := len(workflowExecutions) * numScan
313+
314+
s.testShadower.options.Mode = ShadowModeContinuous
315+
s.testShadower.options.ExitCondition = ShadowExitCondition{
316+
ShadowCount: totalWorkflows,
317+
}
318+
319+
for i := 0; i != numScan; i++ {
320+
scanResp := &shared.ListWorkflowExecutionsResponse{
321+
Executions: workflowExecutions,
322+
}
323+
s.mockService.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any(), callOptions...).Return(scanResp, nil).Times(1)
324+
}
325+
326+
s.mockService.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any(), callOptions...).Return(&shared.GetWorkflowExecutionHistoryResponse{
327+
History: s.testWorkflowHistory,
328+
}, nil).Times(totalWorkflows)
329+
330+
doneCh := make(chan struct{})
331+
var advanceTimeWG sync.WaitGroup
332+
advanceTimeWG.Add(1)
333+
go func() {
334+
defer advanceTimeWG.Done()
335+
for {
336+
time.Sleep(100 * time.Millisecond)
337+
select {
338+
case <-doneCh:
339+
return
340+
default:
341+
s.testShadower.clock.(*clock.Mock).Add(defaultWaitDurationPerIteration)
342+
}
343+
}
344+
}()
345+
346+
s.NoError(s.testShadower.shadowWorker())
347+
close(doneCh)
348+
advanceTimeWG.Wait()
349+
}
350+
351+
func (s *workflowShadowerSuite) TestShadowWorker_ReplayFailed() {
309352
successfullyReplayed := 5
310353
s.mockService.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any(), callOptions...).Return(&shared.ListWorkflowExecutionsResponse{
311354
Executions: newTestWorkflowExecutions(successfullyReplayed * 2),
@@ -321,7 +364,7 @@ func (s *workflowShadowerSuite) TestShadowWorkerExitCondition_ReplayFailed() {
321364
s.Error(s.testShadower.shadowWorker())
322365
}
323366

324-
func (s *workflowShadowerSuite) TestShadowWorkerExitCondition_ExpectedReplayError() {
367+
func (s *workflowShadowerSuite) TestShadowWorker_ExpectedReplayError() {
325368
testCases := []struct {
326369
msg string
327370
getHistoryErr error

internal/workflow_shadower_worker.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ func newShadowWorker(
6262
replayer := NewWorkflowReplayer()
6363
replayer.registry = registry
6464

65+
if len(params.TaskList) != 0 {
66+
// include domain name in tasklist to avoid confliction
67+
// since all shadow workflow will be run in a single system domain
68+
params.TaskList = generateShadowTaskList(domain, params.TaskList)
69+
}
70+
6571
params.UserContext = context.WithValue(params.UserContext, serviceClientContextKey, service)
6672
params.UserContext = context.WithValue(params.UserContext, workflowReplayerContextKey, replayer)
6773

@@ -159,3 +165,7 @@ func (sw *shadowWorker) startShadowWorkflow() error {
159165

160166
return backoff.Retry(ctx, startWorkflowOp, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
161167
}
168+
169+
func generateShadowTaskList(domain, taskList string) string {
170+
return domain + "-" + taskList
171+
}

internal/workflow_shadower_worker_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ func (s *shadowWorkerSuite) TestNewShadowWorker() {
8585
s.True(ok)
8686
_, ok = userContext.Value(workflowReplayerContextKey).(*WorkflowReplayer)
8787
s.True(ok)
88+
89+
taskList := shadowWorker.activityWorker.executionParameters.TaskList
90+
s.Contains(taskList, testDomain)
8891
}
8992

9093
func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_ShadowOptionNotSpecified() {
@@ -226,7 +229,7 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Succeed() {
226229
var workflowParams shadower.WorkflowParams
227230
getDefaultDataConverter().FromData(startRequest.Input, &workflowParams)
228231
s.Equal(testDomain, workflowParams.GetDomain())
229-
s.Equal(testTaskList, workflowParams.GetTaskList())
232+
s.Equal(generateShadowTaskList(testDomain, testTaskList), workflowParams.GetTaskList())
230233
s.Equal(workflowQuery, workflowParams.GetWorkflowQuery())
231234
s.Equal(samplingRate, workflowParams.GetSamplingRate())
232235
s.Equal(shadowMode.toThriftPtr(), workflowParams.ShadowMode)

worker/worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ const (
212212
ShadowModeNormal = internal.ShadowModeNormal
213213
// ShadowModeContinuous mode will start a new round of shadowing
214214
// after all workflows matches WorkflowQuery have been replayed.
215+
// There will be a 5 min wait period between each round,
216+
// currently this wait period is not configurable.
215217
// Shadowing will complete only when ExitCondition is met.
216218
// ExitCondition must be specified when using this mode
217219
ShadowModeContinuous = internal.ShadowModeContinuous

0 commit comments

Comments
 (0)