Skip to content

Commit 1686320

Browse files
committed
Support stacktraces
1 parent 0f5a6e8 commit 1686320

File tree

12 files changed

+191
-49
lines changed

12 files changed

+191
-49
lines changed

internal/sync/coroutine.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import (
1111

1212
const DeadlockDetection = 40 * time.Second
1313

14+
type CoroutineCreator interface {
15+
NewCoroutine(ctx Context, fn func(Context) error)
16+
}
17+
1418
type Coroutine interface {
1519
// Execute continues execution of a blocked corouting and waits until
1620
// it is finished or blocked again
@@ -28,7 +32,9 @@ type Coroutine interface {
2832

2933
Error() error
3034

31-
SetScheduler(s Scheduler)
35+
SetCoroutineCreator(creator CoroutineCreator)
36+
37+
SetPanicHandler(handler func(interface{}) error)
3238
}
3339

3440
type key int
@@ -47,13 +53,14 @@ type coState struct {
4753
shouldExit atomic.Value // coroutine should exit
4854
progress atomic.Value // did the coroutine make progress since last yield?
4955

50-
err error
56+
err error
57+
panicHandler func(interface{}) error
5158

5259
logger logger
5360

5461
deadlockDetection time.Duration
5562

56-
scheduler Scheduler
63+
creator CoroutineCreator
5764
}
5865

5966
func NewCoroutine(ctx Context, fn func(ctx Context) error) Coroutine {
@@ -64,7 +71,11 @@ func NewCoroutine(ctx Context, fn func(ctx Context) error) Coroutine {
6471
defer s.finish() // Ensure we always mark the coroutine as finished
6572
defer func() {
6673
if r := recover(); r != nil {
67-
s.err = fmt.Errorf("panic: %v", r)
74+
if s.panicHandler != nil {
75+
s.err = s.panicHandler(r)
76+
} else {
77+
s.err = fmt.Errorf("panic: %v", r)
78+
}
6879
}
6980
}()
7081

@@ -99,8 +110,12 @@ func (s *coState) finish() {
99110
s.logger.Println("finish")
100111
}
101112

102-
func (s *coState) SetScheduler(scheduler Scheduler) {
103-
s.scheduler = scheduler
113+
func (s *coState) SetCoroutineCreator(creator CoroutineCreator) {
114+
s.creator = creator
115+
}
116+
117+
func (s *coState) SetPanicHandler(handler func(interface{}) error) {
118+
s.panicHandler = handler
104119
}
105120

106121
func (s *coState) Finished() bool {

internal/sync/coroutine_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sync
22

33
import (
44
"errors"
5+
"fmt"
56
"testing"
67
"time"
78

@@ -177,3 +178,18 @@ func Test_Coroutine_Panic(t *testing.T) {
177178
require.Error(t, c.Error())
178179
require.Equal(t, c.Error().Error(), "panic: test panic")
179180
}
181+
182+
func Test_Coroutine_CallPanicHandlerIfSet(t *testing.T) {
183+
c := NewCoroutine(Background(), func(ctx Context) error {
184+
panic("test panic")
185+
})
186+
187+
c.SetPanicHandler(func(r interface{}) error {
188+
return fmt.Errorf("handled panic: %v", r)
189+
})
190+
191+
c.Execute()
192+
193+
require.True(t, c.Finished())
194+
require.EqualError(t, c.Error(), "handled panic: test panic")
195+
}

internal/sync/go.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package sync
33
func Go(ctx Context, f func(ctx Context)) {
44
cs := getCoState(ctx)
55

6-
cs.scheduler.NewCoroutine(ctx, func(ctx Context) error {
6+
cs.creator.NewCoroutine(ctx, func(ctx Context) error {
77
f(ctx)
88

99
return nil

internal/sync/scheduler.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,32 @@
11
package sync
22

3-
type Scheduler interface {
4-
// Starts a new co-routine and tracks it in this scheduler
5-
NewCoroutine(ctx Context, fn func(Context) error)
6-
7-
// Execute executes all coroutines until they are all blocked
8-
Execute() error
9-
10-
RunningCoroutines() int
11-
12-
Exit()
13-
}
14-
15-
type scheduler struct {
16-
coroutines []Coroutine
3+
type Scheduler struct {
4+
coroutines []Coroutine
5+
panicHandler func(interface{}) error
176
}
187

19-
func NewScheduler() Scheduler {
20-
return &scheduler{
8+
func NewScheduler() *Scheduler {
9+
return &Scheduler{
2110
coroutines: make([]Coroutine, 0),
2211
}
2312
}
2413

25-
func (s *scheduler) NewCoroutine(ctx Context, fn func(Context) error) {
14+
func (s *Scheduler) SetPanicHandler(handler func(interface{}) error) {
15+
s.panicHandler = handler
16+
}
17+
18+
// Starts a new co-routine and tracks it in this scheduler
19+
func (s *Scheduler) NewCoroutine(ctx Context, fn func(Context) error) {
2620
c := NewCoroutine(ctx, fn)
21+
if s.panicHandler != nil {
22+
c.SetPanicHandler(s.panicHandler)
23+
}
2724
s.coroutines = append(s.coroutines, c)
28-
c.SetScheduler(s)
25+
c.SetCoroutineCreator(s)
2926
}
3027

31-
func (s *scheduler) Execute() error {
28+
// Execute executes all coroutines until they are all blocked
29+
func (s *Scheduler) Execute() error {
3230
allBlocked := false
3331
for !allBlocked {
3432
allBlocked = true
@@ -60,11 +58,11 @@ func (s *scheduler) Execute() error {
6058
return nil
6159
}
6260

63-
func (s *scheduler) RunningCoroutines() int {
61+
func (s *Scheduler) RunningCoroutines() int {
6462
return len(s.coroutines)
6563
}
6664

67-
func (s *scheduler) Exit() {
65+
func (s *Scheduler) Exit() {
6866
for _, c := range s.coroutines {
6967
c.Exit()
7068
}

internal/workflow/workflow.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,17 @@ import (
1414
type Workflow interface{}
1515

1616
type workflow struct {
17-
s sync.Scheduler
17+
s *sync.Scheduler
1818
fn reflect.Value
1919
result payload.Payload
2020
err error
2121
}
2222

2323
func NewWorkflow(workflowFn reflect.Value) *workflow {
2424
s := sync.NewScheduler()
25+
s.SetPanicHandler(func(i interface{}) error {
26+
return workflowerrors.
27+
})
2528

2629
return &workflow{
2730
s: s,

internal/workflowerrors/error.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,34 @@
11
package workflowerrors
22

3-
import "errors"
3+
import (
4+
"encoding/json"
5+
"errors"
6+
)
47

58
type Error struct {
69
Type string `json:"type,omitempty"`
710
Message string `json:"message,omitempty"`
811

912
Permanent bool `json:"permanent,omitempty"`
1013
Cause error `json:"cause,omitempty"`
11-
StackTrace string `json:"stackTrace,omitempty"`
14+
Stacktrace string `json:"stacktrace,omitempty"`
15+
}
16+
17+
func (e *Error) UnmarshalJSON(b []byte) error {
18+
type Alias Error
19+
a := &struct {
20+
Cause *Error `json:"cause,omitempty"`
21+
*Alias
22+
}{}
23+
24+
if err := json.Unmarshal(b, &a); err != nil {
25+
return err
26+
}
27+
28+
*e = *(*Error)(a.Alias)
29+
e.Cause = a.Cause
30+
31+
return nil
1232
}
1333

1434
func (we *Error) Error() string {
@@ -19,6 +39,10 @@ func (we *Error) Unwrap() error {
1939
return we.Cause
2040
}
2141

42+
func (we *Error) Stack() string {
43+
return we.Stacktrace
44+
}
45+
2246
var _ error = (*Error)(nil)
2347

2448
// FromError wraps the given error into a workflow error which can be persisted and restored
@@ -37,6 +61,10 @@ func FromError(err error) *Error {
3761
Message: err.Error(),
3862
}
3963

64+
if stackTracer, ok := err.(interface{ Stack() string }); ok {
65+
e.Stacktrace = stackTracer.Stack()
66+
}
67+
4068
if cause := errors.Unwrap(err); cause != nil {
4169
e.Cause = FromError(cause)
4270
}
@@ -55,7 +83,7 @@ func ToError(err *Error) error {
5583

5684
switch err.Type {
5785
case getErrorType(&PanicError{}):
58-
return &PanicError{message: e.Message, stacktrace: e.StackTrace}
86+
return &PanicError{message: e.Message, stacktrace: e.Stacktrace}
5987

6088
default:
6189
// Keep *Error

internal/workflowerrors/panic.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ func (pe *PanicError) Error() string {
99
return pe.message
1010
}
1111

12-
func (pe *PanicError) Stacktrace() string {
12+
func (pe *PanicError) Stack() string {
1313
return pe.stacktrace
1414
}
1515

16-
func NewPanicError(msg string) error {
16+
func NewPanicError(msg string) *PanicError {
1717
return &PanicError{
18-
message: msg,
18+
message: msg,
19+
stacktrace: stack(3), // Skip new panic error and immediate caller
1920
}
2021
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package workflowerrors
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func Test_NewPanicError(t *testing.T) {
10+
e := func() *PanicError {
11+
return NewPanicError("test")
12+
}()
13+
14+
require.NotContains(t, e.Stack(), "Test_NewPanicError.func1")
15+
require.NotContains(t, e.Stack(), "NewPanicError")
16+
}

internal/workflowerrors/stack.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,34 @@
11
package workflowerrors
22

3-
import goerrors "github.com/go-errors/errors"
3+
import (
4+
"bytes"
5+
"runtime"
46

5-
func stack(err error) string {
6-
goerr := goerrors.New(err)
7-
return string(goerr.Stack())
7+
goerrors "github.com/go-errors/errors"
8+
)
9+
10+
const MaxStackDepth = 50
11+
12+
// stack returns a stacktrace formatted via go-errors/errors
13+
func stack(skip int) string {
14+
// get stack
15+
stack := make([]uintptr, MaxStackDepth)
16+
length := runtime.Callers(2+skip, stack[:])
17+
18+
// trim
19+
stack = stack[:length]
20+
21+
frames := make([]goerrors.StackFrame, len(stack))
22+
for i, pc := range stack {
23+
frames[i] = goerrors.NewStackFrame(pc)
24+
}
25+
26+
// Convert frames to trace
27+
buf := bytes.Buffer{}
28+
29+
for _, frame := range frames {
30+
buf.WriteString(frame.String())
31+
}
32+
33+
return string(buf.Bytes())
834
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package workflowerrors
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func Test_stack(t *testing.T) {
10+
fn := func() {
11+
s := stack(1)
12+
require.NotContains(t, s, "Test_stack.func1")
13+
}
14+
15+
foo(fn)
16+
}
17+
18+
func foo(fn func()) {
19+
bar(fn)
20+
}
21+
22+
func bar(fn func()) {
23+
fn()
24+
}

0 commit comments

Comments
 (0)