@@ -15,13 +15,7 @@ import (
15
15
"github.com/cschleiden/go-workflows/log"
16
16
)
17
17
18
- type WorkflowWorker interface {
19
- Start (context.Context ) error
20
-
21
- WaitForCompletion () error
22
- }
23
-
24
- type workflowWorker struct {
18
+ type WorkflowWorker struct {
25
19
backend backend.Backend
26
20
27
21
options * Options
@@ -37,15 +31,15 @@ type workflowWorker struct {
37
31
wg * sync.WaitGroup
38
32
}
39
33
40
- func NewWorkflowWorker (backend backend.Backend , registry * workflow.Registry , options * Options ) WorkflowWorker {
34
+ func NewWorkflowWorker (backend backend.Backend , registry * workflow.Registry , options * Options ) * WorkflowWorker {
41
35
var c workflow.ExecutorCache
42
36
if options .WorkflowExecutorCache != nil {
43
37
c = options .WorkflowExecutorCache
44
38
} else {
45
39
c = cache .NewWorkflowExecutorLRUCache (options .WorkflowExecutorCacheSize , options .WorkflowExecutorCacheTTL )
46
40
}
47
41
48
- return & workflowWorker {
42
+ return & WorkflowWorker {
49
43
backend : backend ,
50
44
51
45
options : options ,
@@ -61,7 +55,7 @@ func NewWorkflowWorker(backend backend.Backend, registry *workflow.Registry, opt
61
55
}
62
56
}
63
57
64
- func (ww * workflowWorker ) Start (ctx context.Context ) error {
58
+ func (ww * WorkflowWorker ) Start (ctx context.Context ) error {
65
59
for i := 0 ; i <= ww .options .WorkflowPollers ; i ++ {
66
60
go ww .runPoll (ctx )
67
61
}
@@ -71,15 +65,15 @@ func (ww *workflowWorker) Start(ctx context.Context) error {
71
65
return nil
72
66
}
73
67
74
- func (ww * workflowWorker ) WaitForCompletion () error {
68
+ func (ww * WorkflowWorker ) WaitForCompletion () error {
75
69
close (ww .workflowTaskQueue )
76
70
77
71
ww .wg .Wait ()
78
72
79
73
return nil
80
74
}
81
75
82
- func (ww * workflowWorker ) runPoll (ctx context.Context ) {
76
+ func (ww * WorkflowWorker ) runPoll (ctx context.Context ) {
83
77
for {
84
78
select {
85
79
case <- ctx .Done ():
@@ -99,7 +93,7 @@ func (ww *workflowWorker) runPoll(ctx context.Context) {
99
93
}
100
94
}
101
95
102
- func (ww * workflowWorker ) runDispatcher () {
96
+ func (ww * WorkflowWorker ) runDispatcher () {
103
97
var sem chan (struct {})
104
98
105
99
if ww .options .MaxParallelWorkflowTasks > 0 {
@@ -128,7 +122,7 @@ func (ww *workflowWorker) runDispatcher() {
128
122
}
129
123
}
130
124
131
- func (ww * workflowWorker ) handle (ctx context.Context , t * task.Workflow ) {
125
+ func (ww * WorkflowWorker ) handle (ctx context.Context , t * task.Workflow ) {
132
126
result , err := ww .handleTask (ctx , t )
133
127
if err != nil {
134
128
ww .logger .Panic ("could not handle workflow task" , "error" , err )
@@ -145,7 +139,7 @@ func (ww *workflowWorker) handle(ctx context.Context, t *task.Workflow) {
145
139
}
146
140
}
147
141
148
- func (ww * workflowWorker ) handleTask (
142
+ func (ww * WorkflowWorker ) handleTask (
149
143
ctx context.Context ,
150
144
t * task.Workflow ,
151
145
) (* workflow.ExecutionResult , error ) {
@@ -169,7 +163,7 @@ func (ww *workflowWorker) handleTask(
169
163
return result , nil
170
164
}
171
165
172
- func (ww * workflowWorker ) getExecutor (ctx context.Context , t * task.Workflow ) (workflow.WorkflowExecutor , error ) {
166
+ func (ww * WorkflowWorker ) getExecutor (ctx context.Context , t * task.Workflow ) (workflow.WorkflowExecutor , error ) {
173
167
// Try to get a cached executor
174
168
executor , ok , err := ww .cache .Get (ctx , t .WorkflowInstance )
175
169
if err != nil {
@@ -192,7 +186,7 @@ func (ww *workflowWorker) getExecutor(ctx context.Context, t *task.Workflow) (wo
192
186
return executor , nil
193
187
}
194
188
195
- func (ww * workflowWorker ) heartbeatTask (ctx context.Context , task * task.Workflow ) {
189
+ func (ww * WorkflowWorker ) heartbeatTask (ctx context.Context , task * task.Workflow ) {
196
190
t := time .NewTicker (ww .options .WorkflowHeartbeatInterval )
197
191
defer t .Stop ()
198
192
@@ -208,7 +202,7 @@ func (ww *workflowWorker) heartbeatTask(ctx context.Context, task *task.Workflow
208
202
}
209
203
}
210
204
211
- func (ww * workflowWorker ) poll (ctx context.Context , timeout time.Duration ) (* task.Workflow , error ) {
205
+ func (ww * WorkflowWorker ) poll (ctx context.Context , timeout time.Duration ) (* task.Workflow , error ) {
212
206
if timeout == 0 {
213
207
timeout = 30 * time .Second
214
208
}
0 commit comments