Skip to content

Commit ab0bf9b

Browse files
authored
fix flakey unit test (#238)
* fix flakey unit test * define activityProvider
1 parent c194d87 commit ab0bf9b

File tree

3 files changed

+61
-50
lines changed

3 files changed

+61
-50
lines changed

internal_task_handlers.go

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,17 @@ type (
8585
hostEnv *hostEnvImpl
8686
}
8787

88+
activityProvider func(name string) activity
8889
// activityTaskHandlerImpl is the implementation of ActivityTaskHandler
8990
activityTaskHandlerImpl struct {
90-
taskListName string
91-
identity string
92-
service m.TChanWorkflowService
93-
metricsScope tally.Scope
94-
logger *zap.Logger
95-
userContext context.Context
96-
hostEnv *hostEnvImpl
91+
taskListName string
92+
identity string
93+
service m.TChanWorkflowService
94+
metricsScope tally.Scope
95+
logger *zap.Logger
96+
userContext context.Context
97+
hostEnv *hostEnvImpl
98+
activityProvider activityProvider
9799
}
98100

99101
// history wrapper method to help information about events.
@@ -809,15 +811,25 @@ func newActivityTaskHandler(
809811
service m.TChanWorkflowService,
810812
params workerExecutionParameters,
811813
env *hostEnvImpl,
814+
) ActivityTaskHandler {
815+
return newActivityTaskHandlerWithCustomProvider(service, params, env, nil)
816+
}
817+
818+
func newActivityTaskHandlerWithCustomProvider(
819+
service m.TChanWorkflowService,
820+
params workerExecutionParameters,
821+
env *hostEnvImpl,
822+
activityProvider activityProvider,
812823
) ActivityTaskHandler {
813824
return &activityTaskHandlerImpl{
814-
taskListName: params.TaskList,
815-
identity: params.Identity,
816-
service: service,
817-
logger: params.Logger,
818-
metricsScope: params.MetricsScope,
819-
userContext: params.UserContext,
820-
hostEnv: env,
825+
taskListName: params.TaskList,
826+
identity: params.Identity,
827+
service: service,
828+
logger: params.Logger,
829+
metricsScope: params.MetricsScope,
830+
userContext: params.UserContext,
831+
hostEnv: env,
832+
activityProvider: activityProvider,
821833
}
822834
}
823835

@@ -958,10 +970,10 @@ func (ath *activityTaskHandlerImpl) Execute(t *s.PollForActivityTaskResponse) (r
958970
defer invoker.Close()
959971
ctx := WithActivityTask(canCtx, t, invoker, ath.logger, ath.metricsScope)
960972
activityType := *t.GetActivityType()
961-
activityImplementation, ok := ath.hostEnv.getActivity(activityType.GetName())
962-
if !ok {
973+
activityImplementation := ath.getActivity(activityType.GetName())
974+
if activityImplementation == nil {
963975
// Couldn't find the activity implementation.
964-
return nil, fmt.Errorf("Unable to find activityType=%v", activityType.GetName())
976+
return nil, fmt.Errorf("unable to find activityType=%v", activityType.GetName())
965977
}
966978

967979
// panic handler
@@ -999,6 +1011,18 @@ func (ath *activityTaskHandlerImpl) Execute(t *s.PollForActivityTaskResponse) (r
9991011
return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, output, err), nil
10001012
}
10011013

1014+
func (ath *activityTaskHandlerImpl) getActivity(name string) activity {
1015+
if ath.activityProvider != nil {
1016+
return ath.activityProvider(name)
1017+
}
1018+
1019+
if a, ok := ath.hostEnv.getActivity(name); ok {
1020+
return a
1021+
}
1022+
1023+
return nil
1024+
}
1025+
10021026
func createNewDecision(decisionType s.DecisionType) *s.Decision {
10031027
return &s.Decision{
10041028
DecisionType: common.DecisionTypePtr(decisionType),

internal_workflow_testsuite.go

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -945,42 +945,29 @@ func (env *testWorkflowEnvironmentImpl) newTestActivityTaskHandler(taskList stri
945945
}
946946
ensureRequiredParams(&params)
947947

948-
for fnName, tasklistActivity := range env.taskListSpecificActivities {
949-
if _, ok := tasklistActivity.taskLists[taskList]; ok {
950-
ae := &activityExecutor{name: fnName, fn: tasklistActivity.fn}
951-
getHostEnvironment().addActivity(
952-
fnName,
953-
&activityExecutorWrapper{activityExecutor: ae, env: env},
954-
)
955-
}
956-
}
957-
958948
if len(getHostEnvironment().getRegisteredActivities()) == 0 {
959949
panic(fmt.Sprintf("no activity is registered for tasklist '%v'", taskList))
960950
}
961951

962-
for _, a := range getHostEnvironment().getRegisteredActivities() {
963-
fnName := a.ActivityType().Name
964-
if _, ok := env.taskListSpecificActivities[fnName]; ok {
965-
// activity is registered to a specific taskList, so ignore it from the global registered activities.
966-
continue
952+
getActivity := func(name string) activity {
953+
tlsa, ok := env.taskListSpecificActivities[name]
954+
if ok {
955+
_, ok := tlsa.taskLists[taskList]
956+
if !ok {
957+
// activity are bind to specific task list but not to current task list
958+
return nil
959+
}
967960
}
968-
var ae *activityExecutor
969-
switch v := a.(type) {
970-
case *activityExecutor:
971-
ae = v
972-
case *activityExecutorWrapper:
973-
ae = v.activityExecutor
974-
default:
975-
ae = &activityExecutor{name: v.ActivityType().Name, fn: v.GetFunction()}
961+
962+
activity, ok := getHostEnvironment().getActivity(name)
963+
if !ok {
964+
return nil
976965
}
977-
getHostEnvironment().addActivity(
978-
fnName,
979-
&activityExecutorWrapper{activityExecutor: ae, env: env},
980-
)
966+
ae := &activityExecutor{name: activity.ActivityType().Name, fn: activity.GetFunction()}
967+
return &activityExecutorWrapper{activityExecutor: ae, env: env}
981968
}
982969

983-
taskHandler := newActivityTaskHandler(env.service, params, getHostEnvironment())
970+
taskHandler := newActivityTaskHandlerWithCustomProvider(env.service, params, getHostEnvironment(), getActivity)
984971
return taskHandler
985972
}
986973

internal_workflow_testsuite_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,11 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityMockValues() {
9292
}
9393

9494
func (s *WorkflowTestSuiteUnitTest) Test_OnActivityStartedListener() {
95+
runCount := 100
9596
workflowFn := func(ctx Context) error {
9697
ctx = WithActivityOptions(ctx, s.activityOptions)
9798

98-
for i := 1; i <= 3; i++ {
99+
for i := 1; i <= runCount; i++ {
99100
err := ExecuteActivity(ctx, testActivityHello, fmt.Sprintf("msg%d", i)).Get(ctx, nil)
100101
if err != nil {
101102
return err
@@ -113,10 +114,9 @@ func (s *WorkflowTestSuiteUnitTest) Test_OnActivityStartedListener() {
113114
s.NoError(args.Get(&input))
114115
activityCalls = append(activityCalls, fmt.Sprintf("%s:%s", activityInfo.ActivityType.Name, input))
115116
})
116-
expectedCalls := []string{
117-
"testActivityHello:msg1",
118-
"testActivityHello:msg2",
119-
"testActivityHello:msg3",
117+
expectedCalls := []string{}
118+
for i := 1; i <= runCount; i++ {
119+
expectedCalls = append(expectedCalls, fmt.Sprintf("testActivityHello:msg%v", i))
120120
}
121121

122122
env.ExecuteWorkflow(workflowFn)

0 commit comments

Comments
 (0)