Skip to content

Commit cd76ad6

Browse files
authored
Do not auto-retry gRPC-message-size-too-large errors (#2604)
1 parent f9580aa commit cd76ad6

File tree

9 files changed

+597
-63
lines changed

9 files changed

+597
-63
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 140 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,18 @@
99
import com.uber.m3.tally.Scope;
1010
import com.uber.m3.tally.Stopwatch;
1111
import com.uber.m3.util.ImmutableMap;
12+
import io.grpc.StatusRuntimeException;
1213
import io.temporal.api.common.v1.WorkflowExecution;
14+
import io.temporal.api.enums.v1.QueryResultType;
1315
import io.temporal.api.enums.v1.TaskQueueKind;
1416
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
17+
import io.temporal.api.failure.v1.Failure;
1518
import io.temporal.api.workflowservice.v1.*;
19+
import io.temporal.failure.ApplicationFailure;
1620
import io.temporal.internal.logging.LoggerTag;
21+
import io.temporal.internal.retryer.GrpcMessageTooLargeException;
1722
import io.temporal.internal.retryer.GrpcRetryer;
23+
import io.temporal.payload.context.WorkflowSerializationContext;
1824
import io.temporal.serviceclient.MetricsTag;
1925
import io.temporal.serviceclient.RpcRetryOptions;
2026
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -394,73 +400,125 @@ public void handle(WorkflowTask task) throws Exception {
394400
PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
395401
nextWFTResponse = Optional.empty();
396402
WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
403+
WorkflowTaskFailedCause taskFailedCause = null;
397404
try {
398405
RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted();
399406
RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed();
400407
RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();
401408

402-
if (taskCompleted != null) {
403-
RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
404-
taskCompleted.toBuilder();
405-
try (EagerActivitySlotsReservation activitySlotsReservation =
406-
new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
407-
activitySlotsReservation.applyToRequest(requestBuilder);
408-
RespondWorkflowTaskCompletedResponse response =
409-
sendTaskCompleted(
410-
currentTask.getTaskToken(),
411-
requestBuilder,
412-
result.getRequestRetryOptions(),
413-
workflowTypeScope);
414-
// If we were processing a speculative WFT the server may instruct us that the task
415-
// was dropped by resting out event ID.
416-
long resetEventId = response.getResetHistoryEventId();
417-
if (resetEventId != 0) {
418-
result.getResetEventIdHandle().apply(resetEventId);
409+
if (queryCompleted != null) {
410+
try {
411+
sendDirectQueryCompletedResponse(
412+
currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
413+
} catch (StatusRuntimeException e) {
414+
GrpcMessageTooLargeException tooLargeException =
415+
GrpcMessageTooLargeException.tryWrap(e);
416+
if (tooLargeException == null) {
417+
throw e;
419418
}
420-
nextWFTResponse =
421-
response.hasWorkflowTask()
422-
? Optional.of(response.getWorkflowTask())
423-
: Optional.empty();
424-
// TODO we don't have to do this under the runId lock
425-
activitySlotsReservation.handleResponse(response);
419+
Failure failure =
420+
grpcMessageTooLargeFailure(
421+
workflowExecution.getWorkflowId(),
422+
tooLargeException,
423+
"Failed to send query response");
424+
RespondQueryTaskCompletedRequest.Builder queryFailedBuilder =
425+
RespondQueryTaskCompletedRequest.newBuilder()
426+
.setTaskToken(currentTask.getTaskToken())
427+
.setNamespace(namespace)
428+
.setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED)
429+
.setErrorMessage(failure.getMessage())
430+
.setFailure(failure);
431+
sendDirectQueryCompletedResponse(
432+
currentTask.getTaskToken(), queryFailedBuilder, workflowTypeScope);
433+
}
434+
} else {
435+
try {
436+
if (taskCompleted != null) {
437+
RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
438+
taskCompleted.toBuilder();
439+
try (EagerActivitySlotsReservation activitySlotsReservation =
440+
new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
441+
activitySlotsReservation.applyToRequest(requestBuilder);
442+
RespondWorkflowTaskCompletedResponse response =
443+
sendTaskCompleted(
444+
currentTask.getTaskToken(),
445+
requestBuilder,
446+
result.getRequestRetryOptions(),
447+
workflowTypeScope);
448+
// If we were processing a speculative WFT the server may instruct us that the
449+
// task was dropped by resting out event ID.
450+
long resetEventId = response.getResetHistoryEventId();
451+
if (resetEventId != 0) {
452+
result.getResetEventIdHandle().apply(resetEventId);
453+
}
454+
nextWFTResponse =
455+
response.hasWorkflowTask()
456+
? Optional.of(response.getWorkflowTask())
457+
: Optional.empty();
458+
// TODO we don't have to do this under the runId lock
459+
activitySlotsReservation.handleResponse(response);
460+
}
461+
} else if (taskFailed != null) {
462+
taskFailedCause = taskFailed.getCause();
463+
sendTaskFailed(
464+
currentTask.getTaskToken(),
465+
taskFailed.toBuilder(),
466+
result.getRequestRetryOptions(),
467+
workflowTypeScope);
468+
}
469+
} catch (GrpcMessageTooLargeException e) {
470+
// Only fail workflow task on the first attempt, subsequent failures of the same
471+
// workflow task should timeout.
472+
if (currentTask.getAttempt() > 1) {
473+
throw e;
474+
}
475+
476+
releaseReason = SlotReleaseReason.error(e);
477+
handleReportingFailure(
478+
e, currentTask, result, workflowExecution, workflowTypeScope);
479+
// setting/replacing failure cause for metrics purposes
480+
taskFailedCause =
481+
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE;
482+
483+
String messagePrefix =
484+
String.format(
485+
"Failed to send workflow task %s",
486+
taskFailed == null ? "completion" : "failure");
487+
RespondWorkflowTaskFailedRequest.Builder taskFailedBuilder =
488+
RespondWorkflowTaskFailedRequest.newBuilder()
489+
.setFailure(
490+
grpcMessageTooLargeFailure(
491+
workflowExecution.getWorkflowId(), e, messagePrefix))
492+
.setCause(
493+
WorkflowTaskFailedCause
494+
.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE);
495+
sendTaskFailed(
496+
currentTask.getTaskToken(),
497+
taskFailedBuilder,
498+
result.getRequestRetryOptions(),
499+
workflowTypeScope);
426500
}
427-
} else if (taskFailed != null) {
428-
sendTaskFailed(
429-
currentTask.getTaskToken(),
430-
taskFailed.toBuilder(),
431-
result.getRequestRetryOptions(),
432-
workflowTypeScope);
433-
} else if (queryCompleted != null) {
434-
sendDirectQueryCompletedResponse(
435-
currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
436501
}
437502
} catch (Exception e) {
438-
logExceptionDuringResultReporting(e, currentTask, result);
439503
releaseReason = SlotReleaseReason.error(e);
440-
// if we failed to report the workflow task completion back to the server,
441-
// our cached version of the workflow may be more advanced than the server is aware of.
442-
// We should discard this execution and perform a clean replay based on what server
443-
// knows next time.
444-
cache.invalidate(
445-
workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
504+
handleReportingFailure(e, currentTask, result, workflowExecution, workflowTypeScope);
446505
throw e;
447506
}
448507

449-
if (result.getTaskFailed() != null) {
450-
Scope workflowTaskFailureScope = workflowTypeScope;
451-
if (result
452-
.getTaskFailed()
453-
.getCause()
454-
.equals(
455-
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR)) {
456-
workflowTaskFailureScope =
457-
workflowTaskFailureScope.tagged(
458-
ImmutableMap.of(TASK_FAILURE_TYPE, "NonDeterminismError"));
459-
} else {
460-
workflowTaskFailureScope =
461-
workflowTaskFailureScope.tagged(
462-
ImmutableMap.of(TASK_FAILURE_TYPE, "WorkflowError"));
508+
if (taskFailedCause != null) {
509+
String taskFailureType;
510+
switch (taskFailedCause) {
511+
case WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR:
512+
taskFailureType = "NonDeterminismError";
513+
break;
514+
case WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE:
515+
taskFailureType = "GrpcMessageTooLarge";
516+
break;
517+
default:
518+
taskFailureType = "WorkflowError";
463519
}
520+
Scope workflowTaskFailureScope =
521+
workflowTypeScope.tagged(ImmutableMap.of(TASK_FAILURE_TYPE, taskFailureType));
464522
// we don't trigger the counter in case of the legacy query
465523
// (which never has taskFailed set)
466524
workflowTaskFailureScope
@@ -617,5 +675,34 @@ private void logExceptionDuringResultReporting(
617675
e);
618676
}
619677
}
678+
679+
private void handleReportingFailure(
680+
Exception e,
681+
PollWorkflowTaskQueueResponse currentTask,
682+
WorkflowTaskHandler.Result result,
683+
WorkflowExecution workflowExecution,
684+
Scope workflowTypeScope) {
685+
logExceptionDuringResultReporting(e, currentTask, result);
686+
// if we failed to report the workflow task completion back to the server,
687+
// our cached version of the workflow may be more advanced than the server is aware of.
688+
// We should discard this execution and perform a clean replay based on what server
689+
// knows next time.
690+
cache.invalidate(
691+
workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
692+
}
693+
694+
private Failure grpcMessageTooLargeFailure(
695+
String workflowId, GrpcMessageTooLargeException e, String messagePrefix) {
696+
ApplicationFailure applicationFailure =
697+
ApplicationFailure.newBuilder()
698+
.setMessage(messagePrefix + ": " + e.getMessage())
699+
.setType(GrpcMessageTooLargeException.class.getSimpleName())
700+
.build();
701+
applicationFailure.setStackTrace(new StackTraceElement[0]); // don't serialize stack trace
702+
return options
703+
.getDataConverter()
704+
.withContext(new WorkflowSerializationContext(namespace, workflowId))
705+
.exceptionToFailure(applicationFailure);
706+
}
620707
}
621708
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.temporal.testUtils;
2+
3+
import ch.qos.logback.classic.Level;
4+
import ch.qos.logback.classic.Logger;
5+
import java.util.ArrayList;
6+
import java.util.Arrays;
7+
import java.util.List;
8+
import java.util.stream.Collectors;
9+
import org.slf4j.LoggerFactory;
10+
11+
public class LoggerUtils {
12+
public static SilenceLoggers silenceLoggers(Class<?>... classes) {
13+
return new SilenceLoggers(classes);
14+
}
15+
16+
public static class SilenceLoggers implements AutoCloseable {
17+
private final List<Logger> loggers;
18+
List<Level> oldLogLevels;
19+
20+
public SilenceLoggers(Class<?>... classes) {
21+
loggers =
22+
Arrays.stream(classes)
23+
.map(LoggerFactory::getLogger)
24+
.filter(Logger.class::isInstance)
25+
.map(Logger.class::cast)
26+
.collect(Collectors.toList());
27+
oldLogLevels = new ArrayList<>();
28+
for (Logger logger : loggers) {
29+
oldLogLevels.add(logger.getLevel());
30+
logger.setLevel(Level.OFF);
31+
}
32+
}
33+
34+
@Override
35+
public void close() {
36+
for (int i = 0; i < loggers.size(); i++) {
37+
loggers.get(i).setLevel(oldLogLevels.get(i));
38+
}
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)