Skip to content

Commit 4cd383a

Browse files
authored
Added interceptor for WorkflowClient calls. (#129)
* Implemented newWorkflowStub interception
1 parent 2569b4d commit 4cd383a

12 files changed

+213
-37
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.client;
19+
20+
import com.uber.cadence.WorkflowExecution;
21+
import java.util.Optional;
22+
23+
public interface WorkflowClientInterceptor {
24+
25+
UntypedWorkflowStub newUntypedWorkflowStub(
26+
String workflowType, WorkflowOptions options, UntypedWorkflowStub next);
27+
28+
UntypedWorkflowStub newUntypedWorkflowStub(
29+
WorkflowExecution execution, Optional<String> workflowType, UntypedWorkflowStub result);
30+
31+
ActivityCompletionClient newActivityCompletionClient(ActivityCompletionClient next);
32+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.client;
19+
20+
import com.uber.cadence.WorkflowExecution;
21+
import java.util.Optional;
22+
23+
public class WorkflowClientInterceptorBase implements WorkflowClientInterceptor {
24+
25+
@Override
26+
public UntypedWorkflowStub newUntypedWorkflowStub(
27+
String workflowType, WorkflowOptions options, UntypedWorkflowStub next) {
28+
return next;
29+
}
30+
31+
@Override
32+
public UntypedWorkflowStub newUntypedWorkflowStub(
33+
WorkflowExecution execution, Optional<String> workflowType, UntypedWorkflowStub next) {
34+
return next;
35+
}
36+
37+
@Override
38+
public ActivityCompletionClient newActivityCompletionClient(ActivityCompletionClient next) {
39+
return next;
40+
}
41+
}

src/main/java/com/uber/cadence/client/WorkflowClientOptions.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.uber.cadence.converter.DataConverter;
2121
import com.uber.cadence.converter.JsonDataConverter;
22+
import java.util.Objects;
2223

2324
/** Options for WorkflowClient configuration. */
2425
public final class WorkflowClientOptions {
@@ -27,31 +28,51 @@ public static final class Builder {
2728

2829
private DataConverter dataConverter = JsonDataConverter.getInstance();
2930

31+
private WorkflowClientInterceptor[] interceptors = EMPTY_INTERCEPTOR_ARRAY;
32+
3033
/**
3134
* Used to override default (JSON) data converter implementation.
3235
*
3336
* @param dataConverter data converter to serialize and deserialize arguments and return values.
37+
* Not null.
3438
*/
3539
public Builder setDataConverter(DataConverter dataConverter) {
36-
if (dataConverter == null) {
37-
throw new IllegalArgumentException("null");
38-
}
39-
this.dataConverter = dataConverter;
40+
this.dataConverter = Objects.requireNonNull(dataConverter);
41+
return this;
42+
}
43+
44+
/**
45+
* Interceptor used to intercept workflow client calls.
46+
*
47+
* @param interceptors not null
48+
*/
49+
public Builder setInterceptors(WorkflowClientInterceptor... interceptors) {
50+
this.interceptors = Objects.requireNonNull(interceptors);
4051
return this;
4152
}
4253

4354
public WorkflowClientOptions build() {
44-
return new WorkflowClientOptions(dataConverter);
55+
return new WorkflowClientOptions(dataConverter, interceptors);
4556
}
4657
}
4758

59+
private static final WorkflowClientInterceptor[] EMPTY_INTERCEPTOR_ARRAY =
60+
new WorkflowClientInterceptor[0];
4861
private final DataConverter dataConverter;
4962

50-
private WorkflowClientOptions(DataConverter dataConverter) {
63+
private final WorkflowClientInterceptor[] interceptors;
64+
65+
private WorkflowClientOptions(
66+
DataConverter dataConverter, WorkflowClientInterceptor[] interceptors) {
5167
this.dataConverter = dataConverter;
68+
this.interceptors = interceptors;
5269
}
5370

5471
public DataConverter getDataConverter() {
5572
return dataConverter;
5673
}
74+
75+
public WorkflowClientInterceptor[] getInterceptors() {
76+
return interceptors;
77+
}
5778
}

src/main/java/com/uber/cadence/client/WorkflowFailureException.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,24 @@ public WorkflowFailureException(
3333
Optional<String> workflowType,
3434
long decisionTaskCompletedEventId,
3535
Throwable failure) {
36-
super(
37-
"WorkflowType=\""
38-
+ workflowType
39-
+ "\", WorkflowID=\""
40-
+ execution.getWorkflowId()
41-
+ "\", RunID=\""
42-
+ execution.getRunId(),
43-
execution,
44-
workflowType,
45-
failure);
36+
super(getMessage(execution, workflowType), execution, workflowType, failure);
4637
this.decisionTaskCompletedEventId = decisionTaskCompletedEventId;
4738
}
4839

40+
private static String getMessage(WorkflowExecution execution, Optional<String> workflowType) {
41+
StringBuilder result = new StringBuilder();
42+
if (workflowType.isPresent()) {
43+
result.append("WorkflowType=\"");
44+
result.append(workflowType.get());
45+
result.append("\", ");
46+
}
47+
result.append("WorkflowID=\"");
48+
result.append(execution.getWorkflowId());
49+
result.append("\", RunID=\"");
50+
result.append(execution.getRunId());
51+
return result.toString();
52+
}
53+
4954
public long getDecisionTaskCompletedEventId() {
5055
return decisionTaskCompletedEventId;
5156
}

src/main/java/com/uber/cadence/client/WorkflowServiceException.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,22 @@
2222

2323
public final class WorkflowServiceException extends WorkflowException {
2424

25-
public WorkflowServiceException(WorkflowExecution execution, String message) {
26-
super(message, execution, Optional.empty(), null);
25+
public WorkflowServiceException(
26+
WorkflowExecution execution, Optional<String> workflowType, Throwable failure) {
27+
super(getMessage(execution, workflowType), execution, workflowType, failure);
28+
}
29+
30+
private static String getMessage(WorkflowExecution execution, Optional<String> workflowType) {
31+
StringBuilder result = new StringBuilder();
32+
if (workflowType.isPresent()) {
33+
result.append("WorkflowType=\"");
34+
result.append(workflowType.get());
35+
result.append("\", ");
36+
}
37+
result.append("WorkflowID=\"");
38+
result.append(execution.getWorkflowId());
39+
result.append("\", RunID=\"");
40+
result.append(execution.getRunId());
41+
return result.toString();
2742
}
2843
}

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ public WorkflowClient newWorkflowClient() {
117117
return WorkflowClientInternal.newInstance(service, testEnvironmentOptions.getDomain(), options);
118118
}
119119

120+
@Override
121+
public WorkflowClient newWorkflowClient(WorkflowClientOptions options) {
122+
return WorkflowClientInternal.newInstance(service, testEnvironmentOptions.getDomain(), options);
123+
}
124+
120125
@Override
121126
public long currentTimeMillis() {
122127
return service.currentTimeMillis();

src/main/java/com/uber/cadence/internal/sync/UntypedWorkflowStubImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ class UntypedWorkflowStubImpl implements UntypedWorkflowStub {
6464
this.genericClient = genericClient;
6565
this.dataConverter = dataConverter;
6666
this.workflowType = workflowType;
67+
if (execution == null
68+
|| execution.getWorkflowId() == null
69+
|| execution.getWorkflowId().isEmpty()) {
70+
throw new IllegalArgumentException("null or empty workflowId");
71+
}
6772
this.execution.set(execution);
6873
this.options = Optional.empty();
6974
}
@@ -167,6 +172,8 @@ public <R> R getResult(long timeout, TimeUnit unit, Class<R> returnType) throws
167172
return null;
168173
}
169174
return dataConverter.fromData(resultValue, returnType);
175+
} catch (TimeoutException e) {
176+
throw e;
170177
} catch (Exception e) {
171178
return mapToWorkflowFailureException(e, returnType);
172179
}
@@ -207,6 +214,7 @@ public <R> CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit, Clas
207214
}
208215

209216
private <R> R mapToWorkflowFailureException(Exception failure, Class<R> returnType) {
217+
failure = CheckedExceptionWrapper.unwrap(failure);
210218
Class<Throwable> detailsClass;
211219
if (failure instanceof WorkflowExecutionFailedException) {
212220
WorkflowExecutionFailedException executionFailed = (WorkflowExecutionFailedException) failure;
@@ -233,7 +241,7 @@ private <R> R mapToWorkflowFailureException(Exception failure, Class<R> returnTy
233241
} else if (failure instanceof WorkflowException) {
234242
throw (WorkflowException) failure;
235243
} else {
236-
throw new WorkflowFailureException(execution.get(), workflowType, 0, failure);
244+
throw new WorkflowServiceException(execution.get(), workflowType, failure);
237245
}
238246
}
239247

@@ -256,7 +264,7 @@ public <R> R query(String queryType, Class<R> returnType, Object... args) {
256264
throw new WorkflowQueryException(execution.get(), unwrapped.getMessage());
257265
}
258266
if (unwrapped instanceof InternalServiceError) {
259-
throw new WorkflowServiceException(execution.get(), unwrapped.getMessage());
267+
throw new WorkflowServiceException(execution.get(), workflowType, unwrapped);
260268
}
261269
throw e;
262270
}

src/main/java/com/uber/cadence/internal/sync/WorkflowClientInternal.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.uber.cadence.client.ActivityCompletionClient;
2424
import com.uber.cadence.client.UntypedWorkflowStub;
2525
import com.uber.cadence.client.WorkflowClient;
26+
import com.uber.cadence.client.WorkflowClientInterceptor;
2627
import com.uber.cadence.client.WorkflowClientOptions;
2728
import com.uber.cadence.client.WorkflowOptions;
2829
import com.uber.cadence.converter.DataConverter;
@@ -47,6 +48,7 @@ public final class WorkflowClientInternal implements WorkflowClient {
4748
private final GenericWorkflowClientExternalImpl genericClient;
4849
private final ManualActivityCompletionClientFactory manualActivityCompletionClientFactory;
4950
private final DataConverter dataConverter;
51+
private final WorkflowClientInterceptor[] interceptors;
5052

5153
/**
5254
* Creates worker that connects to the local instance of the Cadence Service that listens on a
@@ -129,6 +131,7 @@ private WorkflowClientInternal(
129131
options = new WorkflowClientOptions.Builder().build();
130132
}
131133
this.dataConverter = options.getDataConverter();
134+
this.interceptors = options.getInterceptors();
132135
this.manualActivityCompletionClientFactory =
133136
new ManualActivityCompletionClientFactoryImpl(service, domain, dataConverter);
134137
}
@@ -147,12 +150,14 @@ public String getDomain() {
147150
@SuppressWarnings("unchecked")
148151
public <T> T newWorkflowStub(Class<T> workflowInterface, WorkflowOptions options) {
149152
checkAnnotation(workflowInterface, WorkflowMethod.class);
153+
WorkflowExternalInvocationHandler invocationHandler =
154+
new WorkflowExternalInvocationHandler(
155+
workflowInterface, genericClient, options, dataConverter, interceptors);
150156
return (T)
151157
Proxy.newProxyInstance(
152158
WorkflowInternal.class.getClassLoader(),
153159
new Class<?>[] {workflowInterface},
154-
new WorkflowExternalInvocationHandler(
155-
workflowInterface, genericClient, options, dataConverter));
160+
invocationHandler);
156161
}
157162

158163
@SafeVarargs
@@ -196,20 +201,27 @@ public <T> T newWorkflowStub(
196201
if (runId.isPresent()) {
197202
execution.setRunId(runId.get());
198203
}
204+
WorkflowExternalInvocationHandler invocationHandler =
205+
new WorkflowExternalInvocationHandler(
206+
workflowInterface, genericClient, execution, dataConverter, interceptors);
199207
@SuppressWarnings("unchecked")
200208
T result =
201209
(T)
202210
Proxy.newProxyInstance(
203211
WorkflowInternal.class.getClassLoader(),
204212
new Class<?>[] {workflowInterface},
205-
new WorkflowExternalInvocationHandler(
206-
workflowInterface, genericClient, execution, dataConverter));
213+
invocationHandler);
207214
return result;
208215
}
209216

210217
@Override
211218
public UntypedWorkflowStub newUntypedWorkflowStub(String workflowType, WorkflowOptions options) {
212-
return new UntypedWorkflowStubImpl(genericClient, dataConverter, workflowType, options);
219+
UntypedWorkflowStub result =
220+
new UntypedWorkflowStubImpl(genericClient, dataConverter, workflowType, options);
221+
for (WorkflowClientInterceptor i : interceptors) {
222+
result = i.newUntypedWorkflowStub(workflowType, options, result);
223+
}
224+
return result;
213225
}
214226

215227
@Override
@@ -230,7 +242,12 @@ public UntypedWorkflowStub newUntypedWorkflowStub(
230242

231243
@Override
232244
public ActivityCompletionClient newActivityCompletionClient() {
233-
return new ActivityCompletionClientImpl(manualActivityCompletionClientFactory);
245+
ActivityCompletionClient result =
246+
new ActivityCompletionClientImpl(manualActivityCompletionClientFactory);
247+
for (WorkflowClientInterceptor i : interceptors) {
248+
result = i.newActivityCompletionClient(result);
249+
}
250+
return result;
234251
}
235252

236253
public static WorkflowExecution start(Functions.Proc workflow) {

0 commit comments

Comments
 (0)