Skip to content

Commit 531d3cb

Browse files
Wrap GRPC::CANCELED and DEADLINE_EXCEEDED in new exception type (#2172)
* Wrap GRPC::CANCELED and DEADLINE_EXCEEDED
1 parent 98b2e78 commit 531d3cb

File tree

5 files changed

+180
-19
lines changed

5 files changed

+180
-19
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.client;
22+
23+
import io.temporal.api.common.v1.WorkflowExecution;
24+
25+
/**
26+
* Error that occurs when an update call times out or is cancelled.
27+
*
28+
* <p>Note, this is not related to any general concept of timing out or cancelling a running update,
29+
* this is only related to the client call itself.
30+
*/
31+
public class WorkflowUpdateTimeoutOrCancelledException extends WorkflowServiceException {
32+
public WorkflowUpdateTimeoutOrCancelledException(
33+
WorkflowExecution execution, String updateId, String updateName, Throwable cause) {
34+
super(execution, "", cause);
35+
}
36+
}

temporal-sdk/src/main/java/io/temporal/internal/client/LazyUpdateHandleImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,13 @@ public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
110110
// does not exist or because the update ID does not exist.
111111
throw sre;
112112
}
113+
throw sre;
113114
} else if (failure instanceof WorkflowException) {
114115
throw (WorkflowException) failure;
115116
} else if (failure instanceof TimeoutException) {
116-
throw new CompletionException((TimeoutException) failure);
117+
throw new CompletionException(failure);
117118
}
118-
throw new WorkflowServiceException(execution, workflowType, (Throwable) failure);
119+
throw new WorkflowServiceException(execution, workflowType, failure);
119120
});
120121
}
121122

temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,18 @@ public <R> UpdateHandle<R> startUpdate(StartUpdateInput<R> input) {
339339
UpdateWorkflowExecutionLifecycleStage waitForStage = input.getWaitPolicy().getLifecycleStage();
340340
do {
341341
Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS);
342-
result = genericClient.update(updateRequest, pollTimeoutDeadline);
343-
} while (result.getStage().getNumber() < waitForStage.getNumber()
342+
try {
343+
result = genericClient.update(updateRequest, pollTimeoutDeadline);
344+
} catch (StatusRuntimeException e) {
345+
if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED
346+
|| e.getStatus().getCode() == Status.Code.CANCELLED) {
347+
throw new WorkflowUpdateTimeoutOrCancelledException(
348+
input.getWorkflowExecution(), input.getUpdateName(), input.getUpdateId(), e);
349+
}
350+
throw e;
351+
}
352+
353+
} while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber()
344354
&& result.getStage().getNumber()
345355
< UpdateWorkflowExecutionLifecycleStage
346356
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
@@ -466,17 +476,17 @@ private void pollWorkflowUpdateHelper(
466476
return;
467477
}
468478
if ((e instanceof StatusRuntimeException
469-
&& ((StatusRuntimeException) e).getStatus().getCode()
470-
== Status.Code.DEADLINE_EXCEEDED)
479+
&& (((StatusRuntimeException) e).getStatus().getCode()
480+
== Status.Code.DEADLINE_EXCEEDED
481+
|| ((StatusRuntimeException) e).getStatus().getCode()
482+
== Status.Code.CANCELLED))
471483
|| deadline.isExpired()) {
472484
resultCF.completeExceptionally(
473-
new TimeoutException(
474-
"WorkflowId="
475-
+ request.getUpdateRef().getWorkflowExecution().getWorkflowId()
476-
+ ", runId="
477-
+ request.getUpdateRef().getWorkflowExecution().getRunId()
478-
+ ", updateId="
479-
+ request.getUpdateRef().getUpdateId()));
485+
new WorkflowUpdateTimeoutOrCancelledException(
486+
request.getUpdateRef().getWorkflowExecution(),
487+
request.getUpdateRef().getUpdateId(),
488+
"",
489+
e));
480490
} else if (e != null) {
481491
resultCF.completeExceptionally(e);
482492
} else {

temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,13 @@
2727
import static org.junit.Assert.assertEquals;
2828

2929
import com.google.common.base.Stopwatch;
30-
import io.temporal.client.UpdateHandle;
31-
import io.temporal.client.WorkflowClient;
32-
import io.temporal.client.WorkflowStub;
33-
import io.temporal.client.WorkflowUpdateStage;
30+
import io.temporal.client.*;
3431
import io.temporal.testing.internal.SDKTestOptions;
3532
import io.temporal.testing.internal.SDKTestWorkflowRule;
3633
import io.temporal.workflow.*;
3734
import java.util.concurrent.CompletableFuture;
3835
import java.util.concurrent.ExecutionException;
3936
import java.util.concurrent.TimeUnit;
40-
import java.util.concurrent.TimeoutException;
4137
import org.junit.Rule;
4238
import org.junit.Test;
4339

@@ -111,7 +107,9 @@ public void WorkflowUpdateGetResultTimeout() throws ExecutionException, Interrup
111107
// Verify get throws the correct exception in around the right amount of time
112108
Stopwatch stopWatch = Stopwatch.createStarted();
113109
ExecutionException executionException = assertThrows(ExecutionException.class, result::get);
114-
assertThat(executionException.getCause(), is(instanceOf(TimeoutException.class)));
110+
assertThat(
111+
executionException.getCause(),
112+
is(instanceOf(WorkflowUpdateTimeoutOrCancelledException.class)));
115113
stopWatch.stop();
116114
long elapsedSeconds = stopWatch.elapsed(TimeUnit.SECONDS);
117115
assertTrue(
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.updateTest;
22+
23+
import static org.junit.Assert.assertThrows;
24+
25+
import io.grpc.Context;
26+
import io.temporal.api.common.v1.WorkflowExecution;
27+
import io.temporal.client.*;
28+
import io.temporal.testing.internal.SDKTestOptions;
29+
import io.temporal.testing.internal.SDKTestWorkflowRule;
30+
import io.temporal.worker.WorkerOptions;
31+
import io.temporal.workflow.CompletablePromise;
32+
import io.temporal.workflow.Workflow;
33+
import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate;
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
import java.util.UUID;
37+
import java.util.concurrent.*;
38+
import java.util.concurrent.atomic.AtomicReference;
39+
import org.junit.Assert;
40+
import org.junit.Rule;
41+
import org.junit.Test;
42+
43+
public class UpdateExceptionWrapped {
44+
45+
private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
46+
47+
@Rule
48+
public SDKTestWorkflowRule testWorkflowRule =
49+
SDKTestWorkflowRule.newBuilder()
50+
.setWorkerOptions(WorkerOptions.newBuilder().build())
51+
.setWorkflowTypes(TestUpdateWorkflowImpl.class)
52+
.build();
53+
54+
@Test
55+
public void testUpdateStart() {
56+
String workflowId = UUID.randomUUID().toString();
57+
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
58+
WorkflowOptions options =
59+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
60+
.setWorkflowId(workflowId)
61+
.build();
62+
WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options);
63+
// To execute workflow client.execute() would do. But we want to start workflow and immediately
64+
// return.
65+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
66+
testWorkflowRule.getTestEnvironment().shutdownNow();
67+
testWorkflowRule.getTestEnvironment().awaitTermination(1000, TimeUnit.MILLISECONDS);
68+
69+
final AtomicReference<WorkflowUpdateTimeoutOrCancelledException> exception =
70+
new AtomicReference<>();
71+
72+
Context.current()
73+
.withDeadlineAfter(500, TimeUnit.MILLISECONDS, scheduledExecutor)
74+
.run(
75+
() ->
76+
exception.set(
77+
assertThrows(
78+
WorkflowUpdateTimeoutOrCancelledException.class,
79+
() -> workflow.update(0, ""))));
80+
Assert.assertEquals(execution.getWorkflowId(), exception.get().getExecution().getWorkflowId());
81+
}
82+
83+
public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate {
84+
String state = "initial";
85+
List<String> updates = new ArrayList<>();
86+
CompletablePromise<Void> promise = Workflow.newPromise();
87+
88+
@Override
89+
public String execute() {
90+
promise.get();
91+
return "";
92+
}
93+
94+
@Override
95+
public String getState() {
96+
return state;
97+
}
98+
99+
@Override
100+
public String update(Integer index, String value) {
101+
Workflow.await(() -> false);
102+
return "";
103+
}
104+
105+
@Override
106+
public void updateValidator(Integer index, String value) {}
107+
108+
@Override
109+
public void complete() {
110+
promise.complete(null);
111+
}
112+
113+
@Override
114+
public void completeValidator() {}
115+
}
116+
}

0 commit comments

Comments
 (0)