From 105b0161aacb92d79f5c5822c7b919c032674dd8 Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Wed, 30 Oct 2024 18:08:57 +0100 Subject: [PATCH 01/11] [internal] Improve code coverage of internal_task_pollers.go --- Makefile | 1 - codecov.yml | 1 + internal/internal_public.go | 6 +- internal/internal_task_pollers.go | 205 +++++++++++++------------ internal/internal_task_pollers_test.go | 107 ++++++++++++- internal/mock_local_dispatcher.go | 90 +++++++++++ internal/mock_workflow_task_handler.go | 100 ++++++++++++ 7 files changed, 410 insertions(+), 100 deletions(-) create mode 100644 internal/mock_local_dispatcher.go create mode 100644 internal/mock_workflow_task_handler.go diff --git a/Makefile b/Makefile index 57a9cd362..61d69c264 100644 --- a/Makefile +++ b/Makefile @@ -219,7 +219,6 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc # this needs to be both the files defining the generate command, AND the files that define the interfaces. $(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery $Q $(BIN_PATH) go generate ./... - $Q touch $@ # ==================================== # other intermediates diff --git a/codecov.yml b/codecov.yml index df8694f03..041fbf497 100644 --- a/codecov.yml +++ b/codecov.yml @@ -31,6 +31,7 @@ codecov: ignore: - "**/*_generated.go" - "**/*_mock.go" + - "**/mock_*.go" - "**/testdata/**" - "**/*_test.go" - "**/*_testsuite.go" diff --git a/internal/internal_public.go b/internal/internal_public.go index 6e2d85fd4..e874b89c4 100644 --- a/internal/internal_public.go +++ b/internal/internal_public.go @@ -33,6 +33,8 @@ import ( s "go.uber.org/cadence/.gen/go/shared" ) +//go:generate mockery --srcpkg . --name WorkflowTaskHandler --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE + type ( decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error) @@ -71,7 +73,7 @@ type ( // WorkflowTaskHandler represents decision task handlers. WorkflowTaskHandler interface { - // Processes the workflow task + // ProcessWorkflowTask processes the workflow task // The response could be: // - RespondDecisionTaskCompletedRequest // - RespondDecisionTaskFailedRequest @@ -84,7 +86,7 @@ type ( // ActivityTaskHandler represents activity task handlers. ActivityTaskHandler interface { - // Executes the activity task + // Execute executes the activity task // The response is one of the types: // - RespondActivityTaskCompletedRequest // - RespondActivityTaskFailedRequest diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index c565a5833..382ed0623 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -45,6 +45,8 @@ import ( "go.uber.org/cadence/internal/common/serializer" ) +//go:generate mockery --srcpkg . --name LocalDispatcher --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE + const ( pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta @@ -76,7 +78,7 @@ type ( identity string service workflowserviceclient.Interface taskHandler WorkflowTaskHandler - ldaTunnel *locallyDispatchedActivityTunnel + ldaTunnel LocalDispatcher metricsScope *metrics.TaggedScope logger *zap.Logger @@ -159,6 +161,11 @@ type ( stopCh <-chan struct{} metricsScope *metrics.TaggedScope } + + // LocalDispatcher is an interface to dispatch locally dispatched activities. + LocalDispatcher interface { + SendTask(task *locallyDispatchedActivityTask) bool + } ) func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel { @@ -214,7 +221,7 @@ func (ldat *locallyDispatchedActivityTunnel) getTask() *locallyDispatchedActivit } } -func (ldat *locallyDispatchedActivityTunnel) sendTask(task *locallyDispatchedActivityTask) bool { +func (ldat *locallyDispatchedActivityTunnel) SendTask(task *locallyDispatchedActivityTask) bool { select { case ldat.taskCh <- task: return true @@ -349,7 +356,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { func(response interface{}, startTime time.Time) (*workflowTask, error) { wtp.logger.Debug("Force RespondDecisionTaskCompleted.", zap.Int64("TaskStartedEventID", task.task.GetStartedEventId())) wtp.metricsScope.Counter(metrics.DecisionTaskForceCompleted).Inc(1) - heartbeatResponse, err := wtp.RespondTaskCompletedWithMetrics(response, nil, task.task, startTime) + heartbeatResponse, err := wtp.RespondTaskCompleted(response, nil, task.task, startTime) if err != nil { return nil, err } @@ -365,10 +372,10 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { if completedRequest == nil && err == nil { return nil } - if _, ok := err.(decisionHeartbeatError); ok { + if errors.As(err, new(*decisionHeartbeatError)) { return err } - response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime) + response, err = wtp.RespondTaskCompleted(completedRequest, err, task.task, startTime) if err != nil { return err } @@ -397,8 +404,7 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa return nil } -func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) { - +func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) { metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName()) if taskErr != nil { metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1) @@ -416,7 +422,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(startTime)) responseStartTime := time.Now() - if response, err = wtp.RespondTaskCompleted(completedRequest, task); err != nil { + if response, err = wtp.respondTaskCompleted(completedRequest, task); err != nil { metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1) return } @@ -425,103 +431,114 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest return } -func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) { +func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) { ctx := context.Background() // Respond task completion. err = backoff.Retry(ctx, func() error { - tchCtx, cancel, opt := newChannelContext(ctx, wtp.featureFlags) - defer cancel() - var err1 error - switch request := completedRequest.(type) { - case *s.RespondDecisionTaskFailedRequest: - // Only fail decision on first attempt, subsequent failure on the same decision task will timeout. - // This is to avoid spin on the failed decision task. Checking Attempt not nil for older server. - if task.Attempt != nil && task.GetAttempt() == 0 { - err1 = wtp.service.RespondDecisionTaskFailed(tchCtx, request, opt...) - if err1 != nil { - traceLog(func() { - wtp.logger.Debug("RespondDecisionTaskFailed failed.", zap.Error(err1)) - }) - } + response, err = wtp.respondTaskCompletedAttempt(completedRequest, task) + return err + }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) + + return response, err +} + +func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) { + ctx, cancel, _ := newChannelContext(context.Background(), wtp.featureFlags) + defer cancel() + var ( + err error + response *s.RespondDecisionTaskCompletedResponse + operation string + ) + switch request := completedRequest.(type) { + case *s.RespondDecisionTaskFailedRequest: + err = wtp.handleDecisionFailedRequest(ctx, task, request) + operation = "RespondDecisionTaskFailed" + case *s.RespondDecisionTaskCompletedRequest: + response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request) + operation = "RespondDecisionTaskCompleted" + case *s.RespondQueryTaskCompletedRequest: + err = wtp.service.RespondQueryTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...) + operation = "RespondQueryTaskCompleted" + default: + // should not happen + panic("unknown request type from ProcessWorkflowTask()") + } + + traceLog(func() { + wtp.logger.Debug("Call failed.", zap.Error(err), zap.String("Operation", operation)) + }) + + return response, err +} + +func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest) error { + // Only fail decision on first attempt, subsequent failure on the same decision task will timeout. + // This is to avoid spin on the failed decision task. Checking Attempt not nil for older server. + if task.Attempt != nil && task.GetAttempt() == 0 { + return wtp.service.RespondDecisionTaskFailed(ctx, request, getYarpcCallOptions(wtp.featureFlags)...) + } + return nil +} + +func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest) (response *s.RespondDecisionTaskCompletedResponse, err error) { + if request.StickyAttributes == nil && !wtp.disableStickyExecution { + request.StickyAttributes = &s.StickyExecutionAttributes{ + WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))}, + ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())), + } + } else { + request.ReturnNewDecisionTask = common.BoolPtr(false) + } + + if wtp.ldaTunnel != nil { + var activityTasks []*locallyDispatchedActivityTask + for _, decision := range request.Decisions { + attr := decision.ScheduleActivityTaskDecisionAttributes + if attr != nil && wtp.taskListName == attr.TaskList.GetName() { + // assume the activity type is in registry otherwise the activity would be failed and retried from server + activityTask := &locallyDispatchedActivityTask{ + readyCh: make(chan bool, 1), + ActivityId: attr.ActivityId, + ActivityType: attr.ActivityType, + Input: attr.Input, + Header: attr.Header, + WorkflowDomain: common.StringPtr(wtp.domain), + ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds, + StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds, + HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds, + WorkflowExecution: task.WorkflowExecution, + WorkflowType: task.WorkflowType, } - case *s.RespondDecisionTaskCompletedRequest: - if request.StickyAttributes == nil && !wtp.disableStickyExecution { - request.StickyAttributes = &s.StickyExecutionAttributes{ - WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))}, - ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())), - } + if wtp.ldaTunnel.SendTask(activityTask) { + wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1) + decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true) + activityTasks = append(activityTasks, activityTask) } else { - request.ReturnNewDecisionTask = common.BoolPtr(false) + // all pollers are busy - no room to optimize + wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1) } - var activityTasks []*locallyDispatchedActivityTask - if wtp.ldaTunnel != nil { - for _, decision := range request.Decisions { - attr := decision.ScheduleActivityTaskDecisionAttributes - if attr != nil && wtp.taskListName == attr.TaskList.GetName() { - // assume the activity type is in registry otherwise the activity would be failed and retried from server - activityTask := &locallyDispatchedActivityTask{ - readyCh: make(chan bool, 1), - ActivityId: attr.ActivityId, - ActivityType: attr.ActivityType, - Input: attr.Input, - Header: attr.Header, - WorkflowDomain: common.StringPtr(wtp.domain), - ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds, - StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds, - HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds, - WorkflowExecution: task.WorkflowExecution, - WorkflowType: task.WorkflowType, - } - if wtp.ldaTunnel.sendTask(activityTask) { - wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1) - decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true) - activityTasks = append(activityTasks, activityTask) - } else { - // all pollers are busy - no room to optimize - wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1) - } - } - } - } - defer func() { - for _, at := range activityTasks { - started := false - if response != nil && err1 == nil { - if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok { - at.ScheduledTimestamp = adl.ScheduledTimestamp - at.StartedTimestamp = adl.StartedTimestamp - at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt - at.TaskToken = adl.TaskToken - started = true - } - } - at.readyCh <- started + } + } + defer func() { + for _, at := range activityTasks { + started := false + if response != nil && err == nil { + if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok { + at.ScheduledTimestamp = adl.ScheduledTimestamp + at.StartedTimestamp = adl.StartedTimestamp + at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt + at.TaskToken = adl.TaskToken + started = true } - }() - response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...) - if err1 != nil { - traceLog(func() { - wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1)) - }) } - - case *s.RespondQueryTaskCompletedRequest: - err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request, opt...) - if err1 != nil { - traceLog(func() { - wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1)) - }) - } - default: - // should not happen - panic("unknown request type from ProcessWorkflowTask()") + at.readyCh <- started } + }() + } - return err1 - }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) - - return + return wtp.service.RespondDecisionTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...) } func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller { diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index b39084ca7..5318ddfd2 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -23,14 +23,31 @@ package internal import ( "context" "errors" - "testing" - "time" - + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" + s "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/internal/common" + "go.uber.org/cadence/internal/common/metrics" "go.uber.org/zap/zaptest" + "testing" + "time" +) + +const ( + _testDomainName = "test-domain" + _testTaskList = "test-tasklist" + _testIdentity = "test-worker" ) +// Enable verbose logging for tests. +func TestMain(m *testing.M) { + EnableVerboseLogging(true) + m.Run() +} + func TestLocalActivityPanic(t *testing.T) { // regression: panics in local activities should not terminate the process s := WorkflowTestSuite{logger: zaptest.NewLogger(t)} @@ -54,3 +71,87 @@ func TestLocalActivityPanic(t *testing.T) { assert.Contains(t, perr.StackTrace(), "panic") assert.Contains(t, perr.StackTrace(), t.Name(), "should mention the source location of the local activity that panicked") } + +func TestRespondTaskCompleted_failed(t *testing.T) { + t.Run("fail sends RespondDecisionTaskFailedRequest", func(t *testing.T) { + testTaskToken := []byte("test-task-token") + + poller, client, _, _ := buildWorkflowTaskPoller(t) + client.EXPECT().RespondDecisionTaskFailed(gomock.Any(), &s.RespondDecisionTaskFailedRequest{ + TaskToken: testTaskToken, + Cause: s.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(), + Details: []byte(assert.AnError.Error()), + Identity: common.StringPtr(_testIdentity), + BinaryChecksum: common.StringPtr(getBinaryChecksum()), + }, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + + res, err := poller.RespondTaskCompleted(nil, assert.AnError, &s.PollForDecisionTaskResponse{ + TaskToken: testTaskToken, + Attempt: common.Int64Ptr(0), + }, time.Now()) + assert.NoError(t, err) + assert.Nil(t, res) + }) + t.Run("fail fails to send RespondDecisionTaskFailedRequest", func(t *testing.T) { + testTaskToken := []byte("test-task-token") + + poller, client, _, _ := buildWorkflowTaskPoller(t) + client.EXPECT().RespondDecisionTaskFailed(gomock.Any(), &s.RespondDecisionTaskFailedRequest{ + TaskToken: testTaskToken, + Cause: s.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(), + Details: []byte(assert.AnError.Error()), + Identity: common.StringPtr(_testIdentity), + BinaryChecksum: common.StringPtr(getBinaryChecksum()), + }, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(assert.AnError) + + // We cannot test RespondTaskCompleted since it uses backoff and has a hardcoded retry mechanism for 60 seconds. + _, err := poller.respondTaskCompletedAttempt(errorToFailDecisionTask(testTaskToken, assert.AnError, _testIdentity), &s.PollForDecisionTaskResponse{ + TaskToken: testTaskToken, + Attempt: common.Int64Ptr(0), + }) + assert.ErrorIs(t, err, assert.AnError) + }) + t.Run("fail skips sending for not the first attempt", func(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + + res, err := poller.RespondTaskCompleted(nil, assert.AnError, &s.PollForDecisionTaskResponse{ + Attempt: common.Int64Ptr(1), + }, time.Now()) + assert.NoError(t, err) + assert.Nil(t, res) + }) + +} + +func TestRespondTaskCompleted_Unsupported(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + + assert.Panics(t, func() { + _, _ = poller.RespondTaskCompleted(assert.AnError, nil, &s.PollForDecisionTaskResponse{}, time.Now()) + }) +} + +func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient, *MockWorkflowTaskHandler, *MockLocalDispatcher) { + ctrl := gomock.NewController(t) + mockService := workflowservicetest.NewMockClient(ctrl) + taskHandler := &MockWorkflowTaskHandler{} + lda := &MockLocalDispatcher{} + + return &workflowTaskPoller{ + basePoller: basePoller{ + shutdownC: make(<-chan struct{}), + }, + domain: _testDomainName, + taskListName: _testTaskList, + identity: _testIdentity, + service: mockService, + taskHandler: taskHandler, + ldaTunnel: lda, + metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)}, + logger: zaptest.NewLogger(t), + stickyUUID: "", + disableStickyExecution: false, + StickyScheduleToStartTimeout: time.Millisecond, + featureFlags: FeatureFlags{}, + }, mockService, taskHandler, lda +} diff --git a/internal/mock_local_dispatcher.go b/internal/mock_local_dispatcher.go new file mode 100644 index 000000000..fd72c5484 --- /dev/null +++ b/internal/mock_local_dispatcher.go @@ -0,0 +1,90 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package internal + +import mock "github.com/stretchr/testify/mock" + +// MockLocalDispatcher is an autogenerated mock type for the LocalDispatcher type +type MockLocalDispatcher struct { + mock.Mock +} + +type MockLocalDispatcher_Expecter struct { + mock *mock.Mock +} + +func (_m *MockLocalDispatcher) EXPECT() *MockLocalDispatcher_Expecter { + return &MockLocalDispatcher_Expecter{mock: &_m.Mock} +} + +// SendTask provides a mock function with given fields: task +func (_m *MockLocalDispatcher) SendTask(task *locallyDispatchedActivityTask) bool { + ret := _m.Called(task) + + var r0 bool + if rf, ok := ret.Get(0).(func(*locallyDispatchedActivityTask) bool); ok { + r0 = rf(task) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockLocalDispatcher_SendTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendTask' +type MockLocalDispatcher_SendTask_Call struct { + *mock.Call +} + +// SendTask is a helper method to define mock.On call +// - task *locallyDispatchedActivityTask +func (_e *MockLocalDispatcher_Expecter) SendTask(task interface{}) *MockLocalDispatcher_SendTask_Call { + return &MockLocalDispatcher_SendTask_Call{Call: _e.mock.On("SendTask", task)} +} + +func (_c *MockLocalDispatcher_SendTask_Call) Run(run func(task *locallyDispatchedActivityTask)) *MockLocalDispatcher_SendTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*locallyDispatchedActivityTask)) + }) + return _c +} + +func (_c *MockLocalDispatcher_SendTask_Call) Return(_a0 bool) *MockLocalDispatcher_SendTask_Call { + _c.Call.Return(_a0) + return _c +} + +type mockConstructorTestingTNewMockLocalDispatcher interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockLocalDispatcher creates a new instance of MockLocalDispatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockLocalDispatcher(t mockConstructorTestingTNewMockLocalDispatcher) *MockLocalDispatcher { + mock := &MockLocalDispatcher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mock_workflow_task_handler.go b/internal/mock_workflow_task_handler.go new file mode 100644 index 000000000..db64a8dc7 --- /dev/null +++ b/internal/mock_workflow_task_handler.go @@ -0,0 +1,100 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package internal + +import mock "github.com/stretchr/testify/mock" + +// MockWorkflowTaskHandler is an autogenerated mock type for the WorkflowTaskHandler type +type MockWorkflowTaskHandler struct { + mock.Mock +} + +type MockWorkflowTaskHandler_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWorkflowTaskHandler) EXPECT() *MockWorkflowTaskHandler_Expecter { + return &MockWorkflowTaskHandler_Expecter{mock: &_m.Mock} +} + +// ProcessWorkflowTask provides a mock function with given fields: task, f +func (_m *MockWorkflowTaskHandler) ProcessWorkflowTask(task *workflowTask, f decisionHeartbeatFunc) (interface{}, error) { + ret := _m.Called(task, f) + + var r0 interface{} + if rf, ok := ret.Get(0).(func(*workflowTask, decisionHeartbeatFunc) interface{}); ok { + r0 = rf(task, f) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interface{}) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(*workflowTask, decisionHeartbeatFunc) error); ok { + r1 = rf(task, f) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWorkflowTaskHandler_ProcessWorkflowTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessWorkflowTask' +type MockWorkflowTaskHandler_ProcessWorkflowTask_Call struct { + *mock.Call +} + +// ProcessWorkflowTask is a helper method to define mock.On call +// - task *workflowTask +// - f decisionHeartbeatFunc +func (_e *MockWorkflowTaskHandler_Expecter) ProcessWorkflowTask(task interface{}, f interface{}) *MockWorkflowTaskHandler_ProcessWorkflowTask_Call { + return &MockWorkflowTaskHandler_ProcessWorkflowTask_Call{Call: _e.mock.On("ProcessWorkflowTask", task, f)} +} + +func (_c *MockWorkflowTaskHandler_ProcessWorkflowTask_Call) Run(run func(task *workflowTask, f decisionHeartbeatFunc)) *MockWorkflowTaskHandler_ProcessWorkflowTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*workflowTask), args[1].(decisionHeartbeatFunc)) + }) + return _c +} + +func (_c *MockWorkflowTaskHandler_ProcessWorkflowTask_Call) Return(response interface{}, err error) *MockWorkflowTaskHandler_ProcessWorkflowTask_Call { + _c.Call.Return(response, err) + return _c +} + +type mockConstructorTestingTNewMockWorkflowTaskHandler interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockWorkflowTaskHandler creates a new instance of MockWorkflowTaskHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockWorkflowTaskHandler(t mockConstructorTestingTNewMockWorkflowTaskHandler) *MockWorkflowTaskHandler { + mock := &MockWorkflowTaskHandler{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} From 44a96b5aab7bdf47b3dfb8af1f09eb92dc14b17d Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Thu, 31 Oct 2024 21:47:00 +0100 Subject: [PATCH 02/11] rebase and add tests --- internal/internal_task_pollers_test.go | 54 ++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index 5318ddfd2..91d30efeb 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -25,6 +25,7 @@ import ( "errors" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/uber-go/tally" "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" @@ -42,12 +43,6 @@ const ( _testIdentity = "test-worker" ) -// Enable verbose logging for tests. -func TestMain(m *testing.M) { - EnableVerboseLogging(true) - m.Run() -} - func TestLocalActivityPanic(t *testing.T) { // regression: panics in local activities should not terminate the process s := WorkflowTestSuite{logger: zaptest.NewLogger(t)} @@ -120,7 +115,6 @@ func TestRespondTaskCompleted_failed(t *testing.T) { assert.NoError(t, err) assert.Nil(t, res) }) - } func TestRespondTaskCompleted_Unsupported(t *testing.T) { @@ -131,6 +125,52 @@ func TestRespondTaskCompleted_Unsupported(t *testing.T) { }) } +func TestProcessTask_failures(t *testing.T) { + t.Run("shutdown", func(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + ch := make(chan struct{}) + poller.shutdownC = ch + close(ch) + + err := poller.ProcessTask(&workflowTask{}) + assert.ErrorIs(t, err, errShutdown) + }) + t.Run("unsupported task type", func(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + assert.Panics(t, func() { + _ = poller.ProcessTask(10) + }) + }) + t.Run("nil task", func(t *testing.T) { + poller, _, _, _ := buildWorkflowTaskPoller(t) + + err := poller.ProcessTask(&workflowTask{}) + assert.NoError(t, err) + }) + t.Run("heartbeat error", func(t *testing.T) { + poller, _, mockedTaskHandler, _ := buildWorkflowTaskPoller(t) + hearbeatErr := &decisionHeartbeatError{} + mockedTaskHandler.EXPECT().ProcessWorkflowTask(mock.Anything, mock.Anything).Return(nil, hearbeatErr) + err := poller.ProcessTask(&workflowTask{ + task: &s.PollForDecisionTaskResponse{}, + }) + assert.ErrorIs(t, err, hearbeatErr) + }) + t.Run("ResetStickyTaskList fail", func(t *testing.T) { + poller, client, _, _ := buildWorkflowTaskPoller(t) + client.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, assert.AnError) + err := poller.ProcessTask(&resetStickinessTask{ + task: &s.ResetStickyTaskListRequest{ + Execution: &s.WorkflowExecution{ + WorkflowId: common.StringPtr("test-workflow-id"), + RunId: common.StringPtr("test-run-id"), + }, + }, + }) + assert.ErrorIs(t, err, assert.AnError) + }) +} + func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient, *MockWorkflowTaskHandler, *MockLocalDispatcher) { ctrl := gomock.NewController(t) mockService := workflowservicetest.NewMockClient(ctrl) From 62dc6c352ec2e4206c8eb23750628648ad09921d Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Thu, 31 Oct 2024 21:53:11 +0100 Subject: [PATCH 03/11] fmt --- internal/internal_task_pollers_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index 91d30efeb..3d01017d6 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -23,18 +23,20 @@ package internal import ( "context" "errors" + "testing" + "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "go.uber.org/zap/zaptest" + "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" s "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/internal/common" "go.uber.org/cadence/internal/common/metrics" - "go.uber.org/zap/zaptest" - "testing" - "time" ) const ( From d2257cb2005db0bf06a6afce9140c60bf1b9234e Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Fri, 1 Nov 2024 00:13:42 +0100 Subject: [PATCH 04/11] Update internal/internal_public.go Co-authored-by: Steven L --- internal/internal_public.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/internal_public.go b/internal/internal_public.go index e874b89c4..d51b65e7d 100644 --- a/internal/internal_public.go +++ b/internal/internal_public.go @@ -33,7 +33,7 @@ import ( s "go.uber.org/cadence/.gen/go/shared" ) -//go:generate mockery --srcpkg . --name WorkflowTaskHandler --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE +//go:generate mockery --name WorkflowTaskHandler --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE type ( decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error) From 5f793cd8892b3af2e1ec742d50c11d9950428703 Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Fri, 1 Nov 2024 00:13:54 +0100 Subject: [PATCH 05/11] Update internal/internal_task_pollers.go Co-authored-by: Steven L --- internal/internal_task_pollers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 382ed0623..4b725d316 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -45,7 +45,7 @@ import ( "go.uber.org/cadence/internal/common/serializer" ) -//go:generate mockery --srcpkg . --name LocalDispatcher --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE +//go:generate mockery --name LocalDispatcher --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE const ( pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta From 436c928d6e46282ebf1508a58ae7ea097766b093 Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Fri, 1 Nov 2024 00:16:40 +0100 Subject: [PATCH 06/11] make localDispatcher package local --- internal/internal_task_pollers.go | 6 ++--- internal/internal_task_pollers_test.go | 2 +- internal/mock_local_dispatcher.go | 32 +++++++++++++------------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 4b725d316..8b6aa80ab 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -45,7 +45,7 @@ import ( "go.uber.org/cadence/internal/common/serializer" ) -//go:generate mockery --name LocalDispatcher --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE +//go:generate mockery --name localDispatcher --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE const ( pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta @@ -78,7 +78,7 @@ type ( identity string service workflowserviceclient.Interface taskHandler WorkflowTaskHandler - ldaTunnel LocalDispatcher + ldaTunnel localDispatcher metricsScope *metrics.TaggedScope logger *zap.Logger @@ -163,7 +163,7 @@ type ( } // LocalDispatcher is an interface to dispatch locally dispatched activities. - LocalDispatcher interface { + localDispatcher interface { SendTask(task *locallyDispatchedActivityTask) bool } ) diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index 3d01017d6..e47b0e1d5 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -177,7 +177,7 @@ func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservic ctrl := gomock.NewController(t) mockService := workflowservicetest.NewMockClient(ctrl) taskHandler := &MockWorkflowTaskHandler{} - lda := &MockLocalDispatcher{} + lda := &mockLocalDispatcher{} return &workflowTaskPoller{ basePoller: basePoller{ diff --git a/internal/mock_local_dispatcher.go b/internal/mock_local_dispatcher.go index fd72c5484..9ca65c883 100644 --- a/internal/mock_local_dispatcher.go +++ b/internal/mock_local_dispatcher.go @@ -24,21 +24,21 @@ package internal import mock "github.com/stretchr/testify/mock" -// MockLocalDispatcher is an autogenerated mock type for the LocalDispatcher type -type MockLocalDispatcher struct { +// mockLocalDispatcher is an autogenerated mock type for the localDispatcher type +type mockLocalDispatcher struct { mock.Mock } -type MockLocalDispatcher_Expecter struct { +type mockLocalDispatcher_Expecter struct { mock *mock.Mock } -func (_m *MockLocalDispatcher) EXPECT() *MockLocalDispatcher_Expecter { - return &MockLocalDispatcher_Expecter{mock: &_m.Mock} +func (_m *mockLocalDispatcher) EXPECT() *mockLocalDispatcher_Expecter { + return &mockLocalDispatcher_Expecter{mock: &_m.Mock} } // SendTask provides a mock function with given fields: task -func (_m *MockLocalDispatcher) SendTask(task *locallyDispatchedActivityTask) bool { +func (_m *mockLocalDispatcher) SendTask(task *locallyDispatchedActivityTask) bool { ret := _m.Called(task) var r0 bool @@ -51,37 +51,37 @@ func (_m *MockLocalDispatcher) SendTask(task *locallyDispatchedActivityTask) boo return r0 } -// MockLocalDispatcher_SendTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendTask' -type MockLocalDispatcher_SendTask_Call struct { +// mockLocalDispatcher_SendTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendTask' +type mockLocalDispatcher_SendTask_Call struct { *mock.Call } // SendTask is a helper method to define mock.On call // - task *locallyDispatchedActivityTask -func (_e *MockLocalDispatcher_Expecter) SendTask(task interface{}) *MockLocalDispatcher_SendTask_Call { - return &MockLocalDispatcher_SendTask_Call{Call: _e.mock.On("SendTask", task)} +func (_e *mockLocalDispatcher_Expecter) SendTask(task interface{}) *mockLocalDispatcher_SendTask_Call { + return &mockLocalDispatcher_SendTask_Call{Call: _e.mock.On("SendTask", task)} } -func (_c *MockLocalDispatcher_SendTask_Call) Run(run func(task *locallyDispatchedActivityTask)) *MockLocalDispatcher_SendTask_Call { +func (_c *mockLocalDispatcher_SendTask_Call) Run(run func(task *locallyDispatchedActivityTask)) *mockLocalDispatcher_SendTask_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(*locallyDispatchedActivityTask)) }) return _c } -func (_c *MockLocalDispatcher_SendTask_Call) Return(_a0 bool) *MockLocalDispatcher_SendTask_Call { +func (_c *mockLocalDispatcher_SendTask_Call) Return(_a0 bool) *mockLocalDispatcher_SendTask_Call { _c.Call.Return(_a0) return _c } -type mockConstructorTestingTNewMockLocalDispatcher interface { +type mockConstructorTestingTnewMockLocalDispatcher interface { mock.TestingT Cleanup(func()) } -// NewMockLocalDispatcher creates a new instance of MockLocalDispatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockLocalDispatcher(t mockConstructorTestingTNewMockLocalDispatcher) *MockLocalDispatcher { - mock := &MockLocalDispatcher{} +// newMockLocalDispatcher creates a new instance of mockLocalDispatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func newMockLocalDispatcher(t mockConstructorTestingTnewMockLocalDispatcher) *mockLocalDispatcher { + mock := &mockLocalDispatcher{} mock.Mock.Test(t) t.Cleanup(func() { mock.AssertExpectations(t) }) From 7106a857e4d119d9846a5b420d8721547d98f502 Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Fri, 1 Nov 2024 00:32:33 +0100 Subject: [PATCH 07/11] fix mock usage --- internal/internal_task_pollers_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index e47b0e1d5..97263b094 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -173,7 +173,7 @@ func TestProcessTask_failures(t *testing.T) { }) } -func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient, *MockWorkflowTaskHandler, *MockLocalDispatcher) { +func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservicetest.MockClient, *MockWorkflowTaskHandler, *mockLocalDispatcher) { ctrl := gomock.NewController(t) mockService := workflowservicetest.NewMockClient(ctrl) taskHandler := &MockWorkflowTaskHandler{} From d80d5d5cfab46d1b26aafb82504b71628773bddf Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Fri, 1 Nov 2024 00:39:00 +0100 Subject: [PATCH 08/11] use _mock suffix for mocks --- codecov.yml | 1 - internal/internal_public.go | 2 +- internal/internal_task_pollers.go | 2 +- ...w_task_handler.go => internal_workflow_task_handler_mock.go} | 0 internal/{mock_local_dispatcher.go => local_dispatcher_mock.go} | 0 5 files changed, 2 insertions(+), 3 deletions(-) rename internal/{mock_workflow_task_handler.go => internal_workflow_task_handler_mock.go} (100%) rename internal/{mock_local_dispatcher.go => local_dispatcher_mock.go} (100%) diff --git a/codecov.yml b/codecov.yml index 041fbf497..df8694f03 100644 --- a/codecov.yml +++ b/codecov.yml @@ -31,7 +31,6 @@ codecov: ignore: - "**/*_generated.go" - "**/*_mock.go" - - "**/mock_*.go" - "**/testdata/**" - "**/*_test.go" - "**/*_testsuite.go" diff --git a/internal/internal_public.go b/internal/internal_public.go index d51b65e7d..822c5c8dd 100644 --- a/internal/internal_public.go +++ b/internal/internal_public.go @@ -33,7 +33,7 @@ import ( s "go.uber.org/cadence/.gen/go/shared" ) -//go:generate mockery --name WorkflowTaskHandler --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE +//go:generate mockery --name WorkflowTaskHandler --inpackage --with-expecter --case snake --filename internal_workflow_task_handler_mock.go --boilerplate-file ../LICENSE type ( decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 8b6aa80ab..49abaede5 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -45,7 +45,7 @@ import ( "go.uber.org/cadence/internal/common/serializer" ) -//go:generate mockery --name localDispatcher --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE +//go:generate mockery --name localDispatcher --inpackage --with-expecter --case snake --filename local_dispatcher_mock.go --boilerplate-file ../LICENSE const ( pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta diff --git a/internal/mock_workflow_task_handler.go b/internal/internal_workflow_task_handler_mock.go similarity index 100% rename from internal/mock_workflow_task_handler.go rename to internal/internal_workflow_task_handler_mock.go diff --git a/internal/mock_local_dispatcher.go b/internal/local_dispatcher_mock.go similarity index 100% rename from internal/mock_local_dispatcher.go rename to internal/local_dispatcher_mock.go From 6b04477942c1c9d6934bb02a97a7c3e4a57c138e Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Fri, 1 Nov 2024 07:53:02 +0100 Subject: [PATCH 09/11] addressing review comments --- internal/internal_task_pollers.go | 27 ++++++++++++++------------ internal/internal_task_pollers_test.go | 10 +++++----- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 49abaede5..d075b360f 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -27,6 +27,7 @@ import ( "context" "errors" "fmt" + "go.uber.org/yarpc" "sync" "time" @@ -356,7 +357,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { func(response interface{}, startTime time.Time) (*workflowTask, error) { wtp.logger.Debug("Force RespondDecisionTaskCompleted.", zap.Int64("TaskStartedEventID", task.task.GetStartedEventId())) wtp.metricsScope.Counter(metrics.DecisionTaskForceCompleted).Inc(1) - heartbeatResponse, err := wtp.RespondTaskCompleted(response, nil, task.task, startTime) + heartbeatResponse, err := wtp.RespondTaskCompletedWithMetrics(response, nil, task.task, startTime) if err != nil { return nil, err } @@ -375,7 +376,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { if errors.As(err, new(*decisionHeartbeatError)) { return err } - response, err = wtp.RespondTaskCompleted(completedRequest, err, task.task, startTime) + response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime) if err != nil { return err } @@ -404,7 +405,7 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa return nil } -func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) { +func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) { metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName()) if taskErr != nil { metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1) @@ -444,7 +445,7 @@ func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{} } func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) { - ctx, cancel, _ := newChannelContext(context.Background(), wtp.featureFlags) + ctx, cancel, opts := newChannelContext(context.Background(), wtp.featureFlags) defer cancel() var ( err error @@ -453,13 +454,13 @@ func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest inte ) switch request := completedRequest.(type) { case *s.RespondDecisionTaskFailedRequest: - err = wtp.handleDecisionFailedRequest(ctx, task, request) + err = wtp.handleDecisionFailedRequest(ctx, task, request, opts...) operation = "RespondDecisionTaskFailed" case *s.RespondDecisionTaskCompletedRequest: - response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request) + response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request, opts...) operation = "RespondDecisionTaskCompleted" case *s.RespondQueryTaskCompletedRequest: - err = wtp.service.RespondQueryTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...) + err = wtp.service.RespondQueryTaskCompleted(ctx, request, opts...) operation = "RespondQueryTaskCompleted" default: // should not happen @@ -467,22 +468,24 @@ func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest inte } traceLog(func() { - wtp.logger.Debug("Call failed.", zap.Error(err), zap.String("Operation", operation)) + if err != nil { + wtp.logger.Debug(fmt.Sprintf("%s failed.", operation), zap.Error(err)) + } }) return response, err } -func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest) error { +func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest, opts ...yarpc.CallOption) error { // Only fail decision on first attempt, subsequent failure on the same decision task will timeout. // This is to avoid spin on the failed decision task. Checking Attempt not nil for older server. if task.Attempt != nil && task.GetAttempt() == 0 { - return wtp.service.RespondDecisionTaskFailed(ctx, request, getYarpcCallOptions(wtp.featureFlags)...) + return wtp.service.RespondDecisionTaskFailed(ctx, request, opts...) } return nil } -func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest) (response *s.RespondDecisionTaskCompletedResponse, err error) { +func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (response *s.RespondDecisionTaskCompletedResponse, err error) { if request.StickyAttributes == nil && !wtp.disableStickyExecution { request.StickyAttributes = &s.StickyExecutionAttributes{ WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))}, @@ -538,7 +541,7 @@ func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Co }() } - return wtp.service.RespondDecisionTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...) + return wtp.service.RespondDecisionTaskCompleted(ctx, request, opts...) } func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller { diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index 97263b094..c81fa7db2 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -82,7 +82,7 @@ func TestRespondTaskCompleted_failed(t *testing.T) { BinaryChecksum: common.StringPtr(getBinaryChecksum()), }, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - res, err := poller.RespondTaskCompleted(nil, assert.AnError, &s.PollForDecisionTaskResponse{ + res, err := poller.RespondTaskCompletedWithMetrics(nil, assert.AnError, &s.PollForDecisionTaskResponse{ TaskToken: testTaskToken, Attempt: common.Int64Ptr(0), }, time.Now()) @@ -111,7 +111,7 @@ func TestRespondTaskCompleted_failed(t *testing.T) { t.Run("fail skips sending for not the first attempt", func(t *testing.T) { poller, _, _, _ := buildWorkflowTaskPoller(t) - res, err := poller.RespondTaskCompleted(nil, assert.AnError, &s.PollForDecisionTaskResponse{ + res, err := poller.RespondTaskCompletedWithMetrics(nil, assert.AnError, &s.PollForDecisionTaskResponse{ Attempt: common.Int64Ptr(1), }, time.Now()) assert.NoError(t, err) @@ -122,8 +122,8 @@ func TestRespondTaskCompleted_failed(t *testing.T) { func TestRespondTaskCompleted_Unsupported(t *testing.T) { poller, _, _, _ := buildWorkflowTaskPoller(t) - assert.Panics(t, func() { - _, _ = poller.RespondTaskCompleted(assert.AnError, nil, &s.PollForDecisionTaskResponse{}, time.Now()) + assert.PanicsWithValue(t, "unknown request type from ProcessWorkflowTask()", func() { + _, _ = poller.RespondTaskCompletedWithMetrics(assert.AnError, nil, &s.PollForDecisionTaskResponse{}, time.Now()) }) } @@ -139,7 +139,7 @@ func TestProcessTask_failures(t *testing.T) { }) t.Run("unsupported task type", func(t *testing.T) { poller, _, _, _ := buildWorkflowTaskPoller(t) - assert.Panics(t, func() { + assert.PanicsWithValue(t, "unknown task type.", func() { _ = poller.ProcessTask(10) }) }) From a01a6251189555fa8d6fe55fff6fdfbaae735be3 Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Fri, 1 Nov 2024 07:59:58 +0100 Subject: [PATCH 10/11] fmt --- internal/internal_task_pollers.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index d075b360f..3b8a2e6c0 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -27,10 +27,11 @@ import ( "context" "errors" "fmt" - "go.uber.org/yarpc" "sync" "time" + "go.uber.org/yarpc" + "go.uber.org/cadence/internal/common/debug" "github.com/opentracing/opentracing-go" From d36df234647689bcb9e475fa0231370437ef3ed3 Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Fri, 1 Nov 2024 08:56:46 +0100 Subject: [PATCH 11/11] revert to previous behaviour --- Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 61d69c264..689d7e29f 100644 --- a/Makefile +++ b/Makefile @@ -217,8 +217,9 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc # mockery is quite noisy so it's worth being kinda precise with the files. # this needs to be both the files defining the generate command, AND the files that define the interfaces. -$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery +$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go internal/internal_public.go internal/internal_task_pollers.go $(BIN)/mockery $Q $(BIN_PATH) go generate ./... + $Q touch $@ # ==================================== # other intermediates