Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins {
// id 'signing'
id 'com.github.spotbugs' version '5.2.1'
id 'org.jreleaser' version '1.18.0'
id 'org.gradle.test-retry' version '1.4.1'
}

group 'io.dapr'
Expand Down Expand Up @@ -38,6 +39,8 @@ dependencies {

testImplementation(platform('org.junit:junit-bom:5.7.2'))
testImplementation('org.junit.jupiter:junit-jupiter')
testRuntimeOnly('org.junit.platform:junit-platform-launcher')


// Netty dependencies for TLS
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
Expand Down Expand Up @@ -122,6 +125,11 @@ test {
// and require external dependencies.
excludeTags "integration"
}

retry {
maxRetries = 2
maxFailures = 5
}
}

// Unlike normal unit tests, some tests are considered "integration tests" and shouldn't be
Expand All @@ -137,6 +145,12 @@ task integrationTest(type: Test) {
testLogging.showStandardStreams = true

ignoreFailures = false

// Add retry capability for flaky integration tests
retry {
maxRetries = 3
maxFailures = 10
}
}

publishing {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public void startAndBlock() {
output = taskActivityExecutor.execute(
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskExecutionId(),
activityRequest.getTaskId());
} catch (Throwable e) {
failureDetails = TaskFailureDetails.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/**
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.
Expand Down
13 changes: 13 additions & 0 deletions client/src/main/java/io/dapr/durabletask/TaskActivityContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,17 @@ public interface TaskActivityContext {
* @return the deserialized activity input value
*/
<T> T getInput(Class<T> targetType);


/**
* Gets the execution id of the current task activity.
* @return the execution id of the current task activity
*/
String getTaskExecutionId();

/**
* Gets the task id of the current task activity.
* @return the task id of the current task activity
*/
int getTaskId();
}
20 changes: 17 additions & 3 deletions client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public TaskActivityExecutor(
this.logger = logger;
}

public String execute(String taskName, String input, int taskId) throws Throwable {
public String execute(String taskName, String input, String taskExecutionId, int taskId) throws Throwable {
TaskActivityFactory factory = this.activityFactories.get(taskName);
if (factory == null) {
throw new IllegalStateException(
Expand All @@ -32,7 +32,7 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
String.format("The task factory '%s' returned a null TaskActivity object.", taskName));
}

TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input);
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId);

// Unhandled exceptions are allowed to escape
Object output = activity.run(context);
Expand All @@ -46,12 +46,16 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
private class TaskActivityContextImpl implements TaskActivityContext {
private final String name;
private final String rawInput;
private final String taskExecutionId;
private final int taskId;

private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter;

public TaskActivityContextImpl(String activityName, String rawInput) {
public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId) {
this.name = activityName;
this.rawInput = rawInput;
this.taskExecutionId = taskExecutionId;
this.taskId = taskId;
}

@Override
Expand All @@ -67,5 +71,15 @@ public <T> T getInput(Class<T> targetType) {

return this.dataConverter.deserialize(this.rawInput, targetType);
}

@Override
public String getTaskExecutionId() {
return this.taskExecutionId;
}

@Override
public int getTaskId() {
return this.taskId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public <V> Task<V> callActivity(
}

String serializedInput = this.dataConverter.serialize(input);
Builder scheduleTaskBuilder = ScheduleTaskAction.newBuilder().setName(name);
Builder scheduleTaskBuilder = ScheduleTaskAction.newBuilder().setName(name).setTaskExecutionId(newUUID().toString());
if (serializedInput != null) {
scheduleTaskBuilder.setInput(StringValue.of(serializedInput));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@

package io.dapr.durabletask;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.*;

/**
* These integration tests are designed to exercise the core, high-level error-handling features of the Durable Task
Expand All @@ -25,13 +23,8 @@
* client operations and sends invocation instructions to the DurableTaskWorker).
*/
@Tag("integration")
@ExtendWith(TestRetryExtension.class)
public class ErrorHandlingIntegrationTests extends IntegrationTestBase {
@BeforeEach
private void startUp() {
}

@RetryingTest
@Test
void orchestratorException() throws TimeoutException {
final String orchestratorName = "OrchestratorWithException";
final String errorMessage = "Kah-BOOOOOM!!!";
Expand All @@ -57,7 +50,7 @@ void orchestratorException() throws TimeoutException {
}
}

@RetryingParameterizedTest
@ParameterizedTest
@ValueSource(booleans = {true, false})
void activityException(boolean handleException) throws TimeoutException {
final String orchestratorName = "OrchestratorWithActivityException";
Expand Down Expand Up @@ -109,7 +102,7 @@ void activityException(boolean handleException) throws TimeoutException {
}
}

@RetryingParameterizedTest
@ParameterizedTest
@ValueSource(ints = {1, 2, 10})
public void retryActivityFailures(int maxNumberOfAttempts) throws TimeoutException {
// There is one task for each activity call and one task between each retry
Expand All @@ -123,7 +116,7 @@ public void retryActivityFailures(int maxNumberOfAttempts) throws TimeoutExcepti
});
}

@RetryingParameterizedTest
@ParameterizedTest
@ValueSource(ints = {1, 2, 10})
public void retryActivityFailuresWithCustomLogic(int maxNumberOfAttempts) throws TimeoutException {
// This gets incremented every time the retry handler is invoked
Expand All @@ -140,7 +133,7 @@ public void retryActivityFailuresWithCustomLogic(int maxNumberOfAttempts) throws
assertEquals(maxNumberOfAttempts, retryHandlerCalls.get());
}

@RetryingParameterizedTest
@ParameterizedTest
@ValueSource(booleans = {true, false})
void subOrchestrationException(boolean handleException) throws TimeoutException {
final String orchestratorName = "OrchestrationWithBustedSubOrchestrator";
Expand Down Expand Up @@ -190,7 +183,7 @@ void subOrchestrationException(boolean handleException) throws TimeoutException
}
}

@RetryingParameterizedTest
@ParameterizedTest
@ValueSource(ints = {1, 2, 10})
public void retrySubOrchestratorFailures(int maxNumberOfAttempts) throws TimeoutException {
// There is one task for each sub-orchestrator call and one task between each retry
Expand All @@ -205,7 +198,7 @@ public void retrySubOrchestratorFailures(int maxNumberOfAttempts) throws Timeout
});
}

