Skip to content

Commit 0f5a6e8

Browse files
committed
Improve error handling for activities and workflows
1 parent c47338e commit 0f5a6e8

37 files changed

+745
-127
lines changed

backend/test/e2e.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ import (
1919
"github.com/stretchr/testify/require"
2020
)
2121

22+
type backendTest struct {
23+
name string
24+
options []backend.BackendOption
25+
withoutCache bool // If set, test will only be run when the cache is disabled
26+
f func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend)
27+
}
28+
2229
func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOption) TestBackend, teardown func(b TestBackend)) {
23-
tests := []struct {
24-
name string
25-
options []backend.BackendOption
26-
withoutCache bool // If set, test will only be run when the cache is disabled
27-
f func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend)
28-
}{
30+
tests := []backendTest{
2931
{
3032
name: "SimpleWorkflow",
3133
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
@@ -652,13 +654,42 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
652654

653655
instance := runWorkflow(t, ctx, c, wf, 0)
654656

657+
r, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*10)
658+
require.NoError(t, err)
659+
require.Equal(t, 3, r)
660+
},
661+
},
662+
{
663+
name: "ContinueAsNew_Subworkflow",
664+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
665+
swf := func(ctx workflow.Context, run int) (int, error) {
666+
l := workflow.Logger(ctx)
667+
668+
run = run + 1
669+
if run < 3 {
670+
l.Debug("continue as new", "run", run)
671+
return run, workflow.ContinueAsNew(ctx, run)
672+
}
673+
674+
return run, nil
675+
}
676+
677+
wf := func(ctx workflow.Context, run int) (int, error) {
678+
return workflow.CreateSubWorkflowInstance[int](ctx, workflow.DefaultSubWorkflowOptions, swf, run).Get(ctx)
679+
}
680+
register(t, ctx, w, []interface{}{wf, swf}, nil)
681+
682+
instance := runWorkflow(t, ctx, c, wf, 0)
683+
655684
r, err := client.GetWorkflowResult[int](ctx, c, instance, time.Second*10)
656685
require.NoError(t, err)
657686
require.Equal(t, 3, r)
658687
},
659688
},
660689
}
661690

691+
tests = append(tests, e2eActivityTests...)
692+
662693
run := func(suffix string, workerOptions *worker.Options) {
663694
for _, tt := range tests {
664695
if tt.withoutCache && workerOptions.WorkflowExecutorCache != nil {

backend/test/e2e_activity.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package test
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/cschleiden/go-workflows/client"
9+
"github.com/cschleiden/go-workflows/worker"
10+
"github.com/cschleiden/go-workflows/workflow"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
type CustomError struct {
15+
msg string
16+
}
17+
18+
func (e *CustomError) Error() string {
19+
return e.msg
20+
}
21+
22+
var e2eActivityTests = []backendTest{
23+
{
24+
name: "Activity_Panic",
25+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
26+
a := func(context.Context) error {
27+
panic("activity panic")
28+
}
29+
30+
wf := func(ctx workflow.Context) (bool, error) {
31+
_, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
32+
RetryOptions: workflow.RetryOptions{
33+
MaxAttempts: 1,
34+
},
35+
}, a).Get(ctx)
36+
37+
var perr *workflow.PanicError
38+
return errors.As(err, &perr), nil
39+
}
40+
register(t, ctx, w, []interface{}{wf}, []interface{}{a})
41+
42+
output, err := runWorkflowWithResult[bool](t, ctx, c, wf)
43+
44+
require.True(t, output, "error should be PanicError")
45+
require.NoError(t, err)
46+
},
47+
},
48+
{
49+
name: "Activity_CustomError",
50+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker, b TestBackend) {
51+
a := func(context.Context) error {
52+
return &CustomError{msg: "custom error"}
53+
}
54+
55+
wf := func(ctx workflow.Context) (bool, error) {
56+
_, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
57+
RetryOptions: workflow.RetryOptions{
58+
MaxAttempts: 1,
59+
},
60+
}, a).Get(ctx)
61+
62+
var werr *workflow.Error
63+
if errors.As(err, &werr) {
64+
return werr.Type == "CustomError" && werr.Error() == "custom error", nil
65+
}
66+
67+
return false, nil
68+
}
69+
register(t, ctx, w, []interface{}{wf}, []interface{}{a})
70+
71+
output, err := runWorkflowWithResult[bool](t, ctx, c, wf)
72+
73+
require.True(t, output, "error should be PanicError")
74+
require.NoError(t, err)
75+
},
76+
},
77+
}

