Skip to content

Commit 402392c

Browse files
Use link from start workflow request for Nexus operations (#2547)
1 parent 35386da commit 402392c

File tree

5 files changed

+58
-9
lines changed

5 files changed

+58
-9
lines changed

temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
2424
import io.temporal.internal.client.external.GenericWorkflowClient;
2525
import io.temporal.internal.common.HeaderUtils;
26+
import io.temporal.internal.nexus.CurrentNexusOperationContext;
2627
import io.temporal.payload.context.WorkflowSerializationContext;
2728
import io.temporal.serviceclient.StatusUtils;
2829
import io.temporal.worker.WorkflowTaskDispatchHandle;
@@ -95,6 +96,9 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
9596
e);
9697
}
9798
}
99+
if (CurrentNexusOperationContext.isNexusContext()) {
100+
CurrentNexusOperationContext.get().setStartWorkflowResponseLink(response.getLink());
101+
}
98102
return new WorkflowStartOutput(execution);
99103
}
100104
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/CurrentNexusOperationContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
public final class CurrentNexusOperationContext {
88
private static final ThreadLocal<InternalNexusOperationContext> CURRENT = new ThreadLocal<>();
99

10+
public static boolean isNexusContext() {
11+
return CURRENT.get() != null;
12+
}
13+
1014
public static InternalNexusOperationContext get() {
1115
InternalNexusOperationContext result = CURRENT.get();
1216
if (result == null) {

temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.temporal.internal.nexus;
22

33
import com.uber.m3.tally.Scope;
4+
import io.temporal.api.common.v1.Link;
45
import io.temporal.client.WorkflowClient;
56
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
67
import io.temporal.nexus.NexusOperationContext;
@@ -11,6 +12,7 @@ public class InternalNexusOperationContext {
1112
private final Scope metricScope;
1213
private final WorkflowClient client;
1314
NexusOperationOutboundCallsInterceptor outboundCalls;
15+
Link startWorkflowResponseLink;
1416

1517
public InternalNexusOperationContext(
1618
String namespace, String taskQueue, Scope metricScope, WorkflowClient client) {
@@ -47,6 +49,14 @@ public NexusOperationContext getUserFacingContext() {
4749
return new NexusOperationContextImpl();
4850
}
4951

52+
public void setStartWorkflowResponseLink(Link link) {
53+
this.startWorkflowResponseLink = link;
54+
}
55+
56+
public Link getStartWorkflowResponseLink() {
57+
return startWorkflowResponseLink;
58+
}
59+
5060
private class NexusOperationContextImpl implements NexusOperationContext {
5161
@Override
5262
public Scope getMetricsScope() {

temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,23 @@ public OperationStartResult<R> start(
4141

4242
WorkflowExecution workflowExec = handle.getInvoker().invoke(nexusRequest);
4343

44-
// Create the link information about the new workflow and return to the caller.
44+
// If the start workflow response returned a link use it, otherwise
45+
// create the link information about the new workflow and return to the caller.
4546
Link.WorkflowEvent workflowEventLink =
46-
Link.WorkflowEvent.newBuilder()
47-
.setNamespace(nexusCtx.getNamespace())
48-
.setWorkflowId(workflowExec.getWorkflowId())
49-
.setRunId(workflowExec.getRunId())
50-
.setEventRef(
51-
Link.WorkflowEvent.EventReference.newBuilder()
52-
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
53-
.build();
47+
nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent()
48+
? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent()
49+
: null;
50+
if (workflowEventLink == null) {
51+
workflowEventLink =
52+
Link.WorkflowEvent.newBuilder()
53+
.setNamespace(nexusCtx.getNamespace())
54+
.setWorkflowId(workflowExec.getWorkflowId())
55+
.setRunId(workflowExec.getRunId())
56+
.setEventRef(
57+
Link.WorkflowEvent.EventReference.newBuilder()
58+
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
59+
.build();
60+
}
5461
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
5562
// Generate the operation token for the new workflow.
5663
String operationToken;

temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleUseExistingOnConflictTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
import io.nexusrpc.handler.OperationHandler;
44
import io.nexusrpc.handler.OperationImpl;
55
import io.nexusrpc.handler.ServiceImpl;
6+
import io.temporal.api.common.v1.Link;
7+
import io.temporal.api.enums.v1.EventType;
68
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
79
import io.temporal.client.WorkflowOptions;
10+
import io.temporal.client.WorkflowStub;
811
import io.temporal.nexus.Nexus;
912
import io.temporal.nexus.WorkflowRunOperation;
1013
import io.temporal.testing.internal.SDKTestWorkflowRule;
@@ -14,6 +17,7 @@
1417
import java.util.ArrayList;
1518
import java.util.List;
1619
import java.util.UUID;
20+
import java.util.concurrent.atomic.AtomicInteger;
1721
import org.junit.*;
1822

1923
public class WorkflowHandleUseExistingOnConflictTest {
@@ -31,6 +35,26 @@ public void testOnConflictUseExisting() {
3135
String workflowId = UUID.randomUUID().toString();
3236
String result = workflowStub.execute(workflowId);
3337
Assert.assertEquals("Hello from operation workflow " + workflowId, result);
38+
39+
AtomicInteger eventRefLinkCount = new AtomicInteger();
40+
AtomicInteger requestIdLinkCount = new AtomicInteger();
41+
testWorkflowRule
42+
.getHistoryEvents(
43+
WorkflowStub.fromTyped(workflowStub).getExecution().getWorkflowId(),
44+
EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED)
45+
.forEach(
46+
event -> {
47+
List<Link> links = event.getLinksList();
48+
Assert.assertEquals(1, links.size());
49+
Link link = links.get(0);
50+
if (link.getWorkflowEvent().hasEventRef()) {
51+
eventRefLinkCount.getAndIncrement();
52+
} else if (link.getWorkflowEvent().hasRequestIdRef()) {
53+
requestIdLinkCount.getAndIncrement();
54+
}
55+
});
56+
Assert.assertEquals(1, eventRefLinkCount.get());
57+
Assert.assertEquals(4, requestIdLinkCount.get());
3458
}
3559

3660
public static class TestNexus implements TestWorkflows.TestWorkflow1 {

0 commit comments

Comments
 (0)