Skip to content

Commit d09fb23

Browse files
authored
Set activity execution context while running local activities (#319)
1 parent b37c7bb commit d09fb23

File tree

4 files changed

+100
-4
lines changed

4 files changed

+100
-4
lines changed

src/main/java/com/uber/cadence/internal/sync/CurrentActivityExecutionContext.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@
2525
*/
2626
final class CurrentActivityExecutionContext {
2727

28-
private static final ThreadLocal<ActivityExecutionContext> CURRENT =
29-
new ThreadLocal<ActivityExecutionContext>();
28+
private static final ThreadLocal<ActivityExecutionContext> CURRENT = new ThreadLocal<>();
3029

3130
/**
3231
* This is used by activity implementation to get access to the current ActivityExecutionContext
@@ -39,7 +38,7 @@ public static ActivityExecutionContext get() {
3938
+ "implementation methods and in the same thread that invoked an activity.");
4039
}
4140
return result;
42-
};
41+
}
4342

4443
public static boolean isSet() {
4544
return CURRENT.get() != null;
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.sync;
19+
20+
import com.uber.cadence.WorkflowExecution;
21+
import com.uber.cadence.activity.ActivityTask;
22+
import com.uber.cadence.client.ActivityCompletionException;
23+
import com.uber.cadence.serviceclient.IWorkflowService;
24+
import java.lang.reflect.Type;
25+
import java.util.Optional;
26+
27+
class LocalActivityExecutionContextImpl implements ActivityExecutionContext {
28+
private final IWorkflowService service;
29+
private final String domain;
30+
private final ActivityTask task;
31+
32+
LocalActivityExecutionContextImpl(IWorkflowService service, String domain, ActivityTask task) {
33+
this.domain = domain;
34+
this.service = service;
35+
this.task = task;
36+
}
37+
38+
@Override
39+
public byte[] getTaskToken() {
40+
throw new UnsupportedOperationException("getTaskToken is not supported for local activities");
41+
}
42+
43+
@Override
44+
public WorkflowExecution getWorkflowExecution() {
45+
return task.getWorkflowExecution();
46+
}
47+
48+
@Override
49+
public ActivityTask getTask() {
50+
return task;
51+
}
52+
53+
@Override
54+
public <V> void recordActivityHeartbeat(V details) throws ActivityCompletionException {
55+
throw new UnsupportedOperationException(
56+
"recordActivityHeartbeat is not supported for local activities");
57+
}
58+
59+
@Override
60+
public <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsType) {
61+
throw new UnsupportedOperationException(
62+
"getHeartbeatDetails is not supported for local activities");
63+
}
64+
65+
@Override
66+
public void doNotCompleteOnReturn() {
67+
throw new UnsupportedOperationException(
68+
"doNotCompleteOnReturn is not supported for local activities");
69+
}
70+
71+
@Override
72+
public boolean isDoNotCompleteOnReturn() {
73+
throw new UnsupportedOperationException(
74+
"isDoNotCompleteOnReturn is not supported for local activities");
75+
}
76+
77+
@Override
78+
public IWorkflowService getService() {
79+
return service;
80+
}
81+
82+
@Override
83+
public String getDomain() {
84+
return domain;
85+
}
86+
}

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,9 @@ private class POJOLocalActivityImplementation implements ActivityTaskExecutor {
235235

236236
@Override
237237
public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsScope) {
238-
238+
ActivityExecutionContext context =
239+
new LocalActivityExecutionContextImpl(service, domain, task);
240+
CurrentActivityExecutionContext.set(context);
239241
byte[] input = task.getInput();
240242
Object[] args = dataConverter.fromDataArray(input, method.getGenericParameterTypes());
241243
try {
@@ -249,6 +251,8 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc
249251
return mapToActivityFailure(task.getActivityType(), e, metricsScope);
250252
} catch (InvocationTargetException e) {
251253
return mapToActivityFailure(task.getActivityType(), e.getTargetException(), metricsScope);
254+
} finally {
255+
CurrentActivityExecutionContext.unset();
252256
}
253257
}
254258
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,7 @@ public void testLocalActivityRetry() {
542542
assertTrue(e.getCause().getCause() instanceof IOException);
543543
}
544544
assertEquals(activitiesImpl.toString(), 5, activitiesImpl.invocations.size());
545+
assertEquals("last attempt", 5, activitiesImpl.getLastAttempt());
545546
}
546547

547548
public static class TestActivityRetryOnTimeout implements TestWorkflow1 {
@@ -3292,6 +3293,7 @@ private static class TestActivitiesImpl implements TestActivities {
32923293
final AtomicInteger heartbeatCounter = new AtomicInteger();
32933294
private final ThreadPoolExecutor executor =
32943295
new ThreadPoolExecutor(0, 100, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
3296+
int lastAttempt;
32953297

32963298
private TestActivitiesImpl(ActivityCompletionClient completionClient) {
32973299
this.completionClient = completionClient;
@@ -3459,6 +3461,7 @@ public void heartbeatAndThrowIO() {
34593461

34603462
@Override
34613463
public void throwIO() {
3464+
lastAttempt = Activity.getTask().getAttempt();
34623465
invocations.add("throwIO");
34633466
try {
34643467
throw new IOException("simulated IO problem");
@@ -3487,6 +3490,10 @@ public void throwIOAnnotated() {
34873490
public List<UUID> activityUUIDList(List<UUID> arg) {
34883491
return arg;
34893492
}
3493+
3494+
public int getLastAttempt() {
3495+
return lastAttempt;
3496+
}
34903497
}
34913498

34923499
public interface ProcInvocationQueryable {

0 commit comments

Comments
 (0)