Skip to content

Commit 00508cd

Browse files
authored
Ensure min number of scan workflow result for shadowing (#1084)
1 parent dd3ec45 commit 00508cd

File tree

3 files changed

+123
-36
lines changed

3 files changed

+123
-36
lines changed

internal/internal_workflow_testsuite.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,21 @@ func (env *testWorkflowEnvironmentImpl) getWorkflowDefinition(wt WorkflowType) (
529529
func (env *testWorkflowEnvironmentImpl) executeActivity(
530530
activityFn interface{},
531531
args ...interface{},
532+
) (Value, error) {
533+
return env.executeActivityWithOptions(
534+
activityOptions{
535+
ScheduleToCloseTimeoutSeconds: 600,
536+
StartToCloseTimeoutSeconds: 600,
537+
},
538+
activityFn,
539+
args...,
540+
)
541+
}
542+
543+
func (env *testWorkflowEnvironmentImpl) executeActivityWithOptions(
544+
activityOptions activityOptions,
545+
activityFn interface{},
546+
args ...interface{},
532547
) (Value, error) {
533548
activityType, err := getValidatedActivityFunction(activityFn, args, env.registry)
534549
if err != nil {
@@ -541,13 +556,10 @@ func (env *testWorkflowEnvironmentImpl) executeActivity(
541556
}
542557

543558
params := executeActivityParams{
544-
activityOptions: activityOptions{
545-
ScheduleToCloseTimeoutSeconds: 600,
546-
StartToCloseTimeoutSeconds: 600,
547-
},
548-
ActivityType: *activityType,
549-
Input: input,
550-
Header: env.header,
559+
activityOptions: activityOptions,
560+
ActivityType: *activityType,
561+
Input: input,
562+
Header: env.header,
551563
}
552564

553565
task := newTestActivityTask(

internal/workflow_shadower_activities.go

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"context"
2525
"math/rand"
2626
"strings"
27+
"time"
2728

2829
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
2930
"go.uber.org/cadence/.gen/go/shadower"
@@ -46,6 +47,12 @@ const (
4647
workflowReplayerContextKey contextKey = "workflowReplayer"
4748
)
4849

50+
const (
51+
minScanWorkflowResultSize = 10
52+
ratioToCompleteScanWorkflow = 0.8
53+
scanWorkflowWaitPeriod = 100 * time.Millisecond
54+
)
55+
4956
func scanWorkflowActivity(
5057
ctx context.Context,
5158
params shadower.ScanWorkflowActivityParams,
@@ -69,6 +76,12 @@ func scanWorkflowExecutionsHelper(
6976
params shadower.ScanWorkflowActivityParams,
7077
logger *zap.Logger,
7178
) (shadower.ScanWorkflowActivityResult, error) {
79+
var completionTime time.Time
80+
if deadline, ok := ctx.Deadline(); ok {
81+
now := time.Now()
82+
activityTimeout := deadline.Sub(now)
83+
completionTime = now.Add(time.Duration(ratioToCompleteScanWorkflow * float32(activityTimeout)))
84+
}
7285

7386
request := &shared.ListWorkflowExecutionsRequest{
7487
Domain: params.Domain,
@@ -77,39 +90,48 @@ func scanWorkflowExecutionsHelper(
7790
PageSize: params.PageSize,
7891
}
7992

80-
var resp *shared.ListWorkflowExecutionsResponse
81-
if err := backoff.Retry(ctx,
82-
func() error {
83-
tchCtx, cancel, opt := newChannelContext(ctx)
84-
85-
var err error
86-
resp, err = service.ScanWorkflowExecutions(tchCtx, request, opt...)
87-
cancel()
88-
89-
return err
90-
},
91-
createDynamicServiceRetryPolicy(ctx),
92-
isServiceTransientError,
93-
); err != nil {
94-
logger.Error("Failed to scan workflow executions",
95-
zap.String(tagDomain, params.GetDomain()),
96-
zap.String(tagVisibilityQuery, params.GetWorkflowQuery()),
97-
zap.Error(err),
98-
)
99-
return shadower.ScanWorkflowActivityResult{}, err
100-
}
93+
result := shadower.ScanWorkflowActivityResult{}
94+
for {
95+
var resp *shared.ListWorkflowExecutionsResponse
96+
if err := backoff.Retry(ctx,
97+
func() error {
98+
tchCtx, cancel, opt := newChannelContext(ctx)
99+
100+
var err error
101+
resp, err = service.ScanWorkflowExecutions(tchCtx, request, opt...)
102+
cancel()
101103

102-
executions := make([]*shared.WorkflowExecution, 0, len(resp.Executions))
103-
for _, execution := range resp.Executions {
104-
if shouldReplay(params.GetSamplingRate()) {
105-
executions = append(executions, execution.Execution)
104+
return err
105+
},
106+
createDynamicServiceRetryPolicy(ctx),
107+
isServiceTransientError,
108+
); err != nil {
109+
logger.Error("Failed to scan workflow executions",
110+
zap.String(tagDomain, params.GetDomain()),
111+
zap.String(tagVisibilityQuery, params.GetWorkflowQuery()),
112+
zap.Error(err),
113+
)
114+
return shadower.ScanWorkflowActivityResult{}, err
106115
}
116+
117+
for _, execution := range resp.Executions {
118+
if shouldReplay(params.GetSamplingRate()) {
119+
result.Executions = append(result.Executions, execution.Execution)
120+
}
121+
}
122+
123+
request.NextPageToken = resp.NextPageToken
124+
if len(request.NextPageToken) == 0 ||
125+
len(result.Executions) >= minScanWorkflowResultSize ||
126+
(!completionTime.IsZero() && time.Now().After(completionTime)) {
127+
result.NextPageToken = request.NextPageToken
128+
break
129+
}
130+
131+
time.Sleep(scanWorkflowWaitPeriod)
107132
}
108133

109-
return shadower.ScanWorkflowActivityResult{
110-
Executions: executions,
111-
NextPageToken: resp.NextPageToken,
112-
}, nil
134+
return result, nil
113135
}
114136

115137
func shouldReplay(probability float64) bool {

internal/workflow_shadower_activities_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ package internal
2222

2323
import (
2424
"context"
25+
"math"
2526
"math/rand"
2627
"testing"
28+
"time"
2729

2830
"github.com/golang/mock/gomock"
2931
"github.com/stretchr/testify/require"
@@ -106,6 +108,57 @@ func (s *workflowShadowerActivitiesSuite) TestScanWorkflowActivity_Succeed() {
106108
s.True(len(result.Executions) < numExecutions)
107109
}
108110

111+
func (s *workflowShadowerActivitiesSuite) TestScanWorkflowActivity_MinResultSize() {
112+
numExecutionsPerScan := 3
113+
s.mockService.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any(), callOptions...).Return(&shared.ListWorkflowExecutionsResponse{
114+
Executions: newTestWorkflowExecutions(numExecutionsPerScan),
115+
NextPageToken: []byte{1, 2, 3},
116+
}, nil).Times(int(math.Ceil(float64(minScanWorkflowResultSize) / float64(numExecutionsPerScan))))
117+
118+
params := shadower.ScanWorkflowActivityParams{
119+
Domain: common.StringPtr(defaultTestDomain),
120+
WorkflowQuery: common.StringPtr("some random workflow visibility query"),
121+
SamplingRate: common.Float64Ptr(1),
122+
}
123+
124+
resultValue, err := s.env.ExecuteActivity(shadower.ScanWorkflowActivityName, params)
125+
s.NoError(err)
126+
127+
var result shadower.ScanWorkflowActivityResult
128+
s.NoError(resultValue.Get(&result))
129+
s.NotNil(result.NextPageToken)
130+
s.True(len(result.Executions) >= minScanWorkflowResultSize)
131+
}
132+
133+
func (s *workflowShadowerActivitiesSuite) TestScanWorkflowActivity_CompletionTime() {
134+
activityTimeoutSeconds := int32(1)
135+
s.mockService.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any(), callOptions...).Return(&shared.ListWorkflowExecutionsResponse{
136+
Executions: newTestWorkflowExecutions(1),
137+
NextPageToken: []byte{1, 2, 3},
138+
}, nil).MaxTimes(int(time.Duration(activityTimeoutSeconds) * time.Second / scanWorkflowWaitPeriod))
139+
140+
params := shadower.ScanWorkflowActivityParams{
141+
Domain: common.StringPtr(defaultTestDomain),
142+
WorkflowQuery: common.StringPtr("some random workflow visibility query"),
143+
SamplingRate: common.Float64Ptr(0.00000001),
144+
}
145+
146+
resultValue, err := s.env.impl.executeActivityWithOptions(
147+
activityOptions{
148+
ScheduleToCloseTimeoutSeconds: activityTimeoutSeconds,
149+
StartToCloseTimeoutSeconds: activityTimeoutSeconds,
150+
},
151+
shadower.ScanWorkflowActivityName,
152+
params,
153+
)
154+
s.NoError(err)
155+
156+
var result shadower.ScanWorkflowActivityResult
157+
s.NoError(resultValue.Get(&result))
158+
s.NotNil(result.NextPageToken)
159+
s.Empty(result.Executions)
160+
}
161+
109162
func (s *workflowShadowerActivitiesSuite) TestScanWorkflowActivity_InvalidQuery() {
110163
s.mockService.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any(), callOptions...).Return(nil, &shared.BadRequestError{
111164
Message: "invalid query",

0 commit comments

Comments
 (0)