Skip to content

Commit 1be0cee

Browse files
Add support for async update (#1766)
Add support for async update to test server and client
1 parent 16dc271 commit 1be0cee

File tree

22 files changed

+1159
-245
lines changed

22 files changed

+1159
-245
lines changed

docker/buildkite/dynamicconfig/development.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,7 @@ system.enableEagerWorkflowStart:
88
constraints: {}
99
frontend.enableUpdateWorkflowExecution:
1010
- value: true
11-
constraints: {}
11+
constraints: {}
12+
frontend.enableUpdateWorkflowExecutionAsyncAccepted:
13+
- value: true
14+
constraints: { }

temporal-sdk/src/main/java/io/temporal/client/UpdateHandleImpl.java renamed to temporal-sdk/src/main/java/io/temporal/client/CompletedUpdateHandleImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,19 @@
2323
import io.temporal.api.common.v1.WorkflowExecution;
2424
import io.temporal.common.Experimental;
2525
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.TimeUnit;
2627

2728
@Experimental
28-
final class UpdateHandleImpl<T> implements UpdateHandle<T> {
29+
final class CompletedUpdateHandleImpl<T> implements UpdateHandle<T> {
2930

3031
private final String id;
3132
private final WorkflowExecution execution;
32-
private final CompletableFuture<T> future;
33+
private final T result;
3334

34-
UpdateHandleImpl(String id, WorkflowExecution execution, CompletableFuture<T> future) {
35+
CompletedUpdateHandleImpl(String id, WorkflowExecution execution, T result) {
3536
this.id = id;
3637
this.execution = execution;
37-
this.future = future;
38+
this.result = result;
3839
}
3940

4041
@Override
@@ -49,6 +50,11 @@ public String getId() {
4950

5051
@Override
5152
public CompletableFuture<T> getResultAsync() {
52-
return future;
53+
return CompletableFuture.completedFuture(result);
54+
}
55+
56+
@Override
57+
public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
58+
return CompletableFuture.completedFuture(result);
5359
}
5460
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.client;
22+
23+
import io.grpc.Status;
24+
import io.grpc.StatusRuntimeException;
25+
import io.temporal.api.common.v1.WorkflowExecution;
26+
import io.temporal.common.Experimental;
27+
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
28+
import io.temporal.serviceclient.CheckedExceptionWrapper;
29+
import java.lang.reflect.Type;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.CompletionException;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.TimeoutException;
34+
35+
@Experimental
36+
final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {
37+
38+
private final WorkflowClientCallsInterceptor workflowClientInvoker;
39+
private final String workflowType;
40+
private final String updateName;
41+
private final String id;
42+
private final WorkflowExecution execution;
43+
private final Class<T> resultClass;
44+
private final Type resultType;
45+
46+
LazyUpdateHandleImpl(
47+
WorkflowClientCallsInterceptor workflowClientInvoker,
48+
String workflowType,
49+
String updateName,
50+
String id,
51+
WorkflowExecution execution,
52+
Class<T> resultClass,
53+
Type resultType) {
54+
this.workflowClientInvoker = workflowClientInvoker;
55+
this.workflowType = workflowType;
56+
this.updateName = updateName;
57+
this.id = id;
58+
this.execution = execution;
59+
this.resultClass = resultClass;
60+
this.resultType = resultType;
61+
}
62+
63+
@Override
64+
public WorkflowExecution getExecution() {
65+
return execution;
66+
}
67+
68+
@Override
69+
public String getId() {
70+
return id;
71+
}
72+
73+
@Override
74+
public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
75+
WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput output =
76+
workflowClientInvoker.pollWorkflowUpdate(
77+
new WorkflowClientCallsInterceptor.PollWorkflowUpdateInput<>(
78+
execution, updateName, id, resultClass, resultType, timeout, unit));
79+
80+
return output
81+
.getResult()
82+
.exceptionally(
83+
failure -> {
84+
if (failure instanceof CompletionException) {
85+
// unwrap the CompletionException
86+
failure = ((Throwable) failure).getCause();
87+
}
88+
failure = CheckedExceptionWrapper.unwrap((Throwable) failure);
89+
if (failure instanceof Error) {
90+
throw (Error) failure;
91+
}
92+
if (failure instanceof StatusRuntimeException) {
93+
StatusRuntimeException sre = (StatusRuntimeException) failure;
94+
if (Status.Code.NOT_FOUND.equals(sre.getStatus().getCode())) {
95+
// Currently no way to tell if the NOT_FOUND was because the workflow ID
96+
// does not exist or because the update ID does not exist.
97+
throw sre;
98+
}
99+
} else if (failure instanceof WorkflowException) {
100+
throw (WorkflowException) failure;
101+
} else if (failure instanceof TimeoutException) {
102+
throw new CompletionException((TimeoutException) failure);
103+
}
104+
throw new WorkflowServiceException(execution, workflowType, (Throwable) failure);
105+
});
106+
}
107+
108+
@Override
109+
public CompletableFuture<T> getResultAsync() {
110+
return this.getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
111+
}
112+
}

temporal-sdk/src/main/java/io/temporal/client/UpdateHandle.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.temporal.api.common.v1.WorkflowExecution;
2424
import io.temporal.common.Experimental;
2525
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.TimeUnit;
2627

2728
/**
2829
* UpdateHandle is a handle to an update workflow execution request that can be used to get the
@@ -45,10 +46,20 @@ public interface UpdateHandle<T> {
4546
String getId();
4647

4748
/**
48-
* Returns a {@link CompletableFuture} with the update workflow execution request result
49+
* Returns a {@link CompletableFuture} with the update workflow execution request result,
4950
* potentially waiting for the update to complete.
5051
*
5152
* @return future completed with the result of the update or an exception
5253
*/
5354
CompletableFuture<T> getResultAsync();
55+
56+
/**
57+
* Returns a {@link CompletableFuture} with the update workflow execution request result,
58+
* potentially waiting for the update to complete.
59+
*
60+
* @param timeout maximum time to wait and perform the background long polling
61+
* @param unit unit of timeout
62+
* @return future completed with the result of the update or an exception
63+
*/
64+
CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit);
5465
}

0 commit comments

Comments
 (0)