Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ $(THRIFT_GEN): $(THRIFT_FILES) $(BIN)/thriftrw $(BIN)/thriftrw-plugin-yarpc

# mockery is quite noisy so it's worth being kinda precise with the files.
# this needs to be both the files defining the generate command, AND the files that define the interfaces.
$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go $(BIN)/mockery
$(BUILD)/generate: client/client.go encoded/encoded.go internal/internal_workflow_client.go internal/internal_public.go internal/internal_task_pollers.go $(BIN)/mockery
$Q $(BIN_PATH) go generate ./...
$Q touch $@

Expand Down
6 changes: 4 additions & 2 deletions internal/internal_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
s "go.uber.org/cadence/.gen/go/shared"
)

//go:generate mockery --name WorkflowTaskHandler --inpackage --with-expecter --case snake --filename internal_workflow_task_handler_mock.go --boilerplate-file ../LICENSE

type (
decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error)

Expand Down Expand Up @@ -71,7 +73,7 @@ type (

// WorkflowTaskHandler represents decision task handlers.
WorkflowTaskHandler interface {
// Processes the workflow task
// ProcessWorkflowTask processes the workflow task
// The response could be:
// - RespondDecisionTaskCompletedRequest
// - RespondDecisionTaskFailedRequest
Expand All @@ -84,7 +86,7 @@ type (

// ActivityTaskHandler represents activity task handlers.
ActivityTaskHandler interface {
// Executes the activity task
// Execute executes the activity task
// The response is one of the types:
// - RespondActivityTaskCompletedRequest
// - RespondActivityTaskFailedRequest
Expand Down
203 changes: 112 additions & 91 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
"sync"
"time"

"go.uber.org/yarpc"

"go.uber.org/cadence/internal/common/debug"

"github.com/opentracing/opentracing-go"
Expand All @@ -45,6 +47,8 @@
"go.uber.org/cadence/internal/common/serializer"
)

//go:generate mockery --name localDispatcher --inpackage --with-expecter --case snake --filename local_dispatcher_mock.go --boilerplate-file ../LICENSE

const (
pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta

Expand Down Expand Up @@ -76,7 +80,7 @@
identity string
service workflowserviceclient.Interface
taskHandler WorkflowTaskHandler
ldaTunnel *locallyDispatchedActivityTunnel
ldaTunnel localDispatcher
metricsScope *metrics.TaggedScope
logger *zap.Logger

Expand Down Expand Up @@ -159,6 +163,11 @@
stopCh <-chan struct{}
metricsScope *metrics.TaggedScope
}

// LocalDispatcher is an interface to dispatch locally dispatched activities.
localDispatcher interface {
SendTask(task *locallyDispatchedActivityTask) bool
}
)

func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel {
Expand Down Expand Up @@ -214,7 +223,7 @@
}
}

func (ldat *locallyDispatchedActivityTunnel) sendTask(task *locallyDispatchedActivityTask) bool {
func (ldat *locallyDispatchedActivityTunnel) SendTask(task *locallyDispatchedActivityTask) bool {
select {
case ldat.taskCh <- task:
return true
Expand Down Expand Up @@ -365,7 +374,7 @@
if completedRequest == nil && err == nil {
return nil
}
if _, ok := err.(decisionHeartbeatError); ok {
if errors.As(err, new(*decisionHeartbeatError)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, looks good - it's only returned from one place right now, and no wrapping, so this won't change behavior 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not working before at all.
The error was returned as &decicisionHeartbeatError so type assertion err.(decisionHeartbeatError) was always false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good point. and good fix 👍

return err
}
response, err = wtp.RespondTaskCompletedWithMetrics(completedRequest, err, task.task, startTime)
Expand Down Expand Up @@ -398,7 +407,6 @@
}

func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(completedRequest interface{}, taskErr error, task *s.PollForDecisionTaskResponse, startTime time.Time) (response *s.RespondDecisionTaskCompletedResponse, err error) {

metricsScope := wtp.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName())
if taskErr != nil {
metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1)
Expand All @@ -416,7 +424,7 @@
metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(startTime))

responseStartTime := time.Now()
if response, err = wtp.RespondTaskCompleted(completedRequest, task); err != nil {
if response, err = wtp.respondTaskCompleted(completedRequest, task); err != nil {
metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1)
return
}
Expand All @@ -425,103 +433,116 @@
return
}

func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
func (wtp *workflowTaskPoller) respondTaskCompleted(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (response *s.RespondDecisionTaskCompletedResponse, err error) {
ctx := context.Background()
// Respond task completion.
err = backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx, wtp.featureFlags)
defer cancel()
var err1 error
switch request := completedRequest.(type) {
case *s.RespondDecisionTaskFailedRequest:
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
if task.Attempt != nil && task.GetAttempt() == 0 {
err1 = wtp.service.RespondDecisionTaskFailed(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondDecisionTaskFailed failed.", zap.Error(err1))
})
}
response, err = wtp.respondTaskCompletedAttempt(completedRequest, task)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)

return response, err
}

func (wtp *workflowTaskPoller) respondTaskCompletedAttempt(completedRequest interface{}, task *s.PollForDecisionTaskResponse) (*s.RespondDecisionTaskCompletedResponse, error) {
ctx, cancel, opts := newChannelContext(context.Background(), wtp.featureFlags)
defer cancel()
var (
err error
response *s.RespondDecisionTaskCompletedResponse
operation string
)
switch request := completedRequest.(type) {
case *s.RespondDecisionTaskFailedRequest:
err = wtp.handleDecisionFailedRequest(ctx, task, request, opts...)
operation = "RespondDecisionTaskFailed"
case *s.RespondDecisionTaskCompletedRequest:
response, err = wtp.handleDecisionTaskCompletedRequest(ctx, task, request, opts...)
operation = "RespondDecisionTaskCompleted"
case *s.RespondQueryTaskCompletedRequest:
err = wtp.service.RespondQueryTaskCompleted(ctx, request, opts...)
operation = "RespondQueryTaskCompleted"
default:
// should not happen
panic("unknown request type from ProcessWorkflowTask()")
}

traceLog(func() {
if err != nil {
wtp.logger.Debug(fmt.Sprintf("%s failed.", operation), zap.Error(err))
}
})

return response, err
}

func (wtp *workflowTaskPoller) handleDecisionFailedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskFailedRequest, opts ...yarpc.CallOption) error {
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
// This is to avoid spin on the failed decision task. Checking Attempt not nil for older server.
if task.Attempt != nil && task.GetAttempt() == 0 {
return wtp.service.RespondDecisionTaskFailed(ctx, request, opts...)
}
return nil
}

