Skip to content

Commit 3308b2b

Browse files
committed
chore: Propagate taskExecutionId from runtime to activityContext
Signed-off-by: Javier Aliaga <[email protected]>
1 parent 152b89f commit 3308b2b

File tree

6 files changed

+77
-11
lines changed

6 files changed

+77
-11
lines changed

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public void startAndBlock() {
163163
output = taskActivityExecutor.execute(
164164
activityRequest.getName(),
165165
activityRequest.getInput().getValue(),
166-
activityRequest.getTaskId());
166+
activityRequest.getTaskExecutionId());
167167
} catch (Throwable e) {
168168
failureDetails = TaskFailureDetails.newBuilder()
169169
.setErrorType(e.getClass().getName())

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66

77
import java.time.Duration;
88
import java.util.HashMap;
9-
import java.util.concurrent.Executor;
109
import java.util.concurrent.ExecutorService;
11-
import java.util.concurrent.Executors;
10+
1211

1312
/**
1413
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.

client/src/main/java/io/dapr/durabletask/TaskActivityContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,11 @@ public interface TaskActivityContext {
2121
* @return the deserialized activity input value
2222
*/
2323
<T> T getInput(Class<T> targetType);
24+
25+
26+
/**
27+
* Gets the execution id of the current task activity.
28+
* @return the execution id of the current task activity
29+
*/
30+
String getTaskExecutionId();
2431
}

client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public TaskActivityExecutor(
1919
this.logger = logger;
2020
}
2121

22-
public String execute(String taskName, String input, int taskId) throws Throwable {
22+
public String execute(String taskName, String input, String taskExecutionId) throws Throwable {
2323
TaskActivityFactory factory = this.activityFactories.get(taskName);
2424
if (factory == null) {
2525
throw new IllegalStateException(
@@ -32,7 +32,7 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
3232
String.format("The task factory '%s' returned a null TaskActivity object.", taskName));
3333
}
3434

35-
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input);
35+
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId);
3636

3737
// Unhandled exceptions are allowed to escape
3838
Object output = activity.run(context);
@@ -46,12 +46,14 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
4646
private class TaskActivityContextImpl implements TaskActivityContext {
4747
private final String name;
4848
private final String rawInput;
49+
private final String taskExecutionId;
4950

5051
private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter;
5152

52-
public TaskActivityContextImpl(String activityName, String rawInput) {
53+
public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId) {
5354
this.name = activityName;
5455
this.rawInput = rawInput;
56+
this.taskExecutionId = taskExecutionId;
5557
}
5658

5759
@Override
@@ -67,5 +69,10 @@ public <T> T getInput(Class<T> targetType) {
6769

6870
return this.dataConverter.deserialize(this.rawInput, targetType);
6971
}
72+
73+
@Override
74+
public String getTaskExecutionId() {
75+
return this.taskExecutionId;
76+
}
7077
}
7178
}

client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public <V> Task<V> callActivity(
265265
}
266266

267267
String serializedInput = this.dataConverter.serialize(input);
268-
Builder scheduleTaskBuilder = ScheduleTaskAction.newBuilder().setName(name);
268+
Builder scheduleTaskBuilder = ScheduleTaskAction.newBuilder().setName(name).setTaskExecutionId(newUUID().toString());
269269
if (serializedInput != null) {
270270
scheduleTaskBuilder.setInput(StringValue.of(serializedInput));
271271
}

client/src/test/java/io/dapr/durabletask/IntegrationTests.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.junit.jupiter.api.Tag;
2020
import org.junit.jupiter.api.Test;
2121
import org.junit.jupiter.api.extension.ExtendWith;
22-
import org.junit.jupiter.params.ParameterizedTest;
2322
import org.junit.jupiter.params.provider.ValueSource;
2423

2524
import static org.junit.jupiter.api.Assertions.*;
@@ -43,7 +42,7 @@ public class IntegrationTests extends IntegrationTestBase {
4342
@BeforeEach
4443
private void startUp() {
4544
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
46-
client.deleteTaskHub();
45+
client.createTaskHub(true);
4746
}
4847

4948
@AfterEach
@@ -1547,19 +1546,73 @@ public void newUUIDTest() {
15471546
if (currentUUID1.equals(currentUUID2)) ctx.complete(false);
15481547
else ctx.complete(true);
15491548
})
1550-
.addActivity(echoActivityName, ctx -> ctx.getInput(UUID.class))
1549+
.addActivity(echoActivityName, ctx -> {
1550+
System.out.println("##### echoActivityName: " + ctx.getInput(UUID.class ));
1551+
return ctx.getInput(UUID.class);
1552+
})
1553+
.buildAndStart();
1554+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
1555+
1556+
try(worker; client) {
1557+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
1558+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
1559+
assertNotNull(instance);
1560+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
1561+
assertTrue(instance.readOutputAs(boolean.class));
1562+
} catch (TimeoutException e) {
1563+
throw new RuntimeException(e);
1564+
}
1565+
1566+
}
1567+
1568+
1569+
@Test
1570+
public void taskExecutionIdTest() {
1571+
var orchestratorName = "test-task-execution-id";
1572+
var retryActivityName = "RetryN";
1573+
final RetryPolicy retryPolicy = new RetryPolicy(4, Duration.ofSeconds(3));
1574+
final TaskOptions taskOptions = new TaskOptions(retryPolicy);
1575+
1576+
var execMap = new HashMap<String, Integer>();
1577+
1578+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
1579+
.addOrchestrator(orchestratorName, ctx -> {
1580+
ctx.callActivity(retryActivityName,null,taskOptions).await();
1581+
ctx.callActivity(retryActivityName,null,taskOptions).await();
1582+
ctx.complete(true);
1583+
})
1584+
.addActivity(retryActivityName, ctx -> {
1585+
System.out.println("##### RetryN[executionId]: " + ctx.getTaskExecutionId());
1586+
var c = execMap.get(ctx.getTaskExecutionId());
1587+
if (c == null) {
1588+
c = 0;
1589+
} else {
1590+
c++;
1591+
}
1592+
1593+
execMap.put(ctx.getTaskExecutionId(), c);
1594+
if (c < 2) {
1595+
throw new RuntimeException("test retry");
1596+
}
1597+
return null;
1598+
})
15511599
.buildAndStart();
15521600
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
15531601

15541602
try(worker; client) {
1603+
client.createTaskHub(true);
15551604
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
15561605
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
15571606
assertNotNull(instance);
15581607
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
1608+
assertEquals(2, execMap.size());
15591609
assertTrue(instance.readOutputAs(boolean.class));
15601610
} catch (TimeoutException e) {
15611611
throw new RuntimeException(e);
15621612
}
15631613

15641614
}
1565-
}
1615+
1616+
}
1617+
1618+

0 commit comments

Comments
 (0)