Skip to content

Commit 6427c91

Browse files
committed
[internal] Improve code coverage of internal_task_pollers.go
1 parent bcd0462 commit 6427c91

File tree

7 files changed

+410
-100
lines changed

7 files changed

+410
-100
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,6 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc
219219
# this needs to be both the files defining the generate command, AND the files that define the interfaces.
220220
$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery
221221
$Q $(BIN_PATH) go generate ./...
222-
$Q touch $@
223222

224223
# ====================================
225224
# other intermediates

codecov.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ codecov:
3131
ignore:
3232
- "**/*_generated.go"
3333
- "**/*_mock.go"
34+
- "**/mock_*.go"
3435
- "**/testdata/**"
3536
- "**/*_test.go"
3637
- "**/*_testsuite.go"

internal/internal_public.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
s "go.uber.org/cadence/.gen/go/shared"
3434
)
3535

36+
//go:generate mockery --srcpkg . --name WorkflowTaskHandler --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE
37+
3638
type (
3739
decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error)
3840

@@ -71,7 +73,7 @@ type (
7173

7274
// WorkflowTaskHandler represents decision task handlers.
7375
WorkflowTaskHandler interface {
74-
// Processes the workflow task
76+
// ProcessWorkflowTask processes the workflow task
7577
// The response could be:
7678
// - RespondDecisionTaskCompletedRequest
7779
// - RespondDecisionTaskFailedRequest
@@ -84,7 +86,7 @@ type (
8486

8587
// ActivityTaskHandler represents activity task handlers.
8688
ActivityTaskHandler interface {
87-
// Executes the activity task
89+
// Execute executes the activity task
8890
// The response is one of the types:
8991
// - RespondActivityTaskCompletedRequest
9092
// - RespondActivityTaskFailedRequest

internal/internal_task_pollers.go

Lines changed: 111 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ import (
4545
"go.uber.org/cadence/internal/common/serializer"
4646
)
4747

48+
//go:generate mockery --srcpkg . --name LocalDispatcher --output . --outpkg internal --inpackage --with-expecter --case snake --boilerplate-file ../LICENSE
49+
4850
const (
4951
pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta
5052

@@ -76,7 +78,7 @@ type (
7678
identity string
7779
service workflowserviceclient.Interface
7880
taskHandler WorkflowTaskHandler
79-
ldaTunnel *locallyDispatchedActivityTunnel
81+
ldaTunnel LocalDispatcher
8082
metricsScope *metrics.TaggedScope
8183
logger *zap.Logger
8284

@@ -159,6 +161,11 @@ type (
159161
stopCh <-chan struct{}
160162
metricsScope *metrics.TaggedScope
161163
}
164+
165+
// LocalDispatcher is an interface to dispatch locally dispatched activities.
166+
LocalDispatcher interface {
167+
SendTask(task *locallyDispatchedActivityTask) bool
168+
}
162169
)
163170

164171
func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel {
@@ -214,7 +221,7 @@ func (ldat *locallyDispatchedActivityTunnel) getTask() *locallyDispatchedActivit
214221
}
215222
}
216223

217-
func (ldat *locallyDispatchedActivityTunnel) sendTask(task *locallyDispatchedActivityTask) bool {
224+
func (ldat *locallyDispatchedActivityTunnel) SendTask(task *locallyDispatchedActivityTask) bool {
218225
select {
219226
case ldat.taskCh <- task:
220227
return true
@@ -349,7 +356,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
349356
func(response interface{}, startTime time.Time) (*workflowTask, error) {
350357
wtp.logger.Debug("Force RespondDecisionTaskCompleted.", zap.Int64("TaskStartedEventID", task.task.GetStartedEventId()))
351358
wtp.metricsScope.Counter(metrics.DecisionTaskForceCompleted).Inc(1)
352-
heartbeatResponse, err := wtp.RespondTaskCompletedWithMetrics(response, nil, task.task, startTime)
359+
heartbeatResponse, err := wtp.RespondTaskCompleted(response, nil, task.task, startTime)
353360
if err != nil {
354361
return nil, err
355362
}
@@ -365,10 +372,10 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
365372
if completedRequest == nil && err == nil {
366373
return nil
367374
}
368-
if _, ok := err.(decisionHeartbeatError); ok {
375+
if errors.As(err, new(*decisionHeartbeatError)) {
369376
return err
370377
}
371-
response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime)
378+
response, err = wtp.RespondTaskCompleted(completedRequest, err, task.task, startTime)
372379
if err != nil {
373380
return err
374381
}
@@ -397,8 +404,7 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa
397404
return nil
398405
}
399406

400-
func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) {
401-
407+
func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) {
402408
metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName())
403409
if taskErr != nil {
404410
metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1)
@@ -416,7 +422,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
416422
metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(startTime))
417423

418424
responseStartTime := time.Now()
419-
if response, err = wtp.RespondTaskCompleted(completedRequest, task); err != nil {
425+
if response, err = wtp.respondTaskCompleted(completedRequest, task); err != nil {
420426
metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1)
421427
return
422428
}
@@ -425,103 +431,114 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
425431
return
426432
}
427433

428-
func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
434+
func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
429435
ctx := context.Background()
430436
// Respond task completion.
431437
err = backoff.Retry(ctx,
432438
func() error {
433-
tchCtx, cancel, opt := newChannelContext(ctx, wtp.featureFlags)
434-
defer cancel()
435-
var err1 error
436-
switch request := completedRequest.(type) {
437-
case *s.RespondDecisionTaskFailedRequest:
438-
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
439-
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
440-
if task.Attempt != nil && task.GetAttempt() == 0 {
441-
err1 = wtp.service.RespondDecisionTaskFailed(tchCtx, request, opt...)
442-
if err1 != nil {
443-
traceLog(func() {
444-
wtp.logger.Debug("RespondDecisionTaskFailed failed.", zap.Error(err1))
445-
})
446-
}
439+
response, err = wtp.respondTaskCompletedAttempt(completedRequest, task)
440+
return err
441+
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
442+
443+
return response, err
444+
}
445+
446+
func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) {
447+
ctx, cancel, _ := newChannelContext(context.Background(), wtp.featureFlags)
448+
defer cancel()
449+
var (
450+
err error
451+
response *s.RespondDecisionTaskCompletedResponse
452+
operation string
453+
)
454+
switch request := completedRequest.(type) {
455+
case *s.RespondDecisionTaskFailedRequest:
456+
err = wtp.handleDecisionFailedRequest(ctx, task, request)
457+
operation = "RespondDecisionTaskFailed"
458+
case *s.RespondDecisionTaskCompletedRequest:
459+
response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request)
460+
operation = "RespondDecisionTaskCompleted"
461+
case *s.RespondQueryTaskCompletedRequest:
462+
err = wtp.service.RespondQueryTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
463+
operation = "RespondQueryTaskCompleted"
464+
default:
465+
// should not happen
466+
panic("unknown request type from ProcessWorkflowTask()")
467+
}
468+
469+
traceLog(func() {
470+
wtp.logger.Debug("Call failed.", zap.Error(err), zap.String("Operation", operation))
471+
})
472+
473+
return response, err
474+
}
475+
476+
func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest) error {
477+
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
478+
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
479+
if task.Attempt != nil && task.GetAttempt() == 0 {
480+
return wtp.service.RespondDecisionTaskFailed(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
481+
}
482+
return nil
483+
}
484+
485+
func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest) (response *s.RespondDecisionTaskCompletedResponse, err error) {
486+
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
487+
request.StickyAttributes = &s.StickyExecutionAttributes{
488+
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
489+
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
490+
}
491+
} else {
492+
request.ReturnNewDecisionTask = common.BoolPtr(false)
493+
}
494+
495+
if wtp.ldaTunnel != nil {
496+
var activityTasks []*locallyDispatchedActivityTask
497+
for _, decision := range request.Decisions {
498+
attr := decision.ScheduleActivityTaskDecisionAttributes
499+
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
500+
// assume the activity type is in registry otherwise the activity would be failed and retried from server
501+
activityTask := &locallyDispatchedActivityTask{
502+
readyCh: make(chan bool, 1),
503+
ActivityId: attr.ActivityId,
504+
ActivityType: attr.ActivityType,
505+
Input: attr.Input,
506+
Header: attr.Header,
507+
WorkflowDomain: common.StringPtr(wtp.domain),
508+
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
509+
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
510+
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
511+
WorkflowExecution: task.WorkflowExecution,
512+
WorkflowType: task.WorkflowType,
447513
}
448-
case *s.RespondDecisionTaskCompletedRequest:
449-
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
450-
request.StickyAttributes = &s.StickyExecutionAttributes{
451-
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
452-
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
453-
}
514+
if wtp.ldaTunnel.SendTask(activityTask) {
515+
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
516+
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
517+
activityTasks = append(activityTasks, activityTask)
454518
} else {
455-
request.ReturnNewDecisionTask = common.BoolPtr(false)
519+
// all pollers are busy - no room to optimize
520+
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
456521
}
457-
var activityTasks []*locallyDispatchedActivityTask
458-
if wtp.ldaTunnel != nil {
459-
for _, decision := range request.Decisions {
460-
attr := decision.ScheduleActivityTaskDecisionAttributes
461-
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
462-
// assume the activity type is in registry otherwise the activity would be failed and retried from server
463-
activityTask := &locallyDispatchedActivityTask{
464-
readyCh: make(chan bool, 1),
465-
ActivityId: attr.ActivityId,
466-
ActivityType: attr.ActivityType,
467-
Input: attr.Input,
468-
Header: attr.Header,
469-
WorkflowDomain: common.StringPtr(wtp.domain),
470-
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
471-
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
472-
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
473-
WorkflowExecution: task.WorkflowExecution,
474-
WorkflowType: task.WorkflowType,
475-
}
476-
if wtp.ldaTunnel.sendTask(activityTask) {
477-
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
478-
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
479-
activityTasks = append(activityTasks, activityTask)
480-
} else {
481-
// all pollers are busy - no room to optimize
482-
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
483-
}
484-
}
485-
}
486-
}
487-
defer func() {
488-
for _, at := range activityTasks {
489-
started := false
490-
if response != nil && err1 == nil {
491-
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
492-
at.ScheduledTimestamp = adl.ScheduledTimestamp
493-
at.StartedTimestamp = adl.StartedTimestamp
494-
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
495-
at.TaskToken = adl.TaskToken
496-
started = true
497-
}
498-
}
499-
at.readyCh <- started
522+
}
523+
}
524+
defer func() {
525+
for _, at := range activityTasks {
526+
started := false
527+
if response != nil && err == nil {
528+
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
529+
at.ScheduledTimestamp = adl.ScheduledTimestamp
530+
at.StartedTimestamp = adl.StartedTimestamp
531+
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
532+
at.TaskToken = adl.TaskToken
533+
started = true
500534
}
501-
}()
502-
response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...)
503-
if err1 != nil {
504-
traceLog(func() {
505-
wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1))
506-
})
507535
}
508-
509-
case *s.RespondQueryTaskCompletedRequest:
510-
err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request, opt...)
511-
if err1 != nil {
512-
traceLog(func() {
513-
wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1))
514-
})
515-
}
516-
default:
517-
// should not happen
518-
panic("unknown request type from ProcessWorkflowTask()")
536+
at.readyCh <- started
519537
}
538+
}()
539+
}
520540

521-
return err1
522-
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
523-
524-
return
541+
return wtp.service.RespondDecisionTaskCompleted(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
525542
}
526543

527544
func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller {

0 commit comments

Comments
 (0)