Skip to content

Commit b44435d

Browse files
authored
Add some more fields in activity info (#306)
1 parent 4b1b213 commit b44435d

File tree

7 files changed

+47
-6
lines changed

7 files changed

+47
-6
lines changed

src/main/java/com/uber/cadence/activity/ActivityTask.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.activity;
1919

2020
import com.uber.cadence.WorkflowExecution;
21+
import com.uber.cadence.WorkflowType;
2122
import java.time.Duration;
2223

2324
/**
@@ -58,4 +59,12 @@ public interface ActivityTask {
5859
Duration getHeartbeatTimeout();
5960

6061
byte[] getHeartbeatDetails();
62+
63+
WorkflowType getWorkflowType();
64+
65+
String getWorkflowDomain();
66+
67+
String getTaskList();
68+
69+
int getAttempt();
6170
}

src/main/java/com/uber/cadence/internal/sync/ActivityTaskImpl.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@
1919

2020
import com.uber.cadence.PollForActivityTaskResponse;
2121
import com.uber.cadence.WorkflowExecution;
22+
import com.uber.cadence.WorkflowType;
2223
import com.uber.cadence.activity.ActivityTask;
2324
import java.time.Duration;
2425
import java.util.concurrent.TimeUnit;
2526

2627
final class ActivityTaskImpl implements ActivityTask {
2728
private final PollForActivityTaskResponse response;
29+
private final String taskList;
2830

29-
public ActivityTaskImpl(PollForActivityTaskResponse response) {
31+
ActivityTaskImpl(PollForActivityTaskResponse response, String taskList) {
3032
this.response = response;
33+
this.taskList = taskList;
3134
}
3235

3336
@Override
@@ -76,6 +79,26 @@ public byte[] getHeartbeatDetails() {
7679
return response.getHeartbeatDetails();
7780
}
7881

82+
@Override
83+
public WorkflowType getWorkflowType() {
84+
return response.getWorkflowType();
85+
}
86+
87+
@Override
88+
public String getWorkflowDomain() {
89+
return response.getWorkflowDomain();
90+
}
91+
92+
@Override
93+
public String getTaskList() {
94+
return taskList;
95+
}
96+
97+
@Override
98+
public int getAttempt() {
99+
return response.getAttempt();
100+
}
101+
79102
public byte[] getInput() {
80103
return response.getInput();
81104
}

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public DataConverter getDataConverter() {
5959
return dataConverter;
6060
}
6161

62-
public void addActivityImplementation(Object activity) {
62+
void addActivityImplementation(Object activity) {
6363
if (activity instanceof Class) {
6464
throw new IllegalArgumentException("Activity object instance expected, not the class");
6565
}
@@ -143,7 +143,7 @@ public boolean isAnyTypeSupported() {
143143
return !activities.isEmpty();
144144
}
145145

146-
public void setActivitiesImplementation(Object[] activitiesImplementation) {
146+
void setActivitiesImplementation(Object[] activitiesImplementation) {
147147
activities.clear();
148148
for (Object activity : activitiesImplementation) {
149149
addActivityImplementation(activity);
@@ -154,10 +154,11 @@ public void setActivitiesImplementation(Object[] activitiesImplementation) {
154154
public Result handle(
155155
IWorkflowService service,
156156
String domain,
157+
String taskList,
157158
PollForActivityTaskResponse pollResponse,
158159
Scope metricsScope) {
159160
String activityType = pollResponse.getActivityType().getName();
160-
ActivityTaskImpl activityTask = new ActivityTaskImpl(pollResponse);
161+
ActivityTaskImpl activityTask = new ActivityTaskImpl(pollResponse, taskList);
161162
POJOActivityImplementation activity = activities.get(activityType);
162163
if (activity == null) {
163164
String knownTypes = Joiner.on(", ").join(activities.keySet());

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,11 @@ public <T> Promise<T> executeActivity(
208208
IWorkflowService service = new WorkflowServiceWrapper(workflowService);
209209
Result taskResult =
210210
activityTaskHandler.handle(
211-
service, testEnvironmentOptions.getDomain(), task, NoopScope.getInstance());
211+
service,
212+
testEnvironmentOptions.getDomain(),
213+
options.getTaskList(),
214+
task,
215+
NoopScope.getInstance());
212216
return Workflow.newPromise(getReply(task, taskResult, resultClass, resultType));
213217
}
214218

src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public RetryOptions getRequestRetryOptions() {
8383
Result handle(
8484
IWorkflowService service,
8585
String domain,
86+
String taskList,
8687
PollForActivityTaskResponse activityTask,
8788
Scope metricsScope);
8889

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public void handle(MeasurableActivityTask task) throws Exception {
180180
try {
181181
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
182182
ActivityTaskHandler.Result response =
183-
handler.handle(service, domain, task.task, metricsScope);
183+
handler.handle(service, domain, taskList, task.task, metricsScope);
184184
sw.stop();
185185

186186
sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.uber.cadence.activity.Activity;
3232
import com.uber.cadence.activity.ActivityMethod;
3333
import com.uber.cadence.activity.ActivityOptions;
34+
import com.uber.cadence.activity.ActivityTask;
3435
import com.uber.cadence.client.ActivityCancelledException;
3536
import com.uber.cadence.client.ActivityCompletionClient;
3637
import com.uber.cadence.client.ActivityNotExistsException;
@@ -3306,6 +3307,8 @@ public void proc6(String a1, int a2, int a3, int a4, int a5, int a6) {
33063307

33073308
@Override
33083309
public void heartbeatAndThrowIO() {
3310+
ActivityTask task = Activity.getTask();
3311+
assertEquals(task.getAttempt(), heartbeatCounter.get());
33093312
invocations.add("throwIO");
33103313
Optional<Integer> heartbeatDetails = Activity.getHeartbeatDetails(int.class);
33113314
assertEquals(heartbeatCounter.get(), (int) heartbeatDetails.orElse(0));

0 commit comments

Comments
 (0)