Skip to content

Commit b205f20

Browse files
authored
Only call RespondDecisionTaskFailure on the first attempt to avoid spinning (#115)
1 parent d75bba3 commit b205f20

File tree

7 files changed

+54
-8
lines changed

7 files changed

+54
-8
lines changed

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,19 @@ public ReplayDecisionTaskHandler(String domain, ReplayWorkflowFactory asyncWorkf
4848
}
4949

5050
@Override
51-
public DecisionTaskHandler.Result handleDecisionTask(DecisionTaskWithHistoryIterator decisionTaskIterator) {
51+
public DecisionTaskHandler.Result handleDecisionTask(DecisionTaskWithHistoryIterator decisionTaskIterator) throws Exception {
5252
try {
5353
return handleDecisionTaskImpl(decisionTaskIterator);
5454
} catch (Throwable e) {
5555
PollForDecisionTaskResponse decisionTask = decisionTaskIterator.getDecisionTask();
56+
// Only fail decision on first attempt, subsequent failure on the same decision task will timeout.
57+
// This is to avoid spin on the failed decision task.
58+
if (decisionTask.getAttempt() > 0) {
59+
if (e instanceof Error) {
60+
throw (Error) e;
61+
}
62+
throw (Exception) e;
63+
}
5664
if (log.isErrorEnabled()) {
5765
WorkflowExecution execution = decisionTask.getWorkflowExecution();
5866
log.error("Workflow task failure. startedEventId=" + decisionTask.getStartedEventId()

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void resumePolling() {
8484
worker.resumePolling();
8585
}
8686

87-
public <R> R queryWorkflowExecution(WorkflowExecution execution, String queryType, Class<R> returnType, Object[] args) {
87+
public <R> R queryWorkflowExecution(WorkflowExecution execution, String queryType, Class<R> returnType, Object[] args) throws Exception {
8888
DataConverter dataConverter = options.getDataConverter();
8989
byte[] serializedArgs = dataConverter.toData(args);
9090
byte[] result = worker.queryWorkflowExecution(execution, queryType, serializedArgs);

src/main/java/com/uber/cadence/internal/worker/DecisionTaskHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public String toString() {
8585
* @return One of the possible decision task replies: RespondDecisionTaskCompletedRequest,
8686
* RespondQueryTaskCompletedRequest, RespondDecisionTaskFailedRequest
8787
*/
88-
Result handleDecisionTask(DecisionTaskWithHistoryIterator decisionTaskIterator);
88+
Result handleDecisionTask(DecisionTaskWithHistoryIterator decisionTaskIterator) throws Exception;
8989

9090
/**
9191
* True if this handler handles at least one workflow type.

src/main/java/com/uber/cadence/internal/worker/PollerOptions.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,21 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
2023
import java.time.Duration;
2124

2225
/**
2326
* Options for component that polls Cadence task lists for tasks.
2427
*/
2528
public final class PollerOptions {
2629

30+
private static final Logger log = LoggerFactory.getLogger(PollerOptions.class);
31+
2732
public final static class Builder {
2833

34+
2935
private int maximumPollRateIntervalMilliseconds = 1000;
3036

3137
private double maximumPollRatePerSecond;
@@ -126,7 +132,7 @@ public Builder setPollThreadNamePrefix(String pollThreadNamePrefix) {
126132

127133
public PollerOptions build() {
128134
if (uncaughtExceptionHandler == null) {
129-
uncaughtExceptionHandler = (t, e) -> e.printStackTrace();
135+
uncaughtExceptionHandler = (t, e) -> log.error("uncaught exception", e);
130136
}
131137
return new PollerOptions(maximumPollRateIntervalMilliseconds, maximumPollRatePerSecond,
132138
pollBackoffCoefficient, pollBackoffInitialInterval, pollBackoffMaximumInterval,

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void start() {
8585
}
8686
}
8787

88-
public byte[] queryWorkflowExecution(WorkflowExecution execution, String queryType, byte[] args) {
88+
public byte[] queryWorkflowExecution(WorkflowExecution execution, String queryType, byte[] args) throws Exception {
8989
Iterator<HistoryEvent> history = WorkflowExecutionUtils.getHistory(service, domain, execution);
9090
DecisionTaskWithHistoryIterator historyIterator = new ReplayDecisionTaskWithHistoryIterator(execution, history);
9191
WorkflowQuery query = new WorkflowQuery();

src/main/java/com/uber/cadence/worker/Worker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,10 @@ public String toString() {
231231
* @param args query arguments
232232
* @param <R> type of the query result
233233
* @return query result
234+
* @throws Exception if replay failed for any reason
234235
*/
235236
public <R> R queryWorkflowExecution(WorkflowExecution execution, String queryType, Class<R> returnType,
236-
Object... args) {
237+
Object... args) throws Exception {
237238
if (workflowWorker == null) {
238239
throw new IllegalStateException("disableWorkflowWorker is set in worker options");
239240
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -790,10 +790,12 @@ public void testSignalUntyped() {
790790
WorkflowExecution execution = client.start();
791791
assertEquals("initial", client.query("QueryableWorkflow::getState", String.class));
792792
client.signal("testSignal", "Hello ");
793-
while(!"Hello ".equals(client.query("QueryableWorkflow::getState", String.class))) {}
793+
while (!"Hello ".equals(client.query("QueryableWorkflow::getState", String.class))) {
794+
}
794795
assertEquals("Hello ", client.query("QueryableWorkflow::getState", String.class));
795796
client.signal("testSignal", "World!");
796-
while(!"World!".equals(client.query("QueryableWorkflow::getState", String.class))) {}
797+
while (!"World!".equals(client.query("QueryableWorkflow::getState", String.class))) {
798+
}
797799
assertEquals("World!", client.query("QueryableWorkflow::getState", String.class));
798800
assertEquals("Hello World!", workflowClient.newUntypedWorkflowStub(execution).getResult(String.class));
799801
}
@@ -1144,6 +1146,35 @@ public void testChildWorkflowAsyncRetry() {
11441146
assertEquals(3, AngryChild.invocationCount);
11451147
}
11461148

1149+
private static int testDecisionFailureBackoffReplayCount;
1150+
1151+
public static class TestDecisionFailureBackoff implements TestWorkflow1 {
1152+
1153+
@Override
1154+
public String execute() {
1155+
if (testDecisionFailureBackoffReplayCount++ < 2) {
1156+
throw new Error("simulated decision failure");
1157+
}
1158+
return "result1";
1159+
}
1160+
}
1161+
1162+
@Test
1163+
public void testDecisionFailureBackoff() {
1164+
startWorkerFor(TestDecisionFailureBackoff.class);
1165+
WorkflowOptions o = new WorkflowOptions.Builder()
1166+
.setExecutionStartToCloseTimeout(Duration.ofSeconds(10))
1167+
.setTaskStartToCloseTimeout(Duration.ofSeconds(1))
1168+
.setTaskList(taskList).build();
1169+
1170+
TestWorkflow1 workflowStub = workflowClient.newWorkflowStub(TestWorkflow1.class, o);
1171+
long start = System.currentTimeMillis();
1172+
String result = workflowStub.execute();
1173+
long elapsed = System.currentTimeMillis() - start;
1174+
assertTrue("spinned on fail decision", elapsed > 1000);
1175+
assertEquals("result1", result);
1176+
}
1177+
11471178
public interface TestActivities {
11481179

11491180
String activityWithDelay(long milliseconds, boolean heartbeatMoreThanOnce);

0 commit comments

Comments
 (0)