Skip to content

Commit 8e0e3e2

Browse files
authored
Add support for SignalWithStartWorkflowExecutionAsync (#887)
Add support to schedule a SignalWithStartWorkflowExecution operation to be performed at a future date via SignalWithStartWorkflowExecutionAsync. This is exposed in the java client in two ways: - WorkflowStub#enqueueSignalWithStart, which presents an untyped interface. - WorkflowClient#enqueueSignalWithStart, with presents a statically typed interface. The existing mechanisms for SignalWithStart don't currently have an async equivalent so we don't introduce one here. Additionally add full test coverage for tracing information being injected into headers both in GRPC and TChannel.
1 parent eae647c commit 8e0e3e2

16 files changed

+604
-19
lines changed

src/main/java/com/uber/cadence/client/WorkflowClient.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ WorkflowStub newUntypedWorkflowStub(
227227
* running. The batch before invocation must contain exactly two operations. One annotated
228228
* with @WorkflowMethod and another with @SignalMethod.
229229
*
230-
* @return batch request used to call {@link #signalWithStart(BatchRequest)}
230+
* @return batch request used to call {@link #signalWithStart(BatchRequest)} or {@link
231+
* #enqueueSignalWithStart(BatchRequest)}
231232
*/
232233
BatchRequest newSignalWithStartRequest();
233234

@@ -239,6 +240,20 @@ WorkflowStub newUntypedWorkflowStub(
239240
*/
240241
WorkflowExecution signalWithStart(BatchRequest signalWithStartBatch);
241242

243+
/**
244+
* Schedules a SignalWithStart operation to be performed at a future date via
245+
* SignalWithStartWorkflowExecutionAsync. This requires that async execution has been enabled for
246+
* this domain.
247+
*
248+
* <p>Note that the returned WorkflowExecution will <b>NOT</b> contain a {@code runId}, only a
249+
* {@code workflowId}. This is because the {@code runId} is only determined at the time the
250+
* workflow actually starts.
251+
*
252+
* @param signalWithStartBatch Must be created with {@link #newSignalWithStartRequest()}
253+
* @return WorkflowExecution containing only the workflowId
254+
*/
255+
WorkflowExecution enqueueSignalWithStart(BatchRequest signalWithStartBatch);
256+
242257
/**
243258
* Refreshes all the tasks of a given workflow.
244259
*

src/main/java/com/uber/cadence/client/WorkflowStub.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ CompletableFuture<WorkflowExecution> enqueueStartAsyncWithTimeout(
8585

8686
WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs);
8787

88+
WorkflowExecution enqueueSignalWithStart(
89+
String signalName, Object[] signalArgs, Object[] startArgs);
90+
8891
Optional<String> getWorkflowType();
8992

9093
WorkflowExecution getExecution();

src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,30 @@ public void sendMessage(ReqT message) {
231231
.setStartRequest(
232232
request.getStartRequest().toBuilder().setHeader(newHeader))
233233
.build();
234+
} else if (Objects.equals(
235+
method.getBareMethodName(), "SignalWithStartWorkflowExecutionAsync")
236+
&& message instanceof SignalWithStartWorkflowExecutionAsyncRequest) {
237+
SignalWithStartWorkflowExecutionAsyncRequest request =
238+
(SignalWithStartWorkflowExecutionAsyncRequest) message;
239+
Header newHeader =
240+
addTracingHeaders(request.getRequest().getStartRequest().getHeader());
241+
242+
// cast should not throw error as we are using the builder
243+
message =
244+
(ReqT)
245+
request
246+
.toBuilder()
247+
.setRequest(
248+
request
249+
.getRequest()
250+
.toBuilder()
251+
.setStartRequest(
252+
request
253+
.getRequest()
254+
.getStartRequest()
255+
.toBuilder()
256+
.setHeader(newHeader)))
257+
.build();
234258
}
235259
super.sendMessage(message);
236260
}

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ CompletableFuture<Void> signalWorkflowExecutionAsync(
5656
WorkflowExecution signalWithStartWorkflowExecution(
5757
SignalWithStartWorkflowExecutionParameters parameters);
5858

59+
WorkflowExecution enqueueSignalWithStartWorkflowExecution(
60+
SignalWithStartWorkflowExecutionParameters parameters);
61+
5962
void requestCancelWorkflowExecution(WorkflowExecution execution);
6063

6164
QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParameters);

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
2626
import com.uber.cadence.RetryPolicy;
2727
import com.uber.cadence.SearchAttributes;
28+
import com.uber.cadence.SignalWithStartWorkflowExecutionAsyncRequest;
2829
import com.uber.cadence.SignalWithStartWorkflowExecutionRequest;
2930
import com.uber.cadence.SignalWorkflowExecutionRequest;
3031
import com.uber.cadence.StartWorkflowExecutionAsyncRequest;
@@ -411,8 +412,60 @@ public WorkflowExecution signalWithStartWorkflowExecution(
411412
}
412413
}
413414

415+
@Override
416+
public WorkflowExecution enqueueSignalWithStartWorkflowExecution(
417+
SignalWithStartWorkflowExecutionParameters parameters) {
418+
try {
419+
return enqueueSignalWithStartWorkflowInternal(parameters);
420+
} finally {
421+
Map<String, String> tags =
422+
new ImmutableMap.Builder<String, String>(3)
423+
.put(
424+
MetricsTag.WORKFLOW_TYPE,
425+
parameters.getStartParameters().getWorkflowType().getName())
426+
.put(MetricsTag.TASK_LIST, parameters.getStartParameters().getTaskList())
427+
.put(MetricsTag.DOMAIN, domain)
428+
.build();
429+
metricsScope
430+
.tagged(tags)
431+
.counter(MetricsType.WORKFLOW_SIGNAL_WITH_START_ASYNC_COUNTER)
432+
.inc(1);
433+
}
434+
}
435+
436+
private WorkflowExecution enqueueSignalWithStartWorkflowInternal(
437+
SignalWithStartWorkflowExecutionParameters parameters) {
438+
SignalWithStartWorkflowExecutionAsyncRequest request =
439+
new SignalWithStartWorkflowExecutionAsyncRequest()
440+
.setRequest(createSignalWithStartRequest(parameters));
441+
try {
442+
RpcRetryer.retryWithResult(
443+
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
444+
() -> service.SignalWithStartWorkflowExecutionAsync(request));
445+
return new WorkflowExecution().setWorkflowId(request.getRequest().getWorkflowId());
446+
} catch (TException e) {
447+
throw CheckedExceptionWrapper.wrap(e);
448+
}
449+
}
450+
414451
private WorkflowExecution signalWithStartWorkflowInternal(
415452
SignalWithStartWorkflowExecutionParameters parameters) {
453+
SignalWithStartWorkflowExecutionRequest request = createSignalWithStartRequest(parameters);
454+
try {
455+
StartWorkflowExecutionResponse result =
456+
RpcRetryer.retryWithResult(
457+
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
458+
() -> service.SignalWithStartWorkflowExecution(request));
459+
return new WorkflowExecution()
460+
.setRunId(result.getRunId())
461+
.setWorkflowId(request.getWorkflowId());
462+
} catch (TException e) {
463+
throw CheckedExceptionWrapper.wrap(e);
464+
}
465+
}
466+
467+
private SignalWithStartWorkflowExecutionRequest createSignalWithStartRequest(
468+
SignalWithStartWorkflowExecutionParameters parameters) {
416469
SignalWithStartWorkflowExecutionRequest request = new SignalWithStartWorkflowExecutionRequest();
417470
request.setDomain(domain);
418471
StartWorkflowExecutionParameters startParameters = parameters.getStartParameters();
@@ -451,19 +504,7 @@ private WorkflowExecution signalWithStartWorkflowInternal(
451504
if (startParameters.getDelayStart() != null) {
452505
request.setDelayStartSeconds((int) startParameters.getDelayStart().getSeconds());
453506
}
454-
StartWorkflowExecutionResponse result;
455-
try {
456-
result =
457-
RpcRetryer.retryWithResult(
458-
RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
459-
() -> service.SignalWithStartWorkflowExecution(request));
460-
} catch (TException e) {
461-
throw CheckedExceptionWrapper.wrap(e);
462-
}
463-
WorkflowExecution execution = new WorkflowExecution();
464-
execution.setRunId(result.getRunId());
465-
execution.setWorkflowId(request.getWorkflowId());
466-
return execution;
507+
return request;
467508
}
468509

469510
@Override

src/main/java/com/uber/cadence/internal/metrics/MetricsType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public class MetricsType {
4343
CADENCE_METRICS_PREFIX + "workflow-get-history-latency";
4444
public static final String WORKFLOW_SIGNAL_WITH_START_COUNTER =
4545
CADENCE_METRICS_PREFIX + "workflow-signal-with-start";
46+
public static final String WORKFLOW_SIGNAL_WITH_START_ASYNC_COUNTER =
47+
CADENCE_METRICS_PREFIX + "workflow-signal-with-start-async";
4648

4749
public static final String DECISION_POLL_COUNTER = CADENCE_METRICS_PREFIX + "decision-poll-total";
4850
public static final String DECISION_POLL_FAILED_COUNTER =

src/main/java/com/uber/cadence/internal/sync/SignalWithStartBatchRequest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.List;
2626
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.function.Supplier;
2728

2829
final class SignalWithStartBatchRequest implements BatchRequest {
2930

@@ -33,9 +34,17 @@ final class SignalWithStartBatchRequest implements BatchRequest {
3334
private String signalName;
3435
private Object[] signalArgs;
3536
private Object[] startArgs;
36-
private AtomicBoolean invoked = new AtomicBoolean();
37+
private final AtomicBoolean invoked = new AtomicBoolean();
3738

38-
WorkflowExecution invoke() {
39+
WorkflowExecution enqueue() {
40+
return invoke(this::enqueueSignalWithStart);
41+
}
42+
43+
WorkflowExecution execute() {
44+
return invoke(this::signalWithStart);
45+
}
46+
47+
private WorkflowExecution invoke(Supplier<WorkflowExecution> commitFn) {
3948
if (!invoked.compareAndSet(false, true)) {
4049
throw new IllegalStateException(
4150
"A batch instance can be used only for a single signalWithStart call");
@@ -46,7 +55,7 @@ WorkflowExecution invoke() {
4655
for (Functions.Proc request : requests) {
4756
request.apply();
4857
}
49-
return signalWithStart();
58+
return commitFn.get();
5059
} finally {
5160
WorkflowInvocationHandler.closeAsyncInvocation();
5261
}
@@ -56,6 +65,10 @@ private WorkflowExecution signalWithStart() {
5665
return stub.signalWithStart(signalName, signalArgs, startArgs);
5766
}
5867

68+
private WorkflowExecution enqueueSignalWithStart() {
69+
return stub.enqueueSignalWithStart(signalName, signalArgs, startArgs);
70+
}
71+
5972
void signal(WorkflowStub stub, String signalName, Object[] args) {
6073
setStub(stub);
6174
this.signalName = signalName;

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,12 @@ public WorkflowExecution signalWithStart(
10121012
return next.signalWithStart(signalName, signalArgs, startArgs);
10131013
}
10141014

1015+
@Override
1016+
public WorkflowExecution enqueueSignalWithStart(
1017+
String signalName, Object[] signalArgs, Object[] startArgs) {
1018+
return next.enqueueSignalWithStart(signalName, signalArgs, startArgs);
1019+
}
1020+
10151021
@Override
10161022
public Optional<String> getWorkflowType() {
10171023
return next.getWorkflowType();

src/main/java/com/uber/cadence/internal/sync/WorkflowClientInternal.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,12 @@ public BatchRequest newSignalWithStartRequest() {
210210

211211
@Override
212212
public WorkflowExecution signalWithStart(BatchRequest signalWithStartBatch) {
213-
return ((SignalWithStartBatchRequest) signalWithStartBatch).invoke();
213+
return ((SignalWithStartBatchRequest) signalWithStartBatch).execute();
214+
}
215+
216+
@Override
217+
public WorkflowExecution enqueueSignalWithStart(BatchRequest signalWithStartBatch) {
218+
return ((SignalWithStartBatchRequest) signalWithStartBatch).enqueue();
214219
}
215220

216221
@Override

src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,31 @@ public WorkflowExecution signalWithStart(
323323
WorkflowOptions.merge(null, null, null, options.get()), signalName, signalArgs, startArgs);
324324
}
325325

326+
@Override
327+
public WorkflowExecution enqueueSignalWithStart(
328+
String signalName, Object[] signalArgs, Object[] startArgs) {
329+
if (!options.isPresent()) {
330+
throw new IllegalStateException("Required parameter WorkflowOptions is missing");
331+
}
332+
return enqueueSignalWithStart(
333+
WorkflowOptions.merge(null, null, null, options.get()), signalName, signalArgs, startArgs);
334+
}
335+
336+
private WorkflowExecution enqueueSignalWithStart(
337+
WorkflowOptions options, String signalName, Object[] signalArgs, Object[] startArgs) {
338+
StartWorkflowExecutionParameters sp = getStartWorkflowExecutionParameters(options, startArgs);
339+
340+
byte[] signalInput = dataConverter.toData(signalArgs);
341+
SignalWithStartWorkflowExecutionParameters p =
342+
new SignalWithStartWorkflowExecutionParameters(sp, signalName, signalInput);
343+
try {
344+
execution.set(genericClient.enqueueSignalWithStartWorkflowExecution(p));
345+
} catch (Exception e) {
346+
throw new WorkflowServiceException(execution.get(), workflowType, e);
347+
}
348+
return execution.get();
349+
}
350+
326351
@Override
327352
public Optional<String> getWorkflowType() {
328353
return workflowType;

0 commit comments

Comments
 (0)