Skip to content

Commit 45bb41e

Browse files
Fix #233 - Add support to 'switch' task (#234)
* Fix #233 - Add support to 'switch' task Signed-off-by: Ricardo Zanini <[email protected]> * Fix headers and linters Signed-off-by: Ricardo Zanini <[email protected]> --------- Signed-off-by: Ricardo Zanini <[email protected]>
1 parent 5237b86 commit 45bb41e

17 files changed

+220
-20
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ The table below lists the current state of this implementation. This table is a
132132
| Task Raise ||
133133
| Task Run ||
134134
| Task Set ||
135-
| Task Switch | |
135+
| Task Switch | |
136136
| Task Try ||
137137
| Task Wait ||
138138
| Lifecycle Events | 🟡 |
@@ -157,7 +157,7 @@ The table below lists the current state of this implementation. This table is a
157157
| AsyncAPI Server ||
158158
| AsyncAPI Outbound Message ||
159159
| AsyncAPI Subscription ||
160-
| Workflow Definition Reference | |
160+
| Workflow Definition Reference | |
161161
| Subscription Iterator ||
162162

163163
We love contributions! Our aim is to have a complete implementation to serve as a reference or to become a project on its own to favor the CNCF Ecosystem.

impl/ctx/context.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ import (
1919
"encoding/json"
2020
"errors"
2121
"fmt"
22-
"github.com/google/uuid"
23-
"github.com/serverlessworkflow/sdk-go/v3/model"
2422
"sync"
2523
"time"
24+
25+
"github.com/google/uuid"
26+
"github.com/serverlessworkflow/sdk-go/v3/model"
2627
)
2728

2829
var ErrWorkflowContextNotFound = errors.New("workflow context not found")

impl/expr/expr.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
2122
"github.com/itchyny/gojq"
2223
"github.com/serverlessworkflow/sdk-go/v3/impl/ctx"
2324
"github.com/serverlessworkflow/sdk-go/v3/model"

impl/json_pointer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ package impl
1717
import (
1818
"encoding/json"
1919
"fmt"
20-
"github.com/serverlessworkflow/sdk-go/v3/model"
2120
"reflect"
2221
"strings"
22+
23+
"github.com/serverlessworkflow/sdk-go/v3/model"
2324
)
2425

