Skip to content
This repository was archived by the owner on Feb 28, 2025. It is now read-only.

Commit f6349be

Browse files
3vilhamstermrombout
authored andcommitted
[internal] Improve code coverage of internal_task_pollers.go (cadence-workflow#1373)
* [internal] Improve code coverage of internal_task_pollers.go
1 parent b5c9e99 commit f6349be

File tree

6 files changed

+450
-94
lines changed

6 files changed

+450
-94
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc
217217

218218
# mockery is quite noisy so it's worth being kinda precise with the files.
219219
# this needs to be both the files defining the generate command, AND the files that define the interfaces.
220-
$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery
220+
$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go internal/internal_public.go internal/internal_task_pollers.go $(BIN)/mockery
221221
$Q $(BIN_PATH) go generate ./...
222222
$Q touch $@
223223

internal/internal_public.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
s "go.uber.org/cadence/.gen/go/shared"
3434
)
3535

36+
//go:generate mockery --name WorkflowTaskHandler --inpackage --with-expecter --case snake --filename internal_workflow_task_handler_mock.go --boilerplate-file ../LICENSE
37+
3638
type (
3739
decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error)
3840

@@ -71,7 +73,7 @@ type (
7173

7274
// WorkflowTaskHandler represents decision task handlers.
7375
WorkflowTaskHandler interface {
74-
// Processes the workflow task
76+
// ProcessWorkflowTask processes the workflow task
7577
// The response could be:
7678
// - RespondDecisionTaskCompletedRequest
7779
// - RespondDecisionTaskFailedRequest
@@ -84,7 +86,7 @@ type (
8486

8587
// ActivityTaskHandler represents activity task handlers.
8688
ActivityTaskHandler interface {
87-
// Executes the activity task
89+
// Execute executes the activity task
8890
// The response is one of the types:
8991
// - RespondActivityTaskCompletedRequest
9092
// - RespondActivityTaskFailedRequest

internal/internal_task_pollers.go

Lines changed: 112 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
"sync"
3131
"time"
3232

33+
"go.uber.org/yarpc"
34+
3335
"go.uber.org/cadence/internal/common/debug"
3436

3537
"github.com/opentracing/opentracing-go"
@@ -45,6 +47,8 @@ import (
4547
"go.uber.org/cadence/internal/common/serializer"
4648
)
4749

50+
//go:generate mockery --name localDispatcher --inpackage --with-expecter --case snake --filename local_dispatcher_mock.go --boilerplate-file ../LICENSE
51+
4852
const (
4953
pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta
5054

@@ -76,7 +80,7 @@ type (
7680
identity string
7781
service workflowserviceclient.Interface
7882
taskHandler WorkflowTaskHandler
79-
ldaTunnel *locallyDispatchedActivityTunnel
83+
ldaTunnel localDispatcher
8084
metricsScope *metrics.TaggedScope
8185
logger *zap.Logger
8286

@@ -159,6 +163,11 @@ type (
159163
stopCh <-chan struct{}
160164
metricsScope *metrics.TaggedScope
161165
}
166+
167+
// LocalDispatcher is an interface to dispatch locally dispatched activities.
168+
localDispatcher interface {
169+
SendTask(task *locallyDispatchedActivityTask) bool
170+
}
162171
)
163172

164173
func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel {
@@ -214,7 +223,7 @@ func (ldat *locallyDispatchedActivityTunnel) getTask() *locallyDispatchedActivit
214223
}
215224
}
216225

217-
func (ldat *locallyDispatchedActivityTunnel) sendTask(task *locallyDispatchedActivityTask) bool {
226+
func (ldat *locallyDispatchedActivityTunnel) SendTask(task *locallyDispatchedActivityTask) bool {
218227
select {
219228
case ldat.taskCh <- task:
220229
return true
@@ -365,7 +374,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
365374
if completedRequest == nil && err == nil {
366375
return nil
367376
}
368-
if _, ok := err.(decisionHeartbeatError); ok {
377+
if errors.As(err, new(*decisionHeartbeatError)) {
369378
return err
370379
}
371380
response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime)
@@ -398,7 +407,6 @@ func (wtp *workflowTaskPoller) processResetStickinessTask(rst *resetStickinessTa
398407
}
399408

400409
func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) {
401-
402410
metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName())
403411
if taskErr != nil {
404412
metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1)
@@ -416,7 +424,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
416424
metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(startTime))
417425

418426
responseStartTime := time.Now()
419-
if response, err = wtp.RespondTaskCompleted(completedRequest, task); err != nil {
427+
if response, err = wtp.respondTaskCompleted(completedRequest, task); err != nil {
420428
metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1)
421429
return
422430
}
@@ -425,103 +433,116 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest
425433
return
426434
}
427435

428-
func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
436+
func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
429437
ctx := context.Background()
430438
// Respond task completion.
431439
err = backoff.Retry(ctx,
432440
func() error {
433-
tchCtx, cancel, opt := newChannelContext(ctx, wtp.featureFlags)
434-
defer cancel()
435-
var err1 error
436-
switch request := completedRequest.(type) {
437-
case *s.RespondDecisionTaskFailedRequest:
438-
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
439-
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
440-
if task.Attempt != nil && task.GetAttempt() == 0 {
441-
err1 = wtp.service.RespondDecisionTaskFailed(tchCtx, request, opt...)
442-
if err1 != nil {
443-
traceLog(func() {
444-
wtp.logger.Debug("RespondDecisionTaskFailed failed.", zap.Error(err1))
445-
})
446-
}
441+
response, err = wtp.respondTaskCompletedAttempt(completedRequest, task)
442+
return err
443+
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
444+
445+
return response, err
446+
}
447+
448+
func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) {
449+
ctx, cancel, opts := newChannelContext(context.Background(), wtp.featureFlags)
450+
defer cancel()
451+
var (
452+
err error
453+
response *s.RespondDecisionTaskCompletedResponse
454+
operation string
455+
)
456+
switch request := completedRequest.(type) {
457+
case *s.RespondDecisionTaskFailedRequest:
458+
err = wtp.handleDecisionFailedRequest(ctx, task, request, opts...)
459+
operation = "RespondDecisionTaskFailed"
460+
case *s.RespondDecisionTaskCompletedRequest:
461+
response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request, opts...)
462+
operation = "RespondDecisionTaskCompleted"
463+
case *s.RespondQueryTaskCompletedRequest:
464+
err = wtp.service.RespondQueryTaskCompleted(ctx, request, opts...)
465+
operation = "RespondQueryTaskCompleted"
466+
default:
467+
// should not happen
468+
panic("unknown request type from ProcessWorkflowTask()")
469+
}
470+
471+
traceLog(func() {
472+
if err != nil {
473+
wtp.logger.Debug(fmt.Sprintf("%s failed.", operation), zap.Error(err))
474+
}
475+
})
476+
477+
return response, err
478+
}
479+
480+
func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest, opts ...yarpc.CallOption) error {
481+
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
482+
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
483+
if task.Attempt != nil && task.GetAttempt() == 0 {
484+
return wtp.service.RespondDecisionTaskFailed(ctx, request, opts...)
485+
}
486+
return nil
487+
}
488+
489+
func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (response *s.RespondDecisionTaskCompletedResponse, err error) {
490+
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
491+
request.StickyAttributes = &s.StickyExecutionAttributes{
492+
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
493+
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
494+
}
495+
} else {
496+
request.ReturnNewDecisionTask = common.BoolPtr(false)
497+
}
498+
499+
if wtp.ldaTunnel != nil {
500+
var activityTasks []*locallyDispatchedActivityTask
501+
for _, decision := range request.Decisions {
502+
attr := decision.ScheduleActivityTaskDecisionAttributes
503+
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
504+
// assume the activity type is in registry otherwise the activity would be failed and retried from server
505+
activityTask := &locallyDispatchedActivityTask{
506+
readyCh: make(chan bool, 1),
507+
ActivityId: attr.ActivityId,
508+
ActivityType: attr.ActivityType,
509+
Input: attr.Input,
510+
Header: attr.Header,
511+
WorkflowDomain: common.StringPtr(wtp.domain),
512+
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
513+
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
514+
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
515+
WorkflowExecution: task.WorkflowExecution,
516+
WorkflowType: task.WorkflowType,
447517
}
448-
case *s.RespondDecisionTaskCompletedRequest:
449-
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
450-
request.StickyAttributes = &s.StickyExecutionAttributes{
451-
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
452-
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
453-
}
518+
if wtp.ldaTunnel.SendTask(activityTask) {
519+
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
520+
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
521+
activityTasks = append(activityTasks, activityTask)
454522
} else {
455-
request.ReturnNewDecisionTask = common.BoolPtr(false)
456-
}
457-
var activityTasks []*locallyDispatchedActivityTask
458-
if wtp.ldaTunnel != nil {
459-
for _, decision := range request.Decisions {
460-
attr := decision.ScheduleActivityTaskDecisionAttributes
461-
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
462-
// assume the activity type is in registry otherwise the activity would be failed and retried from server
463-
activityTask := &locallyDispatchedActivityTask{
464-
readyCh: make(chan bool, 1),
465-
ActivityId: attr.ActivityId,
466-
ActivityType: attr.ActivityType,
467-
Input: attr.Input,
468-
Header: attr.Header,
469-
WorkflowDomain: common.StringPtr(wtp.domain),
470-
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
471-
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
472-
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
473-
WorkflowExecution: task.WorkflowExecution,
474-
WorkflowType: task.WorkflowType,
475-
}
476-
if wtp.ldaTunnel.sendTask(activityTask) {
477-
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
478-
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
479-
activityTasks = append(activityTasks, activityTask)
480-
} else {
481-
// all pollers are busy - no room to optimize
482-
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
483-
}
484-
}
485-
}
523+
// all pollers are busy - no room to optimize
524+
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
486525
}
487-
defer func() {
488-
for _, at := range activityTasks {
489-
started := false
490-
if response != nil && err1 == nil {
491-
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
492-
at.ScheduledTimestamp = adl.ScheduledTimestamp
493-
at.StartedTimestamp = adl.StartedTimestamp
494-
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
495-
at.TaskToken = adl.TaskToken
496-
started = true
497-
}
498-
}
499-
at.readyCh <- started
526+
}
527+
}
528+
defer func() {
529+
for _, at := range activityTasks {
530+
started := false
531+
if response != nil && err == nil {
532+
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
533+
at.ScheduledTimestamp = adl.ScheduledTimestamp
534+
at.StartedTimestamp = adl.StartedTimestamp
535+
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
536+
at.TaskToken = adl.TaskToken
537+
started = true
500538
}
501-
}()
502-
response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...)
503-
if err1 != nil {
504-
traceLog(func() {
505-
wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1))
506-
})
507-
}
508-
509-
case *s.RespondQueryTaskCompletedRequest:
510-
err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request, opt...)
511-
if err1 != nil {
512-
traceLog(func() {
513-
wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1))
514-
})
515539
}
516-
default:
517-
// should not happen
518-
panic("unknown request type from ProcessWorkflowTask()")
540+
at.readyCh <- started
519541
}
542+
}()
543+
}
520544

521-
return err1
522-
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
523-
524-
return
545+
return wtp.service.RespondDecisionTaskCompleted(ctx, request, opts...)
525546
}
526547

527548
func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller {

0 commit comments

Comments
 (0)