Skip to content

Commit 598a8e0

Browse files
committed
Fix #229 - Implement For task and refactor jq expr into context
Signed-off-by: Ricardo Zanini <[email protected]>
1 parent cf3064e commit 598a8e0

20 files changed

+903
-423
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ The table below lists the current state of this implementation. This table is a
126126
| Task Call ||
127127
| Task Do ||
128128
| Task Emit ||
129-
| Task For | |
129+
| Task For | |
130130
| Task Fork ||
131131
| Task Listen ||
132132
| Task Raise ||

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ require (
1919
github.com/go-playground/locales v0.14.1 // indirect
2020
github.com/go-playground/universal-translator v0.18.1 // indirect
2121
github.com/google/go-cmp v0.7.0 // indirect
22+
github.com/google/uuid v1.6.0 // indirect
2223
github.com/itchyny/timefmt-go v0.1.6 // indirect
2324
github.com/leodido/go-urn v1.4.0 // indirect
2425
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
26+
github.com/relvacode/iso8601 v1.6.0 // indirect
2527
github.com/tidwall/match v1.1.1 // indirect
2628
github.com/tidwall/pretty v1.2.1 // indirect
2729
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ github.com/go-playground/validator/v10 v10.25.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1
1414
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1515
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
1616
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
17+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
18+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
1719
github.com/itchyny/gojq v0.12.17 h1:8av8eGduDb5+rvEdaOO+zQUjA04MS0m3Ps8HiD+fceg=
1820
github.com/itchyny/gojq v0.12.17/go.mod h1:WBrEMkgAfAGO1LUcGOckBl5O726KPp+OlkKug0I/FEY=
1921
github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q=
@@ -23,6 +25,8 @@ github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjS
2325
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2426
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
2527
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
28+
github.com/relvacode/iso8601 v1.6.0 h1:eFXUhMJN3Gz8Rcq82f9DTMW0svjtAVuIEULglM7QHTU=
29+
github.com/relvacode/iso8601 v1.6.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I=
2630
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
2731
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
2832
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=

impl/ctx/context.go

Lines changed: 247 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@ package ctx
1616

1717
import (
1818
"context"
19+
"encoding/json"
1920
"errors"
21+
"fmt"
22+
"github.com/google/uuid"
2023
"github.com/serverlessworkflow/sdk-go/v3/model"
2124
"sync"
25+
"time"
2226
)
2327

2428
var ErrWorkflowContextNotFound = errors.New("workflow context not found")
@@ -29,34 +33,56 @@ type ctxKey string
2933

3034
const (
3135
runnerCtxKey ctxKey = "wfRunnerContext"
32-
varsContext = "$context"
33-
varsInput = "$input"
34-
varsOutput = "$output"
35-
varsWorkflow = "$workflow"
36+
37+
varsContext = "$context"
38+
varsInput = "$input"
39+
varsOutput = "$output"
40+
varsWorkflow = "$workflow"
41+
varsRuntime = "$runtime"
42+
varsTask = "$task"
43+
44+
// TODO: script during the release to update this value programmatically
45+
runtimeVersion = "v3.1.0"
46+
runtimeName = "CNCF Serverless Workflow Specification Go SDK"
3647
)
3748

3849
type WorkflowContext interface {
50+
SetStartedAt(t time.Time)
3951
SetStatus(status StatusPhase)
40-
SetTaskStatus(task string, status StatusPhase)
52+
SetRawInput(input interface{})
4153
SetInstanceCtx(value interface{})
4254
GetInstanceCtx() interface{}
4355
SetInput(input interface{})
4456
GetInput() interface{}
4557
SetOutput(output interface{})
4658
GetOutput() interface{}
4759
GetOutputAsMap() map[string]interface{}
48-
AsJQVars() map[string]interface{}
60+
GetVars() map[string]interface{}
61+
SetTaskStatus(task string, status StatusPhase)
62+
SetTaskRawInput(input interface{})
63+
SetTaskRawOutput(output interface{})
64+
SetTaskDef(task model.Task) error
65+
SetTaskStartedAt(startedAt time.Time)
66+
SetTaskName(name string)
67+
SetTaskReference(ref string)
68+
GetTaskReference() string
69+
ClearTaskContext()
70+
SetLocalExprVars(vars map[string]interface{})
71+
AddLocalExprVars(vars map[string]interface{})
72+
RemoveLocalExprVars(keys ...string)
4973
}
5074

5175
// workflowContext holds the necessary data for the workflow execution within the instance.
5276
type workflowContext struct {
53-
mu sync.Mutex
54-
input interface{} // $input can hold any type
55-
output interface{} // $output can hold any type
56-
context map[string]interface{} // Holds `$context` as the key
57-
definition map[string]interface{} // $workflow representation in the context
58-
StatusPhase []StatusPhaseLog
59-
TasksStatusPhase map[string][]StatusPhaseLog
77+
mu sync.Mutex
78+
input interface{} // $input can hold any type
79+
output interface{} // $output can hold any type
80+
context map[string]interface{} // Holds `$context` as the key
81+
workflowDescriptor map[string]interface{} // $workflow representation in the context
82+
taskDescriptor map[string]interface{} // $task representation in the context
83+
localExprVars map[string]interface{} // Local expression variables defined in a given task or private context. E.g. a For task $item.
84+
StatusPhase []StatusPhaseLog
85+
TasksStatusPhase map[string][]StatusPhaseLog
6086
}
6187

6288
func NewWorkflowContext(workflow *model.Workflow) (WorkflowContext, error) {
@@ -65,19 +91,110 @@ func NewWorkflowContext(workflow *model.Workflow) (WorkflowContext, error) {
6591
if err != nil {
6692
return nil, err
6793
}
68-
69-
workflowCtx.definition = workflowDef
94+
workflowCtx.taskDescriptor = map[string]interface{}{}
95+
workflowCtx.workflowDescriptor = map[string]interface{}{
96+
varsWorkflow: map[string]interface{}{
97+
"id": uuid.NewString(),
98+
"definition": workflowDef,
99+
},
100+
}
70101
workflowCtx.SetStatus(PendingStatus)
71102

72103
return workflowCtx, nil
73104
}
74105

75-
func (ctx *workflowContext) AsJQVars() map[string]interface{} {
106+
// WithWorkflowContext adds the workflowContext to a parent context
107+
func WithWorkflowContext(parent context.Context, wfCtx WorkflowContext) context.Context {
108+
return context.WithValue(parent, runnerCtxKey, wfCtx)
109+
}
110+
111+
// GetWorkflowContext retrieves the workflowContext from a context
112+
func GetWorkflowContext(ctx context.Context) (WorkflowContext, error) {
113+
wfCtx, ok := ctx.Value(runnerCtxKey).(*workflowContext)
114+
if !ok {
115+
return nil, ErrWorkflowContextNotFound
116+
}
117+
return wfCtx, nil
118+
}
119+
120+
func (ctx *workflowContext) SetStartedAt(t time.Time) {
121+
ctx.mu.Lock()
122+
defer ctx.mu.Unlock()
123+
124+
wf, ok := ctx.workflowDescriptor[varsWorkflow].(map[string]interface{})
125+
if !ok {
126+
wf = make(map[string]interface{})
127+
ctx.workflowDescriptor[varsWorkflow] = wf
128+
}
129+
130+
startedAt, ok := wf["startedAt"].(map[string]interface{})
131+
if !ok {
132+
startedAt = make(map[string]interface{})
133+
wf["startedAt"] = startedAt
134+
}
135+
136+
startedAt["iso8601"] = t.UTC().Format(time.RFC3339)
137+
}
138+
139+
func (ctx *workflowContext) SetRawInput(input interface{}) {
140+
ctx.mu.Lock()
141+
defer ctx.mu.Unlock()
142+
143+
// Ensure the outer "workflow" map
144+
wf, ok := ctx.workflowDescriptor[varsWorkflow].(map[string]interface{})
145+
if !ok {
146+
wf = make(map[string]interface{})
147+
ctx.workflowDescriptor[varsWorkflow] = wf
148+
}
149+
150+
// Store the input
151+
wf["input"] = input
152+
}
153+
154+
func (ctx *workflowContext) AddLocalExprVars(vars map[string]interface{}) {
155+
ctx.mu.Lock()
156+
defer ctx.mu.Unlock()
157+
if ctx.localExprVars == nil {
158+
ctx.localExprVars = map[string]interface{}{}
159+
}
160+
for k, v := range vars {
161+
ctx.localExprVars[k] = v
162+
}
163+
}
164+
165+
func (ctx *workflowContext) RemoveLocalExprVars(keys ...string) {
166+
ctx.mu.Lock()
167+
defer ctx.mu.Unlock()
168+
169+
if ctx.localExprVars == nil {
170+
return
171+
}
172+
173+
for _, k := range keys {
174+
delete(ctx.localExprVars, k)
175+
}
176+
}
177+
178+
func (ctx *workflowContext) SetLocalExprVars(vars map[string]interface{}) {
179+
ctx.mu.Lock()
180+
defer ctx.mu.Unlock()
181+
ctx.localExprVars = vars
182+
}
183+
184+
func (ctx *workflowContext) GetVars() map[string]interface{} {
76185
vars := make(map[string]interface{})
77186
vars[varsInput] = ctx.GetInput()
78187
vars[varsOutput] = ctx.GetOutput()
79188
vars[varsContext] = ctx.GetInstanceCtx()
80-
vars[varsOutput] = ctx.definition
189+
vars[varsTask] = ctx.taskDescriptor[varsTask]
190+
vars[varsWorkflow] = ctx.workflowDescriptor[varsWorkflow]
191+
vars[varsRuntime] = map[string]interface{}{
192+
"name": runtimeName,
193+
"version": runtimeVersion,
194+
}
195+
for varName, varValue := range ctx.localExprVars {
196+
vars[varName] = varValue
197+
}
81198
return vars
82199
}
83200

@@ -90,15 +207,6 @@ func (ctx *workflowContext) SetStatus(status StatusPhase) {
90207
ctx.StatusPhase = append(ctx.StatusPhase, NewStatusPhaseLog(status))
91208
}
92209

93-
func (ctx *workflowContext) SetTaskStatus(task string, status StatusPhase) {
94-
ctx.mu.Lock()
95-
defer ctx.mu.Unlock()
96-
if ctx.TasksStatusPhase == nil {
97-
ctx.TasksStatusPhase = map[string][]StatusPhaseLog{}
98-
}
99-
ctx.TasksStatusPhase[task] = append(ctx.TasksStatusPhase[task], NewStatusPhaseLog(status))
100-
}
101-
102210
// SetInstanceCtx safely sets the `$context` value
103211
func (ctx *workflowContext) SetInstanceCtx(value interface{}) {
104212
ctx.mu.Lock()
@@ -179,16 +287,121 @@ func (ctx *workflowContext) GetOutputAsMap() map[string]interface{} {
179287
}
180288
}
181289

182-
// WithWorkflowContext adds the workflowContext to a parent context
183-
func WithWorkflowContext(parent context.Context, wfCtx WorkflowContext) context.Context {
184-
return context.WithValue(parent, runnerCtxKey, wfCtx)
290+
func (ctx *workflowContext) SetTaskStatus(task string, status StatusPhase) {
291+
ctx.mu.Lock()
292+
defer ctx.mu.Unlock()
293+
if ctx.TasksStatusPhase == nil {
294+
ctx.TasksStatusPhase = map[string][]StatusPhaseLog{}
295+
}
296+
ctx.TasksStatusPhase[task] = append(ctx.TasksStatusPhase[task], NewStatusPhaseLog(status))
185297
}
186298

187-
// GetWorkflowContext retrieves the workflowContext from a context
188-
func GetWorkflowContext(ctx context.Context) (WorkflowContext, error) {
189-
wfCtx, ok := ctx.Value(runnerCtxKey).(*workflowContext)
299+
func (ctx *workflowContext) SetTaskRawInput(input interface{}) {
300+
ctx.mu.Lock()
301+
defer ctx.mu.Unlock()
302+
303+
task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{})
190304
if !ok {
191-
return nil, ErrWorkflowContextNotFound
305+
task = make(map[string]interface{})
306+
ctx.taskDescriptor[varsTask] = task
192307
}
193-
return wfCtx, nil
308+
309+
task["input"] = input
310+
}
311+
312+
func (ctx *workflowContext) SetTaskRawOutput(output interface{}) {
313+
ctx.mu.Lock()
314+
defer ctx.mu.Unlock()
315+
316+
task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{})
317+
if !ok {
318+
task = make(map[string]interface{})
319+
ctx.taskDescriptor[varsTask] = task
320+
}
321+
322+
task["output"] = output
323+
}
324+
325+
func (ctx *workflowContext) SetTaskDef(task model.Task) error {
326+
ctx.mu.Lock()
327+
defer ctx.mu.Unlock()
328+
329+
if task == nil {
330+
return errors.New("SetTaskDef called with nil model.Task")
331+
}
332+
333+
defBytes, err := json.Marshal(task)
334+
if err != nil {
335+
return fmt.Errorf("failed to marshal task: %w", err)
336+
}
337+
338+
var defMap map[string]interface{}
339+
if err := json.Unmarshal(defBytes, &defMap); err != nil {
340+
return fmt.Errorf("failed to unmarshal task into map: %w", err)
341+
}
342+
343+
taskMap, ok := ctx.taskDescriptor[varsTask].(map[string]interface{})
344+
if !ok {
345+
taskMap = make(map[string]interface{})
346+
ctx.taskDescriptor[varsTask] = taskMap
347+
}
348+
349+
taskMap["definition"] = defMap
350+
351+
return nil
352+
}
353+
354+
func (ctx *workflowContext) SetTaskStartedAt(startedAt time.Time) {
355+
ctx.mu.Lock()
356+
defer ctx.mu.Unlock()
357+
358+
task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{})
359+
if !ok {
360+
task = make(map[string]interface{})
361+
ctx.taskDescriptor[varsTask] = task
362+
}
363+
364+
task["startedAt"] = startedAt.UTC().Format(time.RFC3339)
365+
}
366+
367+
func (ctx *workflowContext) SetTaskName(name string) {
368+
ctx.mu.Lock()
369+
defer ctx.mu.Unlock()
370+
371+
task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{})
372+
if !ok {
373+
task = make(map[string]interface{})
374+
ctx.taskDescriptor[varsTask] = task
375+
}
376+
377+
task["name"] = name
378+
}
379+
380+
func (ctx *workflowContext) SetTaskReference(ref string) {
381+
ctx.mu.Lock()
382+
defer ctx.mu.Unlock()
383+
384+
task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{})
385+
if !ok {
386+
task = make(map[string]interface{})
387+
ctx.taskDescriptor[varsTask] = task
388+
}
389+
390+
task["reference"] = ref
391+
}
392+
393+
func (ctx *workflowContext) GetTaskReference() string {
394+
ctx.mu.Lock()
395+
defer ctx.mu.Unlock()
396+
task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{})
397+
if !ok {
398+
return ""
399+
}
400+
return task["reference"].(string)
401+
}
402+
403+
func (ctx *workflowContext) ClearTaskContext() {
404+
ctx.mu.Lock()
405+
defer ctx.mu.Unlock()
406+
ctx.taskDescriptor[varsTask] = make(map[string]interface{})
194407
}

0 commit comments

Comments
 (0)