func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Context, task *s.PollForDecisionTaskResponse, request *s.RespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (response *s.RespondDecisionTaskCompletedResponse, err error) {
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
request.StickyAttributes = &s.StickyExecutionAttributes{
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
}
} else {
request.ReturnNewDecisionTask = common.BoolPtr(false)
}

Check warning on line 497 in internal/internal_task_pollers.go

View check run for this annotation

Codecov / codecov/patch

internal/internal_task_pollers.go#L496-L497

Added lines #L496 - L497 were not covered by tests

if wtp.ldaTunnel != nil {
var activityTasks []*locallyDispatchedActivityTask
for _, decision := range request.Decisions {
attr := decision.ScheduleActivityTaskDecisionAttributes
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
// assume the activity type is in registry otherwise the activity would be failed and retried from server
activityTask := &locallyDispatchedActivityTask{
readyCh: make(chan bool, 1),
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Input: attr.Input,
Header: attr.Header,
WorkflowDomain: common.StringPtr(wtp.domain),
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
WorkflowExecution: task.WorkflowExecution,
WorkflowType: task.WorkflowType,
}
case *s.RespondDecisionTaskCompletedRequest:
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
request.StickyAttributes = &s.StickyExecutionAttributes{
WorkerTaskList: &s.TaskList{Name: common.StringPtr(getWorkerTaskList(wtp.stickyUUID))},
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Ceil(wtp.StickyScheduleToStartTimeout.Seconds())),
}
if wtp.ldaTunnel.SendTask(activityTask) {
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
activityTasks = append(activityTasks, activityTask)
} else {
request.ReturnNewDecisionTask = common.BoolPtr(false)
}
var activityTasks []*locallyDispatchedActivityTask
if wtp.ldaTunnel != nil {
for _, decision := range request.Decisions {
attr := decision.ScheduleActivityTaskDecisionAttributes
if attr != nil && wtp.taskListName == attr.TaskList.GetName() {
// assume the activity type is in registry otherwise the activity would be failed and retried from server
activityTask := &locallyDispatchedActivityTask{
readyCh: make(chan bool, 1),
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Input: attr.Input,
Header: attr.Header,
WorkflowDomain: common.StringPtr(wtp.domain),
ScheduleToCloseTimeoutSeconds: attr.ScheduleToCloseTimeoutSeconds,
StartToCloseTimeoutSeconds: attr.StartToCloseTimeoutSeconds,
HeartbeatTimeoutSeconds: attr.HeartbeatTimeoutSeconds,
WorkflowExecution: task.WorkflowExecution,
WorkflowType: task.WorkflowType,
}
if wtp.ldaTunnel.sendTask(activityTask) {
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchSucceedCounter).Inc(1)
decision.ScheduleActivityTaskDecisionAttributes.RequestLocalDispatch = common.BoolPtr(true)
activityTasks = append(activityTasks, activityTask)
} else {
// all pollers are busy - no room to optimize
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
}
}
}
// all pollers are busy - no room to optimize
wtp.metricsScope.Counter(metrics.ActivityLocalDispatchFailedCounter).Inc(1)
}
defer func() {
for _, at := range activityTasks {
started := false
if response != nil && err1 == nil {
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
at.ScheduledTimestamp = adl.ScheduledTimestamp
at.StartedTimestamp = adl.StartedTimestamp
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
at.TaskToken = adl.TaskToken
started = true
}
}
at.readyCh <- started
}
}
defer func() {
for _, at := range activityTasks {
started := false
if response != nil && err == nil {
if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok {
at.ScheduledTimestamp = adl.ScheduledTimestamp
at.StartedTimestamp = adl.StartedTimestamp
at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt
at.TaskToken = adl.TaskToken
started = true
}
}()
response, err1 = wtp.service.RespondDecisionTaskCompleted(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondDecisionTaskCompleted failed.", zap.Error(err1))
})
}

case *s.RespondQueryTaskCompletedRequest:
err1 = wtp.service.RespondQueryTaskCompleted(tchCtx, request, opt...)
if err1 != nil {
traceLog(func() {
wtp.logger.Debug("RespondQueryTaskCompleted failed.", zap.Error(err1))
})
}
default:
// should not happen
panic("unknown request type from ProcessWorkflowTask()")
at.readyCh <- started
}
}()
}

return err1
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)

return
return wtp.service.RespondDecisionTaskCompleted(ctx, request, opts...)
}

func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localActivityTunnel) *localActivityTaskPoller {
Expand Down
Loading
Loading