@@ -49,12 +49,14 @@ public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskEx
4949 private final Optional <SchemaValidator > inputSchemaValidator ;
5050 private final Optional <SchemaValidator > outputSchemaValidator ;
5151 private final Optional <SchemaValidator > contextSchemaValidator ;
52+ private final Optional <WorkflowFilter > ifFilter ;
5253
5354 public abstract static class AbstractTaskExecutorBuilder <T extends TaskBase >
5455 implements TaskExecutorBuilder <T > {
5556 private Optional <WorkflowFilter > inputProcessor = Optional .empty ();
5657 private Optional <WorkflowFilter > outputProcessor = Optional .empty ();
5758 private Optional <WorkflowFilter > contextProcessor = Optional .empty ();
59+ private Optional <WorkflowFilter > ifFilter = Optional .empty ();
5860 private Optional <SchemaValidator > inputSchemaValidator = Optional .empty ();
5961 private Optional <SchemaValidator > outputSchemaValidator = Optional .empty ();
6062 private Optional <SchemaValidator > contextSchemaValidator = Optional .empty ();
@@ -100,6 +102,7 @@ protected AbstractTaskExecutorBuilder(
100102 this .contextSchemaValidator =
101103 getSchemaValidator (application .validatorFactory (), resourceLoader , export .getSchema ());
102104 }
105+ this .ifFilter = optionalFilter (application .expressionFactory (), task .getIf ());
103106 }
104107
105108 protected final TransitionInfoBuilder next (
@@ -153,6 +156,7 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T> builder) {
153156 this .inputSchemaValidator = builder .inputSchemaValidator ;
154157 this .outputSchemaValidator = builder .outputSchemaValidator ;
155158 this .contextSchemaValidator = builder .contextSchemaValidator ;
159+ this .ifFilter = builder .ifFilter ;
156160 }
157161
158162 protected final CompletableFuture <TaskContext > executeNext (
@@ -177,40 +181,49 @@ public CompletableFuture<TaskContext> apply(
177181 if (!TaskExecutorHelper .isActive (workflowContext )) {
178182 return completable ;
179183 }
180- return executeNext (
181- completable
182- .thenApply (
183- t -> {
184- workflowContext
185- .definition ()
186- .listeners ()
187- .forEach (l -> l .onTaskStarted (position , task ));
188- inputSchemaValidator .ifPresent (s -> s .validate (t .rawInput ()));
189- inputProcessor .ifPresent (
190- p -> taskContext .input (p .apply (workflowContext , t , t .rawInput ())));
191- return t ;
192- })
193- .thenCompose (t -> execute (workflowContext , t ))
194- .thenApply (
195- t -> {
196- outputProcessor .ifPresent (
197- p -> t .output (p .apply (workflowContext , t , t .rawOutput ())));
198- outputSchemaValidator .ifPresent (s -> s .validate (t .output ()));
199- contextProcessor .ifPresent (
200- p ->
201- workflowContext .context (
202- p .apply (workflowContext , t , workflowContext .context ())));
203- contextSchemaValidator .ifPresent (s -> s .validate (workflowContext .context ()));
204- t .completedAt (Instant .now ());
205- workflowContext
206- .definition ()
207- .listeners ()
208- .forEach (l -> l .onTaskEnded (position , task ));
209- return t ;
210- }),
211- workflowContext );
184+ if (ifFilter
185+ .map (f -> f .apply (workflowContext , taskContext , input ).asBoolean (true ))
186+ .orElse (true )) {
187+ return executeNext (
188+ completable
189+ .thenApply (
190+ t -> {
191+ workflowContext
192+ .definition ()
193+ .listeners ()
194+ .forEach (l -> l .onTaskStarted (position , task ));
195+ inputSchemaValidator .ifPresent (s -> s .validate (t .rawInput ()));
196+ inputProcessor .ifPresent (
197+ p -> taskContext .input (p .apply (workflowContext , t , t .rawInput ())));
198+ return t ;
199+ })
200+ .thenCompose (t -> execute (workflowContext , t ))
201+ .thenApply (
202+ t -> {
203+ outputProcessor .ifPresent (
204+ p -> t .output (p .apply (workflowContext , t , t .rawOutput ())));
205+ outputSchemaValidator .ifPresent (s -> s .validate (t .output ()));
206+ contextProcessor .ifPresent (
207+ p ->
208+ workflowContext .context (
209+ p .apply (workflowContext , t , workflowContext .context ())));
210+ contextSchemaValidator .ifPresent (s -> s .validate (workflowContext .context ()));
211+ t .completedAt (Instant .now ());
212+ workflowContext
213+ .definition ()
214+ .listeners ()
215+ .forEach (l -> l .onTaskEnded (position , task ));
216+ return t ;
217+ }),
218+ workflowContext );
219+ } else {
220+ taskContext .transition (getSkipTransition ());
221+ return executeNext (completable , workflowContext );
222+ }
212223 }
213224
225+ protected abstract TransitionInfo getSkipTransition ();
226+
214227 protected abstract CompletableFuture <TaskContext > execute (
215228 WorkflowContext workflow , TaskContext taskContext );
216229}
0 commit comments