Skip to content

Commit bf06a8d

Browse files
authored
Add initial support for StartWorkflowExecutionAsync (#884)
Add WorkflowStub#enqueueStart which uses the new StartWorkflowExecutionAsync to submit a workflow to be started at some point in the future. Currently nearly all testing of the Cadence Java client is implemented as integration tests in WorkflowTests, running against either a locally running Cadence service or using the test environment. While this approach would be technically possible to configure (adding zookeeper and kafka to the docker-compose files), this would require reimplementing the integration tests already present on the Cadence server for all new features added. This approach doesn't scale well for adding new functionality to each client. Instead, we add tests for this functionality via targeted unit tests at the different layers of the client. Add WorkflowClientInternalTest to use an in-memory fake server with stubbed responses to assert that we make exactly the correct request. This tests the entire Tchannel networking stack of the client end to end. Add WorkflowStubImplTest to test the specific semantics and state of the WorkflowStubImpl. State is particularly important with WorkflowStubs as they're intended to represent a single execution of a Workflow, so they normally capture the Workflow runId when starting a workflow. For StartWorkflowExecutionAsync we capture only the workflowId so that a stub can be used to enqueueStart(), then later qury or signal that workflow. Additionally add tests for the thrift2proto mappers, including the MapperTestUtil assertions to ensure that all fields are present. This ensures that any update to the IDL files requires a corresponding change to either the test or mapper to account for the new field.
1 parent ec10e2b commit bf06a8d

23 files changed

+1311
-85
lines changed

src/main/java/com/uber/cadence/client/WorkflowStub.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ CompletableFuture<Void> signalAsyncWithTimeout(
7676
CompletableFuture<WorkflowExecution> startAsyncWithTimeout(
7777
long timeout, TimeUnit unit, Object... args);
7878

79+
WorkflowExecution enqueueStart(Object... args);
80+
81+
CompletableFuture<WorkflowExecution> enqueueStartAsync(Object... args);
82+
83+
CompletableFuture<WorkflowExecution> enqueueStartAsyncWithTimeout(
84+
long timeout, TimeUnit unit, Object... args);
85+
7986
WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs);
8087

8188
Optional<String> getWorkflowType();

src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -221,15 +221,25 @@ public StartWorkflowExecutionAsyncResponse StartWorkflowExecutionAsync(
221221
throws BadRequestError, WorkflowExecutionAlreadyStartedError, ServiceBusyError,
222222
DomainNotActiveError, LimitExceededError, EntityNotExistsError,
223223
ClientVersionNotSupportedError, TException {
224-
throw new IllegalArgumentException("unimplemented");
224+
initializeStartWorkflowExecutionRequest(startRequest.getRequest());
225+
try {
226+
com.uber.cadence.api.v1.StartWorkflowExecutionAsyncResponse response =
227+
grpcServiceStubs
228+
.workflowBlockingStub()
229+
.startWorkflowExecutionAsync(
230+
RequestMapper.startWorkflowExecutionAsyncRequest(startRequest));
231+
return ResponseMapper.startWorkflowExecutionAsyncResponse(response);
232+
} catch (StatusRuntimeException e) {
233+
throw ErrorMapper.Error(e);
234+
}
225235
}
226236

227237
private StartWorkflowExecutionResponse startWorkflowExecution(
228238
StartWorkflowExecutionRequest startRequest)
229239
throws BadRequestError, WorkflowExecutionAlreadyStartedError, ServiceBusyError,
230240
DomainNotActiveError, LimitExceededError, EntityNotExistsError,
231241
ClientVersionNotSupportedError, TException {
232-
startRequest.setRequestId(UUID.randomUUID().toString());
242+
initializeStartWorkflowExecutionRequest(startRequest);
233243
try {
234244
com.uber.cadence.api.v1.StartWorkflowExecutionResponse response =
235245
grpcServiceStubs
@@ -241,6 +251,10 @@ private StartWorkflowExecutionResponse startWorkflowExecution(
241251
}
242252
}
243253

254+
private void initializeStartWorkflowExecutionRequest(StartWorkflowExecutionRequest request) {
255+
request.setRequestId(UUID.randomUUID().toString());
256+
}
257+
244258
@Override
245259
public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory(
246260
GetWorkflowExecutionHistoryRequest getRequest)
@@ -485,7 +499,7 @@ public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(
485499
LimitExceededError, WorkflowExecutionAlreadyStartedError, ClientVersionNotSupportedError,
486500
TException {
487501
try {
488-
signalWithStartRequest.setRequestId(UUID.randomUUID().toString());
502+
initializeSignalWithStartWorkflowExecution(signalWithStartRequest);
489503
com.uber.cadence.api.v1.SignalWithStartWorkflowExecutionResponse response =
490504
grpcServiceStubs
491505
.workflowBlockingStub()
@@ -503,7 +517,23 @@ public SignalWithStartWorkflowExecutionAsyncResponse SignalWithStartWorkflowExec
503517
throws BadRequestError, WorkflowExecutionAlreadyStartedError, ServiceBusyError,
504518
DomainNotActiveError, LimitExceededError, EntityNotExistsError,
505519
ClientVersionNotSupportedError, TException {
506-
throw new IllegalArgumentException("unimplemented");
520+
try {
521+
initializeSignalWithStartWorkflowExecution(signalWithStartRequest.getRequest());
522+
com.uber.cadence.api.v1.SignalWithStartWorkflowExecutionAsyncResponse response =
523+
grpcServiceStubs
524+
.workflowBlockingStub()
525+
.signalWithStartWorkflowExecutionAsync(
526+
RequestMapper.signalWithStartWorkflowExecutionAsyncRequest(
527+
signalWithStartRequest));
528+
return ResponseMapper.signalWithStartWorkflowExecutionAsyncResponse(response);
529+
} catch (StatusRuntimeException e) {
530+
throw ErrorMapper.Error(e);
531+
}
532+
}
533+
534+
private void initializeSignalWithStartWorkflowExecution(
535+
SignalWithStartWorkflowExecutionRequest request) {
536+
request.setRequestId(UUID.randomUUID().toString());
507537
}
508538

509539
@Override
@@ -825,7 +855,28 @@ public void StartWorkflowExecution(
825855
public void StartWorkflowExecutionAsync(
826856
StartWorkflowExecutionAsyncRequest startRequest, AsyncMethodCallback resultHandler)
827857
throws TException {
828-
throw new IllegalArgumentException("unimplemented");
858+
try {
859+
initializeStartWorkflowExecutionRequest(startRequest.getRequest());
860+
ListenableFuture<com.uber.cadence.api.v1.StartWorkflowExecutionAsyncResponse> resultFuture =
861+
grpcServiceStubs
862+
.workflowFutureStub()
863+
.startWorkflowExecutionAsync(
864+
RequestMapper.startWorkflowExecutionAsyncRequest(startRequest));
865+
resultFuture.addListener(
866+
() -> {
867+
try {
868+
com.uber.cadence.api.v1.StartWorkflowExecutionAsyncResponse response =
869+
resultFuture.get();
870+
resultHandler.onComplete(
871+
ResponseMapper.startWorkflowExecutionAsyncResponse(response));
872+
} catch (Exception e) {
873+
resultHandler.onError(e);
874+
}
875+
},
876+
ForkJoinPool.commonPool());
877+
} catch (StatusRuntimeException e) {
878+
throw ErrorMapper.Error(e);
879+
}
829880
}
830881

831882
@Override
@@ -1124,7 +1175,7 @@ public void StartWorkflowExecutionWithTimeout(
11241175
Long timeoutInMillis)
11251176
throws TException {
11261177
try {
1127-
startRequest.setRequestId(UUID.randomUUID().toString());
1178+
initializeStartWorkflowExecutionRequest(startRequest);
11281179
ListenableFuture<com.uber.cadence.api.v1.StartWorkflowExecutionResponse> resultFuture =
11291180
grpcServiceStubs
11301181
.workflowFutureStub()
@@ -1145,6 +1196,37 @@ public void StartWorkflowExecutionWithTimeout(
11451196
}
11461197
}
11471198

1199+
@Override
1200+
public void StartWorkflowExecutionAsyncWithTimeout(
1201+
StartWorkflowExecutionAsyncRequest startAsyncRequest,
1202+
AsyncMethodCallback resultHandler,
1203+
Long timeoutInMillis)
1204+
throws TException {
1205+
try {
1206+
initializeStartWorkflowExecutionRequest(startAsyncRequest.getRequest());
1207+
ListenableFuture<com.uber.cadence.api.v1.StartWorkflowExecutionAsyncResponse> resultFuture =
1208+
grpcServiceStubs
1209+
.workflowFutureStub()
1210+
.withDeadline(Deadline.after(timeoutInMillis, TimeUnit.MILLISECONDS))
1211+
.startWorkflowExecutionAsync(
1212+
RequestMapper.startWorkflowExecutionAsyncRequest(startAsyncRequest));
1213+
resultFuture.addListener(
1214+
() -> {
1215+
try {
1216+
com.uber.cadence.api.v1.StartWorkflowExecutionAsyncResponse response =
1217+
resultFuture.get();
1218+
resultHandler.onComplete(
1219+
ResponseMapper.startWorkflowExecutionAsyncResponse(response));
1220+
} catch (Exception e) {
1221+
resultHandler.onError(e);
1222+
}
1223+
},
1224+
ForkJoinPool.commonPool());
1225+
} catch (StatusRuntimeException e) {
1226+
throw ErrorMapper.Error(e);
1227+
}
1228+
}
1229+
11481230
@Override
11491231
public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeout(
11501232
GetWorkflowExecutionHistoryRequest getRequest, Long timeoutInMillis) throws TException {

src/main/java/com/uber/cadence/internal/compatibility/proto/RequestMapper.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@
8484
import com.uber.cadence.api.v1.RespondDecisionTaskFailedRequest;
8585
import com.uber.cadence.api.v1.RespondQueryTaskCompletedRequest;
8686
import com.uber.cadence.api.v1.ScanWorkflowExecutionsRequest;
87+
import com.uber.cadence.api.v1.SignalWithStartWorkflowExecutionAsyncRequest;
8788
import com.uber.cadence.api.v1.SignalWithStartWorkflowExecutionRequest;
8889
import com.uber.cadence.api.v1.SignalWorkflowExecutionRequest;
90+
import com.uber.cadence.api.v1.StartWorkflowExecutionAsyncRequest;
8991
import com.uber.cadence.api.v1.StartWorkflowExecutionRequest;
9092
import com.uber.cadence.api.v1.TerminateWorkflowExecutionRequest;
9193
import com.uber.cadence.api.v1.UpdateDomainRequest;
@@ -449,7 +451,7 @@ public static SignalWithStartWorkflowExecutionRequest signalWithStartWorkflowExe
449451
if (t.getDelayStartSeconds() > 0) {
450452
builder.setDelayStart(secondsToDuration(t.getDelayStartSeconds()));
451453
}
452-
;
454+
453455
if (t.getIdentity() != null) {
454456
builder.setIdentity(t.getIdentity());
455457
}
@@ -464,6 +466,20 @@ public static SignalWithStartWorkflowExecutionRequest signalWithStartWorkflowExe
464466
return sb.build();
465467
}
466468

469+
public static SignalWithStartWorkflowExecutionAsyncRequest
470+
signalWithStartWorkflowExecutionAsyncRequest(
471+
com.uber.cadence.SignalWithStartWorkflowExecutionAsyncRequest t) {
472+
if (t == null) {
473+
return null;
474+
}
475+
SignalWithStartWorkflowExecutionAsyncRequest.Builder builder =
476+
SignalWithStartWorkflowExecutionAsyncRequest.newBuilder();
477+
if (t.getRequest() != null) {
478+
builder.setRequest(signalWithStartWorkflowExecutionRequest(t.getRequest()));
479+
}
480+
return builder.build();
481+
}
482+
467483
public static SignalWorkflowExecutionRequest signalWorkflowExecutionRequest(
468484
com.uber.cadence.SignalWorkflowExecutionRequest t) {
469485
if (t == null) {
@@ -518,6 +534,19 @@ public static StartWorkflowExecutionRequest startWorkflowExecutionRequest(
518534
return request.build();
519535
}
520536

537+
public static StartWorkflowExecutionAsyncRequest startWorkflowExecutionAsyncRequest(
538+
com.uber.cadence.StartWorkflowExecutionAsyncRequest t) {
539+
if (t == null) {
540+
return null;
541+
}
542+
StartWorkflowExecutionAsyncRequest.Builder builder =
543+
StartWorkflowExecutionAsyncRequest.newBuilder();
544+
if (t.getRequest() != null) {
545+
builder.setRequest(startWorkflowExecutionRequest(t.getRequest()));
546+
}
547+
return builder.build();
548+
}
549+
521550
public static TerminateWorkflowExecutionRequest terminateWorkflowExecutionRequest(
522551
com.uber.cadence.TerminateWorkflowExecutionRequest t) {
523552
if (t == null) {

src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -193,55 +193,58 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
193193
span.finish();
194194
}
195195

196+
@SuppressWarnings("unchecked")
196197
@Override
197198
public void sendMessage(ReqT message) {
198199
if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecution")
199200
&& message instanceof StartWorkflowExecutionRequest) {
200201
StartWorkflowExecutionRequest request = (StartWorkflowExecutionRequest) message;
201-
Map<String, byte[]> headers = new HashMap<>();
202-
tracingPropagator.inject(headers);
203-
Header.Builder headerBuilder = request.getHeader().toBuilder();
204-
headers.forEach(
205-
(k, v) -> {
206-
headerBuilder.putFields(
207-
k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build());
208-
});
202+
Header newHeader = addTracingHeaders(request.getHeader());
203+
204+
// cast should not throw error as we are using the builder
205+
message = (ReqT) request.toBuilder().setHeader(newHeader).build();
206+
} else if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecutionAsync")
207+
&& message instanceof StartWorkflowExecutionAsyncRequest) {
208+
StartWorkflowExecutionAsyncRequest request =
209+
(StartWorkflowExecutionAsyncRequest) message;
210+
Header newHeader = addTracingHeaders(request.getRequest().getHeader());
209211

210212
// cast should not throw error as we are using the builder
211213
message =
212214
(ReqT)
213-
((StartWorkflowExecutionRequest) message)
215+
request
214216
.toBuilder()
215-
.setHeader(headerBuilder.build())
217+
.setRequest(request.getRequest().toBuilder().setHeader(newHeader))
216218
.build();
217-
}
218-
if (Objects.equals(method.getBareMethodName(), "SignalWithStartWorkflowExecution")
219+
} else if (Objects.equals(
220+
method.getBareMethodName(), "SignalWithStartWorkflowExecution")
219221
&& message instanceof SignalWithStartWorkflowExecutionRequest) {
220222
SignalWithStartWorkflowExecutionRequest request =
221223
(SignalWithStartWorkflowExecutionRequest) message;
222-
Map<String, byte[]> headers = new HashMap<>();
223-
tracingPropagator.inject(headers);
224-
Header.Builder headerBuilder = request.getStartRequest().getHeader().toBuilder();
225-
headers.forEach(
226-
(k, v) -> {
227-
headerBuilder.putFields(
228-
k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build());
229-
});
224+
Header newHeader = addTracingHeaders(request.getStartRequest().getHeader());
230225

231226
// cast should not throw error as we are using the builder
232227
message =
233228
(ReqT)
234-
((SignalWithStartWorkflowExecutionRequest) message)
229+
request
235230
.toBuilder()
236231
.setStartRequest(
237-
request
238-
.getStartRequest()
239-
.toBuilder()
240-
.setHeader(headerBuilder.build()))
232+
request.getStartRequest().toBuilder().setHeader(newHeader))
241233
.build();
242234
}
243235
super.sendMessage(message);
244236
}
237+
238+
private Header addTracingHeaders(Header header) {
239+
Map<String, byte[]> headers = new HashMap<>();
240+
tracingPropagator.inject(headers);
241+
Header.Builder headerBuilder = header.toBuilder();
242+
headers.forEach(
243+
(k, v) ->
244+
headerBuilder.putFields(
245+
k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build()));
246+
return headerBuilder.build();
247+
}
245248
};
246249
}
247250
};

src/main/java/com/uber/cadence/internal/compatibility/thrift/ResponseMapper.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@
7171
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
7272
import com.uber.cadence.ResetWorkflowExecutionResponse;
7373
import com.uber.cadence.RespondDecisionTaskCompletedResponse;
74+
import com.uber.cadence.SignalWithStartWorkflowExecutionAsyncResponse;
75+
import com.uber.cadence.StartWorkflowExecutionAsyncResponse;
7476
import com.uber.cadence.StartWorkflowExecutionResponse;
7577
import com.uber.cadence.UpdateDomainResponse;
7678
import com.uber.cadence.api.v1.WorkflowQuery;
@@ -88,6 +90,11 @@ public static StartWorkflowExecutionResponse startWorkflowExecutionResponse(
8890
return startWorkflowExecutionResponse;
8991
}
9092

93+
public static StartWorkflowExecutionAsyncResponse startWorkflowExecutionAsyncResponse(
94+
com.uber.cadence.api.v1.StartWorkflowExecutionAsyncResponse t) {
95+
return t == null ? null : new StartWorkflowExecutionAsyncResponse();
96+
}
97+
9198
public static DescribeTaskListResponse describeTaskListResponse(
9299
com.uber.cadence.api.v1.DescribeTaskListResponse t) {
93100
if (t == null) {
@@ -399,6 +406,12 @@ public static StartWorkflowExecutionResponse signalWithStartWorkflowExecutionRes
399406
return startWorkflowExecutionResponse;
400407
}
401408

409+
public static SignalWithStartWorkflowExecutionAsyncResponse
410+
signalWithStartWorkflowExecutionAsyncResponse(
411+
com.uber.cadence.api.v1.SignalWithStartWorkflowExecutionAsyncResponse t) {
412+
return t == null ? null : new SignalWithStartWorkflowExecutionAsyncResponse();
413+
}
414+
402415
public static UpdateDomainResponse updateDomainResponse(
403416
com.uber.cadence.api.v1.UpdateDomainResponse t) {
404417
if (t == null) {

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ CompletableFuture<WorkflowExecution> startWorkflowAsync(
3939
CompletableFuture<WorkflowExecution> startWorkflowAsync(
4040
StartWorkflowExecutionParameters startParameters, Long timeoutInMillis);
4141

42+
void enqueueStartWorkflow(StartWorkflowExecutionParameters startParameters)
43+
throws WorkflowExecutionAlreadyStartedError;
44+
45+
CompletableFuture<Void> enqueueStartWorkflowAsync(
46+
StartWorkflowExecutionParameters startParameters, Long timeoutInMillis);
47+
4248
void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters);
4349

4450
CompletableFuture<Void> signalWorkflowExecutionAsync(

0 commit comments

Comments
 (0)