15
15
*/
16
16
package io .serverlessworkflow .impl .executors ;
17
17
18
+ import com .fasterxml .jackson .databind .JsonNode ;
18
19
import io .serverlessworkflow .api .types .FlowDirectiveEnum ;
19
20
import io .serverlessworkflow .api .types .ForkTask ;
20
21
import io .serverlessworkflow .api .types .ForkTaskConfiguration ;
24
25
import io .serverlessworkflow .impl .WorkflowDefinition ;
25
26
import io .serverlessworkflow .impl .WorkflowState ;
26
27
import io .serverlessworkflow .impl .WorkflowUtils ;
27
- import io .serverlessworkflow .impl .generic .SortedArrayList ;
28
28
import io .serverlessworkflow .impl .json .JsonUtils ;
29
29
import java .lang .reflect .UndeclaredThrowableException ;
30
+ import java .util .ArrayList ;
30
31
import java .util .HashMap ;
31
32
import java .util .List ;
32
33
import java .util .Map ;
33
34
import java .util .concurrent .ExecutionException ;
34
35
import java .util .concurrent .ExecutorService ;
35
36
import java .util .concurrent .Future ;
36
- import java .util .stream .Collectors ;
37
+ import java .util .stream .Stream ;
37
38
import org .slf4j .Logger ;
38
39
import org .slf4j .LoggerFactory ;
39
40
@@ -47,8 +48,6 @@ protected ForkExecutor(ForkTask task, WorkflowDefinition definition) {
47
48
service = definition .executorService ();
48
49
}
49
50
50
- private record BranchContext (String taskName , TaskContext <?> taskContext ) {}
51
-
52
51
@ Override
53
52
protected void internalExecute (WorkflowContext workflow , TaskContext <ForkTask > taskContext ) {
54
53
ForkTaskConfiguration forkConfig = task .getFork ();
@@ -62,13 +61,10 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
62
61
item .getName (),
63
62
service .submit (() -> executeBranch (workflow , taskContext .copy (), item , i )));
64
63
}
65
- List <BranchContext > results =
66
- new SortedArrayList <>(
67
- (arg1 , arg2 ) ->
68
- arg1 .taskContext .completedAt ().compareTo (arg2 .taskContext .completedAt ()));
64
+ List <Map .Entry <String , TaskContext <?>>> results = new ArrayList <>();
69
65
for (Map .Entry <String , Future <TaskContext <?>>> entry : futures .entrySet ()) {
70
66
try {
71
- results .add (new BranchContext (entry .getKey (), entry .getValue ().get ()));
67
+ results .add (Map . entry (entry .getKey (), entry .getValue ().get ()));
72
68
} catch (ExecutionException ex ) {
73
69
Throwable cause = ex .getCause ();
74
70
if (cause instanceof RuntimeException ) {
@@ -77,24 +73,25 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
77
73
throw new UndeclaredThrowableException (ex );
78
74
}
79
75
} catch (InterruptedException ex ) {
80
- logger .warn (
81
- "Thred executing branch {} was interrupted, this branch will be ignored" ,
82
- entry .getKey (),
83
- ex );
76
+ logger .warn ("Branch {} was interrupted, no result will be recorded" , entry .getKey (), ex );
84
77
}
85
78
}
86
79
if (!results .isEmpty ()) {
80
+ Stream <Map .Entry <String , TaskContext <?>>> sortedStream =
81
+ results .stream ()
82
+ .sorted (
83
+ (arg1 , arg2 ) ->
84
+ arg1 .getValue ().completedAt ().compareTo (arg2 .getValue ().completedAt ()));
87
85
taskContext .rawOutput (
88
86
forkConfig .isCompete ()
89
- ? results .get (0 ).taskContext ().output ()
90
- : JsonUtils .fromValue (
91
- results .stream ()
92
- .map (
93
- e ->
94
- JsonUtils .mapper ()
95
- .createObjectNode ()
96
- .set (e .taskName (), e .taskContext ().output ()))
97
- .collect (Collectors .toList ())));
87
+ ? sortedStream .map (e -> e .getValue ().output ()).findFirst ().orElseThrow ()
88
+ : sortedStream
89
+ .<JsonNode >map (
90
+ e ->
91
+ JsonUtils .mapper ()
92
+ .createObjectNode ()
93
+ .set (e .getKey (), e .getValue ().output ()))
94
+ .collect (JsonUtils .arrayNodeCollector ()));
98
95
}
99
96
}
100
97
}
0 commit comments