Skip to content

Commit f92b53c

Browse files
💥 Nexus error rehydration (#2365)
Change how unknown errors are handled
1 parent 436d080 commit f92b53c

File tree

37 files changed

+916
-694
lines changed

37 files changed

+916
-694
lines changed

‎build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ext {
3131
// Platforms
3232
grpcVersion = '1.58.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
3333
jacksonVersion = '2.14.2' // [2.9.0,)
34-
nexusVersion = '0.3.0-alpha'
34+
nexusVersion = '0.4.0-alpha'
3535
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
3636
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)
3737

‎temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
package io.temporal.opentracing.internal;
2222

23-
import io.nexusrpc.OperationUnsuccessfulException;
23+
import io.nexusrpc.OperationException;
2424
import io.opentracing.Scope;
2525
import io.opentracing.Span;
2626
import io.opentracing.SpanContext;
@@ -49,8 +49,7 @@ public OpenTracingNexusOperationInboundCallsInterceptor(
4949
}
5050

5151
@Override
52-
public StartOperationOutput startOperation(StartOperationInput input)
53-
throws OperationUnsuccessfulException {
52+
public StartOperationOutput startOperation(StartOperationInput input) throws OperationException {
5453
SpanContext rootSpanContext =
5554
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);
5655

‎temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.temporal.api.failure.v1.Failure;
3030
import io.temporal.api.failure.v1.ResetWorkflowFailureInfo;
3131
import io.temporal.api.failure.v1.TimeoutFailureInfo;
32-
import io.temporal.failure.TemporalFailure;
3332
import io.temporal.payload.codec.ChainCodec;
3433
import io.temporal.payload.codec.PayloadCodec;
3534
import io.temporal.payload.context.SerializationContext;
@@ -199,7 +198,7 @@ public Failure exceptionToFailure(@Nonnull Throwable throwable) {
199198

200199
@Override
201200
@Nonnull
202-
public TemporalFailure failureToException(@Nonnull Failure failure) {
201+
public RuntimeException failureToException(@Nonnull Failure failure) {
203202
Preconditions.checkNotNull(failure, "failure");
204203
return ConverterUtils.withContext(dataConverter, serializationContext)
205204
.failureToException(this.decodeFailure(failure.toBuilder()).build());

‎temporal-sdk/src/main/java/io/temporal/common/converter/DataConverter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.temporal.api.failure.v1.Failure;
2929
import io.temporal.common.Experimental;
3030
import io.temporal.failure.DefaultFailureConverter;
31-
import io.temporal.failure.TemporalFailure;
3231
import io.temporal.payload.codec.PayloadCodec;
3332
import io.temporal.payload.context.SerializationContext;
3433
import java.lang.reflect.Type;
@@ -176,7 +175,7 @@ default Object[] fromPayloads(
176175
* @throws NullPointerException if failure is null
177176
*/
178177
@Nonnull
179-
default TemporalFailure failureToException(@Nonnull Failure failure) {
178+
default RuntimeException failureToException(@Nonnull Failure failure) {
180179
Preconditions.checkNotNull(failure, "failure");
181180
return new DefaultFailureConverter().failureToException(failure, this);
182181
}

‎temporal-sdk/src/main/java/io/temporal/common/converter/FailureConverter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import io.temporal.api.failure.v1.Failure;
2424
import io.temporal.failure.DefaultFailureConverter;
25-
import io.temporal.failure.TemporalFailure;
2625
import io.temporal.payload.context.SerializationContext;
2726
import javax.annotation.Nonnull;
2827

@@ -49,7 +48,7 @@ public interface FailureConverter {
4948
* @throws NullPointerException if either failure or dataConverter is null
5049
*/
5150
@Nonnull
52-
TemporalFailure failureToException(
51+
RuntimeException failureToException(
5352
@Nonnull Failure failure, @Nonnull DataConverter dataConverter);
5453

5554
/**

‎temporal-sdk/src/main/java/io/temporal/common/converter/PayloadAndFailureDataConverter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.temporal.api.common.v1.Payloads;
2929
import io.temporal.api.failure.v1.Failure;
3030
import io.temporal.failure.DefaultFailureConverter;
31-
import io.temporal.failure.TemporalFailure;
3231
import io.temporal.payload.context.SerializationContext;
3332
import java.lang.reflect.Type;
3433
import java.util.*;
@@ -135,7 +134,7 @@ public <T> T fromPayloads(
135134

136135
@Override
137136
@Nonnull
138-
public TemporalFailure failureToException(@Nonnull Failure failure) {
137+
public RuntimeException failureToException(@Nonnull Failure failure) {
139138
Preconditions.checkNotNull(failure, "failure");
140139
return (serializationContext != null
141140
? failureConverter.withContext(serializationContext)

‎temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
package io.temporal.common.interceptors;
2222

23-
import io.nexusrpc.OperationUnsuccessfulException;
23+
import io.nexusrpc.OperationException;
2424
import io.nexusrpc.handler.*;
2525
import io.temporal.common.Experimental;
2626

@@ -103,10 +103,9 @@ final class CancelOperationOutput {}
103103
*
104104
* @param input input to the operation start.
105105
* @return result of the operation start.
106-
* @throws OperationUnsuccessfulException if the operation start failed.
106+
* @throws io.nexusrpc.OperationException if the operation start failed.
107107
*/
108-
StartOperationOutput startOperation(StartOperationInput input)
109-
throws OperationUnsuccessfulException;
108+
StartOperationOutput startOperation(StartOperationInput input) throws OperationException;
110109

111110
/**
112111
* Intercepts a call to cancel a Nexus operation.

‎temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
package io.temporal.common.interceptors;
2222

23-
import io.nexusrpc.OperationUnsuccessfulException;
23+
import io.nexusrpc.OperationException;
2424
import io.temporal.common.Experimental;
2525

2626
/** Convenience base class for {@link NexusOperationInboundCallsInterceptor} implementations. */
@@ -39,8 +39,7 @@ public void init(NexusOperationOutboundCallsInterceptor outboundCalls) {
3939
}
4040

4141
@Override
42-
public StartOperationOutput startOperation(StartOperationInput input)
43-
throws OperationUnsuccessfulException {
42+
public StartOperationOutput startOperation(StartOperationInput input) throws OperationException {
4443
return next.startOperation(input);
4544
}
4645

‎temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import com.google.common.base.Preconditions;
2424
import com.google.common.base.Strings;
2525
import com.google.common.collect.ImmutableSet;
26+
import io.nexusrpc.handler.HandlerException;
2627
import io.temporal.api.common.v1.ActivityType;
2728
import io.temporal.api.common.v1.Payloads;
2829
import io.temporal.api.common.v1.WorkflowType;
30+
import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior;
2931
import io.temporal.api.failure.v1.*;
3032
import io.temporal.client.ActivityCanceledException;
3133
import io.temporal.common.converter.DataConverter;
@@ -72,21 +74,23 @@ public final class DefaultFailureConverter implements FailureConverter {
7274

7375
@Override
7476
@Nonnull
75-
public TemporalFailure failureToException(
77+
public RuntimeException failureToException(
7678
@Nonnull Failure failure, @Nonnull DataConverter dataConverter) {
7779
Preconditions.checkNotNull(failure, "failure");
7880
Preconditions.checkNotNull(dataConverter, "dataConverter");
79-
TemporalFailure result = failureToExceptionImpl(failure, dataConverter);
80-
result.setFailure(failure);
81+
RuntimeException result = failureToExceptionImpl(failure, dataConverter);
82+
if (result instanceof TemporalFailure) {
83+
((TemporalFailure) result).setFailure(failure);
84+
}
8185
if (failure.getSource().equals(JAVA_SDK) && !failure.getStackTrace().isEmpty()) {
8286
StackTraceElement[] stackTrace = parseStackTrace(failure.getStackTrace());
8387
result.setStackTrace(stackTrace);
8488
}
8589
return result;
8690
}
8791

88-
private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter dataConverter) {
89-
TemporalFailure cause =
92+
private RuntimeException failureToExceptionImpl(Failure failure, DataConverter dataConverter) {
93+
Exception cause =
9094
failure.hasCause() ? failureToException(failure.getCause(), dataConverter) : null;
9195
switch (failure.getFailureInfoCase()) {
9296
case APPLICATION_FAILURE_INFO:
@@ -184,9 +188,23 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da
184188
info.getEndpoint(),
185189
info.getService(),
186190
info.getOperation(),
187-
info.getOperationId(),
191+
info.getOperationToken().isEmpty() ? info.getOperationId() : info.getOperationToken(),
188192
cause);
189193
}
194+
case NEXUS_HANDLER_FAILURE_INFO:
195+
{
196+
NexusHandlerFailureInfo info = failure.getNexusHandlerFailureInfo();
197+
HandlerException.RetryBehavior retryBehavior = HandlerException.RetryBehavior.UNSPECIFIED;
198+
switch (info.getRetryBehavior()) {
199+
case NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE:
200+
retryBehavior = HandlerException.RetryBehavior.RETRYABLE;
201+
break;
202+
case NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE:
203+
retryBehavior = HandlerException.RetryBehavior.NON_RETRYABLE;
204+
break;
205+
}
206+
return new HandlerException(info.getType(), cause, retryBehavior);
207+
}
190208
case FAILUREINFO_NOT_SET:
191209
default:
192210
// All unknown types are considered to be retryable ApplicationError.
@@ -302,14 +320,34 @@ private Failure exceptionToFailure(Throwable throwable) {
302320
failure.setCanceledFailureInfo(info);
303321
} else if (throwable instanceof NexusOperationFailure) {
304322
NexusOperationFailure no = (NexusOperationFailure) throwable;
305-
NexusOperationFailureInfo.Builder info =
323+
NexusOperationFailureInfo.Builder op =
306324
NexusOperationFailureInfo.newBuilder()
307325
.setScheduledEventId(no.getScheduledEventId())
308326
.setEndpoint(no.getEndpoint())
309327
.setService(no.getService())
310328
.setOperation(no.getOperation())
311-
.setOperationId(no.getOperationId());
312-
failure.setNexusOperationExecutionFailureInfo(info);
329+
.setOperationId(no.getOperationToken())
330+
.setOperationToken(no.getOperationToken());
331+
failure.setNexusOperationExecutionFailureInfo(op);
332+
} else if (throwable instanceof HandlerException) {
333+
HandlerException he = (HandlerException) throwable;
334+
NexusHandlerErrorRetryBehavior retryBehavior =
335+
NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
336+
switch (he.getRetryBehavior()) {
337+
case RETRYABLE:
338+
retryBehavior =
339+
NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE;
340+
break;
341+
case NON_RETRYABLE:
342+
retryBehavior =
343+
NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE;
344+
break;
345+
}
346+
NexusHandlerFailureInfo.Builder info =
347+
NexusHandlerFailureInfo.newBuilder()
348+
.setType(he.getRawErrorType())
349+
.setRetryBehavior(retryBehavior);
350+
failure.setNexusHandlerFailureInfo(info);
313351
} else {
314352
ApplicationFailureInfo.Builder info =
315353
ApplicationFailureInfo.newBuilder()

‎temporal-sdk/src/main/java/io/temporal/failure/NexusOperationFailure.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,25 @@ public final class NexusOperationFailure extends TemporalFailure {
3535
private final String endpoint;
3636
private final String service;
3737
private final String operation;
38-
private final String operationId;
38+
private final String operationToken;
3939

4040
public NexusOperationFailure(
4141
String message,
4242
long scheduledEventId,
4343
String endpoint,
4444
String service,
4545
String operation,
46-
String operationId,
46+
String operationToken,
4747
Throwable cause) {
4848
super(
49-
getMessage(message, scheduledEventId, endpoint, service, operation, operationId),
49+
getMessage(message, scheduledEventId, endpoint, service, operation, operationToken),
5050
message,
5151
cause);
5252
this.scheduledEventId = scheduledEventId;
5353
this.endpoint = endpoint;
5454
this.service = service;
5555
this.operation = operation;
56-
this.operationId = operationId;
56+
this.operationToken = operationToken;
5757
}
5858

5959
public static String getMessage(
@@ -62,7 +62,7 @@ public static String getMessage(
6262
String endpoint,
6363
String service,
6464
String operation,
65-
String operationId) {
65+
String operationToken) {
6666
return "Nexus Operation with operation='"
6767
+ operation
6868
+ "service='"
@@ -74,7 +74,7 @@ public static String getMessage(
7474
+ "'. "
7575
+ "scheduledEventId="
7676
+ scheduledEventId
77-
+ (operationId == null ? "" : ", operationId=" + operationId);
77+
+ (operationToken == null ? "" : ", operationToken=" + operationToken);
7878
}
7979

8080
public long getScheduledEventId() {
@@ -93,7 +93,7 @@ public String getOperation() {
9393
return operation;
9494
}
9595

96-
public String getOperationId() {
97-
return operationId;
96+
public String getOperationToken() {
97+
return operationToken;
9898
}
9999
}

0 commit comments

Comments
 (0)