24
24
import io .serverlessworkflow .impl .WorkflowDefinition ;
25
25
import io .serverlessworkflow .impl .WorkflowState ;
26
26
import io .serverlessworkflow .impl .WorkflowUtils ;
27
- import io .serverlessworkflow .impl .generic .SortedArrayList ;
28
27
import io .serverlessworkflow .impl .json .JsonUtils ;
29
28
import java .lang .reflect .UndeclaredThrowableException ;
29
+ import java .util .ArrayList ;
30
30
import java .util .HashMap ;
31
31
import java .util .List ;
32
32
import java .util .Map ;
33
33
import java .util .concurrent .ExecutionException ;
34
34
import java .util .concurrent .ExecutorService ;
35
35
import java .util .concurrent .Future ;
36
36
import java .util .stream .Collectors ;
37
+ import java .util .stream .Stream ;
37
38
import org .slf4j .Logger ;
38
39
import org .slf4j .LoggerFactory ;
39
40
@@ -62,10 +63,7 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
62
63
item .getName (),
63
64
service .submit (() -> executeBranch (workflow , taskContext .copy (), item , i )));
64
65
}
65
- List <BranchContext > results =
66
- new SortedArrayList <>(
67
- (arg1 , arg2 ) ->
68
- arg1 .taskContext .completedAt ().compareTo (arg2 .taskContext .completedAt ()));
66
+ List <BranchContext > results = new ArrayList <>();
69
67
for (Map .Entry <String , Future <TaskContext <?>>> entry : futures .entrySet ()) {
70
68
try {
71
69
results .add (new BranchContext (entry .getKey (), entry .getValue ().get ()));
@@ -83,19 +81,23 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
83
81
ex );
84
82
}
85
83
}
86
- if (!results .isEmpty ()) {
87
- taskContext .rawOutput (
88
- forkConfig .isCompete ()
89
- ? results .get (0 ).taskContext ().output ()
90
- : JsonUtils .fromValue (
91
- results .stream ()
92
- .map (
93
- e ->
94
- JsonUtils .mapper ()
95
- .createObjectNode ()
96
- .set (e .taskName (), e .taskContext ().output ()))
97
- .collect (Collectors .toList ())));
98
- }
84
+
85
+ Stream <BranchContext > sortedStream =
86
+ results .stream ()
87
+ .sorted (
88
+ (arg1 , arg2 ) ->
89
+ arg1 .taskContext .completedAt ().compareTo (arg2 .taskContext .completedAt ()));
90
+ taskContext .rawOutput (
91
+ forkConfig .isCompete ()
92
+ ? sortedStream .map (e -> e .taskContext ().output ()).findFirst ().orElseThrow ()
93
+ : JsonUtils .fromValue (
94
+ sortedStream
95
+ .map (
96
+ e ->
97
+ JsonUtils .mapper ()
98
+ .createObjectNode ()
99
+ .set (e .taskName (), e .taskContext ().output ()))
100
+ .collect (Collectors .toList ())));
99
101
}
100
102
}
101
103
0 commit comments