Skip to content

Commit b471e13

Browse files
Access to workflow/activity instance from context (#2384)
1 parent 3ad0b0e commit b471e13

17 files changed

+220
-10
lines changed

temporal-sdk/src/main/java/io/temporal/activity/ActivityExecutionContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,7 @@ public interface ActivityExecutionContext {
147147
* an activity.
148148
*/
149149
WorkflowClient getWorkflowClient();
150+
151+
/** Get the currently running activity instance. */
152+
Object getInstance();
150153
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityExecutionContextBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,9 @@ public Scope getMetricsScope() {
9191
public WorkflowClient getWorkflowClient() {
9292
return next.getWorkflowClient();
9393
}
94+
95+
@Override
96+
public Object getInstance() {
97+
return next.getInstance();
98+
}
9499
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@
2323
import com.uber.m3.tally.Scope;
2424

2525
public interface ActivityExecutionContextFactory {
26-
InternalActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope);
26+
InternalActivityExecutionContext createContext(
27+
ActivityInfoInternal info, Object activity, Scope metricsScope);
2728
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ public ActivityExecutionContextFactoryImpl(
6161

6262
@Override
6363
public InternalActivityExecutionContext createContext(
64-
ActivityInfoInternal info, Scope metricsScope) {
64+
ActivityInfoInternal info, Object activity, Scope metricsScope) {
6565
return new ActivityExecutionContextImpl(
6666
client,
6767
namespace,
68+
activity,
6869
info,
6970
dataConverter,
7071
heartbeatExecutor,

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
4949
private final Lock lock = new ReentrantLock();
5050
private final WorkflowClient client;
51+
private final Object activity;
5152
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;
5253
private final Functions.Proc completionHandle;
5354
private final HeartbeatContext heartbeatContext;
@@ -61,6 +62,7 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
6162
ActivityExecutionContextImpl(
6263
WorkflowClient client,
6364
String namespace,
65+
Object activity,
6466
ActivityInfo info,
6567
DataConverter dataConverter,
6668
ScheduledExecutorService heartbeatExecutor,
@@ -71,6 +73,7 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
7173
Duration maxHeartbeatThrottleInterval,
7274
Duration defaultHeartbeatThrottleInterval) {
7375
this.client = client;
76+
this.activity = activity;
7477
this.metricsScope = metricsScope;
7578
this.info = info;
7679
this.completionHandle = completionHandle;
@@ -177,4 +180,9 @@ public Object getLastHeartbeatValue() {
177180
public WorkflowClient getWorkflowClient() {
178181
return client;
179182
}
183+
184+
@Override
185+
public Object getInstance() {
186+
return activity;
187+
}
180188
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public BaseActivityTaskExecutor(
7676
@Override
7777
public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope) {
7878
InternalActivityExecutionContext context =
79-
executionContextFactory.createContext(info, metricsScope);
79+
executionContextFactory.createContext(info, getActivity(), metricsScope);
8080
ActivityInfo activityInfo = context.getInfo();
8181
ActivitySerializationContext serializationContext =
8282
new ActivitySerializationContext(
@@ -144,6 +144,8 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri
144144

145145
abstract ActivityInboundCallsInterceptor createRootInboundInterceptor();
146146

147+
abstract Object getActivity();
148+
147149
abstract Object[] provideArgs(
148150
Optional<Payloads> input, DataConverter dataConverterWithActivityContext);
149151

@@ -203,6 +205,11 @@ ActivityInboundCallsInterceptor createRootInboundInterceptor() {
203205
activity, method);
204206
}
205207

208+
@Override
209+
Object getActivity() {
210+
return activity;
211+
}
212+
206213
@Override
207214
Object[] provideArgs(Optional<Payloads> input, DataConverter dataConverterWithActivityContext) {
208215
return dataConverterWithActivityContext.fromPayloads(
@@ -241,6 +248,11 @@ ActivityInboundCallsInterceptor createRootInboundInterceptor() {
241248
activity);
242249
}
243250

251+
@Override
252+
Object getActivity() {
253+
return activity;
254+
}
255+
244256
@Override
245257
Object[] provideArgs(Optional<Payloads> input, DataConverter dataConverterWithActivityContext) {
246258
EncodedValues encodedValues = new EncodedValues(input, dataConverterWithActivityContext);

temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public LocalActivityExecutionContextFactoryImpl(WorkflowClient client) {
3232

3333
@Override
3434
public InternalActivityExecutionContext createContext(
35-
ActivityInfoInternal info, Scope metricsScope) {
36-
return new LocalActivityExecutionContextImpl(client, info, metricsScope);
35+
ActivityInfoInternal info, Object activity, Scope metricsScope) {
36+
return new LocalActivityExecutionContextImpl(client, activity, info, metricsScope);
3737
}
3838
}

temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030

3131
class LocalActivityExecutionContextImpl implements InternalActivityExecutionContext {
3232
private final WorkflowClient client;
33+
private final Object activity;
3334
private final ActivityInfo info;
3435
private final Scope metricsScope;
3536

36-
LocalActivityExecutionContextImpl(WorkflowClient client, ActivityInfo info, Scope metricsScope) {
37+
LocalActivityExecutionContextImpl(
38+
WorkflowClient client, Object activity, ActivityInfo info, Scope metricsScope) {
3739
this.client = client;
40+
this.activity = activity;
3841
this.info = info;
3942
this.metricsScope = metricsScope;
4043
}
@@ -100,4 +103,9 @@ public Object getLastHeartbeatValue() {
100103
public WorkflowClient getWorkflowClient() {
101104
return client;
102105
}
106+
107+
@Override
108+
public Object getInstance() {
109+
return activity;
110+
}
103111
}

temporal-sdk/src/main/java/io/temporal/internal/sync/DynamicSyncWorkflowDefinition.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
3131
import io.temporal.workflow.DynamicWorkflow;
3232
import io.temporal.workflow.Functions;
33+
import java.util.Objects;
3334
import java.util.Optional;
35+
import javax.annotation.Nullable;
3436

3537
final class DynamicSyncWorkflowDefinition implements SyncWorkflowDefinition {
3638

3739
private final Functions.Func1<EncodedValues, ? extends DynamicWorkflow> factory;
40+
private RootWorkflowInboundCallsInterceptor rootWorkflowInvoker;
3841
private final WorkerInterceptor[] workerInterceptors;
3942
// don't pass it down to other classes, it's a "cached" instance for internal usage only
4043
private final DataConverter dataConverterWithWorkflowContext;
@@ -52,7 +55,9 @@ public DynamicSyncWorkflowDefinition(
5255
@Override
5356
public void initialize(Optional<Payloads> input) {
5457
SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
55-
workflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext, input);
58+
RootWorkflowInboundCallsInterceptor rootWorkflowInvoker =
59+
new RootWorkflowInboundCallsInterceptor(workflowContext, input);
60+
workflowInvoker = rootWorkflowInvoker;
5661
for (WorkerInterceptor workerInterceptor : workerInterceptors) {
5762
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
5863
}
@@ -69,6 +74,13 @@ public Optional<Payloads> execute(Header header, Optional<Payloads> input) {
6974
return dataConverterWithWorkflowContext.toPayloads(result.getResult());
7075
}
7176

77+
@Nullable
78+
@Override
79+
public Object getInstance() {
80+
Objects.requireNonNull(rootWorkflowInvoker, "getInstance called before initialize.");
81+
return rootWorkflowInvoker.getInstance();
82+
}
83+
7284
class RootWorkflowInboundCallsInterceptor extends BaseRootWorkflowInboundCallsInterceptor {
7385
private DynamicWorkflow workflow;
7486
private Optional<Payloads> input;
@@ -79,6 +91,10 @@ public RootWorkflowInboundCallsInterceptor(
7991
this.input = input;
8092
}
8193

94+
public DynamicWorkflow getInstance() {
95+
return workflow;
96+
}
97+
8298
@Override
8399
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
84100
super.init(outboundCalls);

temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.Objects;
6161
import java.util.Optional;
6262
import javax.annotation.Nonnull;
63+
import javax.annotation.Nullable;
6364
import org.slf4j.Logger;
6465
import org.slf4j.LoggerFactory;
6566

@@ -315,6 +316,7 @@ private class POJOWorkflowImplementation implements SyncWorkflowDefinition {
315316
private final Class<?> workflowImplementationClass;
316317
private final Method workflowMethod;
317318
private final Constructor<?> ctor;
319+
private RootWorkflowInboundCallsInterceptor rootWorkflowInvoker;
318320
private WorkflowInboundCallsInterceptor workflowInvoker;
319321
// don't pass it down to other classes, it's a "cached" instance for internal usage only
320322
private final DataConverter dataConverterWithWorkflowContext;
@@ -333,7 +335,8 @@ public POJOWorkflowImplementation(
333335
@Override
334336
public void initialize(Optional<Payloads> input) {
335337
SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
336-
workflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext, input);
338+
rootWorkflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext, input);
339+
workflowInvoker = rootWorkflowInvoker;
337340
for (WorkerInterceptor workerInterceptor : workerInterceptors) {
338341
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
339342
}
@@ -357,6 +360,13 @@ public Optional<Payloads> execute(Header header, Optional<Payloads> input)
357360
return dataConverterWithWorkflowContext.toPayloads(result.getResult());
358361
}
359362

363+
@Nullable
364+
@Override
365+
public Object getInstance() {
366+
Objects.requireNonNull(rootWorkflowInvoker, "getInstance called before initialize.");
367+
return rootWorkflowInvoker.getInstance();
368+
}
369+
360370
private class RootWorkflowInboundCallsInterceptor
361371
extends BaseRootWorkflowInboundCallsInterceptor {
362372
private Object workflow;
@@ -368,6 +378,10 @@ public RootWorkflowInboundCallsInterceptor(
368378
this.input = input;
369379
}
370380

381+
public Object getInstance() {
382+
return workflow;
383+
}
384+
371385
@Override
372386
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
373387
super.init(outboundCalls);

0 commit comments

Comments
 (0)