Skip to content

Commit d98d33f

Browse files
authored
Add async signal to untypedstub (#527)
1 parent 7c0e2ab commit d98d33f

File tree

10 files changed

+297
-17
lines changed

10 files changed

+297
-17
lines changed

src/main/java/com/uber/cadence/client/WorkflowStub.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ static <T> WorkflowStub fromTyped(T typed) {
6565

6666
void signal(String signalName, Object... args);
6767

68+
CompletableFuture<Void> signalAsync(String signalName, Object... args);
69+
70+
CompletableFuture<Void> signalAsyncWithTimeout(
71+
long timeout, TimeUnit unit, String signalName, Object... args);
72+
6873
WorkflowExecution start(Object... args);
6974

7075
CompletableFuture<WorkflowExecution> startAsync(Object... args);

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ CompletableFuture<WorkflowExecution> startWorkflowAsync(
4141

4242
void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters);
4343

44+
CompletableFuture<Void> signalWorkflowExecutionAsync(
45+
SignalExternalWorkflowParameters signalParameters);
46+
47+
CompletableFuture<Void> signalWorkflowExecutionAsync(
48+
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis);
49+
4450
WorkflowExecution signalWithStartWorkflowExecution(
4551
SignalWithStartWorkflowExecutionParameters parameters);
4652

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -269,15 +269,8 @@ private RetryPolicy toRetryPolicy(RetryParameters retryParameters) {
269269

270270
@Override
271271
public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters) {
272-
SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
273-
request.setDomain(domain);
272+
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
274273

275-
request.setInput(signalParameters.getInput());
276-
request.setSignalName(signalParameters.getSignalName());
277-
WorkflowExecution execution = new WorkflowExecution();
278-
execution.setRunId(signalParameters.getRunId());
279-
execution.setWorkflowId(signalParameters.getWorkflowId());
280-
request.setWorkflowExecution(execution);
281274
try {
282275
Retryer.retry(
283276
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
@@ -287,6 +280,55 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam
287280
}
288281
}
289282

283+
@Override
284+
public CompletableFuture<Void> signalWorkflowExecutionAsync(
285+
SignalExternalWorkflowParameters signalParameters) {
286+
return signalWorkflowExecutionAsync(signalParameters, Long.MAX_VALUE);
287+
}
288+
289+
@Override
290+
public CompletableFuture<Void> signalWorkflowExecutionAsync(
291+
SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) {
292+
SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
293+
return Retryer.retryWithResultAsync(
294+
getRetryOptionsWithExpiration(
295+
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis),
296+
() -> {
297+
CompletableFuture<Void> result = new CompletableFuture<>();
298+
try {
299+
service.SignalWorkflowExecution(
300+
request,
301+
new AsyncMethodCallback() {
302+
@Override
303+
public void onComplete(Object response) {
304+
result.complete(null);
305+
}
306+
307+
@Override
308+
public void onError(Exception exception) {
309+
result.completeExceptionally(exception);
310+
}
311+
});
312+
} catch (TException e) {
313+
result.completeExceptionally(e);
314+
}
315+
return result;
316+
});
317+
}
318+
319+
private SignalWorkflowExecutionRequest getSignalRequest(
320+
SignalExternalWorkflowParameters signalParameters) {
321+
SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
322+
request.setDomain(domain);
323+
request.setInput(signalParameters.getInput());
324+
request.setSignalName(signalParameters.getSignalName());
325+
WorkflowExecution execution = new WorkflowExecution();
326+
execution.setRunId(signalParameters.getRunId());
327+
execution.setWorkflowId(signalParameters.getWorkflowId());
328+
request.setWorkflowExecution(execution);
329+
return request;
330+
}
331+
290332
@Override
291333
public WorkflowExecution signalWithStartWorkflowExecution(
292334
SignalWithStartWorkflowExecutionParameters parameters) {

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,15 @@ public void SignalWorkflowExecution(
699699
impl.SignalWorkflowExecution(signalRequest, resultHandler);
700700
}
701701

702+
@Override
703+
public void SignalWorkflowExecutionWithTimeout(
704+
SignalWorkflowExecutionRequest signalRequest,
705+
AsyncMethodCallback resultHandler,
706+
Long timeoutInMillis)
707+
throws TException {
708+
impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis);
709+
}
710+
702711
@Override
703712
public void SignalWithStartWorkflowExecution(
704713
SignalWithStartWorkflowExecutionRequest signalWithStartRequest,

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,15 @@ public void SignalWorkflowExecution(
550550
impl.SignalWorkflowExecution(signalRequest, resultHandler);
551551
}
552552

553+
@Override
554+
public void SignalWorkflowExecutionWithTimeout(
555+
SignalWorkflowExecutionRequest signalRequest,
556+
AsyncMethodCallback resultHandler,
557+
Long timeoutInMillis)
558+
throws TException {
559+
impl.SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, timeoutInMillis);
560+
}
561+
553562
@Override
554563
public void SignalWithStartWorkflowExecution(
555564
SignalWithStartWorkflowExecutionRequest signalWithStartRequest,
@@ -806,6 +815,17 @@ public void signal(String signalName, Object... args) {
806815
next.signal(signalName, args);
807816
}
808817

818+
@Override
819+
public CompletableFuture<Void> signalAsync(String signalName, Object... args) {
820+
return next.signalAsync(signalName, args);
821+
}
822+
823+
@Override
824+
public CompletableFuture<Void> signalAsyncWithTimeout(
825+
long timeout, TimeUnit unit, String signalName, Object... args) {
826+
return next.signalAsyncWithTimeout(timeout, unit, signalName, args);
827+
}
828+
809829
@Override
810830
public WorkflowExecution start(Object... args) {
811831
return next.start(args);

src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,28 @@ class WorkflowStubImpl implements WorkflowStub {
9797

9898
@Override
9999
public void signal(String signalName, Object... input) {
100+
SignalExternalWorkflowParameters p = getSignalExternalWorkflowParameters(signalName, input);
101+
try {
102+
genericClient.signalWorkflowExecution(p);
103+
} catch (Exception e) {
104+
throw new WorkflowServiceException(execution.get(), workflowType, e);
105+
}
106+
}
107+
108+
@Override
109+
public CompletableFuture<Void> signalAsync(String signalName, Object... input) {
110+
return signalAsyncWithTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS, signalName, input);
111+
}
112+
113+
@Override
114+
public CompletableFuture<Void> signalAsyncWithTimeout(
115+
long timeout, TimeUnit unit, String signalName, Object... input) {
116+
SignalExternalWorkflowParameters p = getSignalExternalWorkflowParameters(signalName, input);
117+
return genericClient.signalWorkflowExecutionAsync(p, unit.toMillis(timeout));
118+
}
119+
120+
private SignalExternalWorkflowParameters getSignalExternalWorkflowParameters(
121+
String signalName, Object... input) {
100122
checkStarted();
101123
SignalExternalWorkflowParameters p = new SignalExternalWorkflowParameters();
102124
p.setInput(dataConverter.toData(input));
@@ -105,11 +127,7 @@ public void signal(String signalName, Object... input) {
105127
// TODO: Deal with signaling started workflow only, when requested
106128
// Commented out to support signaling workflows that called continue as new.
107129
// p.setRunId(execution.getRunId());
108-
try {
109-
genericClient.signalWorkflowExecution(p);
110-
} catch (Exception e) {
111-
throw new WorkflowServiceException(execution.get(), workflowType, e);
112-
}
130+
return p;
113131
}
114132

115133
private WorkflowExecution startWithOptions(WorkflowOptions o, Object... args) {

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -936,7 +936,24 @@ public void RequestCancelWorkflowExecution(
936936
public void SignalWorkflowExecution(
937937
SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler)
938938
throws TException {
939-
throw new UnsupportedOperationException("not implemented");
939+
SignalWorkflowExecutionWithTimeout(signalRequest, resultHandler, null);
940+
}
941+
942+
@Override
943+
public void SignalWorkflowExecutionWithTimeout(
944+
SignalWorkflowExecutionRequest signalRequest,
945+
AsyncMethodCallback resultHandler,
946+
Long timeoutInMillis)
947+
throws TException {
948+
forkJoinPool.execute(
949+
() -> {
950+
try {
951+
SignalWorkflowExecution(signalRequest);
952+
resultHandler.onComplete(null);
953+
} catch (TException e) {
954+
resultHandler.onError(e);
955+
}
956+
});
940957
}
941958

942959
@Override

src/main/java/com/uber/cadence/serviceclient/IWorkflowService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
2121
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
22+
import com.uber.cadence.SignalWorkflowExecutionRequest;
2223
import com.uber.cadence.StartWorkflowExecutionRequest;
2324
import com.uber.cadence.WorkflowService.AsyncIface;
2425
import com.uber.cadence.WorkflowService.Iface;
@@ -69,4 +70,18 @@ void GetWorkflowExecutionHistoryWithTimeout(
6970
AsyncMethodCallback resultHandler,
7071
Long timeoutInMillis)
7172
throws TException;
73+
/**
74+
* SignalWorkflowExecutionWithTimeout signal workflow same as SignalWorkflowExecution but with
75+
* timeout
76+
*
77+
* @param signalRequest
78+
* @param resultHandler
79+
* @param timeoutInMillis
80+
* @throws TException
81+
*/
82+
void SignalWorkflowExecutionWithTimeout(
83+
SignalWorkflowExecutionRequest signalRequest,
84+
AsyncMethodCallback resultHandler,
85+
Long timeoutInMillis)
86+
throws TException;
7287
}

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2551,9 +2551,82 @@ public void RequestCancelWorkflowExecution(
25512551

25522552
@Override
25532553
public void SignalWorkflowExecution(
2554-
SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler)
2555-
throws TException {
2556-
throw new UnsupportedOperationException("not implemented");
2554+
SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler) {
2555+
signalWorkflowExecution(signalRequest, resultHandler, null);
2556+
}
2557+
2558+
@Override
2559+
public void SignalWorkflowExecutionWithTimeout(
2560+
SignalWorkflowExecutionRequest signalRequest,
2561+
AsyncMethodCallback resultHandler,
2562+
Long timeoutInMillis) {
2563+
signalWorkflowExecution(signalRequest, resultHandler, timeoutInMillis);
2564+
}
2565+
2566+
private void signalWorkflowExecution(
2567+
SignalWorkflowExecutionRequest signalRequest,
2568+
AsyncMethodCallback resultHandler,
2569+
Long timeoutInMillis) {
2570+
2571+
timeoutInMillis = validateAndUpdateTimeout(timeoutInMillis, options.getRpcTimeoutMillis());
2572+
ThriftRequest<WorkflowService.SignalWorkflowExecution_args> request =
2573+
buildThriftRequest(
2574+
"SignalWorkflowExecution",
2575+
new WorkflowService.SignalWorkflowExecution_args(signalRequest),
2576+
timeoutInMillis);
2577+
CompletableFuture<ThriftResponse<WorkflowService.SignalWorkflowExecution_result>> response =
2578+
doRemoteCallAsync(request);
2579+
response
2580+
.whenComplete(
2581+
(r, e) -> {
2582+
try {
2583+
if (e != null) {
2584+
resultHandler.onError(CheckedExceptionWrapper.wrap(e));
2585+
return;
2586+
}
2587+
WorkflowService.SignalWorkflowExecution_result result =
2588+
r.getBody(WorkflowService.SignalWorkflowExecution_result.class);
2589+
if (r.getResponseCode() == ResponseCode.OK) {
2590+
resultHandler.onComplete(null);
2591+
return;
2592+
}
2593+
if (result.isSetBadRequestError()) {
2594+
resultHandler.onError(result.getBadRequestError());
2595+
return;
2596+
}
2597+
if (result.isSetEntityNotExistError()) {
2598+
resultHandler.onError(result.getEntityNotExistError());
2599+
return;
2600+
}
2601+
if (result.isSetServiceBusyError()) {
2602+
resultHandler.onError(result.getServiceBusyError());
2603+
return;
2604+
}
2605+
if (result.isSetDomainNotActiveError()) {
2606+
resultHandler.onError(result.getDomainNotActiveError());
2607+
return;
2608+
}
2609+
if (result.isSetLimitExceededError()) {
2610+
resultHandler.onError(result.getLimitExceededError());
2611+
return;
2612+
}
2613+
if (result.isSetClientVersionNotSupportedError()) {
2614+
resultHandler.onError(result.getClientVersionNotSupportedError());
2615+
return;
2616+
}
2617+
resultHandler.onError(
2618+
new TException("SignalWorkflowExecution failed with unknown error:" + result));
2619+
} finally {
2620+
if (r != null) {
2621+
r.release();
2622+
}
2623+
}
2624+
})
2625+
.exceptionally(
2626+
(e) -> {
2627+
log.error("Unexpected error in SignalWorkflowExecution", e);
2628+
return null;
2629+
});
25572630
}
25582631

25592632
@Override

0 commit comments

Comments
 (0)