Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,22 @@ func PartitionDownscaleFactor(qps float64) Tag {
return newFloat64Tag("partition-downscale-factor", qps)
}

func MatchingTaskID(id int64) Tag {
return newInt64("matching-task-id", id)
}

func MatchingTaskScheduleID(id int64) Tag {
return newInt64("matching-task-schedule-id", id)
}

func DecisionTaskState(state int32) Tag {
return newInt32("decision-task-state", state)
}

func ActivityTaskState(state int32) Tag {
return newInt32("activity-task-state", state)
}

func Namespace(name string) Tag {
return newStringTag("namespace", name)
}
27 changes: 23 additions & 4 deletions service/matching/tasklist/task_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ func (tc *taskCompleterImpl) isTaskStarted(task *InternalTask, workflowExecution
// taskType can only be Activity or Decision, leaving the default case for future proofing
switch tc.taskListID.taskType {
case persistence.TaskListTypeActivity:
return isActivityTaskStarted(task, workflowExecutionResponse), nil
return tc.isActivityTaskStarted(task, workflowExecutionResponse), nil
case persistence.TaskListTypeDecision:
return isDecisionTaskStarted(task, workflowExecutionResponse), nil
return tc.isDecisionTaskStarted(task, workflowExecutionResponse), nil
default:
return false, errTaskTypeNotSupported
}
}

func isDecisionTaskStarted(task *InternalTask, workflowExecutionResponse *types.DescribeWorkflowExecutionResponse) bool {
func (tc *taskCompleterImpl) isDecisionTaskStarted(task *InternalTask, workflowExecutionResponse *types.DescribeWorkflowExecutionResponse) bool {
// if there is no pending decision task, that means that this task has been already started
if workflowExecutionResponse.PendingDecision == nil {
return true
Expand All @@ -189,16 +189,35 @@ func isDecisionTaskStarted(task *InternalTask, workflowExecutionResponse *types.
return true
}

tc.logger.Debug("Decision task not started.",
tag.WorkflowID(task.Event.WorkflowID),
tag.WorkflowRunID(task.Event.RunID),
tag.MatchingTaskID(task.Event.TaskID),
tag.MatchingTaskScheduleID(task.Event.ScheduleID),
tag.WorkflowScheduleID(workflowExecutionResponse.PendingDecision.ScheduleID),
tag.DecisionTaskState(int32(*workflowExecutionResponse.PendingDecision.State)),
)

return false
}

func isActivityTaskStarted(task *InternalTask, workflowExecutionResponse *types.DescribeWorkflowExecutionResponse) bool {
func (tc *taskCompleterImpl) isActivityTaskStarted(task *InternalTask, workflowExecutionResponse *types.DescribeWorkflowExecutionResponse) bool {
// if the scheduleID is different from all pending tasks' scheduleID or the pending activity has PendingActivityStateStarted, then the activity task with the task's scheduleID has already been started
for _, pendingActivity := range workflowExecutionResponse.PendingActivities {
if task.Event.ScheduleID == pendingActivity.ScheduleID {
if *pendingActivity.State == types.PendingActivityStateStarted {
return true
}

tc.logger.Debug("Activity task not started.",
tag.WorkflowID(task.Event.WorkflowID),
tag.WorkflowRunID(task.Event.RunID),
tag.MatchingTaskID(task.Event.TaskID),
tag.MatchingTaskScheduleID(task.Event.ScheduleID),
tag.WorkflowScheduleID(pendingActivity.ScheduleID),
tag.ActivityTaskState(int32(*pendingActivity.State)),
)

return false
}
}
Expand Down
1 change: 1 addition & 0 deletions service/matching/tasklist/task_completer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func TestCompleteTaskIfStarted(t *testing.T) {
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{},
PendingDecision: &types.PendingDecisionInfo{
ScheduleID: 2,
State: types.PendingDecisionStateScheduled.Ptr(),
},
}
mockHistoryService.EXPECT().DescribeWorkflowExecution(ctx, req).Return(resp, nil).Times(retryPolicyMaxAttempts + 1)
Expand Down
Loading