Skip to content

Commit dc4e15a

Browse files
authored
Merge branch 'master' into namespace_worker_start
2 parents 8e70da4 + ff939d7 commit dc4e15a

File tree

17 files changed

+804
-99
lines changed

17 files changed

+804
-99
lines changed

temporal-sdk/src/main/java/io/temporal/common/converter/DataConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import com.google.common.base.Defaults;
55
import com.google.common.base.Preconditions;
6+
import com.google.common.reflect.TypeToken;
67
import io.temporal.api.common.v1.Payload;
78
import io.temporal.api.common.v1.Payloads;
89
import io.temporal.api.failure.v1.Failure;
@@ -132,7 +133,7 @@ default Object[] fromPayloads(
132133
if (!content.isPresent()) {
133134
// Return defaults for all the parameters
134135
for (int i = 0; i < parameterTypes.length; i++) {
135-
result[i] = Defaults.defaultValue((Class<?>) genericParameterTypes[i]);
136+
result[i] = Defaults.defaultValue(TypeToken.of(genericParameterTypes[i]).getRawType());
136137
}
137138
return result;
138139
}
@@ -142,7 +143,7 @@ default Object[] fromPayloads(
142143
Class<?> pt = parameterTypes[i];
143144
Type gt = genericParameterTypes[i];
144145
if (i >= count) {
145-
result[i] = Defaults.defaultValue((Class<?>) gt);
146+
result[i] = Defaults.defaultValue(TypeToken.of(gt).getRawType());
146147
} else {
147148
result[i] = this.fromPayload(payloads.getPayloads(i), pt, gt);
148149
}

temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.google.common.base.Defaults;
5+
import com.google.common.base.Strings;
56
import io.nexusrpc.Header;
67
import io.nexusrpc.handler.HandlerException;
78
import io.nexusrpc.handler.ServiceImplInstance;
@@ -88,23 +89,6 @@ public static NexusWorkflowStarter createNexusBoundStub(
8889
HandlerException.ErrorType.BAD_REQUEST,
8990
new IllegalArgumentException("failed to generate workflow operation token", e));
9091
}
91-
// Add the Nexus operation ID to the headers if it is not already present to support fabricating
92-
// a NexusOperationStarted event if the completion is received before the response to a
93-
// StartOperation request.
94-
Map<String, String> headers =
95-
request.getCallbackHeaders().entrySet().stream()
96-
.collect(
97-
Collectors.toMap(
98-
(k) -> k.getKey().toLowerCase(),
99-
Map.Entry::getValue,
100-
(a, b) -> a,
101-
() -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER)));
102-
if (!headers.containsKey(Header.OPERATION_ID)) {
103-
headers.put(Header.OPERATION_ID.toLowerCase(), operationToken);
104-
}
105-
if (!headers.containsKey(Header.OPERATION_TOKEN)) {
106-
headers.put(Header.OPERATION_TOKEN.toLowerCase(), operationToken);
107-
}
10892
List<Link> links =
10993
request.getLinks() == null
11094
? null
@@ -127,21 +111,42 @@ public static NexusWorkflowStarter createNexusBoundStub(
127111
})
128112
.filter(Objects::nonNull)
129113
.collect(Collectors.toList());
130-
Callback.Builder cbBuilder =
131-
Callback.newBuilder()
132-
.setNexus(
133-
Callback.Nexus.newBuilder()
134-
.setUrl(request.getCallbackUrl())
135-
.putAllHeader(headers)
136-
.build());
137-
if (links != null) {
138-
cbBuilder.addAllLinks(links);
139-
}
140114
WorkflowOptions.Builder nexusWorkflowOptions =
141-
WorkflowOptions.newBuilder(options)
142-
.setRequestId(request.getRequestId())
143-
.setCompletionCallbacks(Collections.singletonList(cbBuilder.build()))
144-
.setLinks(links);
115+
WorkflowOptions.newBuilder(options).setRequestId(request.getRequestId()).setLinks(links);
116+
117+
// If a callback URL is provided, pass it as a completion callback.
118+
if (!Strings.isNullOrEmpty(request.getCallbackUrl())) {
119+
// Add the Nexus operation ID to the headers if it is not already present to support
120+
// fabricating
121+
// a NexusOperationStarted event if the completion is received before the response to a
122+
// StartOperation request.
123+
Map<String, String> headers =
124+
request.getCallbackHeaders().entrySet().stream()
125+
.collect(
126+
Collectors.toMap(
127+
(k) -> k.getKey().toLowerCase(),
128+
Map.Entry::getValue,
129+
(a, b) -> a,
130+
() -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER)));
131+
if (!headers.containsKey(Header.OPERATION_ID)) {
132+
headers.put(Header.OPERATION_ID.toLowerCase(), operationToken);
133+
}
134+
if (!headers.containsKey(Header.OPERATION_TOKEN)) {
135+
headers.put(Header.OPERATION_TOKEN.toLowerCase(), operationToken);
136+
}
137+
Callback.Builder cbBuilder =
138+
Callback.newBuilder()
139+
.setNexus(
140+
Callback.Nexus.newBuilder()
141+
.setUrl(request.getCallbackUrl())
142+
.putAllHeader(headers)
143+
.build());
144+
if (links != null) {
145+
cbBuilder.addAllLinks(links);
146+
}
147+
nexusWorkflowOptions.setCompletionCallbacks(Collections.singletonList(cbBuilder.build()));
148+
}
149+
145150
if (options.getTaskQueue() == null) {
146151
nexusWorkflowOptions.setTaskQueue(request.getTaskQueue());
147152
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;
55

66
import com.uber.m3.tally.Scope;
7+
import io.grpc.StatusRuntimeException;
78
import io.nexusrpc.Header;
89
import io.nexusrpc.OperationException;
910
import io.nexusrpc.handler.*;
@@ -12,6 +13,7 @@
1213
import io.temporal.api.nexus.v1.*;
1314
import io.temporal.client.WorkflowClient;
1415
import io.temporal.client.WorkflowException;
16+
import io.temporal.client.WorkflowNotFoundException;
1517
import io.temporal.common.converter.DataConverter;
1618
import io.temporal.common.interceptors.WorkerInterceptor;
1719
import io.temporal.failure.ApplicationFailure;
@@ -203,6 +205,9 @@ private CancelOperationResponse handleCancelledOperation(
203205
private void convertKnownFailures(Throwable e) {
204206
Throwable failure = CheckedExceptionWrapper.unwrap(e);
205207
if (failure instanceof WorkflowException) {
208+
if (failure instanceof WorkflowNotFoundException) {
209+
throw new HandlerException(HandlerException.ErrorType.NOT_FOUND, failure);
210+
}
206211
throw new HandlerException(HandlerException.ErrorType.BAD_REQUEST, failure);
207212
}
208213
if (failure instanceof ApplicationFailure) {
@@ -213,6 +218,10 @@ private void convertKnownFailures(Throwable e) {
213218
HandlerException.RetryBehavior.NON_RETRYABLE);
214219
}
215220
}
221+
if (failure instanceof StatusRuntimeException) {
222+
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) failure;
223+
throw convertStatusRuntimeExceptionToHandlerException(statusRuntimeException);
224+
}
216225
if (failure instanceof Error) {
217226
throw (Error) failure;
218227
}
@@ -221,6 +230,45 @@ private void convertKnownFailures(Throwable e) {
221230
: new RuntimeException(failure);
222231
}
223232

233+
private HandlerException convertStatusRuntimeExceptionToHandlerException(
234+
StatusRuntimeException sre) {
235+
switch (sre.getStatus().getCode()) {
236+
case INVALID_ARGUMENT:
237+
return new HandlerException(HandlerException.ErrorType.BAD_REQUEST, sre);
238+
case ALREADY_EXISTS:
239+
case FAILED_PRECONDITION:
240+
case OUT_OF_RANGE:
241+
return new HandlerException(
242+
HandlerException.ErrorType.INTERNAL, sre, HandlerException.RetryBehavior.NON_RETRYABLE);
243+
case ABORTED:
244+
case UNAVAILABLE:
245+
return new HandlerException(HandlerException.ErrorType.UNAVAILABLE, sre);
246+
case CANCELLED:
247+
case DATA_LOSS:
248+
case INTERNAL:
249+
case UNKNOWN:
250+
case UNAUTHENTICATED:
251+
case PERMISSION_DENIED:
252+
// Note that codes.Unauthenticated, codes.PermissionDenied have Nexus error types but we
253+
// convert to internal
254+
// because this is not a client auth error and happens when the handler fails to auth with
255+
// Temporal and should
256+
// be considered retryable.
257+
return new HandlerException(HandlerException.ErrorType.INTERNAL, sre);
258+
case NOT_FOUND:
259+
return new HandlerException(HandlerException.ErrorType.NOT_FOUND, sre);
260+
case RESOURCE_EXHAUSTED:
261+
return new HandlerException(HandlerException.ErrorType.RESOURCE_EXHAUSTED, sre);
262+
case UNIMPLEMENTED:
263+
return new HandlerException(HandlerException.ErrorType.NOT_IMPLEMENTED, sre);
264+
case DEADLINE_EXCEEDED:
265+
return new HandlerException(HandlerException.ErrorType.UPSTREAM_TIMEOUT, sre);
266+
default:
267+
// If the status code is not recognized, we treat it as an internal error
268+
return new HandlerException(HandlerException.ErrorType.INTERNAL, sre);
269+
}
270+
}
271+
224272
private OperationStartResult<HandlerResultContent> startOperation(
225273
OperationContext context, OperationStartDetails details, HandlerInputContent input)
226274
throws OperationException {

0 commit comments

Comments
 (0)