Skip to content

Commit 94b0e07

Browse files
authored
Ignore workflowPanicError in query task and add warn logs (#1094)
1 parent 2b88fb1 commit 94b0e07

File tree

5 files changed

+52
-16
lines changed

5 files changed

+52
-16
lines changed

internal/internal_event_handlers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -781,8 +781,8 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
781781
topLine := fmt.Sprintf("process event for %s [panic]:", weh.workflowInfo.TaskListName)
782782
st := getStackTraceRaw(topLine, 7, 0)
783783
weh.logger.Error("ProcessEvent panic.",
784-
zap.String("PanicError", fmt.Sprintf("%v", p)),
785-
zap.String("PanicStack", st))
784+
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
785+
zap.String(tagPanicStack, st))
786786

787787
weh.Complete(nil, newWorkflowPanicError(p, st))
788788
}

internal/internal_logging_tags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,6 @@ const (
3838
tagLocalActivityType = "LocalActivityType"
3939
tagQueryType = "QueryType"
4040
tagVisibilityQuery = "VisibilityQuery"
41+
tagPanicError = "PanicError"
42+
tagPanicStack = "PanicStack"
4143
)

internal/internal_task_handlers.go

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,13 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
823823

824824
skipReplayCheck := w.skipReplayCheck()
825825
isReplayTest := task.GetPreviousStartedEventId() == replayPreviousStartedEventID
826+
if isReplayTest {
827+
w.wth.logger.Info("Processing workflow task in replay test mode",
828+
zap.String(tagWorkflowType, task.WorkflowType.GetName()),
829+
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
830+
zap.String(tagRunID, task.WorkflowExecution.GetRunId()),
831+
)
832+
}
826833
// Process events
827834
ProcessEvents:
828835
for {
@@ -917,12 +924,19 @@ ProcessEvents:
917924
}
918925
}
919926
if nonDeterministicErr == nil && w.err != nil {
920-
if isReplayTest {
921-
// NOTE: we should check the following error regardless if it's in replay test or not
922-
// but since we are not checking it previously, it may break existing customers workflow
923-
if panicErr, ok := w.err.(*workflowPanicError); ok && panicErr.value != nil {
924-
if _, isStateMachinePanic := panicErr.value.(stateMachineIllegalStatePanic); isStateMachinePanic {
927+
if panicErr, ok := w.err.(*workflowPanicError); ok && panicErr.value != nil {
928+
if _, isStateMachinePanic := panicErr.value.(stateMachineIllegalStatePanic); isStateMachinePanic {
929+
if isReplayTest {
930+
// NOTE: we should do this regardless if it's in replay test or not
931+
// but since previously we checked the wrong error type, it may break existing customers workflow
925932
nonDeterministicErr = panicErr
933+
} else {
934+
w.wth.logger.Warn("Ignored workflow panic error",
935+
zap.String(tagWorkflowType, task.WorkflowType.GetName()),
936+
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
937+
zap.String(tagRunID, task.WorkflowExecution.GetRunId()),
938+
zap.Error(nonDeterministicErr),
939+
)
926940
}
927941
}
928942
}
@@ -1455,12 +1469,32 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
14551469
// for query task
14561470
if task.Query != nil {
14571471
queryCompletedRequest := &s.RespondQueryTaskCompletedRequest{TaskToken: task.TaskToken}
1458-
if panicErr, ok := workflowContext.err.(*workflowPanicError); ok {
1472+
if panicErr, ok := workflowContext.err.(*PanicError); ok {
1473+
// NOTE: this code path should never be executed, we should check for workflowPanicError instead of PanicError
1474+
wth.logger.Warn("Encountered PanicError in workflow query task",
1475+
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
1476+
zap.String(tagRunID, task.WorkflowExecution.GetRunId()),
1477+
zap.String(tagPanicError, panicErr.Error()),
1478+
zap.String(tagPanicStack, panicErr.StackTrace()),
1479+
)
1480+
14591481
queryCompletedRequest.CompletedType = common.QueryTaskCompletedTypePtr(s.QueryTaskCompletedTypeFailed)
14601482
queryCompletedRequest.ErrorMessage = common.StringPtr("Workflow panic: " + panicErr.Error())
14611483
return queryCompletedRequest
14621484
}
14631485

1486+
if workflowPanicErr, ok := workflowContext.err.(*workflowPanicError); ok {
1487+
// NOTE: in this case we should return complete query task with CompletedTypeFailed
1488+
// but we didn't check for the right error type before, this may break existing customer
1489+
wth.logger.Warn("Ignored workflow panic error for query, query result may be partial",
1490+
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
1491+
zap.String(tagRunID, task.WorkflowExecution.GetRunId()),
1492+
zap.String(tagPanicError, workflowPanicErr.Error()),
1493+
zap.String(tagPanicStack, workflowPanicErr.StackTrace()),
1494+
zap.Int64("PreviousStartedEventID", task.GetPreviousStartedEventId()),
1495+
)
1496+
}
1497+
14641498
result, err := eventHandler.ProcessQuery(task.Query.GetQueryType(), task.Query.QueryArgs)
14651499
if err != nil {
14661500
queryCompletedRequest.CompletedType = common.QueryTaskCompletedTypePtr(s.QueryTaskCompletedTypeFailed)
@@ -1481,8 +1515,8 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
14811515
wth.logger.Error("Workflow panic.",
14821516
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
14831517
zap.String(tagRunID, task.WorkflowExecution.GetRunId()),
1484-
zap.String("PanicError", panicErr.Error()),
1485-
zap.String("PanicStack", panicErr.StackTrace()))
1518+
zap.String(tagPanicError, panicErr.Error()),
1519+
zap.String(tagPanicStack, panicErr.StackTrace()))
14861520
return errorToFailDecisionTask(task.TaskToken, panicErr, wth.identity)
14871521
}
14881522

@@ -1840,8 +1874,8 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
18401874
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
18411875
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
18421876
zap.String(tagActivityType, activityType),
1843-
zap.String("PanicError", fmt.Sprintf("%v", p)),
1844-
zap.String("PanicStack", st))
1877+
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
1878+
zap.String(tagPanicStack, st))
18451879
metricsScope.Counter(metrics.ActivityTaskPanicCounter).Inc(1)
18461880
panicErr := newPanicError(p, st)
18471881
result, err = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr, ath.dataConverter), nil

internal/internal_task_pollers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -603,8 +603,8 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
603603
zap.String(tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID),
604604
zap.String(tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID),
605605
zap.String(tagActivityType, activityType),
606-
zap.String("PanicError", fmt.Sprintf("%v", p)),
607-
zap.String("PanicStack", st))
606+
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
607+
zap.String(tagPanicStack, st))
608608
metricsScope.Counter(metrics.LocalActivityPanicCounter).Inc(1)
609609
panicErr := newPanicError(p, st)
610610
result = &localActivityResult{

internal/internal_worker_base.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,8 @@ func (bw *baseWorker) processTask(task interface{}) {
313313
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
314314
st := getStackTraceRaw(topLine, 7, 0)
315315
bw.logger.Error("Unhandled panic.",
316-
zap.String("PanicError", fmt.Sprintf("%v", p)),
317-
zap.String("PanicStack", st))
316+
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
317+
zap.String(tagPanicStack, st))
318318
}
319319

320320
if isPolledTask {

0 commit comments

Comments
 (0)