Skip to content

Commit fdbf4cf

Browse files
committed
[Fix #933] Adding timeout support
Signed-off-by: fjtirado <[email protected]>
1 parent 239d4ba commit fdbf4cf

File tree

4 files changed

+98
-4
lines changed

4 files changed

+98
-4
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ public static Builder runtime(TaskContext context, Exception ex) {
5656
return runtime(Errors.RUNTIME.status(), context, ex);
5757
}
5858

59+
public static Builder timeout() {
60+
return error(Errors.TIMEOUT.toString(), Errors.TIMEOUT.status());
61+
}
62+
5963
public static class Builder {
6064

6165
private final String type;

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,39 @@
2424
import io.serverlessworkflow.api.types.Input;
2525
import io.serverlessworkflow.api.types.Output;
2626
import io.serverlessworkflow.api.types.TaskBase;
27+
import io.serverlessworkflow.api.types.TaskTimeout;
28+
import io.serverlessworkflow.api.types.Timeout;
2729
import io.serverlessworkflow.api.types.Workflow;
2830
import io.serverlessworkflow.impl.TaskContext;
2931
import io.serverlessworkflow.impl.WorkflowApplication;
3032
import io.serverlessworkflow.impl.WorkflowContext;
3133
import io.serverlessworkflow.impl.WorkflowDefinition;
34+
import io.serverlessworkflow.impl.WorkflowError;
35+
import io.serverlessworkflow.impl.WorkflowException;
3236
import io.serverlessworkflow.impl.WorkflowFilter;
3337
import io.serverlessworkflow.impl.WorkflowModel;
3438
import io.serverlessworkflow.impl.WorkflowMutablePosition;
3539
import io.serverlessworkflow.impl.WorkflowPosition;
3640
import io.serverlessworkflow.impl.WorkflowPredicate;
3741
import io.serverlessworkflow.impl.WorkflowStatus;
42+
import io.serverlessworkflow.impl.WorkflowUtils;
43+
import io.serverlessworkflow.impl.WorkflowValueResolver;
3844
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
3945
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
4046
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
4147
import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent;
4248
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
4349
import io.serverlessworkflow.impl.resources.ResourceLoader;
4450
import io.serverlessworkflow.impl.schema.SchemaValidator;
51+
import java.time.Duration;
4552
import java.time.Instant;
4653
import java.util.Iterator;
4754
import java.util.Map;
4855
import java.util.Optional;
4956
import java.util.concurrent.CancellationException;
5057
import java.util.concurrent.CompletableFuture;
5158
import java.util.concurrent.CompletionException;
59+
import java.util.concurrent.TimeUnit;
5260

5361
public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskExecutor<T> {
5462

@@ -62,6 +70,7 @@ public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskEx
6270
private final Optional<SchemaValidator> outputSchemaValidator;
6371
private final Optional<SchemaValidator> contextSchemaValidator;
6472
private final Optional<WorkflowPredicate> ifFilter;
73+
private final Optional<WorkflowValueResolver<Duration>> timeout;
6574

6675
public abstract static class AbstractTaskExecutorBuilder<
6776
T extends TaskBase, V extends AbstractTaskExecutor<T>>
@@ -80,6 +89,7 @@ public abstract static class AbstractTaskExecutorBuilder<
8089
protected final Workflow workflow;
8190
protected final ResourceLoader resourceLoader;
8291
private final WorkflowDefinition definition;
92+
private Optional<WorkflowValueResolver<Duration>> timeout;
8393

8494
private V instance;
8595

@@ -113,6 +123,30 @@ protected AbstractTaskExecutorBuilder(
113123
getSchemaValidator(application.validatorFactory(), resourceLoader, export.getSchema());
114124
}
115125
this.ifFilter = application.expressionFactory().buildIfFilter(task);
126+
this.timeout = getTaskTimeout();
127+
}
128+
129+
private Optional<WorkflowValueResolver<Duration>> getTaskTimeout() {
130+
TaskTimeout timeout = task.getTimeout();
131+
132+
if (timeout != null) {
133+
Timeout timeoutDef = null;
134+
if (timeout.getTaskTimeoutDefinition() != null) {
135+
timeoutDef = timeout.getTaskTimeoutDefinition();
136+
} else if (timeout.getTaskTimeoutReference() != null) {
137+
timeoutDef =
138+
workflow
139+
.getUse()
140+
.getTimeouts()
141+
.getAdditionalProperties()
142+
.get(timeout.getTaskTimeoutReference());
143+
}
144+
if (timeoutDef == null) {
145+
throw new IllegalStateException("Attemp to define a not existing timeout");
146+
}
147+
return Optional.of(WorkflowUtils.fromTimeoutAfter(application, timeoutDef.getAfter()));
148+
}
149+
return Optional.empty();
116150
}
117151

118152
protected final TransitionInfoBuilder next(
@@ -171,6 +205,7 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {
171205
this.outputSchemaValidator = builder.outputSchemaValidator;
172206
this.contextSchemaValidator = builder.contextSchemaValidator;
173207
this.ifFilter = builder.ifFilter;
208+
this.timeout = builder.timeout;
174209
}
175210

176211
protected final CompletableFuture<TaskContext> executeNext(
@@ -200,7 +235,7 @@ public CompletableFuture<TaskContext> apply(
200235
} else if (taskContext.isCompleted()) {
201236
return executeNext(completable, workflowContext);
202237
} else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
203-
return executeNext(
238+
completable =
204239
completable
205240
.thenCompose(workflowContext.instance()::suspendedCheck)
206241
.thenApply(
@@ -247,8 +282,22 @@ public CompletableFuture<TaskContext> apply(
247282
l.onTaskCompleted(
248283
new TaskCompletedEvent(workflowContext, taskContext)));
249284
return t;
250-
}),
251-
workflowContext);
285+
});
286+
if (timeout.isPresent()) {
287+
completable =
288+
completable
289+
.orTimeout(
290+
timeout
291+
.map(t -> t.apply(workflowContext, taskContext, input))
292+
.orElseThrow()
293+
.toMillis(),
294+
TimeUnit.MILLISECONDS)
295+
.exceptionallyCompose(
296+
e ->
297+
CompletableFuture.failedFuture(
298+
new WorkflowException(WorkflowError.timeout().build(), e)));
299+
}
300+
return executeNext(completable, workflowContext);
252301
} else {
253302
taskContext.transition(getSkipTransition());
254303
return executeNext(completable, workflowContext);

impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTest.java renamed to impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package io.serverlessworkflow.impl.test;
1717

1818
import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
19+
import static org.assertj.core.api.Assertions.assertThat;
1920
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
import static org.junit.Assert.assertThat;
2022

2123
import com.fasterxml.jackson.databind.JsonNode;
2224
import io.serverlessworkflow.impl.WorkflowApplication;
@@ -38,7 +40,7 @@
3840
import org.junit.jupiter.params.ParameterizedTest;
3941
import org.junit.jupiter.params.provider.ValueSource;
4042

41-
public class RetryTest {
43+
public class RetryTimeoutTest {
4244

4345
private static WorkflowApplication app;
4446
private MockWebServer apiServer;
@@ -106,4 +108,17 @@ void testRetryEnd() throws IOException {
106108
.join())
107109
.hasCauseInstanceOf(WorkflowException.class);
108110
}
111+
112+
@Test
113+
void testTimeout() throws IOException {
114+
Map<String, Object> result =
115+
app.workflowDefinition(
116+
readWorkflowFromClasspath("workflows-samples/listen-to-one-timeout.yaml"))
117+
.instance(Map.of("delay", 0.01f))
118+
.start()
119+
.join()
120+
.asMap()
121+
.orElseThrow();
122+
assertThat(result.get("message")).isEqualTo("Viva er Beti Balompie");
123+
}
109124
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: listen-to-one-timeout
5+
version: '0.1.0'
6+
do:
7+
- tryListen:
8+
try:
9+
- waitingNotForever:
10+
listen:
11+
to:
12+
one:
13+
with:
14+
type: neven-happening-event
15+
timeout:
16+
after: ${"PT\(.delay)S"}
17+
catch:
18+
errors:
19+
with:
20+
type: https://serverlessworkflow.io/spec/1.0.0/errors/timeout
21+
status: 408
22+
do:
23+
- setMessage:
24+
set:
25+
message: Viva er Beti Balompie
26+

0 commit comments

Comments
 (0)