Skip to content

Commit d64c4ef

Browse files
authored
Prevent local-activity panics from taking down the worker process (#1169)
Previously, a local activity that panicked would lead to a panic exiting the goroutine that performed the call, terminating the whole worker process immediately. Obviously this is not good behavior. So now that's fixed. Seems like it has been this way since local activities were introduced, in commit 79c072a , so it's pretty surprising that it went unnoticed / unfixed for this long. Particularly since it stands a decent chance of killing every worker instance due to decision task retries.
1 parent eead117 commit d64c4ef

File tree

2 files changed

+54
-19
lines changed

2 files changed

+54
-19
lines changed

internal/internal_task_pollers.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -602,25 +602,8 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
602602
}
603603
}
604604

605-
// panic handler
605+
// count all failures beyond this point, as they come from the activity itself
606606
defer func() {
607-
if p := recover(); p != nil {
608-
topLine := fmt.Sprintf("local activity for %s [panic]:", activityType)
609-
st := getStackTraceRaw(topLine, 7, 0)
610-
lath.logger.Error("LocalActivity panic.",
611-
zap.String(tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID),
612-
zap.String(tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID),
613-
zap.String(tagActivityType, activityType),
614-
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
615-
zap.String(tagPanicStack, st))
616-
metricsScope.Counter(metrics.LocalActivityPanicCounter).Inc(1)
617-
panicErr := newPanicError(p, st)
618-
result = &localActivityResult{
619-
task: task,
620-
result: nil,
621-
err: panicErr,
622-
}
623-
}
624607
if result.err != nil {
625608
metricsScope.Counter(metrics.LocalActivityFailedCounter).Inc(1)
626609
}
@@ -648,12 +631,28 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
648631
var err error
649632
doneCh := make(chan struct{})
650633
go func(ch chan struct{}) {
634+
defer close(ch)
635+
636+
defer func() {
637+
if p := recover(); p != nil {
638+
topLine := fmt.Sprintf("local activity for %s [panic]:", activityType)
639+
st := getStackTraceRaw(topLine, 7, 0)
640+
lath.logger.Error("LocalActivity panic.",
641+
zap.String(tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID),
642+
zap.String(tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID),
643+
zap.String(tagActivityType, activityType),
644+
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
645+
zap.String(tagPanicStack, st))
646+
metricsScope.Counter(metrics.LocalActivityPanicCounter).Inc(1)
647+
err = newPanicError(p, st)
648+
}
649+
}()
650+
651651
laStartTime := time.Now()
652652
ctx, span := createOpenTracingActivitySpan(ctx, lath.tracer, time.Now(), task.params.ActivityType, task.params.WorkflowInfo.WorkflowExecution.ID, task.params.WorkflowInfo.WorkflowExecution.RunID)
653653
defer span.Finish()
654654
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs)
655655
executionLatency := time.Now().Sub(laStartTime)
656-
close(ch)
657656
metricsScope.Timer(metrics.LocalActivityExecutionLatency).Record(executionLatency)
658657
if executionLatency > timeoutDuration {
659658
// If local activity takes longer than expected timeout, the context would already be DeadlineExceeded and
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"go.uber.org/zap/zaptest"
12+
)
13+
14+
func TestLocalActivityPanic(t *testing.T) {
15+
// regression: panics in local activities should not terminate the process
16+
s := WorkflowTestSuite{logger: zaptest.NewLogger(t)}
17+
env := s.NewTestWorkflowEnvironment()
18+
19+
wf := "panicky_local_activity"
20+
env.RegisterWorkflowWithOptions(func(ctx Context) error {
21+
ctx = WithLocalActivityOptions(ctx, LocalActivityOptions{
22+
ScheduleToCloseTimeout: time.Second,
23+
})
24+
return ExecuteLocalActivity(ctx, func(ctx context.Context) error {
25+
panic("should not kill process")
26+
}).Get(ctx, nil)
27+
}, RegisterWorkflowOptions{Name: wf})
28+
29+
env.ExecuteWorkflow(wf)
30+
err := env.GetWorkflowError()
31+
require.Error(t, err)
32+
var perr *PanicError
33+
require.True(t, errors.As(err, &perr), "error should be a panic error")
34+
assert.Contains(t, perr.StackTrace(), "panic")
35+
assert.Contains(t, perr.StackTrace(), t.Name(), "should mention the source location of the local activity that panicked")
36+
}

0 commit comments

Comments
 (0)