24
24
import io .serverlessworkflow .impl .WorkflowDefinition ;
25
25
import io .serverlessworkflow .impl .WorkflowState ;
26
26
import io .serverlessworkflow .impl .WorkflowUtils ;
27
- import io .serverlessworkflow .impl .generic .SortedArrayList ;
28
27
import io .serverlessworkflow .impl .json .JsonUtils ;
29
28
import java .lang .reflect .UndeclaredThrowableException ;
29
+ import java .util .ArrayList ;
30
30
import java .util .HashMap ;
31
31
import java .util .List ;
32
32
import java .util .Map ;
33
33
import java .util .concurrent .ExecutionException ;
34
34
import java .util .concurrent .ExecutorService ;
35
35
import java .util .concurrent .Future ;
36
36
import java .util .stream .Collectors ;
37
+ import java .util .stream .Stream ;
37
38
import org .slf4j .Logger ;
38
39
import org .slf4j .LoggerFactory ;
39
40
@@ -47,8 +48,6 @@ protected ForkExecutor(ForkTask task, WorkflowDefinition definition) {
47
48
service = definition .executorService ();
48
49
}
49
50
50
- private record BranchContext (String taskName , TaskContext <?> taskContext ) {}
51
-
52
51
@ Override
53
52
protected void internalExecute (WorkflowContext workflow , TaskContext <ForkTask > taskContext ) {
54
53
ForkTaskConfiguration forkConfig = task .getFork ();
@@ -62,13 +61,10 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
62
61
item .getName (),
63
62
service .submit (() -> executeBranch (workflow , taskContext .copy (), item , i )));
64
63
}
65
- List <BranchContext > results =
66
- new SortedArrayList <>(
67
- (arg1 , arg2 ) ->
68
- arg1 .taskContext .completedAt ().compareTo (arg2 .taskContext .completedAt ()));
64
+ List <Map .Entry <String , TaskContext <?>>> results = new ArrayList <>();
69
65
for (Map .Entry <String , Future <TaskContext <?>>> entry : futures .entrySet ()) {
70
66
try {
71
- results .add (new BranchContext (entry .getKey (), entry .getValue ().get ()));
67
+ results .add (Map . entry (entry .getKey (), entry .getValue ().get ()));
72
68
} catch (ExecutionException ex ) {
73
69
Throwable cause = ex .getCause ();
74
70
if (cause instanceof RuntimeException ) {
@@ -83,19 +79,22 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
83
79
ex );
84
80
}
85
81
}
86
- if (!results .isEmpty ()) {
87
- taskContext .rawOutput (
88
- forkConfig .isCompete ()
89
- ? results .get (0 ).taskContext ().output ()
90
- : JsonUtils .fromValue (
91
- results .stream ()
92
- .map (
93
- e ->
94
- JsonUtils .mapper ()
95
- .createObjectNode ()
96
- .set (e .taskName (), e .taskContext ().output ()))
97
- .collect (Collectors .toList ())));
98
- }
82
+ Stream <Map .Entry <String , TaskContext <?>>> sortedStream =
83
+ results .stream ()
84
+ .sorted (
85
+ (arg1 , arg2 ) ->
86
+ arg1 .getValue ().completedAt ().compareTo (arg2 .getValue ().completedAt ()));
87
+ taskContext .rawOutput (
88
+ forkConfig .isCompete ()
89
+ ? sortedStream .map (e -> e .getValue ().output ()).findFirst ().orElseThrow ()
90
+ : JsonUtils .fromValue (
91
+ sortedStream
92
+ .map (
93
+ e ->
94
+ JsonUtils .mapper ()
95
+ .createObjectNode ()
96
+ .set (e .getKey (), e .getValue ().output ()))
97
+ .collect (Collectors .toList ())));
99
98
}
100
99
}
101
100
0 commit comments