client/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cschleiden/go-workflows/internal/fn"
1515
"github.com/cschleiden/go-workflows/internal/history"
1616
"github.com/cschleiden/go-workflows/internal/metrickeys"
17+
"github.com/cschleiden/go-workflows/internal/workflowerrors"
1718
"github.com/cschleiden/go-workflows/log"
1819
"github.com/cschleiden/go-workflows/metrics"
1920
"github.com/cschleiden/go-workflows/workflow"
@@ -195,7 +196,7 @@ func GetWorkflowResult[T any](ctx context.Context, c Client, instance *workflow.
195196
return *new(T), fmt.Errorf("workflow did not finish in time: %w", err)
196197
}
197198

198-
h, err := b.GetWorkflowInstanceHistory(ctx, instance, nil)
199+
h, err := b.GetWorkflowInstanceHistory(ctx, instance, nil) // future: could optimize this by retriving only the very last entry in the history
199200
if err != nil {
200201
return *new(T), fmt.Errorf("getting workflow history: %w", err)
201202
}
@@ -206,8 +207,8 @@ func GetWorkflowResult[T any](ctx context.Context, c Client, instance *workflow.
206207
switch event.Type {
207208
case history.EventType_WorkflowExecutionFinished:
208209
a := event.Attributes.(*history.ExecutionCompletedAttributes)
209-
if a.Error != "" {
210-
return *new(T), errors.New(a.Error)
210+
if a.Error != nil {
211+
return *new(T), workflowerrors.ToError(a.Error)
211212
}
212213

213214
var r T

client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func Test_Client_GetWorkflowResultSuccess(t *testing.T) {
8080
history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{}),
8181
history.NewHistoryEvent(2, time.Now(), history.EventType_WorkflowExecutionFinished, &history.ExecutionCompletedAttributes{
8282
Result: r,
83-
Error: "",
83+
Error: nil,
8484
}),
8585
}, nil)
8686
b.On("Converter").Return(converter.DefaultConverter)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ require (
2626
github.com/breml/bidichk v0.2.3 // indirect
2727
github.com/curioswitch/go-reassign v0.2.0 // indirect
2828
github.com/firefart/nonamedreturns v1.0.4 // indirect
29+
github.com/go-errors/errors v1.4.2 // indirect
2930
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
3031
github.com/kkHAIKE/contextcheck v1.1.2 // indirect
3132
github.com/lufeee/execinquery v1.2.1 // indirect

go.sum

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ github.com/alexkohler/prealloc v1.0.0/go.mod h1:VetnK3dIgFBBKmg0YnD9F9x6Icjd+9cv
6767
github.com/alingse/asasalint v0.0.11 h1:SFwnQXJ49Kx/1GghOFz1XGqHYKp21Kq1nHad/0WQRnw=
6868
github.com/alingse/asasalint v0.0.11/go.mod h1:nCaoMhw7a9kSJObvQyVzNTPBDbNpdocqrSP7t/cW5+I=
6969
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
70+
github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
7071
github.com/ashanbrown/forbidigo v1.3.0 h1:VkYIwb/xxdireGAdJNZoo24O4lmnEWkactplBlWTShc=
7172
github.com/ashanbrown/forbidigo v1.3.0/go.mod h1:vVW7PEdqEFqapJe95xHkTfB1+XvZXBFg8t0sG2FIxmI=
7273
github.com/ashanbrown/makezero v1.1.1 h1:iCQ87C0V0vSyO+M9E/FZYbu65auqH0lnsOkf5FcB28s=
@@ -115,6 +116,8 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH
115116
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
116117
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
117118
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
119+
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
120+
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
118121
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
119122
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
120123
github.com/cristalhq/acmd v0.8.1/go.mod h1:LG5oa43pE/BbxtfMoImHCQN++0Su7dzipdgBjMCBVDQ=
@@ -155,6 +158,8 @@ github.com/fzipp/gocyclo v0.6.0/go.mod h1:rXPyn8fnlpa0R2csP/31uerbiVBugk5whMdlya
155158
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
156159
github.com/go-critic/go-critic v0.6.5 h1:fDaR/5GWURljXwF8Eh31T2GZNz9X4jeboS912mWF8Uo=
157160
github.com/go-critic/go-critic v0.6.5/go.mod h1:ezfP/Lh7MA6dBNn4c6ab5ALv3sKnZVLx37tr00uuaOY=
161+
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
162+
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
158163
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
159164
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
160165
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
@@ -198,12 +203,14 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA
198203
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
199204
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
200205
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
206+
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
201207
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
202208
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
203209
github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE=
204210
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
205211
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
206212
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
213+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
207214
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
208215
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
209216
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
@@ -309,13 +316,17 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QG
309316
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
310317
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
311318
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
319+
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
320+
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
312321
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
313322
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
323+
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
314324
github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
315325
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
316326
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
317327
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
318328
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
329+
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
319330
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
320331
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
321332
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
@@ -598,6 +609,7 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
598609
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
599610
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
600611
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
612+
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
601613
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
602614
go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
603615
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 h1:t4ZwRPU+emrcvM2e9DHd0Fsf0JTPVcbfa/BhTDF03d0=
@@ -662,6 +674,7 @@ golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPI
662674
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
663675
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
664676
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
677+
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
665678
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
666679
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
667680
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=

internal/activity/executor.go

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cschleiden/go-workflows/internal/payload"
1414
"github.com/cschleiden/go-workflows/internal/task"
1515
"github.com/cschleiden/go-workflows/internal/workflow"
16+
"github.com/cschleiden/go-workflows/internal/workflowerrors"
1617
"github.com/cschleiden/go-workflows/log"
1718
"go.opentelemetry.io/otel/attribute"
1819
"go.opentelemetry.io/otel/trace"
@@ -26,8 +27,8 @@ type Executor struct {
2627
r *workflow.Registry
2728
}
2829

29-
func NewExecutor(logger log.Logger, tracer trace.Tracer, converter converter.Converter, propagators []contextpropagation.ContextPropagator, r *workflow.Registry) Executor {
30-
return Executor{
30+
func NewExecutor(logger log.Logger, tracer trace.Tracer, converter converter.Converter, propagators []contextpropagation.ContextPropagator, r *workflow.Registry) *Executor {
31+
return &Executor{
3132
logger: logger,
3233
tracer: tracer,
3334
converter: converter,
@@ -46,12 +47,12 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (pa
4647

4748
activityFn := reflect.ValueOf(activity)
4849
if activityFn.Type().Kind() != reflect.Func {
49-
return nil, errors.New("activity not a function")
50+
return nil, workflowerrors.NewPermanentError(errors.New("activity not a function"))
5051
}
5152

5253
args, addContext, err := args.InputsToArgs(e.converter, activityFn, a.Inputs)
5354
if err != nil {
54-
return nil, fmt.Errorf("converting activity inputs: %w", err)
55+
return nil, workflowerrors.NewPermanentError(fmt.Errorf("converting activity inputs: %w", err))
5556
}
5657

5758
// Add activity state to context
@@ -64,7 +65,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (pa
6465
for _, propagator := range e.propagators {
6566
activityCtx, err = propagator.Extract(activityCtx, a.Metadata)
6667
if err != nil {
67-
return nil, fmt.Errorf("extracting context from propagator: %w", err)
68+
return nil, workflowerrors.NewPermanentError(fmt.Errorf("extracting context from propagator: %w", err))
6869
}
6970
}
7071

@@ -79,31 +80,52 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (pa
7980
if addContext {
8081
args[0] = reflect.ValueOf(activityCtx)
8182
}
82-
r := activityFn.Call(args)
8383

84-
if len(r) < 1 || len(r) > 2 {
85-
return nil, errors.New("activity has to return either (error) or (<result>, error)")
84+
done := make(chan struct{})
85+
var rv []reflect.Value
86+
87+
go func() {
88+
// Recover any panic encountered during activity execution
89+
defer func() {
90+
if r := recover(); r != nil {
91+
err = workflowerrors.NewPanicError(fmt.Sprintf("panic: %v", r))
92+
rv = []reflect.Value{reflect.ValueOf(err)}
93+
}
94+
95+
close(done)
96+
}()
97+
98+
rv = activityFn.Call(args)
99+
}()
100+
101+
<-done
102+
103+
if len(rv) < 1 || len(rv) > 2 {
104+
return nil, workflowerrors.NewPermanentError(errors.New("activity has to return either (error) or (<result>, error)"))
86105
}
87106

88107
var result payload.Payload
89108

90-
if len(r) > 1 {
109+
// Convert activity result to payload. We always expect at least an error
110+
if len(rv) > 1 {
91111
var err error
92-
result, err = e.converter.To(r[0].Interface())
112+
result, err = e.converter.To(rv[0].Interface())
93113
if err != nil {
94-
return nil, fmt.Errorf("converting activity result: %w", err)
114+
return nil, workflowerrors.NewPermanentError(fmt.Errorf("converting activity result: %w", err))
95115
}
96116
}
97117

98-
errResult := r[len(r)-1]
118+
// Was an error returned?
119+
errResult := rv[len(rv)-1]
99120
if errResult.IsNil() {
121+
// No error from activity execution
100122
return result, nil
101123
}
102124

103-
errInterface, ok := errResult.Interface().(error)
125+
err, ok := errResult.Interface().(error)
104126
if !ok {
105-
return nil, fmt.Errorf("activity error result does not satisfy error interface (%T): %v", errResult, errResult)
127+
return nil, workflowerrors.NewPermanentError(fmt.Errorf("activity error result does not satisfy error interface (%T): %v", errResult, errResult))
106128
}
107129

108-
return result, errInterface
130+
return result, workflowerrors.FromError(err)
109131
}

0 commit comments

Comments
 (0)