diff --git a/Makefile b/Makefile index 57a9cd362..689d7e29f 100644 --- a/Makefile +++ b/Makefile @@ -217,7 +217,7 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc # mockery is quite noisy so it's worth being kinda precise with the files. # this needs to be both the files defining the generate command, AND the files that define the interfaces. -$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery +$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go internal/internal_public.go internal/internal_task_pollers.go $(BIN)/mockery $Q $(BIN_PATH) go generate ./... $Q touch $@ diff --git a/internal/internal_public.go b/internal/internal_public.go index 6e2d85fd4..822c5c8dd 100644 --- a/internal/internal_public.go +++ b/internal/internal_public.go @@ -33,6 +33,8 @@ import ( s "go.uber.org/cadence/.gen/go/shared" ) +//go:generate mockery --name WorkflowTaskHandler --inpackage --with-expecter --case snake --filename internal_workflow_task_handler_mock.go --boilerplate-file ../LICENSE + type ( decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error) @@ -71,7 +73,7 @@ type ( // WorkflowTaskHandler represents decision task handlers. WorkflowTaskHandler interface { - // Processes the workflow task + // ProcessWorkflowTask processes the workflow task // The response could be: // - RespondDecisionTaskCompletedRequest // - RespondDecisionTaskFailedRequest @@ -84,7 +86,7 @@ type ( // ActivityTaskHandler represents activity task handlers. ActivityTaskHandler interface { - // Executes the activity task + // Execute executes the activity task // The response is one of the types: // - RespondActivityTaskCompletedRequest // - RespondActivityTaskFailedRequest diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index c565a5833..3b8a2e6c0 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -30,6 +30,8 @@ import ( "sync" "time" + "go.uber.org/yarpc" + "go.uber.org/cadence/internal/common/debug" "github.com/opentracing/opentracing-go" @@ -45,6 +47,8 @@ import ( "go.uber.org/cadence/internal/common/serializer" ) +//go:generate mockery --name localDispatcher --inpackage --with-expecter --case snake --filename local_dispatcher_mock.go --boilerplate-file ../LICENSE + const ( pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta @@ -76,7 +80,7 @@ type ( identity string service workflowserviceclient.Interface taskHandler WorkflowTaskHandler - ldaTunnel *locallyDispatchedActivityTunnel + ldaTunnel localDispatcher metricsScope *metrics.TaggedScope logger *zap.Logger @@ -159,6 +163,11 @@ type ( stopCh <-chan struct{} metricsScope *metrics.TaggedScope } + + // LocalDispatcher is an interface to dispatch locally dispatched activities. + localDispatcher interface { + SendTask(task *locallyDispatchedActivityTask) bool + } ) func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel { @@ -214,7 +223,7 @@ func (ldat *locallyDispatchedActivityTunnel) getTask() *locallyDispatchedActivit } } -func (ldat *locallyDispatchedActivityTunnel) sendTask(task *locallyDispatchedActivityTask) bool { +func (ldat *locallyDispatchedActivityTunnel) SendTask(task *locallyDispatchedActivityTask) bool { select { case ldat.taskCh <- task: return true @@ -365,7 +374,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { if completedRequest == nil && err == nil { return nil } - if _, ok := err.(decisionHeartbeatError); ok { + if errors.As(err, new(*decisionHeartbeatError)) { return err } response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime) @@ -398,7 +407,6 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa } func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) { - metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName()) if taskErr != nil { metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1) @@ -416,7 +424,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(startTime)) responseStartTime := time.Now() - if response, err = wtp.RespondTaskCompleted(completedRequest, task); err != nil { + if response, err = wtp.respondTaskCompleted(completedRequest, task); err != nil { metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1) return } @@ -425,103 +433,116 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest return } -func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) { +func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) { ctx := context.Background() // Respond task completion. err = backoff.Retry(ctx, func() error { - tchCtx, cancel, opt := newChannelContext(ctx, wtp.featureFlags) - defer cancel() - var err1 error - switch request := completedRequest.(type) { - case *s.RespondDecisionTaskFailedRequest: - // Only fail decision on first attempt, subsequent failure on the same decision task will timeout. - // This is to avoid spin on the failed decision task. Checking Attempt not nil for older server. - if task.Attempt != nil && task.GetAttempt() == 0 { - err1 = wtp.service.RespondDecisionTaskFailed(tchCtx, request, opt...) - if err1 != nil { - traceLog(func() { - wtp.logger.Debug("RespondDecisionTaskFailed failed.", zap.Error(err1)) - }) - } + response, err = wtp.respondTaskCompletedAttempt(completedRequest, task) + return err + }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) + + return response, err +} + +func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) { + ctx, cancel, opts := newChannelContext(context.Background(), wtp.featureFlags) + defer cancel() + var ( + err error + response *s.RespondDecisionTaskCompletedResponse + operation string + ) + switch request := completedRequest.(type) { + case *s.RespondDecisionTaskFailedRequest: + err = wtp.handleDecisionFailedRequest(ctx, task, request, opts...) + operation = "RespondDecisionTaskFailed" + case *s.RespondDecisionTaskCompletedRequest: + response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request, opts...) + operation = "RespondDecisionTaskCompleted" + case *s.RespondQueryTaskCompletedRequest: + err = wtp.service.RespondQueryTaskCompleted(ctx, request, opts...) + operation = "RespondQueryTaskCompleted" + default: + // should not happen + panic("unknown request type from ProcessWorkflowTask()") + } + + traceLog(func() { + if err != nil { + wtp.logger.Debug(fmt.Sprintf("%s failed.", operation), zap.Error(err)) + } + }) + + return response, err +} + +func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest, opts ...yarpc.CallOption) error { + // Only fail decision on first attempt, subsequent failure on the same decision task will timeout. + // This is to avoid spin on the failed decision task. Checking Attempt not nil for older server. + if task.Attempt != nil && task.GetAttempt() == 0 { + return wtp.service.RespondDecisionTaskFailed(ctx, request, opts...) + } + return nil +} + +func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (response *s.RespondDecisionTaskCompletedResponse, err error) { + if request.StickyAttributes == nil && !wtp.disableStickyExecution { + request.StickyAttributes = &s.StickyExecutionAttributes{ + WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))}, + ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())), + } + } else { + request.ReturnNewDecisionTask = common.BoolPtr(false) + } + + if wtp.ldaTunnel != nil { + var activityTasks []*locallyDispatchedActivityTask + for _, decision := range request.Decisions { + attr := decision.ScheduleActivityTaskDecisionAttributes + if attr != nil && wtp.taskListName == attr.TaskList.GetName() { + // assume the activity type is in registry otherwise the activity would be failed and retried from server + activityTask := &locallyDispatchedActivityTask{ + readyCh: make(chan bool, 1), + ActivityId: attr.ActivityId, + ActivityType: attr.ActivityType, + Input: attr.Input, + Header: attr.Header, + WorkflowDomain: common.StringPtr(wtp.domain), + ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds, + StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds, + HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds, + WorkflowExecution: task.WorkflowExecution, + WorkflowType: task.WorkflowType, } - case *s.RespondDecisionTaskCompletedRequest: - if request.StickyAttributes == nil && !wtp.disableStickyExecution { - request.StickyAttributes = &s.StickyExecutionAttributes{ - WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))}, - ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())), - } + if wtp.ldaTunnel.SendTask(activityTask) { + wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1) + decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true) + activityTasks = append(activityTasks, activityTask) } else { - request.ReturnNewDecisionTask = common.BoolPtr(false) - } - var activityTasks []*locallyDispatchedActivityTask - if wtp.ldaTunnel != nil { - for _, decision := range request.Decisions { - attr := decision.ScheduleActivityTaskDecisionAttributes - if attr != nil && wtp.taskListName == attr.TaskList.GetName() { - // assume the activity type is in registry otherwise the activity would be failed and retried from server - activityTask := &locallyDispatchedActivityTask{ - readyCh: make(chan bool, 1), - ActivityId: attr.ActivityId, - ActivityType: attr.ActivityType, - Input: attr.Input, - Header: attr.Header, - WorkflowDomain: common.StringPtr(wtp.domain), - ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds, - StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds, - HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds, - WorkflowExecution: task.WorkflowExecution, - WorkflowType: task.WorkflowType, - } - if wtp.ldaTunnel.sendTask(activityTask) { - wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1) - decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true) - activityTasks = append(activityTasks, activityTask) - } else { - // all pollers are busy - no room to optimize - wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1) - } - } - } + // all pollers are busy - no room to optimize + wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1) } - defer func() { - for _, at := range activityTasks { - started := false - if response != nil && err1 == nil { - if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok { - at.ScheduledTimestamp = adl.ScheduledTimestamp - at.StartedTimestamp = adl.StartedTimestamp - at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt - at.TaskToken = adl.TaskToken - started = true - } - } - at.readyCh <- started + } + } + defer func() { + for _, at := range activityTasks { + started := false + if response != nil && err == nil { + if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok { + at.ScheduledTimestamp = adl.ScheduledTimestamp + at.StartedTimestamp = adl.StartedTimestamp + at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt + at.TaskToken = adl.TaskToken + started = true } - }() - response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...) - if err1 != nil { - traceLog(func() { - wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1)) - }) - } - - case *s.RespondQueryTaskCompletedRequest: - err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request, opt...) - if err1 != nil { - traceLog(func() { - wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1)) - }) } - default: - // should not happen - panic("unknown request type from ProcessWorkflowTask()") + at.readyCh <- started } + }() + } - return err1 - }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) - - return + return wtp.service.RespondDecisionTaskCompleted(ctx, request, opts...) } func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller { diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index b39084ca7..c81fa7db2 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -26,9 +26,23 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" "go.uber.org/zap/zaptest" + + "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" + s "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/internal/common" + "go.uber.org/cadence/internal/common/metrics" +) + +const ( + _testDomainName = "test-domain" + _testTaskList = "test-tasklist" + _testIdentity = "test-worker" ) func TestLocalActivityPanic(t *testing.T) { @@ -54,3 +68,132 @@ func TestLocalActivityPanic(t *testing.T) { assert.Contains(t, perr.StackTrace(), "panic") assert.Contains(t, perr.StackTrace(), t.Name(), "should mention the source location of the local activity that panicked") } + +func TestRespondTaskCompleted_failed(t *testing.T) { + t.Run("fail sends RespondDecisionTaskFailedRequest", func(t *testing.T) { + testTaskToken := []byte("test-task-token") + + poller, client, _, _ := buildWorkflowTaskPoller(t) + client.EXPECT().RespondDecisionTaskFailed(gomock.Any(), &s.RespondDecisionTaskFailedRequest{ + TaskToken: testTaskToken, + Cause: s.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(), + Details: []byte(assert.AnError.Error()), + Identity: common.StringPtr(_testIdentity), + BinaryChecksum: common.StringPtr(getBinaryChecksum()), + }, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + + res, err := poller.RespondTaskCompletedWithMetrics(nil, assert.AnError, &s.PollForDecisionTaskResponse{ + TaskToken: testTaskToken, + Attempt: common.Int64Ptr(0), + }, time.Now()) + assert.NoError(t, err) + assert.Nil(t, res) + }) + t.Run("fail fails to send RespondDecisionTaskFailedRequest", func(t *testing.T) { + testTaskToken := []byte("test-task-token") + + poller, client, _, _ := buildWorkflowTaskPoller(t) + client.EXPECT().RespondDecisionTaskFailed(gomock.Any(), &s.RespondDecisionTaskFailedRequest{ + TaskToken: testTaskToken, + Cause: s.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(), + Details: []byte(assert.AnError.Error()), + Identity: common.StringPtr(_testIdentity), + BinaryChecksum: common.StringPtr(getBinaryChecksum()), + }, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(assert.AnError) + + // We cannot test RespondTaskCompleted since it uses backoff and has a hardcoded retry mechanism for 60 seconds. + _, err := poller.respondTaskCompletedAttempt(errorToFailDecisionTask(testTaskToken, assert.AnError, _testIdentity), &s.PollForDecisionTaskResponse{ + TaskToken: testTaskToken, + Attempt: common.Int64Ptr(0), + }) + assert.ErrorIs(t, err, assert.AnError) + }) + t.Run("fail skips sending for not the first attempt", func(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + + res, err := poller.RespondTaskCompletedWithMetrics(nil, assert.AnError, &s.PollForDecisionTaskResponse{ + Attempt: common.Int64Ptr(1), + }, time.Now()) + assert.NoError(t, err) + assert.Nil(t, res) + }) +} + +func TestRespondTaskCompleted_Unsupported(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + + assert.PanicsWithValue(t, "unknown request type from ProcessWorkflowTask()", func() { + _, _ = poller.RespondTaskCompletedWithMetrics(assert.AnError, nil, &s.PollForDecisionTaskResponse{}, time.Now()) + }) +} + +func TestProcessTask_failures(t *testing.T) { + t.Run("shutdown", func(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + ch := make(chan struct{}) + poller.shutdownC = ch + close(ch) + + err := poller.ProcessTask(&workflowTask{}) + assert.ErrorIs(t, err, errShutdown) + }) + t.Run("unsupported task type", func(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + assert.PanicsWithValue(t, "unknown task type.", func() { + _ = poller.ProcessTask(10) + }) + }) + t.Run("nil task", func(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + + err := poller.ProcessTask(&workflowTask{}) + assert.NoError(t, err) + }) + t.Run("heartbeat error", func(t *testing.T) { + poller, _, mockedTaskHandler, _ := buildWorkflowTaskPoller(t) + hearbeatErr := &decisionHeartbeatError{} + mockedTaskHandler.EXPECT().ProcessWorkflowTask(mock.Anything, mock.Anything).Return(nil, hearbeatErr) + err := poller.ProcessTask(&workflowTask{ + task: &s.PollForDecisionTaskResponse{}, + }) + assert.ErrorIs(t, err, hearbeatErr) + }) + t.Run("ResetStickyTaskList fail", func(t *testing.T) { + poller, client, _, _ := buildWorkflowTaskPoller(t) + client.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, assert.AnError) + err := poller.ProcessTask(&resetStickinessTask{ + task: &s.ResetStickyTaskListRequest{ + Execution: &s.WorkflowExecution{ + WorkflowId: common.StringPtr("test-workflow-id"), + RunId: common.StringPtr("test-run-id"), + }, + }, + }) + assert.ErrorIs(t, err, assert.AnError) + }) +} + +func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient, *MockWorkflowTaskHandler, *mockLocalDispatcher) { + ctrl := gomock.NewController(t) + mockService := workflowservicetest.NewMockClient(ctrl) + taskHandler := &MockWorkflowTaskHandler{} + lda := &mockLocalDispatcher{} + + return &workflowTaskPoller{ + basePoller: basePoller{ + shutdownC: make(<-chan struct{}), + }, + domain: _testDomainName, + taskListName: _testTaskList, + identity: _testIdentity, + service: mockService, + taskHandler: taskHandler, + ldaTunnel: lda, + metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)}, + logger: zaptest.NewLogger(t), + stickyUUID: "", + disableStickyExecution: false, + StickyScheduleToStartTimeout: time.Millisecond, + featureFlags: FeatureFlags{}, + }, mockService, taskHandler, lda +} diff --git a/internal/internal_workflow_task_handler_mock.go b/internal/internal_workflow_task_handler_mock.go new file mode 100644 index 000000000..db64a8dc7 --- /dev/null +++ b/internal/internal_workflow_task_handler_mock.go @@ -0,0 +1,100 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package internal + +import mock "github.com/stretchr/testify/mock" + +// MockWorkflowTaskHandler is an autogenerated mock type for the WorkflowTaskHandler type +type MockWorkflowTaskHandler struct { + mock.Mock +} + +type MockWorkflowTaskHandler_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWorkflowTaskHandler) EXPECT() *MockWorkflowTaskHandler_Expecter { + return &MockWorkflowTaskHandler_Expecter{mock: &_m.Mock} +} + +// ProcessWorkflowTask provides a mock function with given fields: task, f +func (_m *MockWorkflowTaskHandler) ProcessWorkflowTask(task *workflowTask, f decisionHeartbeatFunc) (interface{}, error) { + ret := _m.Called(task, f) + + var r0 interface{} + if rf, ok := ret.Get(0).(func(*workflowTask, decisionHeartbeatFunc) interface{}); ok { + r0 = rf(task, f) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interface{}) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(*workflowTask, decisionHeartbeatFunc) error); ok { + r1 = rf(task, f) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWorkflowTaskHandler_ProcessWorkflowTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessWorkflowTask' +type MockWorkflowTaskHandler_ProcessWorkflowTask_Call struct { + *mock.Call +} + +// ProcessWorkflowTask is a helper method to define mock.On call +// - task *workflowTask +// - f decisionHeartbeatFunc +func (_e *MockWorkflowTaskHandler_Expecter) ProcessWorkflowTask(task interface{}, f interface{}) *MockWorkflowTaskHandler_ProcessWorkflowTask_Call { + return &MockWorkflowTaskHandler_ProcessWorkflowTask_Call{Call: _e.mock.On("ProcessWorkflowTask", task, f)} +} + +func (_c *MockWorkflowTaskHandler_ProcessWorkflowTask_Call) Run(run func(task *workflowTask, f decisionHeartbeatFunc)) *MockWorkflowTaskHandler_ProcessWorkflowTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*workflowTask), args[1].(decisionHeartbeatFunc)) + }) + return _c +} + +func (_c *MockWorkflowTaskHandler_ProcessWorkflowTask_Call) Return(response interface{}, err error) *MockWorkflowTaskHandler_ProcessWorkflowTask_Call { + _c.Call.Return(response, err) + return _c +} + +type mockConstructorTestingTNewMockWorkflowTaskHandler interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockWorkflowTaskHandler creates a new instance of MockWorkflowTaskHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockWorkflowTaskHandler(t mockConstructorTestingTNewMockWorkflowTaskHandler) *MockWorkflowTaskHandler { + mock := &MockWorkflowTaskHandler{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/local_dispatcher_mock.go b/internal/local_dispatcher_mock.go new file mode 100644 index 000000000..9ca65c883 --- /dev/null +++ b/internal/local_dispatcher_mock.go @@ -0,0 +1,90 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package internal + +import mock "github.com/stretchr/testify/mock" + +// mockLocalDispatcher is an autogenerated mock type for the localDispatcher type +type mockLocalDispatcher struct { + mock.Mock +} + +type mockLocalDispatcher_Expecter struct { + mock *mock.Mock +} + +func (_m *mockLocalDispatcher) EXPECT() *mockLocalDispatcher_Expecter { + return &mockLocalDispatcher_Expecter{mock: &_m.Mock} +} + +// SendTask provides a mock function with given fields: task +func (_m *mockLocalDispatcher) SendTask(task *locallyDispatchedActivityTask) bool { + ret := _m.Called(task) + + var r0 bool + if rf, ok := ret.Get(0).(func(*locallyDispatchedActivityTask) bool); ok { + r0 = rf(task) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// mockLocalDispatcher_SendTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendTask' +type mockLocalDispatcher_SendTask_Call struct { + *mock.Call +} + +// SendTask is a helper method to define mock.On call +// - task *locallyDispatchedActivityTask +func (_e *mockLocalDispatcher_Expecter) SendTask(task interface{}) *mockLocalDispatcher_SendTask_Call { + return &mockLocalDispatcher_SendTask_Call{Call: _e.mock.On("SendTask", task)} +} + +func (_c *mockLocalDispatcher_SendTask_Call) Run(run func(task *locallyDispatchedActivityTask)) *mockLocalDispatcher_SendTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*locallyDispatchedActivityTask)) + }) + return _c +} + +func (_c *mockLocalDispatcher_SendTask_Call) Return(_a0 bool) *mockLocalDispatcher_SendTask_Call { + _c.Call.Return(_a0) + return _c +} + +type mockConstructorTestingTnewMockLocalDispatcher interface { + mock.TestingT + Cleanup(func()) +} + +// newMockLocalDispatcher creates a new instance of mockLocalDispatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func newMockLocalDispatcher(t mockConstructorTestingTnewMockLocalDispatcher) *mockLocalDispatcher { + mock := &mockLocalDispatcher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}