Skip to content

Commit 85ffc5e

Browse files
committed
Add raise task
Signed-off-by: Ricardo Zanini <[email protected]>
1 parent feeb8db commit 85ffc5e

20 files changed

+661
-42
lines changed

expr/expr.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TraverseAndEvaluate(node interface{}, input interface{}) (interface{}, erro
6161
case string:
6262
// Check if the string is a runtime expression (e.g., ${ .some.path })
6363
if IsStrictExpr(v) {
64-
return EvaluateJQExpression(Sanitize(v), input)
64+
return evaluateJQExpression(Sanitize(v), input)
6565
}
6666
return v, nil
6767

@@ -71,8 +71,8 @@ func TraverseAndEvaluate(node interface{}, input interface{}) (interface{}, erro
7171
}
7272
}
7373

74-
// EvaluateJQExpression evaluates a jq expression against a given JSON input
75-
func EvaluateJQExpression(expression string, input interface{}) (interface{}, error) {
74+
// evaluateJQExpression evaluates a jq expression against a given JSON input
75+
func evaluateJQExpression(expression string, input interface{}) (interface{}, error) {
7676
// Parse the sanitized jq expression
7777
query, err := gojq.Parse(expression)
7878
if err != nil {

impl/runner.go

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err er
5151
}()
5252

5353
// Process input
54-
if input, err = wr.processInput(input); err != nil {
54+
if input, err = wr.processWorkflowInput(input); err != nil {
5555
return nil, err
5656
}
5757

@@ -64,16 +64,24 @@ func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err er
6464
output = wr.RunnerCtx.GetOutput()
6565

6666
// Process output
67-
if output, err = wr.processOutput(output); err != nil {
67+
if output, err = wr.processWorkflowOutput(output); err != nil {
6868
return nil, err
6969
}
7070

7171
wr.RunnerCtx.SetStatus(CompletedStatus)
7272
return output, nil
7373
}
7474

75-
// processInput validates and transforms input if needed.
76-
func (wr *workflowRunnerImpl) processInput(input interface{}) (interface{}, error) {
75+
// wrapWorkflowError ensures workflow errors have a proper instance reference.
76+
func (wr *workflowRunnerImpl) wrapWorkflowError(err error, taskName string) error {
77+
if knownErr := model.AsError(err); knownErr != nil {
78+
return knownErr.WithInstanceRef(wr.Workflow, taskName)
79+
}
80+
return model.NewErrRuntime(err, taskName)
81+
}
82+
83+
// processWorkflowInput validates and transforms input if needed.
84+
func (wr *workflowRunnerImpl) processWorkflowInput(input interface{}) (interface{}, error) {
7785
if wr.Workflow.Input != nil {
7886
var err error
7987
if err = validateSchema(input, wr.Workflow.Input.Schema, "/"); err != nil {
@@ -99,39 +107,45 @@ func (wr *workflowRunnerImpl) executeTasks(tasks *model.TaskList) error {
99107
return nil
100108
}
101109

102-
// TODO: implement control flow: continue, end, then
103-
for _, taskItem := range *tasks {
110+
idx := 0
111+
currentTask := (*tasks)[idx]
112+
113+
for currentTask != nil {
104114
wr.RunnerCtx.SetInput(wr.RunnerCtx.GetOutput())
105-
if shouldRun, err := wr.shouldRunTask(taskItem); err != nil {
115+
if shouldRun, err := wr.shouldRunTask(currentTask); err != nil {
106116
return err
107117
} else if !shouldRun {
108118
wr.RunnerCtx.SetOutput(wr.RunnerCtx.GetInput())
119+
idx, currentTask = tasks.Next(idx)
109120
continue
110121
}
111122

112-
wr.RunnerCtx.SetTaskStatus(taskItem.Key, PendingStatus)
113-
runner, err := NewTaskRunner(taskItem.Key, taskItem.Task)
123+
wr.RunnerCtx.SetTaskStatus(currentTask.Key, PendingStatus)
124+
runner, err := NewTaskRunner(currentTask.Key, currentTask.Task, wr)
114125
if err != nil {
115126
return err
116127
}
117128

118-
wr.RunnerCtx.SetTaskStatus(taskItem.Key, RunningStatus)
129+
wr.RunnerCtx.SetTaskStatus(currentTask.Key, RunningStatus)
119130
var output interface{}
120-
if output, err = wr.runTask(runner, taskItem.Task.GetBase()); err != nil {
121-
wr.RunnerCtx.SetTaskStatus(taskItem.Key, FaultedStatus)
131+
if output, err = wr.runTask(runner, currentTask.Task.GetBase()); err != nil {
132+
wr.RunnerCtx.SetTaskStatus(currentTask.Key, FaultedStatus)
122133
return err
123134
}
135+
// TODO: make sure that `output` is a map[string]interface{}, so compatible to JSON traversal.
124136

125-
wr.RunnerCtx.SetTaskStatus(taskItem.Key, CompletedStatus)
137+
wr.RunnerCtx.SetTaskStatus(currentTask.Key, CompletedStatus)
126138
wr.RunnerCtx.SetOutput(output)
139+
140+
idx, currentTask = tasks.Next(idx)
127141
}
128142

129143
return nil
130144
}
131145

132146
func (wr *workflowRunnerImpl) shouldRunTask(task *model.TaskItem) (bool, error) {
133147
if task.GetBase().If != nil {
134-
output, err := expr.EvaluateJQExpression(task.GetBase().If.String(), wr.RunnerCtx.GetInput())
148+
output, err := expr.TraverseAndEvaluate(task.GetBase().If.String(), wr.RunnerCtx.GetInput())
135149
if err != nil {
136150
return false, model.NewErrExpression(err, task.Key)
137151
}
@@ -142,8 +156,8 @@ func (wr *workflowRunnerImpl) shouldRunTask(task *model.TaskItem) (bool, error)
142156
return true, nil
143157
}
144158

145-
// processOutput applies output transformations.
146-
func (wr *workflowRunnerImpl) processOutput(output interface{}) (interface{}, error) {
159+
// processWorkflowOutput applies output transformations.
160+
func (wr *workflowRunnerImpl) processWorkflowOutput(output interface{}) (interface{}, error) {
147161
if wr.Workflow.Output != nil {
148162
var err error
149163
if output, err = traverseAndEvaluate(wr.Workflow.Output.As, wr.RunnerCtx.GetOutput(), "/"); err != nil {
@@ -161,16 +175,40 @@ func (wr *workflowRunnerImpl) processOutput(output interface{}) (interface{}, er
161175

162176
// ----------------- Task funcs ------------------- //
163177

178+
// TODO: refactor to receive a resolver handler instead of the workflow runner
179+
164180
// NewTaskRunner creates a TaskRunner instance based on the task type.
165-
func NewTaskRunner(taskName string, task model.Task) (TaskRunner, error) {
181+
func NewTaskRunner(taskName string, task model.Task, wr *workflowRunnerImpl) (TaskRunner, error) {
166182
switch t := task.(type) {
167183
case *model.SetTask:
168184
return NewSetTaskRunner(taskName, t)
185+
case *model.RaiseTask:
186+
if err := wr.resolveErrorDefinition(t); err != nil {
187+
return nil, err
188+
}
189+
return NewRaiseTaskRunner(taskName, t)
169190
default:
170191
return nil, fmt.Errorf("unsupported task type '%T' for task '%s'", t, taskName)
171192
}
172193
}
173194

195+
// TODO: can e refactored to a definition resolver callable from the context
196+
func (wr *workflowRunnerImpl) resolveErrorDefinition(t *model.RaiseTask) error {
197+
if t.Raise.Error.Ref != nil {
198+
notFoundErr := model.NewErrValidation(fmt.Errorf("%v error definition not found in 'uses'", t.Raise.Error.Ref), "")
199+
if wr.Workflow.Use != nil && wr.Workflow.Use.Errors != nil {
200+
definition, ok := wr.Workflow.Use.Errors[*t.Raise.Error.Ref]
201+
if !ok {
202+
return notFoundErr
203+
}
204+
t.Raise.Error.Definition = definition
205+
return nil
206+
}
207+
return notFoundErr
208+
}
209+
return nil
210+
}
211+
174212
// runTask executes an individual task.
175213
func (wr *workflowRunnerImpl) runTask(runner TaskRunner, task *model.TaskBase) (output interface{}, err error) {
176214
taskInput := wr.RunnerCtx.GetInput()
@@ -234,14 +272,6 @@ func (wr *workflowRunnerImpl) validateAndEvaluateTaskOutput(task *model.TaskBase
234272
return output, nil
235273
}
236274

237-
// wrapWorkflowError ensures workflow errors have a proper instance reference.
238-
func (wr *workflowRunnerImpl) wrapWorkflowError(err error, taskName string) error {
239-
if knownErr := model.AsError(err); knownErr != nil {
240-
return knownErr.WithInstanceRef(wr.Workflow, taskName)
241-
}
242-
return model.NewErrRuntime(err, taskName)
243-
}
244-
245275
func validateSchema(data interface{}, schema *model.Schema, taskName string) error {
246276
if schema != nil {
247277
if err := ValidateJSONSchema(data, schema); err != nil {

impl/task.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
)
88

99
var _ TaskRunner = &SetTaskRunner{}
10+
var _ TaskRunner = &RaiseTaskRunner{}
1011

1112
type TaskRunner interface {
1213
Run(input interface{}) (interface{}, error)
@@ -50,3 +51,65 @@ func (s *SetTaskRunner) Run(input interface{}) (output interface{}, err error) {
5051

5152
return output, nil
5253
}
54+
55+
func NewRaiseTaskRunner(taskName string, task *model.RaiseTask) (*RaiseTaskRunner, error) {
56+
if task == nil || task.Raise.Error.Definition == nil {
57+
return nil, model.NewErrValidation(fmt.Errorf("no raise configuration provided for RaiseTask %s", taskName), taskName)
58+
}
59+
return &RaiseTaskRunner{
60+
Task: task,
61+
TaskName: taskName,
62+
}, nil
63+
}
64+
65+
type RaiseTaskRunner struct {
66+
Task *model.RaiseTask
67+
TaskName string
68+
}
69+
70+
var raiseErrFuncMapping = map[string]func(error, string) *model.Error{
71+
model.ErrorTypeAuthentication: model.NewErrAuthentication,
72+
model.ErrorTypeValidation: model.NewErrValidation,
73+
model.ErrorTypeCommunication: model.NewErrCommunication,
74+
model.ErrorTypeAuthorization: model.NewErrAuthorization,
75+
model.ErrorTypeConfiguration: model.NewErrConfiguration,
76+
model.ErrorTypeExpression: model.NewErrExpression,
77+
model.ErrorTypeRuntime: model.NewErrRuntime,
78+
model.ErrorTypeTimeout: model.NewErrTimeout,
79+
}
80+
81+
func (r *RaiseTaskRunner) Run(input interface{}) (output interface{}, err error) {
82+
output = input
83+
// TODO: make this an external func so we can call it after getting the reference? Or we can get the reference from the workflow definition
84+
var detailResult interface{}
85+
detailResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Detail.AsObjectOrRuntimeExpr(), input, r.TaskName)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
var titleResult interface{}
91+
titleResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Title.AsObjectOrRuntimeExpr(), input, r.TaskName)
92+
if err != nil {
93+
return nil, err
94+
}
95+
96+
instance := &model.JsonPointerOrRuntimeExpression{Value: r.TaskName}
97+
98+
var raiseErr *model.Error
99+
if raiseErrF, ok := raiseErrFuncMapping[r.Task.Raise.Error.Definition.Type.String()]; ok {
100+
raiseErr = raiseErrF(fmt.Errorf("%v", detailResult), instance.String())
101+
} else {
102+
raiseErr = r.Task.Raise.Error.Definition
103+
raiseErr.Detail = model.NewStringOrRuntimeExpr(fmt.Sprintf("%v", detailResult))
104+
raiseErr.Instance = instance
105+
}
106+
107+
raiseErr.Title = model.NewStringOrRuntimeExpr(fmt.Sprintf("%v", titleResult))
108+
err = raiseErr
109+
110+
return output, err
111+
}
112+
113+
func (r *RaiseTaskRunner) GetTaskName() string {
114+
return r.TaskName
115+
}

impl/task_raise_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package impl
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"github.com/serverlessworkflow/sdk-go/v3/model"
7+
"github.com/stretchr/testify/assert"
8+
"testing"
9+
)
10+
11+
func TestRaiseTaskRunner_WithDefinedError(t *testing.T) {
12+
input := map[string]interface{}{}
13+
14+
raiseTask := &model.RaiseTask{
15+
Raise: model.RaiseTaskConfiguration{
16+
Error: model.RaiseTaskError{
17+
Definition: &model.Error{
18+
Type: model.NewUriTemplate(model.ErrorTypeValidation),
19+
Status: 400,
20+
Title: model.NewStringOrRuntimeExpr("Validation Error"),
21+
Detail: model.NewStringOrRuntimeExpr("Invalid input data"),
22+
},
23+
},
24+
},
25+
}
26+
27+
runner, err := NewRaiseTaskRunner("task_raise_defined", raiseTask)
28+
assert.NoError(t, err)
29+
30+
output, err := runner.Run(input)
31+
assert.Equal(t, output, input)
32+
assert.Error(t, err)
33+
34+
expectedErr := model.NewErrValidation(errors.New("Invalid input data"), "task_raise_defined")
35+
36+
assert.Equal(t, expectedErr.Type.String(), err.(*model.Error).Type.String())
37+
assert.Equal(t, expectedErr.Status, err.(*model.Error).Status)
38+
assert.Equal(t, expectedErr.Title.String(), err.(*model.Error).Title.String())
39+
assert.Equal(t, "Invalid input data", err.(*model.Error).Detail.String())
40+
assert.Equal(t, expectedErr.Instance.String(), err.(*model.Error).Instance.String())
41+
}
42+
43+
func TestRaiseTaskRunner_WithReferencedError(t *testing.T) {
44+
var ref string
45+
ref = "someErrorRef"
46+
raiseTask := &model.RaiseTask{
47+
Raise: model.RaiseTaskConfiguration{
48+
Error: model.RaiseTaskError{
49+
Ref: &ref,
50+
},
51+
},
52+
}
53+
54+
runner, err := NewRaiseTaskRunner("task_raise_ref", raiseTask)
55+
assert.Error(t, err)
56+
assert.Nil(t, runner)
57+
}
58+
59+
func TestRaiseTaskRunner_TimeoutErrorWithExpression(t *testing.T) {
60+
input := map[string]interface{}{
61+
"timeoutMessage": "Request took too long",
62+
}
63+
64+
raiseTask := &model.RaiseTask{
65+
Raise: model.RaiseTaskConfiguration{
66+
Error: model.RaiseTaskError{
67+
Definition: &model.Error{
68+
Type: model.NewUriTemplate(model.ErrorTypeTimeout),
69+
Status: 408,
70+
Title: model.NewStringOrRuntimeExpr("Timeout Error"),
71+
Detail: model.NewStringOrRuntimeExpr("${ .timeoutMessage }"),
72+
},
73+
},
74+
},
75+
}
76+
77+
runner, err := NewRaiseTaskRunner("task_raise_timeout_expr", raiseTask)
78+
assert.NoError(t, err)
79+
80+
output, err := runner.Run(input)
81+
assert.Equal(t, input, output)
82+
assert.Error(t, err)
83+
84+
expectedErr := model.NewErrTimeout(errors.New("Request took too long"), "task_raise_timeout_expr")
85+
86+
assert.Equal(t, expectedErr.Type.String(), err.(*model.Error).Type.String())
87+
assert.Equal(t, expectedErr.Status, err.(*model.Error).Status)
88+
assert.Equal(t, expectedErr.Title.String(), err.(*model.Error).Title.String())
89+
assert.Equal(t, "Request took too long", err.(*model.Error).Detail.String())
90+
assert.Equal(t, expectedErr.Instance.String(), err.(*model.Error).Instance.String())
91+
}
92+
93+
func TestRaiseTaskRunner_Serialization(t *testing.T) {
94+
raiseTask := &model.RaiseTask{
95+
Raise: model.RaiseTaskConfiguration{
96+
Error: model.RaiseTaskError{
97+
Definition: &model.Error{
98+
Type: model.NewUriTemplate(model.ErrorTypeRuntime),
99+
Status: 500,
100+
Title: model.NewStringOrRuntimeExpr("Runtime Error"),
101+
Detail: model.NewStringOrRuntimeExpr("Unexpected failure"),
102+
Instance: &model.JsonPointerOrRuntimeExpression{Value: "/task_runtime"},
103+
},
104+
},
105+
},
106+
}
107+
108+
data, err := json.Marshal(raiseTask)
109+
assert.NoError(t, err)
110+
111+
var deserializedTask model.RaiseTask
112+
err = json.Unmarshal(data, &deserializedTask)
113+
assert.NoError(t, err)
114+
115+
assert.Equal(t, raiseTask.Raise.Error.Definition.Type.String(), deserializedTask.Raise.Error.Definition.Type.String())
116+
assert.Equal(t, raiseTask.Raise.Error.Definition.Status, deserializedTask.Raise.Error.Definition.Status)
117+
assert.Equal(t, raiseTask.Raise.Error.Definition.Title.String(), deserializedTask.Raise.Error.Definition.Title.String())
118+
assert.Equal(t, raiseTask.Raise.Error.Definition.Detail.String(), deserializedTask.Raise.Error.Definition.Detail.String())
119+
assert.Equal(t, raiseTask.Raise.Error.Definition.Instance.String(), deserializedTask.Raise.Error.Definition.Instance.String())
120+
}
121+
122+
func TestRaiseTaskRunner_ReferenceSerialization(t *testing.T) {
123+
var ref string
124+
ref = "errorReference"
125+
raiseTask := &model.RaiseTask{
126+
Raise: model.RaiseTaskConfiguration{
127+
Error: model.RaiseTaskError{
128+
Ref: &ref,
129+
},
130+
},
131+
}
132+
133+
data, err := json.Marshal(raiseTask)
134+
assert.NoError(t, err)
135+
136+
var deserializedTask model.RaiseTask
137+
err = json.Unmarshal(data, &deserializedTask)
138+
assert.NoError(t, err)
139+
140+
assert.Equal(t, *raiseTask.Raise.Error.Ref, *deserializedTask.Raise.Error.Ref)
141+
assert.Nil(t, deserializedTask.Raise.Error.Definition)
142+
}

0 commit comments

Comments
 (0)