2526
func findJsonPointer(data interface{}, target string, path string) (string, bool) {

impl/json_pointer_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
package impl
1616

1717
import (
18+
"testing"
19+
1820
"github.com/serverlessworkflow/sdk-go/v3/model"
1921
"github.com/stretchr/testify/assert"
20-
"testing"
2122
)
2223

2324
// TestGenerateJSONPointer_SimpleTask tests a simple workflow task.
@@ -60,8 +61,8 @@ func TestGenerateJSONPointer_ForkTask(t *testing.T) {
6061
Fork: model.ForkTaskConfiguration{
6162
Compete: true,
6263
Branches: &model.TaskList{
63-
{Key: "callNurse", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/nurses")}}},
64-
{Key: "callDoctor", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/doctor")}}},
64+
&model.TaskItem{Key: "callNurse", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/nurses")}}},
65+
&model.TaskItem{Key: "callDoctor", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/doctor")}}},
6566
},
6667
},
6768
},
@@ -85,12 +86,12 @@ func TestGenerateJSONPointer_DeepNestedTask(t *testing.T) {
8586
Fork: model.ForkTaskConfiguration{
8687
Compete: false,
8788
Branches: &model.TaskList{
88-
{
89+
&model.TaskItem{
8990
Key: "branchA",
9091
Task: &model.ForkTask{
9192
Fork: model.ForkTaskConfiguration{
9293
Branches: &model.TaskList{
93-
{
94+
&model.TaskItem{
9495
Key: "deepTask",
9596
Task: &model.SetTask{Set: map[string]interface{}{"result": "done"}},
9697
},

impl/runner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ package impl
1717
import (
1818
"context"
1919
"fmt"
20+
"time"
21+
2022
"github.com/serverlessworkflow/sdk-go/v3/impl/ctx"
2123
"github.com/serverlessworkflow/sdk-go/v3/model"
22-
"time"
2324
)
2425

2526
var _ WorkflowRunner = &workflowRunnerImpl{}

impl/runner_test.go

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ package impl
1717
import (
1818
"context"
1919
"fmt"
20+
"os"
21+
"path/filepath"
22+
"testing"
23+
2024
"github.com/serverlessworkflow/sdk-go/v3/impl/ctx"
2125
"github.com/serverlessworkflow/sdk-go/v3/model"
2226
"github.com/serverlessworkflow/sdk-go/v3/parser"
2327
"github.com/stretchr/testify/assert"
24-
"os"
25-
"path/filepath"
26-
"testing"
2728
)
2829

2930
type taskSupportOpts func(*workflowRunnerImpl)
@@ -407,3 +408,51 @@ func TestForTaskRunner_Run(t *testing.T) {
407408
})
408409

409410
}
411+
412+
func TestSwitchTaskRunner_Run(t *testing.T) {
413+
t.Run("Color is red", func(t *testing.T) {
414+
workflowPath := "./testdata/switch_match.yaml"
415+
input := map[string]interface{}{
416+
"color": "red",
417+
}
418+
expectedOutput := map[string]interface{}{
419+
"colors": []interface{}{"red"},
420+
}
421+
runWorkflowTest(t, workflowPath, input, expectedOutput)
422+
})
423+
424+
t.Run("Color is green", func(t *testing.T) {
425+
workflowPath := "./testdata/switch_match.yaml"
426+
input := map[string]interface{}{
427+
"color": "green",
428+
}
429+
expectedOutput := map[string]interface{}{
430+
"colors": []interface{}{"green"},
431+
}
432+
runWorkflowTest(t, workflowPath, input, expectedOutput)
433+
})
434+
435+
t.Run("Color is blue", func(t *testing.T) {
436+
workflowPath := "./testdata/switch_match.yaml"
437+
input := map[string]interface{}{
438+
"color": "blue",
439+
}
440+
expectedOutput := map[string]interface{}{
441+
"colors": []interface{}{"blue"},
442+
}
443+
runWorkflowTest(t, workflowPath, input, expectedOutput)
444+
})
445+
}
446+
447+
func TestSwitchTaskRunner_DefaultCase(t *testing.T) {
448+
t.Run("Color is unknown, should match default", func(t *testing.T) {
449+
workflowPath := "./testdata/switch_with_default.yaml"
450+
input := map[string]interface{}{
451+
"color": "yellow",
452+
}
453+
expectedOutput := map[string]interface{}{
454+
"colors": []interface{}{"default"},
455+
}
456+
runWorkflowTest(t, workflowPath, input, expectedOutput)
457+
})
458+
}

impl/task_runner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ package impl
1616

1717
import (
1818
"context"
19+
"time"
20+
1921
"github.com/serverlessworkflow/sdk-go/v3/impl/ctx"
2022
"github.com/serverlessworkflow/sdk-go/v3/model"
21-
"time"
2223
)
2324

2425
var _ TaskRunner = &SetTaskRunner{}

impl/task_runner_do.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ package impl
1616

1717
import (
1818
"fmt"
19+
"time"
20+
1921
"github.com/serverlessworkflow/sdk-go/v3/impl/ctx"
2022
"github.com/serverlessworkflow/sdk-go/v3/model"
21-
"time"
2223
)
2324

2425
// NewTaskRunner creates a TaskRunner instance based on the task type.
@@ -86,6 +87,24 @@ func (d *DoTaskRunner) runTasks(input interface{}, tasks *model.TaskList) (outpu
8687
}
8788

8889
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.PendingStatus)
90+
91+
// Check if this task is a SwitchTask and handle it
92+
if switchTask, ok := currentTask.Task.(*model.SwitchTask); ok {
93+
flowDirective, err := d.evaluateSwitchTask(input, currentTask.Key, switchTask)
94+
if err != nil {
95+
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus)
96+
return output, err
97+
}
98+
d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
99+
100+
// Process FlowDirective: update idx/currentTask accordingly
101+
idx, currentTask = tasks.KeyAndIndex(flowDirective.Value)
102+
if currentTask == nil {
103+
return nil, fmt.Errorf("flow directive target '%s' not found", flowDirective.Value)
104+
}
105+
continue
106+
}
107+
89108
runner, err := NewTaskRunner(currentTask.Key, currentTask.Task, d.TaskSupport)
90109
if err != nil {
91110
return output, err
@@ -116,6 +135,32 @@ func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (b
116135
return true, nil
117136
}
118137

138+
func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskKey string, switchTask *model.SwitchTask) (*model.FlowDirective, error) {
139+
var defaultThen *model.FlowDirective
140+
for _, switchItem := range switchTask.Switch {
141+
for _, switchCase := range switchItem {
142+
if switchCase.When == nil {
143+
defaultThen = switchCase.Then
144+
continue
145+
}
146+
result, err := traverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, d.TaskSupport.GetContext())
147+
if err != nil {
148+
return nil, model.NewErrExpression(err, taskKey)
149+
}
150+
if result {
151+
if switchCase.Then == nil {
152+
return nil, model.NewErrExpression(fmt.Errorf("missing 'then' directive in matched switch case"), taskKey)
153+
}
154+
return switchCase.Then, nil
155+
}
156+
}
157+
}
158+
if defaultThen != nil {
159+
return defaultThen, nil
160+
}
161+
return nil, model.NewErrExpression(fmt.Errorf("no matching switch case"), taskKey)
162+
}
163+
119164
// runTask executes an individual task.
120165
func (d *DoTaskRunner) runTask(input interface{}, runner TaskRunner, task *model.TaskBase) (output interface{}, err error) {
121166
taskName := runner.GetTaskName()

impl/task_runner_for.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ package impl
1616

1717
import (
1818
"fmt"
19-
"github.com/serverlessworkflow/sdk-go/v3/impl/expr"
20-
"github.com/serverlessworkflow/sdk-go/v3/model"
2119
"reflect"
2220
"strings"
21+
22+
"github.com/serverlessworkflow/sdk-go/v3/impl/expr"
23+
"github.com/serverlessworkflow/sdk-go/v3/model"
2324
)
2425

2526
const (

0 commit comments

Comments
 (0)