2424import io .serverlessworkflow .api .types .Input ;
2525import io .serverlessworkflow .api .types .Output ;
2626import io .serverlessworkflow .api .types .TaskBase ;
27+ import io .serverlessworkflow .api .types .TaskTimeout ;
28+ import io .serverlessworkflow .api .types .Timeout ;
2729import io .serverlessworkflow .api .types .Workflow ;
2830import io .serverlessworkflow .impl .TaskContext ;
2931import io .serverlessworkflow .impl .WorkflowApplication ;
3032import io .serverlessworkflow .impl .WorkflowContext ;
3133import io .serverlessworkflow .impl .WorkflowDefinition ;
34+ import io .serverlessworkflow .impl .WorkflowError ;
35+ import io .serverlessworkflow .impl .WorkflowException ;
3236import io .serverlessworkflow .impl .WorkflowFilter ;
3337import io .serverlessworkflow .impl .WorkflowModel ;
3438import io .serverlessworkflow .impl .WorkflowMutablePosition ;
3539import io .serverlessworkflow .impl .WorkflowPosition ;
3640import io .serverlessworkflow .impl .WorkflowPredicate ;
3741import io .serverlessworkflow .impl .WorkflowStatus ;
42+ import io .serverlessworkflow .impl .WorkflowUtils ;
43+ import io .serverlessworkflow .impl .WorkflowValueResolver ;
3844import io .serverlessworkflow .impl .lifecycle .TaskCancelledEvent ;
3945import io .serverlessworkflow .impl .lifecycle .TaskCompletedEvent ;
4046import io .serverlessworkflow .impl .lifecycle .TaskFailedEvent ;
4147import io .serverlessworkflow .impl .lifecycle .TaskRetriedEvent ;
4248import io .serverlessworkflow .impl .lifecycle .TaskStartedEvent ;
4349import io .serverlessworkflow .impl .resources .ResourceLoader ;
4450import io .serverlessworkflow .impl .schema .SchemaValidator ;
51+ import java .time .Duration ;
4552import java .time .Instant ;
4653import java .util .Iterator ;
4754import java .util .Map ;
55+ import java .util .Objects ;
4856import java .util .Optional ;
4957import java .util .concurrent .CancellationException ;
5058import java .util .concurrent .CompletableFuture ;
5159import java .util .concurrent .CompletionException ;
60+ import java .util .concurrent .TimeUnit ;
5261
5362public abstract class AbstractTaskExecutor <T extends TaskBase > implements TaskExecutor <T > {
5463
@@ -62,6 +71,7 @@ public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskEx
6271 private final Optional <SchemaValidator > outputSchemaValidator ;
6372 private final Optional <SchemaValidator > contextSchemaValidator ;
6473 private final Optional <WorkflowPredicate > ifFilter ;
74+ private final Optional <WorkflowValueResolver <Duration >> timeout ;
6575
6676 public abstract static class AbstractTaskExecutorBuilder <
6777 T extends TaskBase , V extends AbstractTaskExecutor <T >>
@@ -80,6 +90,7 @@ public abstract static class AbstractTaskExecutorBuilder<
8090 protected final Workflow workflow ;
8191 protected final ResourceLoader resourceLoader ;
8292 private final WorkflowDefinition definition ;
93+ private Optional <WorkflowValueResolver <Duration >> timeout ;
8394
8495 private V instance ;
8596
@@ -113,6 +124,31 @@ protected AbstractTaskExecutorBuilder(
113124 getSchemaValidator (application .validatorFactory (), resourceLoader , export .getSchema ());
114125 }
115126 this .ifFilter = application .expressionFactory ().buildIfFilter (task );
127+ this .timeout = getTaskTimeout ();
128+ }
129+
130+ private Optional <WorkflowValueResolver <Duration >> getTaskTimeout () {
131+ TaskTimeout timeout = task .getTimeout ();
132+
133+ if (timeout != null ) {
134+ Timeout timeoutDef = timeout .getTaskTimeoutDefinition ();
135+ if (timeoutDef == null && timeout .getTaskTimeoutReference () != null ) {
136+ timeoutDef =
137+ Objects .requireNonNull (
138+ workflow .getUse ().getTimeouts (),
139+ "Timeout reference specified, but use timeout was not defined" )
140+ .getAdditionalProperties ()
141+ .get (timeout .getTaskTimeoutReference ());
142+ }
143+ return Optional .of (
144+ WorkflowUtils .fromTimeoutAfter (
145+ application ,
146+ Objects .requireNonNull (
147+ timeoutDef ,
148+ "Malformed workflow definition. There is a timeout without the required information" )
149+ .getAfter ()));
150+ }
151+ return Optional .empty ();
116152 }
117153
118154 protected final TransitionInfoBuilder next (
@@ -171,6 +207,7 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {
171207 this .outputSchemaValidator = builder .outputSchemaValidator ;
172208 this .contextSchemaValidator = builder .contextSchemaValidator ;
173209 this .ifFilter = builder .ifFilter ;
210+ this .timeout = builder .timeout ;
174211 }
175212
176213 protected final CompletableFuture <TaskContext > executeNext (
@@ -200,7 +237,7 @@ public CompletableFuture<TaskContext> apply(
200237 } else if (taskContext .isCompleted ()) {
201238 return executeNext (completable , workflowContext );
202239 } else if (ifFilter .map (f -> f .test (workflowContext , taskContext , input )).orElse (true )) {
203- return executeNext (
240+ completable =
204241 completable
205242 .thenCompose (workflowContext .instance ()::suspendedCheck )
206243 .thenApply (
@@ -247,8 +284,22 @@ public CompletableFuture<TaskContext> apply(
247284 l .onTaskCompleted (
248285 new TaskCompletedEvent (workflowContext , taskContext )));
249286 return t ;
250- }),
251- workflowContext );
287+ });
288+ if (timeout .isPresent ()) {
289+ completable =
290+ completable
291+ .orTimeout (
292+ timeout
293+ .map (t -> t .apply (workflowContext , taskContext , input ))
294+ .orElseThrow ()
295+ .toMillis (),
296+ TimeUnit .MILLISECONDS )
297+ .exceptionallyCompose (
298+ e ->
299+ CompletableFuture .failedFuture (
300+ new WorkflowException (WorkflowError .timeout ().build (), e )));
301+ }
302+ return executeNext (completable , workflowContext );
252303 } else {
253304 taskContext .transition (getSkipTransition ());
254305 return executeNext (completable , workflowContext );
0 commit comments