@@ -8,84 +8,124 @@ import (
8
8
9
9
type ctxKey string
10
10
11
- const executorCtxKey ctxKey = "executorContext"
12
-
13
- // ExecutorContext to not confound with Workflow Context as "$context" in the specification.
14
- // This holds the necessary data for the workflow execution within the instance.
15
- type ExecutorContext struct {
16
- mu sync.Mutex
17
- Input map [string ]interface {}
18
- Output map [string ]interface {}
19
- // Context or `$context` passed through the task executions see https://github.com/serverlessworkflow/specification/blob/main/dsl.md#data-flow
20
- Context map [string ]interface {}
11
+ const runnerCtxKey ctxKey = "wfRunnerContext"
12
+
13
+ // WorkflowRunnerContext holds the necessary data for the workflow execution within the instance.
14
+ type WorkflowRunnerContext struct {
15
+ mu sync.Mutex
16
+ input interface {} // input can hold any type
17
+ output interface {} // output can hold any type
18
+ context map [string ]interface {}
19
+ StatusPhase []StatusPhaseLog
20
+ TasksStatusPhase map [string ][]StatusPhaseLog // Holds `$context` as the key
21
+ }
22
+
23
+ func (runnerCtx * WorkflowRunnerContext ) SetStatus (status StatusPhase ) {
24
+ runnerCtx .mu .Lock ()
25
+ defer runnerCtx .mu .Unlock ()
26
+ if runnerCtx .StatusPhase == nil {
27
+ runnerCtx .StatusPhase = []StatusPhaseLog {}
28
+ }
29
+ runnerCtx .StatusPhase = append (runnerCtx .StatusPhase , NewStatusPhaseLog (status ))
21
30
}
22
31
23
- // SetWorkflowCtx safely sets the $context
24
- func (execCtx * ExecutorContext ) SetWorkflowCtx (wfCtx map [string ]interface {}) {
25
- execCtx .mu .Lock ()
26
- defer execCtx .mu .Unlock ()
27
- execCtx .Context = wfCtx
32
+ func (runnerCtx * WorkflowRunnerContext ) SetTaskStatus (task string , status StatusPhase ) {
33
+ runnerCtx .mu .Lock ()
34
+ defer runnerCtx .mu .Unlock ()
35
+ if runnerCtx .TasksStatusPhase == nil {
36
+ runnerCtx .TasksStatusPhase = map [string ][]StatusPhaseLog {}
37
+ }
38
+ runnerCtx .TasksStatusPhase [task ] = append (runnerCtx .TasksStatusPhase [task ], NewStatusPhaseLog (status ))
39
+ }
40
+
41
+ // SetWorkflowCtx safely sets the `$context` value
42
+ func (runnerCtx * WorkflowRunnerContext ) SetWorkflowCtx (value interface {}) {
43
+ runnerCtx .mu .Lock ()
44
+ defer runnerCtx .mu .Unlock ()
45
+ if runnerCtx .context == nil {
46
+ runnerCtx .context = make (map [string ]interface {})
47
+ }
48
+ runnerCtx .context ["$context" ] = value
28
49
}
29
50
30
- // GetWorkflowCtx safely retrieves the $context
31
- func (execCtx * ExecutorContext ) GetWorkflowCtx () map [string ]interface {} {
32
- execCtx .mu .Lock ()
33
- defer execCtx .mu .Unlock ()
34
- return execCtx .Context
51
+ // GetWorkflowCtx safely retrieves the `$context` value
52
+ func (runnerCtx * WorkflowRunnerContext ) GetWorkflowCtx () interface {} {
53
+ runnerCtx .mu .Lock ()
54
+ defer runnerCtx .mu .Unlock ()
55
+ if runnerCtx .context == nil {
56
+ return nil
57
+ }
58
+ return runnerCtx .context ["$context" ]
35
59
}
36
60
37
- // SetInput safely sets the input map
38
- func (execCtx * ExecutorContext ) SetInput (input map [ string ] interface {}) {
39
- execCtx .mu .Lock ()
40
- defer execCtx .mu .Unlock ()
41
- execCtx . Input = input
61
+ // SetInput safely sets the input
62
+ func (runnerCtx * WorkflowRunnerContext ) SetInput (input interface {}) {
63
+ runnerCtx .mu .Lock ()
64
+ defer runnerCtx .mu .Unlock ()
65
+ runnerCtx . input = input
42
66
}
43
67
44
- // GetInput safely retrieves the input map
45
- func (execCtx * ExecutorContext ) GetInput () map [ string ] interface {} {
46
- execCtx .mu .Lock ()
47
- defer execCtx .mu .Unlock ()
48
- return execCtx . Input
68
+ // GetInput safely retrieves the input
69
+ func (runnerCtx * WorkflowRunnerContext ) GetInput () interface {} {
70
+ runnerCtx .mu .Lock ()
71
+ defer runnerCtx .mu .Unlock ()
72
+ return runnerCtx . input
49
73
}
50
74
51
- // SetOutput safely sets the output map
52
- func (execCtx * ExecutorContext ) SetOutput (output map [ string ] interface {}) {
53
- execCtx .mu .Lock ()
54
- defer execCtx .mu .Unlock ()
55
- execCtx . Output = output
75
+ // SetOutput safely sets the output
76
+ func (runnerCtx * WorkflowRunnerContext ) SetOutput (output interface {}) {
77
+ runnerCtx .mu .Lock ()
78
+ defer runnerCtx .mu .Unlock ()
79
+ runnerCtx . output = output
56
80
}
57
81
58
- // GetOutput safely retrieves the output map
59
- func (execCtx * ExecutorContext ) GetOutput () map [ string ] interface {} {
60
- execCtx .mu .Lock ()
61
- defer execCtx .mu .Unlock ()
62
- return execCtx . Output
82
+ // GetOutput safely retrieves the output
83
+ func (runnerCtx * WorkflowRunnerContext ) GetOutput () interface {} {
84
+ runnerCtx .mu .Lock ()
85
+ defer runnerCtx .mu .Unlock ()
86
+ return runnerCtx . output
63
87
}
64
88
65
- // UpdateOutput allows adding or updating a single key-value pair in the output map
66
- func (execCtx * ExecutorContext ) UpdateOutput (key string , value interface {}) {
67
- execCtx .mu .Lock ()
68
- defer execCtx .mu .Unlock ()
69
- if execCtx .Output == nil {
70
- execCtx .Output = make (map [string ]interface {})
89
+ // GetInputAsMap safely retrieves the input as a map[string]interface{}.
90
+ // If input is not a map, it creates a map with an empty string key and the input as the value.
91
+ func (runnerCtx * WorkflowRunnerContext ) GetInputAsMap () map [string ]interface {} {
92
+ runnerCtx .mu .Lock ()
93
+ defer runnerCtx .mu .Unlock ()
94
+
95
+ if inputMap , ok := runnerCtx .input .(map [string ]interface {}); ok {
96
+ return inputMap
97
+ }
98
+
99
+ // If input is not a map, create a map with an empty key and set input as the value
100
+ return map [string ]interface {}{
101
+ "" : runnerCtx .input ,
71
102
}
72
- execCtx .Output [key ] = value
73
103
}
74
104
75
- // GetOutputValue safely retrieves a single key from the output map
76
- func (execCtx * ExecutorContext ) GetOutputValue (key string ) (interface {}, bool ) {
77
- execCtx .mu .Lock ()
78
- defer execCtx .mu .Unlock ()
79
- value , exists := execCtx .Output [key ]
80
- return value , exists
105
+ // GetOutputAsMap safely retrieves the output as a map[string]interface{}.
106
+ // If output is not a map, it creates a map with an empty string key and the output as the value.
107
+ func (runnerCtx * WorkflowRunnerContext ) GetOutputAsMap () map [string ]interface {} {
108
+ runnerCtx .mu .Lock ()
109
+ defer runnerCtx .mu .Unlock ()
110
+
111
+ if outputMap , ok := runnerCtx .output .(map [string ]interface {}); ok {
112
+ return outputMap
113
+ }
114
+
115
+ // If output is not a map, create a map with an empty key and set output as the value
116
+ return map [string ]interface {}{
117
+ "" : runnerCtx .output ,
118
+ }
81
119
}
82
120
83
- func WithExecutorContext (parent context.Context , wfCtx * ExecutorContext ) context.Context {
84
- return context .WithValue (parent , executorCtxKey , wfCtx )
121
+ // WithRunnerContext adds the WorkflowRunnerContext to a parent context
122
+ func WithRunnerContext (parent context.Context , wfCtx * WorkflowRunnerContext ) context.Context {
123
+ return context .WithValue (parent , runnerCtxKey , wfCtx )
85
124
}
86
125
87
- func GetExecutorContext (ctx context.Context ) (* ExecutorContext , error ) {
88
- wfCtx , ok := ctx .Value (executorCtxKey ).(* ExecutorContext )
126
+ // GetRunnerContext retrieves the WorkflowRunnerContext from a context
127
+ func GetRunnerContext (ctx context.Context ) (* WorkflowRunnerContext , error ) {
128
+ wfCtx , ok := ctx .Value (runnerCtxKey ).(* WorkflowRunnerContext )
89
129
if ! ok {
90
130
return nil , errors .New ("workflow context not found" )
91
131
}
0 commit comments