28
28
import com .uber .cadence .internal .worker .WorkflowExecutionException ;
29
29
import com .uber .cadence .testing .SimulatedTimeoutException ;
30
30
import com .uber .cadence .workflow .Functions ;
31
+ import com .uber .cadence .workflow .Functions .Func ;
31
32
import com .uber .cadence .workflow .QueryMethod ;
32
33
import com .uber .cadence .workflow .SignalMethod ;
33
34
import com .uber .cadence .workflow .Workflow ;
@@ -59,9 +60,12 @@ final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
59
60
private final Map <String , Functions .Func <SyncWorkflowDefinition >> workflowDefinitions =
60
61
Collections .synchronizedMap (new HashMap <>());
61
62
63
+ private final Map <Class <?>, Functions .Func <?>> workflowImplementationFactories =
64
+ Collections .synchronizedMap (new HashMap <>());
65
+
62
66
private final ExecutorService threadPool ;
63
67
64
- public POJOWorkflowImplementationFactory (
68
+ POJOWorkflowImplementationFactory (
65
69
DataConverter dataConverter ,
66
70
ExecutorService threadPool ,
67
71
Function <WorkflowInterceptor , WorkflowInterceptor > interceptorFactory ) {
@@ -70,14 +74,19 @@ public POJOWorkflowImplementationFactory(
70
74
this .interceptorFactory = Objects .requireNonNull (interceptorFactory );
71
75
}
72
76
73
- public void setWorkflowImplementationTypes (Class <?>[] workflowImplementationTypes ) {
77
+ void setWorkflowImplementationTypes (Class <?>[] workflowImplementationTypes ) {
74
78
workflowDefinitions .clear ();
75
79
for (Class <?> type : workflowImplementationTypes ) {
76
80
addWorkflowImplementationType (type );
77
81
}
78
82
}
79
83
80
- public void addWorkflowImplementationType (Class <?> workflowImplementationClass ) {
84
+ <R > void addWorkflowImplementationFactory (Class <R > clazz , Functions .Func <R > factory ) {
85
+ workflowImplementationFactories .put (clazz , factory );
86
+ addWorkflowImplementationType (clazz );
87
+ }
88
+
89
+ private void addWorkflowImplementationType (Class <?> workflowImplementationClass ) {
81
90
TypeToken <?>.TypeSet interfaces =
82
91
TypeToken .of (workflowImplementationClass ).getTypes ().interfaces ();
83
92
if (interfaces .isEmpty ()) {
@@ -230,17 +239,22 @@ public byte[] execute(byte[] input) throws CancellationException, WorkflowExecut
230
239
231
240
private void newInstance () {
232
241
if (workflow == null ) {
233
- try {
234
- workflow = workflowImplementationClass .getDeclaredConstructor ().newInstance ();
235
- } catch (NoSuchMethodException
236
- | InstantiationException
237
- | IllegalAccessException
238
- | InvocationTargetException e ) {
239
- // Error to fail decision as this can be fixed by a new deployment.
240
- throw new Error (
241
- "Failure instantiating workflow implementation class "
242
- + workflowImplementationClass .getName (),
243
- e );
242
+ Func <?> factory = workflowImplementationFactories .get (workflowImplementationClass );
243
+ if (factory != null ) {
244
+ workflow = factory .apply ();
245
+ } else {
246
+ try {
247
+ workflow = workflowImplementationClass .getDeclaredConstructor ().newInstance ();
248
+ } catch (NoSuchMethodException
249
+ | InstantiationException
250
+ | IllegalAccessException
251
+ | InvocationTargetException e ) {
252
+ // Error to fail decision as this can be fixed by a new deployment.
253
+ throw new Error (
254
+ "Failure instantiating workflow implementation class "
255
+ + workflowImplementationClass .getName (),
256
+ e );
257
+ }
244
258
}
245
259
WorkflowInternal .registerQuery (workflow );
246
260
}
@@ -289,7 +303,7 @@ public void processSignal(String signalName, byte[] input, long eventId) {
289
303
}
290
304
}
291
305
292
- public static WorkflowExecutionException mapToWorkflowExecutionException (
306
+ static WorkflowExecutionException mapToWorkflowExecutionException (
293
307
Exception failure , DataConverter dataConverter ) {
294
308
failure = CheckedExceptionWrapper .unwrap (failure );
295
309
// Only expected during unit tests.
0 commit comments