Skip to content

Commit 22742bb

Browse files
committed
chore: Propagate taskExecutionId from runtime to activityContext
Signed-off-by: Javier Aliaga <[email protected]>
1 parent 642161a commit 22742bb

File tree

6 files changed

+76
-9
lines changed

6 files changed

+76
-9
lines changed

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public void startAndBlock() {
185185
output = taskActivityExecutor.execute(
186186
activityRequest.getName(),
187187
activityRequest.getInput().getValue(),
188-
activityRequest.getTaskId());
188+
activityRequest.getTaskExecutionId());
189189
} catch (Throwable e) {
190190
failureDetails = TaskFailureDetails.newBuilder()
191191
.setErrorType(e.getClass().getName())

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66

77
import java.time.Duration;
88
import java.util.HashMap;
9-
import java.util.concurrent.Executor;
109
import java.util.concurrent.ExecutorService;
11-
import java.util.concurrent.Executors;
10+
1211

1312
/**
1413
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.

client/src/main/java/io/dapr/durabletask/TaskActivityContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,11 @@ public interface TaskActivityContext {
2121
* @return the deserialized activity input value
2222
*/
2323
<T> T getInput(Class<T> targetType);
24+
25+
26+
/**
27+
* Gets the execution id of the current task activity.
28+
* @return the execution id of the current task activity
29+
*/
30+
String getTaskExecutionId();
2431
}

client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public TaskActivityExecutor(
1919
this.logger = logger;
2020
}
2121

22-
public String execute(String taskName, String input, int taskId) throws Throwable {
22+
public String execute(String taskName, String input, String taskExecutionId) throws Throwable {
2323
TaskActivityFactory factory = this.activityFactories.get(taskName);
2424
if (factory == null) {
2525
throw new IllegalStateException(
@@ -32,7 +32,7 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
3232
String.format("The task factory '%s' returned a null TaskActivity object.", taskName));
3333
}
3434

35-
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input);
35+
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId);
3636

3737
// Unhandled exceptions are allowed to escape
3838
Object output = activity.run(context);
@@ -46,12 +46,14 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
4646
private class TaskActivityContextImpl implements TaskActivityContext {
4747
private final String name;
4848
private final String rawInput;
49+
private final String taskExecutionId;
4950

5051
private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter;
5152

52-
public TaskActivityContextImpl(String activityName, String rawInput) {
53+
public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId) {
5354
this.name = activityName;
5455
this.rawInput = rawInput;
56+
this.taskExecutionId = taskExecutionId;
5557
}
5658

5759
@Override
@@ -67,5 +69,10 @@ public <T> T getInput(Class<T> targetType) {
6769

6870
return this.dataConverter.deserialize(this.rawInput, targetType);
6971
}
72+
73+
@Override
74+
public String getTaskExecutionId() {
75+
return this.taskExecutionId;
76+
}
7077
}
7178
}

client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public <V> Task<V> callActivity(
265265
}
266266

267267
String serializedInput = this.dataConverter.serialize(input);
268-
Builder scheduleTaskBuilder = ScheduleTaskAction.newBuilder().setName(name);
268+
Builder scheduleTaskBuilder = ScheduleTaskAction.newBuilder().setName(name).setTaskExecutionId(newUUID().toString());
269269
if (serializedInput != null) {
270270
scheduleTaskBuilder.setInput(StringValue.of(serializedInput));
271271
}

client/src/test/java/io/dapr/durabletask/IntegrationTests.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,7 +1566,10 @@ public void newUUIDTest() {
15661566
if (currentUUID1.equals(currentUUID2)) ctx.complete(false);
15671567
else ctx.complete(true);
15681568
})
1569-
.addActivity(echoActivityName, ctx -> ctx.getInput(UUID.class))
1569+
.addActivity(echoActivityName, ctx -> {
1570+
System.out.println("##### echoActivityName: " + ctx.getInput(UUID.class ));
1571+
return ctx.getInput(UUID.class);
1572+
})
15701573
.buildAndStart();
15711574
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
15721575

@@ -1581,4 +1584,55 @@ public void newUUIDTest() {
15811584
}
15821585

15831586
}
1584-
}
1587+
1588+
1589+
@Test
1590+
public void taskExecutionIdTest() {
1591+
var orchestratorName = "test-task-execution-id";
1592+
var retryActivityName = "RetryN";
1593+
final RetryPolicy retryPolicy = new RetryPolicy(4, Duration.ofSeconds(3));
1594+
final TaskOptions taskOptions = new TaskOptions(retryPolicy);
1595+
1596+
var execMap = new HashMap<String, Integer>();
1597+
1598+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
1599+
.addOrchestrator(orchestratorName, ctx -> {
1600+
ctx.callActivity(retryActivityName,null,taskOptions).await();
1601+
ctx.callActivity(retryActivityName,null,taskOptions).await();
1602+
ctx.complete(true);
1603+
})
1604+
.addActivity(retryActivityName, ctx -> {
1605+
System.out.println("##### RetryN[executionId]: " + ctx.getTaskExecutionId());
1606+
var c = execMap.get(ctx.getTaskExecutionId());
1607+
if (c == null) {
1608+
c = 0;
1609+
} else {
1610+
c++;
1611+
}
1612+
1613+
execMap.put(ctx.getTaskExecutionId(), c);
1614+
if (c < 2) {
1615+
throw new RuntimeException("test retry");
1616+
}
1617+
return null;
1618+
})
1619+
.buildAndStart();
1620+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
1621+
1622+
try(worker; client) {
1623+
client.createTaskHub(true);
1624+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
1625+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
1626+
assertNotNull(instance);
1627+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
1628+
assertEquals(2, execMap.size());
1629+
assertTrue(instance.readOutputAs(boolean.class));
1630+
} catch (TimeoutException e) {
1631+
throw new RuntimeException(e);
1632+
}
1633+
1634+
}
1635+
1636+
}
1637+
1638+

0 commit comments

Comments
 (0)