@@ -11,6 +11,7 @@ import (
11
11
"github.com/cschleiden/go-workflows/internal/command"
12
12
"github.com/cschleiden/go-workflows/internal/converter"
13
13
"github.com/cschleiden/go-workflows/internal/core"
14
+ "github.com/cschleiden/go-workflows/internal/fn"
14
15
"github.com/cschleiden/go-workflows/internal/history"
15
16
"github.com/cschleiden/go-workflows/internal/logger"
16
17
"github.com/cschleiden/go-workflows/internal/payload"
@@ -29,14 +30,14 @@ func (t *testHistoryProvider) GetWorkflowInstanceHistory(ctx context.Context, in
29
30
return t .history , nil
30
31
}
31
32
32
- func newExecutor (r * Registry , i * core.WorkflowInstance , historyProvider WorkflowHistoryProvider ) * executor {
33
+ func newExecutor (r * Registry , i * core.WorkflowInstance , workflow interface {}, historyProvider WorkflowHistoryProvider ) * executor {
33
34
logger := logger .NewDefaultLogger ()
34
35
s := workflowstate .NewWorkflowState (i , logger , clock .New ())
35
36
wfCtx , cancel := sync .WithCancel (workflowstate .WithWorkflowState (sync .Background (), s ))
36
37
37
38
return & executor {
38
39
registry : r ,
39
- workflow : NewWorkflow (reflect .ValueOf (workflow1 )),
40
+ workflow : NewWorkflow (reflect .ValueOf (workflow )),
40
41
historyProvider : historyProvider ,
41
42
workflowState : s ,
42
43
workflowCtx : wfCtx ,
@@ -52,15 +53,13 @@ func activity1(ctx context.Context, r int) (int, error) {
52
53
return r , nil
53
54
}
54
55
55
- var workflowHits int
56
-
57
- func workflow1 (ctx sync.Context ) error {
58
- workflowHits ++
59
-
60
- return nil
61
- }
62
-
63
56
func Test_ExecuteWorkflow (t * testing.T ) {
57
+ var workflowHits int
58
+ workflow1 := func (ctx sync.Context ) error {
59
+ workflowHits ++
60
+ return nil
61
+ }
62
+
64
63
r := NewRegistry ()
65
64
66
65
r .RegisterWorkflow (workflow1 )
@@ -74,14 +73,14 @@ func Test_ExecuteWorkflow(t *testing.T) {
74
73
time .Now (),
75
74
history .EventType_WorkflowExecutionStarted ,
76
75
& history.ExecutionStartedAttributes {
77
- Name : " workflow1" ,
76
+ Name : fn . Name ( workflow1 ) ,
78
77
Inputs : []payload.Payload {},
79
78
},
80
79
),
81
80
},
82
81
}
83
82
84
- e := newExecutor (r , task .WorkflowInstance , & testHistoryProvider {})
83
+ e := newExecutor (r , task .WorkflowInstance , workflow1 , & testHistoryProvider {})
85
84
86
85
_ , err := e .ExecuteTask (context .Background (), task )
87
86
require .NoError (t , err )
@@ -125,13 +124,13 @@ func Test_ReplayWorkflowWithActivityResult(t *testing.T) {
125
124
LastSequenceID : 3 ,
126
125
}
127
126
128
- e := newExecutor (r , task .WorkflowInstance , & testHistoryProvider {[]history.Event {
127
+ e := newExecutor (r , task .WorkflowInstance , workflowWithActivity , & testHistoryProvider {[]history.Event {
129
128
history .NewHistoryEvent (
130
129
1 ,
131
130
time .Now (),
132
131
history .EventType_WorkflowExecutionStarted ,
133
132
& history.ExecutionStartedAttributes {
134
- Name : " workflowWithActivity" ,
133
+ Name : fn . Name ( workflowWithActivity ) ,
135
134
Inputs : []payload.Payload {inputs },
136
135
},
137
136
),
@@ -181,14 +180,14 @@ func Test_ExecuteWorkflowWithActivityCommand(t *testing.T) {
181
180
time .Now (),
182
181
history .EventType_WorkflowExecutionStarted ,
183
182
& history.ExecutionStartedAttributes {
184
- Name : " workflowWithActivity" ,
183
+ Name : fn . Name ( workflowWithActivity ) ,
185
184
Inputs : []payload.Payload {},
186
185
},
187
186
),
188
187
},
189
188
}
190
189
191
- e := newExecutor (r , task .WorkflowInstance , & testHistoryProvider {})
190
+ e := newExecutor (r , task .WorkflowInstance , workflowWithActivity , & testHistoryProvider {})
192
191
193
192
e .ExecuteTask (context .Background (), task )
194
193
@@ -237,14 +236,14 @@ func Test_ExecuteWorkflowWithTimer(t *testing.T) {
237
236
time .Now (),
238
237
history .EventType_WorkflowExecutionStarted ,
239
238
& history.ExecutionStartedAttributes {
240
- Name : " workflowWithTimer" ,
239
+ Name : fn . Name ( workflowWithTimer ) ,
241
240
Inputs : []payload.Payload {},
242
241
},
243
242
),
244
243
},
245
244
}
246
245
247
- e := newExecutor (r , task .WorkflowInstance , & testHistoryProvider {})
246
+ e := newExecutor (r , task .WorkflowInstance , workflowWithTimer , & testHistoryProvider {})
248
247
249
248
e .ExecuteTask (context .Background (), task )
250
249
@@ -293,14 +292,14 @@ func Test_ExecuteWorkflowWithSelector(t *testing.T) {
293
292
time .Now (),
294
293
history .EventType_WorkflowExecutionStarted ,
295
294
& history.ExecutionStartedAttributes {
296
- Name : " workflowWithSelector" ,
295
+ Name : fn . Name ( workflowWithSelector ) ,
297
296
Inputs : []payload.Payload {},
298
297
},
299
298
),
300
299
},
301
300
}
302
301
303
- e := newExecutor (r , task .WorkflowInstance , & testHistoryProvider {})
302
+ e := newExecutor (r , task .WorkflowInstance , workflowWithSelector , & testHistoryProvider {})
304
303
305
304
e .ExecuteTask (context .Background (), task )
306
305
@@ -330,7 +329,7 @@ func Test_ExecuteNewEvents(t *testing.T) {
330
329
time .Now (),
331
330
history .EventType_WorkflowExecutionStarted ,
332
331
& history.ExecutionStartedAttributes {
333
- Name : " workflowWithActivity" ,
332
+ Name : fn . Name ( workflowWithActivity ) ,
334
333
Inputs : []payload.Payload {inputs },
335
334
},
336
335
),
@@ -346,7 +345,7 @@ func Test_ExecuteNewEvents(t *testing.T) {
346
345
},
347
346
}
348
347
349
- e := newExecutor (r , oldTask .WorkflowInstance , & testHistoryProvider {[]history.Event {}})
348
+ e := newExecutor (r , oldTask .WorkflowInstance , workflowWithActivity , & testHistoryProvider {[]history.Event {}})
350
349
351
350
taskResult , err := e .ExecuteTask (context .Background (), oldTask )
352
351
@@ -385,21 +384,21 @@ func Test_ExecuteNewEvents(t *testing.T) {
385
384
require .Len (t , e .workflowState .Commands (), 1 )
386
385
}
387
386
388
- var workflowSignalHits int
387
+ func Test_ExecuteWorkflowWithSignal (t * testing.T ) {
388
+ r := NewRegistry ()
389
389
390
- func workflowWithSignal1 (ctx sync.Context ) error {
391
- c := wf .NewSignalChannel [string ](ctx , "signal1" )
392
- c .Receive (ctx )
390
+ var workflowSignalHits int
393
391
394
- workflowSignalHits ++
392
+ workflowWithSignal := func (ctx sync.Context ) error {
393
+ c := wf .NewSignalChannel [string ](ctx , "signal1" )
394
+ c .Receive (ctx )
395
395
396
- return nil
397
- }
396
+ workflowSignalHits ++
398
397
399
- func Test_ExecuteWorkflowWithSignal ( t * testing. T ) {
400
- r := NewRegistry ()
398
+ return nil
399
+ }
401
400
402
- r .RegisterWorkflow (workflowWithSignal1 )
401
+ r .RegisterWorkflow (workflowWithSignal )
403
402
404
403
s , err := converter .DefaultConverter .To ("" )
405
404
require .NoError (t , err )
@@ -412,7 +411,7 @@ func Test_ExecuteWorkflowWithSignal(t *testing.T) {
412
411
time .Now (),
413
412
history .EventType_WorkflowExecutionStarted ,
414
413
& history.ExecutionStartedAttributes {
415
- Name : "workflowWithSignal1" ,
414
+ Name : fn . Name ( workflowWithSignal ) ,
416
415
Inputs : []payload.Payload {},
417
416
},
418
417
),
@@ -427,7 +426,7 @@ func Test_ExecuteWorkflowWithSignal(t *testing.T) {
427
426
},
428
427
}
429
428
430
- e := newExecutor (r , task .WorkflowInstance , & testHistoryProvider {})
429
+ e := newExecutor (r , task .WorkflowInstance , workflowWithSignal , & testHistoryProvider {})
431
430
432
431
_ , err = e .ExecuteTask (context .Background (), task )
433
432
require .NoError (t , err )
@@ -437,6 +436,40 @@ func Test_ExecuteWorkflowWithSignal(t *testing.T) {
437
436
require .Len (t , e .workflowState .Commands (), 1 )
438
437
}
439
438
439
+ func Test_CompletesWorkflowOnError (t * testing.T ) {
440
+ r := NewRegistry ()
441
+
442
+ workflowPanic := func (ctx sync.Context ) error {
443
+ panic ("wf error" )
444
+ }
445
+
446
+ r .RegisterWorkflow (workflowPanic )
447
+
448
+ task1 := & task.Workflow {
449
+ ID : "taskid" ,
450
+ WorkflowInstance : core .NewWorkflowInstance ("instanceID" , "executionID" ),
451
+ NewEvents : []history.Event {
452
+ history .NewPendingEvent (
453
+ time .Now (),
454
+ history .EventType_WorkflowExecutionStarted ,
455
+ & history.ExecutionStartedAttributes {
456
+ Name : fn .Name (workflowPanic ),
457
+ Inputs : []payload.Payload {},
458
+ },
459
+ ),
460
+ },
461
+ }
462
+
463
+ historyProvider := & testHistoryProvider {[]history.Event {}}
464
+ e := newExecutor (r , task1 .WorkflowInstance , workflowPanic , historyProvider )
465
+
466
+ r1 , err := e .ExecuteTask (context .Background (), task1 )
467
+ require .NoError (t , err )
468
+ require .True (t , e .workflow .Completed ())
469
+ require .Len (t , e .workflowState .Commands (), 1 )
470
+ require .True (t , r1 .Completed )
471
+ }
472
+
440
473
func Test_ClearCommandsBetweenRuns (t * testing.T ) {
441
474
r := NewRegistry ()
442
475
@@ -461,7 +494,7 @@ func Test_ClearCommandsBetweenRuns(t *testing.T) {
461
494
}
462
495
463
496
historyProvider := & testHistoryProvider {[]history.Event {}}
464
- e := newExecutor (r , task1 .WorkflowInstance , historyProvider )
497
+ e := newExecutor (r , task1 .WorkflowInstance , workflowWithActivity , historyProvider )
465
498
466
499
r1 , err := e .ExecuteTask (context .Background (), task1 )
467
500
require .NoError (t , err )
0 commit comments