Skip to content

Commit a898cd7

Browse files
authored
Merge pull request #31 from javier-aliaga/task-execution-id
Task execution
2 parents 642161a + e103c22 commit a898cd7

13 files changed

+180
-198
lines changed

client/build.gradle

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ plugins {
77
// id 'signing'
88
id 'com.github.spotbugs' version '5.2.1'
99
id 'org.jreleaser' version '1.18.0'
10+
id 'org.gradle.test-retry' version '1.4.1'
1011
}
1112

1213
group 'io.dapr'
@@ -38,6 +39,8 @@ dependencies {
3839

3940
testImplementation(platform('org.junit:junit-bom:5.7.2'))
4041
testImplementation('org.junit.jupiter:junit-jupiter')
42+
testRuntimeOnly('org.junit.platform:junit-platform-launcher')
43+
4144

4245
// Netty dependencies for TLS
4346
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
@@ -122,6 +125,11 @@ test {
122125
// and require external dependencies.
123126
excludeTags "integration"
124127
}
128+
129+
retry {
130+
maxRetries = 2
131+
maxFailures = 5
132+
}
125133
}
126134

127135
// Unlike normal unit tests, some tests are considered "integration tests" and shouldn't be
@@ -137,6 +145,12 @@ task integrationTest(type: Test) {
137145
testLogging.showStandardStreams = true
138146

139147
ignoreFailures = false
148+
149+
// Add retry capability for flaky integration tests
150+
retry {
151+
maxRetries = 3
152+
maxFailures = 10
153+
}
140154
}
141155

142156
publishing {

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public void startAndBlock() {
185185
output = taskActivityExecutor.execute(
186186
activityRequest.getName(),
187187
activityRequest.getInput().getValue(),
188+
activityRequest.getTaskExecutionId(),
188189
activityRequest.getTaskId());
189190
} catch (Throwable e) {
190191
failureDetails = TaskFailureDetails.newBuilder()

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66

77
import java.time.Duration;
88
import java.util.HashMap;
9-
import java.util.concurrent.Executor;
109
import java.util.concurrent.ExecutorService;
11-
import java.util.concurrent.Executors;
10+
1211

1312
/**
1413
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.

client/src/main/java/io/dapr/durabletask/TaskActivityContext.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,17 @@ public interface TaskActivityContext {
2121
* @return the deserialized activity input value
2222
*/
2323
<T> T getInput(Class<T> targetType);
24+
25+
26+
/**
27+
* Gets the execution id of the current task activity.
28+
* @return the execution id of the current task activity
29+
*/
30+
String getTaskExecutionId();
31+
32+
/**
33+
* Gets the task id of the current task activity.
34+
* @return the task id of the current task activity
35+
*/
36+
int getTaskId();
2437
}

client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public TaskActivityExecutor(
1919
this.logger = logger;
2020
}
2121

22-
public String execute(String taskName, String input, int taskId) throws Throwable {
22+
public String execute(String taskName, String input, String taskExecutionId, int taskId) throws Throwable {
2323
TaskActivityFactory factory = this.activityFactories.get(taskName);
2424
if (factory == null) {
2525
throw new IllegalStateException(
@@ -32,7 +32,7 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
3232
String.format("The task factory '%s' returned a null TaskActivity object.", taskName));
3333
}
3434

35-
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input);
35+
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId);
3636

3737
// Unhandled exceptions are allowed to escape
3838
Object output = activity.run(context);
@@ -46,12 +46,16 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
4646
private class TaskActivityContextImpl implements TaskActivityContext {
4747
private final String name;
4848
private final String rawInput;
49+
private final String taskExecutionId;
50+
private final int taskId;
4951

5052
private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter;
5153

52-
public TaskActivityContextImpl(String activityName, String rawInput) {
54+
public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId) {
5355
this.name = activityName;
5456
this.rawInput = rawInput;
57+
this.taskExecutionId = taskExecutionId;
58+
this.taskId = taskId;
5559
}
5660

5761
@Override
@@ -67,5 +71,15 @@ public <T> T getInput(Class<T> targetType) {
6771

6872
return this.dataConverter.deserialize(this.rawInput, targetType);
6973
}
74+
75+
@Override
76+
public String getTaskExecutionId() {
77+
return this.taskExecutionId;
78+
}
79+
80+
@Override
81+
public int getTaskId() {
82+
return this.taskId;
83+
}
7084
}
7185
}

client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public <V> Task<V> callActivity(
265265
}
266266

267267
String serializedInput = this.dataConverter.serialize(input);
268-
Builder scheduleTaskBuilder = ScheduleTaskAction.newBuilder().setName(name);
268+
Builder scheduleTaskBuilder = ScheduleTaskAction.newBuilder().setName(name).setTaskExecutionId(newUUID().toString());
269269
if (serializedInput != null) {
270270
scheduleTaskBuilder.setInput(StringValue.of(serializedInput));
271271
}

client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,17 @@
33

44
package io.dapr.durabletask;
55

6-
import static org.junit.jupiter.api.Assertions.assertEquals;
7-
import static org.junit.jupiter.api.Assertions.assertNotNull;
8-
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import org.junit.jupiter.api.Tag;
7+
import org.junit.jupiter.api.Test;
8+
import org.junit.jupiter.params.ParameterizedTest;
9+
import org.junit.jupiter.params.provider.ValueSource;
910