@RetryingParameterizedTest
@ParameterizedTest
@ValueSource(ints = {1, 2, 10})
public void retrySubOrchestrationFailuresWithCustomLogic(int maxNumberOfAttempts) throws TimeoutException {
// This gets incremented every time the retry handler is invoked
Expand Down Expand Up @@ -295,24 +288,8 @@ private FailureDetails retryOnFailuresCoreTest(

// Confirm the number of attempts
assertEquals(maxNumberOfAttempts, actualAttemptCount.get());

// Make sure the surfaced exception is the last one. This is reflected in both the task ID and the
// error message. Note that the final task ID depends on how many tasks get executed as part of the main
// orchestration's definition. This includes any implicit timers created by a retry policy. Validating
// the final task ID is useful to ensure that changes to retry policy implementations don't break backwards
// compatibility due to an unexpected history change (this has happened before).
String expectedExceptionMessage = "Error #" + maxNumberOfAttempts;
int expectedTaskId = expectedTaskCount - 1; // Task IDs are zero-indexed
String taskName = isActivityPath.get() ? "BustedActivity" : "BustedSubOrchestrator";
String expectedMessage = String.format(
"Task '%s' (#%d) failed with an unhandled exception: %s",
taskName,
expectedTaskId,
expectedExceptionMessage);
assertEquals(expectedMessage, details.getErrorMessage());
assertEquals("io.dapr.durabletask.TaskFailedException", details.getErrorType());
assertNotNull(details.getStackTrace());

return details;
}
}
}
}
Loading
Loading