Skip to content

Commit ca3a27a

Browse files
Use correct operation token on OPERATION_TOKEN (#2589)
1 parent d310594 commit ca3a27a

9 files changed

+81
-30
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.temporal.common.interceptors.WorkflowClientInterceptor;
1818
import io.temporal.internal.WorkflowThreadMarker;
1919
import io.temporal.internal.client.*;
20+
import io.temporal.internal.client.NexusStartWorkflowResponse;
2021
import io.temporal.internal.client.external.GenericWorkflowClient;
2122
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
2223
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
@@ -695,12 +696,13 @@ public void deregisterWorkerFactory(WorkerFactory workerFactory) {
695696
}
696697

697698
@Override
698-
public WorkflowExecution startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow) {
699+
public NexusStartWorkflowResponse startNexus(
700+
NexusStartWorkflowRequest request, Functions.Proc workflow) {
699701
enforceNonWorkflowThread();
700702
WorkflowInvocationHandler.initAsyncInvocation(InvocationType.START_NEXUS, request);
701703
try {
702704
workflow.apply();
703-
return WorkflowInvocationHandler.getAsyncInvocationResult(WorkflowExecution.class);
705+
return WorkflowInvocationHandler.getAsyncInvocationResult(NexusStartWorkflowResponse.class);
704706
} finally {
705707
WorkflowInvocationHandler.closeAsyncInvocation();
706708
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.temporal.internal.client;
2+
3+
import io.temporal.api.common.v1.WorkflowExecution;
4+
5+
public final class NexusStartWorkflowResponse {
6+
private final WorkflowExecution workflowExecution;
7+
private final String operationToken;
8+
9+
public NexusStartWorkflowResponse(WorkflowExecution workflowExecution, String operationToken) {
10+
this.workflowExecution = workflowExecution;
11+
this.operationToken = operationToken;
12+
}
13+
14+
public String getOperationToken() {
15+
return operationToken;
16+
}
17+
18+
public WorkflowExecution getWorkflowExecution() {
19+
return workflowExecution;
20+
}
21+
}

temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.temporal.internal.client;
22

3-
import io.temporal.api.common.v1.WorkflowExecution;
43
import io.temporal.client.WorkflowClient;
54
import io.temporal.worker.WorkerFactory;
65
import io.temporal.workflow.Functions;
@@ -18,5 +17,5 @@ public interface WorkflowClientInternal {
1817

1918
void deregisterWorkerFactory(WorkerFactory workerFactory);
2019

21-
WorkflowExecution startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow);
20+
NexusStartWorkflowResponse startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow);
2221
}

temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.temporal.internal.common;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
34
import com.google.common.base.Defaults;
45
import io.nexusrpc.Header;
6+
import io.nexusrpc.handler.HandlerException;
57
import io.nexusrpc.handler.ServiceImplInstance;
68
import io.temporal.api.common.v1.Callback;
79
import io.temporal.api.common.v1.Link;
@@ -14,6 +16,9 @@
1416
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
1517
import io.temporal.common.metadata.WorkflowMethodType;
1618
import io.temporal.internal.client.NexusStartWorkflowRequest;
19+
import io.temporal.internal.nexus.CurrentNexusOperationContext;
20+
import io.temporal.internal.nexus.InternalNexusOperationContext;
21+
import io.temporal.internal.nexus.OperationTokenUtil;
1722
import java.util.*;
1823
import java.util.stream.Collectors;
1924
import org.slf4j.Logger;
@@ -60,7 +65,7 @@ public static Object getValueOrDefault(Object value, Class<?> valueClass) {
6065
* URL and headers set
6166
*/
6267
@SuppressWarnings("deprecation") // Check the OPERATION_ID header for backwards compatibility
63-
public static WorkflowStub createNexusBoundStub(
68+
public static NexusWorkflowStarter createNexusBoundStub(
6469
WorkflowStub stub, NexusStartWorkflowRequest request) {
6570
if (!stub.getOptions().isPresent()) {
6671
throw new IllegalArgumentException("Options are expected to be set on the stub");
@@ -70,6 +75,19 @@ public static WorkflowStub createNexusBoundStub(
7075
throw new IllegalArgumentException(
7176
"WorkflowId is expected to be set on WorkflowOptions when used with Nexus");
7277
}
78+
InternalNexusOperationContext nexusContext = CurrentNexusOperationContext.get();
79+
// Generate the operation token for the new workflow.
80+
String operationToken;
81+
try {
82+
operationToken =
83+
OperationTokenUtil.generateWorkflowRunOperationToken(
84+
options.getWorkflowId(), nexusContext.getNamespace());
85+
} catch (JsonProcessingException e) {
86+
// Not expected as the link is constructed by the SDK.
87+
throw new HandlerException(
88+
HandlerException.ErrorType.BAD_REQUEST,
89+
new IllegalArgumentException("failed to generate workflow operation token", e));
90+
}
7391
// Add the Nexus operation ID to the headers if it is not already present to support fabricating
7492
// a NexusOperationStarted event if the completion is received before the response to a
7593
// StartOperation request.
@@ -82,10 +100,10 @@ public static WorkflowStub createNexusBoundStub(
82100
(a, b) -> a,
83101
() -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER)));
84102
if (!headers.containsKey(Header.OPERATION_ID)) {
85-
headers.put(Header.OPERATION_ID.toLowerCase(), options.getWorkflowId());
103+
headers.put(Header.OPERATION_ID.toLowerCase(), operationToken);
86104
}
87105
if (!headers.containsKey(Header.OPERATION_TOKEN)) {
88-
headers.put(Header.OPERATION_TOKEN.toLowerCase(), options.getWorkflowId());
106+
headers.put(Header.OPERATION_TOKEN.toLowerCase(), operationToken);
89107
}
90108
List<Link> links =
91109
request.getLinks() == null
@@ -134,7 +152,7 @@ public static WorkflowStub createNexusBoundStub(
134152
.setAttachCompletionCallbacks(true)
135153
.build());
136154

137-
return stub.newInstance(nexusWorkflowOptions.build());
155+
return new NexusWorkflowStarter(stub.newInstance(nexusWorkflowOptions.build()), operationToken);
138156
}
139157

140158
/** Check the method name for reserved prefixes or names. */
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.temporal.internal.common;
2+
3+
import io.temporal.api.common.v1.WorkflowExecution;
4+
import io.temporal.client.WorkflowStub;
5+
import io.temporal.internal.client.NexusStartWorkflowResponse;
6+
7+
public class NexusWorkflowStarter {
8+
private final WorkflowStub workflowStub;
9+
private final String operationToken;
10+
11+
public NexusWorkflowStarter(WorkflowStub workflowStub, String operationToken) {
12+
this.workflowStub = workflowStub;
13+
this.operationToken = operationToken;
14+
}
15+
16+
public NexusStartWorkflowResponse start(Object... args) {
17+
WorkflowExecution workflowExecution = workflowStub.start(args);
18+
return new NexusStartWorkflowResponse(workflowExecution, operationToken);
19+
}
20+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.temporal.nexus;
22

3-
import io.temporal.api.common.v1.WorkflowExecution;
43
import io.temporal.internal.client.NexusStartWorkflowRequest;
4+
import io.temporal.internal.client.NexusStartWorkflowResponse;
55

66
interface WorkflowHandleInvoker {
7-
WorkflowExecution invoke(NexusStartWorkflowRequest request);
7+
NexusStartWorkflowResponse invoke(NexusStartWorkflowRequest request);
88
}

temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodMethodInvoker.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
package io.temporal.nexus;
22

3-
import io.temporal.api.common.v1.WorkflowExecution;
43
import io.temporal.internal.client.NexusStartWorkflowRequest;
4+
import io.temporal.internal.client.NexusStartWorkflowResponse;
55
import io.temporal.internal.client.WorkflowClientInternal;
66
import io.temporal.internal.nexus.CurrentNexusOperationContext;
77
import io.temporal.internal.nexus.InternalNexusOperationContext;
88
import io.temporal.workflow.Functions;
99

1010
class WorkflowMethodMethodInvoker implements WorkflowHandleInvoker {
11-
private Functions.Proc workflow;
11+
private final Functions.Proc workflow;
1212

1313
public WorkflowMethodMethodInvoker(Functions.Proc workflow) {
1414
this.workflow = workflow;
1515
}
1616

1717
@Override
18-
public WorkflowExecution invoke(NexusStartWorkflowRequest request) {
18+
public NexusStartWorkflowResponse invoke(NexusStartWorkflowRequest request) {
1919
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();
2020
return ((WorkflowClientInternal) nexusCtx.getWorkflowClient().getInternal())
2121
.startNexus(request, workflow);

temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
44
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;
55

6-
import com.fasterxml.jackson.core.JsonProcessingException;
76
import io.nexusrpc.OperationInfo;
87
import io.nexusrpc.handler.*;
98
import io.nexusrpc.handler.OperationHandler;
@@ -12,6 +11,7 @@
1211
import io.temporal.api.enums.v1.EventType;
1312
import io.temporal.client.WorkflowClient;
1413
import io.temporal.internal.client.NexusStartWorkflowRequest;
14+
import io.temporal.internal.client.NexusStartWorkflowResponse;
1515
import io.temporal.internal.nexus.CurrentNexusOperationContext;
1616
import io.temporal.internal.nexus.InternalNexusOperationContext;
1717
import io.temporal.internal.nexus.OperationTokenUtil;
@@ -39,7 +39,9 @@ public OperationStartResult<R> start(
3939
nexusCtx.getTaskQueue(),
4040
operationStartDetails.getLinks());
4141

42-
WorkflowExecution workflowExec = handle.getInvoker().invoke(nexusRequest);
42+
NexusStartWorkflowResponse nexusStartWorkflowResponse =
43+
handle.getInvoker().invoke(nexusRequest);
44+
WorkflowExecution workflowExec = nexusStartWorkflowResponse.getWorkflowExecution();
4345

4446
// If the start workflow response returned a link use it, otherwise
4547
// create the link information about the new workflow and return to the caller.
@@ -59,20 +61,9 @@ public OperationStartResult<R> start(
5961
.build();
6062
}
6163
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
62-
// Generate the operation token for the new workflow.
63-
String operationToken;
64-
try {
65-
operationToken =
66-
OperationTokenUtil.generateWorkflowRunOperationToken(
67-
workflowExec.getWorkflowId(), nexusCtx.getNamespace());
68-
} catch (JsonProcessingException e) {
69-
// Not expected as the link is constructed by the SDK.
70-
throw new HandlerException(
71-
HandlerException.ErrorType.BAD_REQUEST,
72-
new IllegalArgumentException("failed to generate workflow operation token", e));
73-
}
7464
// Attach the link to the operation result.
75-
OperationStartResult.Builder<R> result = OperationStartResult.newAsyncBuilder(operationToken);
65+
OperationStartResult.Builder<R> result =
66+
OperationStartResult.newAsyncBuilder(nexusStartWorkflowResponse.getOperationToken());
7667
if (nexusLink != null) {
7768
try {
7869
ctx.addLinks(nexusProtoLinkToLink(nexusLink));

temporal-sdk/src/main/java/io/temporal/nexus/WorkflowStubHandleInvoker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import static io.temporal.internal.common.InternalUtils.createNexusBoundStub;
44

5-
import io.temporal.api.common.v1.WorkflowExecution;
65
import io.temporal.client.WorkflowStub;
76
import io.temporal.internal.client.NexusStartWorkflowRequest;
7+
import io.temporal.internal.client.NexusStartWorkflowResponse;
88

99
class WorkflowStubHandleInvoker implements WorkflowHandleInvoker {
1010
final Object[] args;
@@ -16,7 +16,7 @@ class WorkflowStubHandleInvoker implements WorkflowHandleInvoker {
1616
}
1717

1818
@Override
19-
public WorkflowExecution invoke(NexusStartWorkflowRequest request) {
19+
public NexusStartWorkflowResponse invoke(NexusStartWorkflowRequest request) {
2020
return createNexusBoundStub(stub, request).start(args);
2121
}
2222
}

0 commit comments

Comments
 (0)