Skip to content

Commit ff939d7

Browse files
Nexus - Only pass a completion callback if a completion URL is provided (#2615)
1 parent b5057e8 commit ff939d7

File tree

2 files changed

+37
-32
lines changed

2 files changed

+37
-32
lines changed

temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.google.common.base.Defaults;
5+
import com.google.common.base.Strings;
56
import io.nexusrpc.Header;
67
import io.nexusrpc.handler.HandlerException;
78
import io.nexusrpc.handler.ServiceImplInstance;
@@ -88,23 +89,6 @@ public static NexusWorkflowStarter createNexusBoundStub(
8889
HandlerException.ErrorType.BAD_REQUEST,
8990
new IllegalArgumentException("failed to generate workflow operation token", e));
9091
}
91-
// Add the Nexus operation ID to the headers if it is not already present to support fabricating
92-
// a NexusOperationStarted event if the completion is received before the response to a
93-
// StartOperation request.
94-
Map<String, String> headers =
95-
request.getCallbackHeaders().entrySet().stream()
96-
.collect(
97-
Collectors.toMap(
98-
(k) -> k.getKey().toLowerCase(),
99-
Map.Entry::getValue,
100-
(a, b) -> a,
101-
() -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER)));
102-
if (!headers.containsKey(Header.OPERATION_ID)) {
103-
headers.put(Header.OPERATION_ID.toLowerCase(), operationToken);
104-
}
105-
if (!headers.containsKey(Header.OPERATION_TOKEN)) {
106-
headers.put(Header.OPERATION_TOKEN.toLowerCase(), operationToken);
107-
}
10892
List<Link> links =
10993
request.getLinks() == null
11094
? null
@@ -127,21 +111,42 @@ public static NexusWorkflowStarter createNexusBoundStub(
127111
})
128112
.filter(Objects::nonNull)
129113
.collect(Collectors.toList());
130-
Callback.Builder cbBuilder =
131-
Callback.newBuilder()
132-
.setNexus(
133-
Callback.Nexus.newBuilder()
134-
.setUrl(request.getCallbackUrl())
135-
.putAllHeader(headers)
136-
.build());
137-
if (links != null) {
138-
cbBuilder.addAllLinks(links);
139-
}
140114
WorkflowOptions.Builder nexusWorkflowOptions =
141-
WorkflowOptions.newBuilder(options)
142-
.setRequestId(request.getRequestId())
143-
.setCompletionCallbacks(Collections.singletonList(cbBuilder.build()))
144-
.setLinks(links);
115+
WorkflowOptions.newBuilder(options).setRequestId(request.getRequestId()).setLinks(links);
116+
117+
// If a callback URL is provided, pass it as a completion callback.
118+
if (!Strings.isNullOrEmpty(request.getCallbackUrl())) {
119+
// Add the Nexus operation ID to the headers if it is not already present to support
120+
// fabricating
121+
// a NexusOperationStarted event if the completion is received before the response to a
122+
// StartOperation request.
123+
Map<String, String> headers =
124+
request.getCallbackHeaders().entrySet().stream()
125+
.collect(
126+
Collectors.toMap(
127+
(k) -> k.getKey().toLowerCase(),
128+
Map.Entry::getValue,
129+
(a, b) -> a,
130+
() -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER)));
131+
if (!headers.containsKey(Header.OPERATION_ID)) {
132+
headers.put(Header.OPERATION_ID.toLowerCase(), operationToken);
133+
}
134+
if (!headers.containsKey(Header.OPERATION_TOKEN)) {
135+
headers.put(Header.OPERATION_TOKEN.toLowerCase(), operationToken);
136+
}
137+
Callback.Builder cbBuilder =
138+
Callback.newBuilder()
139+
.setNexus(
140+
Callback.Nexus.newBuilder()
141+
.setUrl(request.getCallbackUrl())
142+
.putAllHeader(headers)
143+
.build());
144+
if (links != null) {
145+
cbBuilder.addAllLinks(links);
146+
}
147+
nexusWorkflowOptions.setCompletionCallbacks(Collections.singletonList(cbBuilder.build()));
148+
}
149+
145150
if (options.getTaskQueue() == null) {
146151
nexusWorkflowOptions.setTaskQueue(request.getTaskQueue());
147152
}

temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public OperationStartResult<R> start(
2929
OperationContext ctx, OperationStartDetails operationStartDetails, T input) {
3030
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();
3131

32-
WorkflowHandle handle = handleFactory.apply(ctx, operationStartDetails, input);
32+
WorkflowHandle<R> handle = handleFactory.apply(ctx, operationStartDetails, input);
3333

3434
NexusStartWorkflowRequest nexusRequest =
3535
new NexusStartWorkflowRequest(

0 commit comments

Comments
 (0)