1011
import java.time.Duration;
1112
import java.util.concurrent.TimeoutException;
1213
import java.util.concurrent.atomic.AtomicBoolean;
1314
import java.util.concurrent.atomic.AtomicInteger;
1415

15-
import org.junit.jupiter.api.BeforeEach;
16-
import org.junit.jupiter.api.Tag;
17-
import org.junit.jupiter.api.extension.ExtendWith;
18-
import org.junit.jupiter.params.provider.ValueSource;
16+
import static org.junit.jupiter.api.Assertions.*;
1917

2018
/**
2119
* These integration tests are designed to exercise the core, high-level error-handling features of the Durable Task
@@ -25,13 +23,8 @@
2523
* client operations and sends invocation instructions to the DurableTaskWorker).
2624
*/
2725
@Tag("integration")
28-
@ExtendWith(TestRetryExtension.class)
2926
public class ErrorHandlingIntegrationTests extends IntegrationTestBase {
30-
@BeforeEach
31-
private void startUp() {
32-
}
33-
34-
@RetryingTest
27+
@Test
3528
void orchestratorException() throws TimeoutException {
3629
final String orchestratorName = "OrchestratorWithException";
3730
final String errorMessage = "Kah-BOOOOOM!!!";
@@ -57,7 +50,7 @@ void orchestratorException() throws TimeoutException {
5750
}
5851
}
5952

60-
@RetryingParameterizedTest
53+
@ParameterizedTest
6154
@ValueSource(booleans = {true, false})
6255
void activityException(boolean handleException) throws TimeoutException {
6356
final String orchestratorName = "OrchestratorWithActivityException";
@@ -109,7 +102,7 @@ void activityException(boolean handleException) throws TimeoutException {
109102
}
110103
}
111104

112-
@RetryingParameterizedTest
105+
@ParameterizedTest
113106
@ValueSource(ints = {1, 2, 10})
114107
public void retryActivityFailures(int maxNumberOfAttempts) throws TimeoutException {
115108
// There is one task for each activity call and one task between each retry
@@ -123,7 +116,7 @@ public void retryActivityFailures(int maxNumberOfAttempts) throws TimeoutExcepti
123116
});
124117
}
125118

126-
@RetryingParameterizedTest
119+
@ParameterizedTest
127120
@ValueSource(ints = {1, 2, 10})
128121
public void retryActivityFailuresWithCustomLogic(int maxNumberOfAttempts) throws TimeoutException {
129122
// This gets incremented every time the retry handler is invoked
@@ -140,7 +133,7 @@ public void retryActivityFailuresWithCustomLogic(int maxNumberOfAttempts) throws
140133
assertEquals(maxNumberOfAttempts, retryHandlerCalls.get());
141134
}
142135

143-
@RetryingParameterizedTest
136+
@ParameterizedTest
144137
@ValueSource(booleans = {true, false})
145138
void subOrchestrationException(boolean handleException) throws TimeoutException {
146139
final String orchestratorName = "OrchestrationWithBustedSubOrchestrator";
@@ -190,7 +183,7 @@ void subOrchestrationException(boolean handleException) throws TimeoutException
190183
}
191184
}
192185

193-
@RetryingParameterizedTest
186+
@ParameterizedTest
194187
@ValueSource(ints = {1, 2, 10})
195188
public void retrySubOrchestratorFailures(int maxNumberOfAttempts) throws TimeoutException {
196189
// There is one task for each sub-orchestrator call and one task between each retry
@@ -205,7 +198,7 @@ public void retrySubOrchestratorFailures(int maxNumberOfAttempts) throws Timeout
205198
});
206199
}
207200

208-
@RetryingParameterizedTest
201+
@ParameterizedTest
209202
@ValueSource(ints = {1, 2, 10})
210203
public void retrySubOrchestrationFailuresWithCustomLogic(int maxNumberOfAttempts) throws TimeoutException {
211204
// This gets incremented every time the retry handler is invoked
@@ -295,24 +288,8 @@ private FailureDetails retryOnFailuresCoreTest(
295288

296289
// Confirm the number of attempts
297290
assertEquals(maxNumberOfAttempts, actualAttemptCount.get());
298-
299-
// Make sure the surfaced exception is the last one. This is reflected in both the task ID and the
300-
// error message. Note that the final task ID depends on how many tasks get executed as part of the main
301-
// orchestration's definition. This includes any implicit timers created by a retry policy. Validating
302-
// the final task ID is useful to ensure that changes to retry policy implementations don't break backwards
303-
// compatibility due to an unexpected history change (this has happened before).
304-
String expectedExceptionMessage = "Error #" + maxNumberOfAttempts;
305-
int expectedTaskId = expectedTaskCount - 1; // Task IDs are zero-indexed
306-
String taskName = isActivityPath.get() ? "BustedActivity" : "BustedSubOrchestrator";
307-
String expectedMessage = String.format(
308-
"Task '%s' (#%d) failed with an unhandled exception: %s",
309-
taskName,
310-
expectedTaskId,
311-
expectedExceptionMessage);
312-
assertEquals(expectedMessage, details.getErrorMessage());
313-
assertEquals("io.dapr.durabletask.TaskFailedException", details.getErrorType());
314-
assertNotNull(details.getStackTrace());
291+
315292
return details;
316293
}
317294
}
318-
}
295+
}

0 commit comments

Comments
 (0)