Skip to content

Commit ad27b61

Browse files
Add support for sync update (#1749)
Add support for workflow update
1 parent c1cff1c commit ad27b61

File tree

70 files changed

+3075
-68
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+3075
-68
lines changed

docker/buildkite/dynamicconfig/development.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ system.enableActivityEagerExecution:
66
system.enableEagerWorkflowStart:
77
- value: true
88
constraints: {}
9+
frontend.enableUpdateWorkflowExecution:
10+
- value: true
11+
constraints: {}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.client;
22+
23+
import io.temporal.api.common.v1.WorkflowExecution;
24+
import io.temporal.common.Experimental;
25+
import java.util.concurrent.CompletableFuture;
26+
27+
/**
28+
* UpdateHandle is a handle to an update workflow execution request that can be used to get the
29+
* status of that update request.
30+
*/
31+
@Experimental
32+
public interface UpdateHandle<T> {
33+
/**
34+
* Gets the workflow execution this update request was sent to.
35+
*
36+
* @return the workflow execution this update was sent to.
37+
*/
38+
WorkflowExecution getExecution();
39+
40+
/**
41+
* Gets the unique ID of this update.
42+
*
43+
* @return the updates ID.
44+
*/
45+
String getId();
46+
47+
/**
48+
* Returns a {@link CompletableFuture} with the update workflow execution request result
49+
* potentially waiting for the update to complete.
50+
*
51+
* @return future completed with the result of the update or an exception
52+
*/
53+
CompletableFuture<T> getResultAsync();
54+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.client;
22+
23+
import io.temporal.api.common.v1.WorkflowExecution;
24+
import io.temporal.common.Experimental;
25+
import java.util.concurrent.CompletableFuture;
26+
27+
@Experimental
28+
final class UpdateHandleImpl<T> implements UpdateHandle<T> {
29+
30+
private final String id;
31+
private final WorkflowExecution execution;
32+
private final CompletableFuture<T> future;
33+
34+
UpdateHandleImpl(String id, WorkflowExecution execution, CompletableFuture<T> future) {
35+
this.id = id;
36+
this.execution = execution;
37+
this.future = future;
38+
}
39+
40+
@Override
41+
public WorkflowExecution getExecution() {
42+
return execution;
43+
}
44+
45+
@Override
46+
public String getId() {
47+
return id;
48+
}
49+
50+
@Override
51+
public CompletableFuture<T> getResultAsync() {
52+
return future;
53+
}
54+
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,11 @@ public void invoke(
270270
} else if (type == WorkflowMethodType.SIGNAL) {
271271
signalWorkflow(methodMetadata, untyped, method, args);
272272
result = null;
273+
} else if (type == WorkflowMethodType.UPDATE) {
274+
result = updateWorkflow(methodMetadata, untyped, method, args);
273275
} else {
274276
throw new IllegalArgumentException(
275-
method + " is not annotated with @WorkflowMethod or @QueryMethod");
277+
method + " is not annotated with @WorkflowMethod, @QueryMethod, @UpdateMethod");
276278
}
277279
}
278280

@@ -306,6 +308,15 @@ private Object queryWorkflow(
306308
return untyped.query(queryType, method.getReturnType(), method.getGenericReturnType(), args);
307309
}
308310

311+
private Object updateWorkflow(
312+
POJOWorkflowMethodMetadata methodMetadata,
313+
WorkflowStub untyped,
314+
Method method,
315+
Object[] args) {
316+
String updateType = methodMetadata.getName();
317+
return untyped.update(updateType, method.getReturnType(), args);
318+
}
319+
309320
@SuppressWarnings("FutureReturnValueIgnored")
310321
private Object startWorkflow(WorkflowStub untyped, Method method, Object[] args) {
311322
WorkflowInvocationHandler.startWorkflow(untyped, args);

temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import io.temporal.api.common.v1.WorkflowExecution;
2424
import io.temporal.api.enums.v1.QueryRejectCondition;
25+
import io.temporal.common.Experimental;
2526
import io.temporal.failure.CanceledFailure;
2627
import io.temporal.failure.TerminatedFailure;
2728
import io.temporal.failure.TimeoutFailure;
@@ -36,8 +37,8 @@
3637

3738
/**
3839
* WorkflowStub is a client side stub to a single workflow instance. It can be used to start,
39-
* signal, query, wait for completion and cancel a workflow execution. Created through {@link
40-
* WorkflowClient#newUntypedWorkflowStub(String, WorkflowOptions)} or {@link
40+
* signal, query, update, wait for completion and cancel a workflow execution. Created through
41+
* {@link WorkflowClient#newUntypedWorkflowStub(String, WorkflowOptions)} or {@link
4142
* WorkflowClient#newUntypedWorkflowStub(WorkflowExecution, Optional)}.
4243
*/
4344
public interface WorkflowStub {
@@ -73,6 +74,93 @@ static <T> WorkflowStub fromTyped(T typed) {
7374
*/
7475
void signal(String signalName, Object... args);
7576

77+
/**
78+
* Synchronously update a workflow execution by invoking its update handler. Usually a update
79+
* handler is a method annotated with {@link io.temporal.workflow.UpdateMethod}.
80+
*
81+
* @param updateName name of the update handler. Usually it is a method name.
82+
* @param resultClass class of the update return value
83+
* @param <R> type of the update return value
84+
* @param args update method arguments
85+
* @return update result
86+
* @throws WorkflowNotFoundException if the workflow execution doesn't exist or completed and
87+
* can't be signalled
88+
* @throws WorkflowServiceException for all other failures including networking and service
89+
* availability issues
90+
*/
91+
@Experimental
92+
<R> R update(String updateName, Class<R> resultClass, Object... args);
93+
94+
/**
95+
* Synchronously update a workflow execution by invoking its update handler. Usually a update
96+
* handler is a method annotated with {@link io.temporal.workflow.UpdateMethod}.
97+
*
98+
* @param updateName name of the update handler. Usually it is a method name.
99+
* @param updateId is an application-layer identifier for the requested update. It must be unique
100+
* within the scope of a workflow execution.
101+
* @param firstExecutionRunId specifies the RunID expected to identify the first run in the
102+
* workflow execution chain. If this expectation does not match then the server will reject
103+
* the update request with an error.
104+
* @param resultClass class of the update return value.
105+
* @param <R> type of the update return value.
106+
* @param resultType type of the workflow return value. Differs from resultClass for generic
107+
* types.
108+
* @param args update method arguments
109+
* @return update result
110+
* @throws WorkflowNotFoundException if the workflow execution doesn't exist or completed and
111+
* can't be sent an update request
112+
* @throws WorkflowServiceException for all other failures including networking and service
113+
* availability issues
114+
*/
115+
@Experimental
116+
<R> R update(
117+
String updateName,
118+
String updateId,
119+
String firstExecutionRunId,
120+
Class<R> resultClass,
121+
Type resultType,
122+
Object... args);
123+
124+
/**
125+
* Asynchronously update a workflow execution by invoking its update handler and returning a
126+
* handle to the update request. Usually a update handler is a method annotated with {@link
127+
* io.temporal.workflow.UpdateMethod}.
128+
*
129+
* @param updateName name of the update handler. Usually it is a method name.
130+
* @param resultClass class of the update return value
131+
* @param <R> type of the update return value
132+
* @param args update method arguments
133+
* @return update reference that can be used to get the result of the update.
134+
*/
135+
@Experimental
136+
<R> UpdateHandle<R> startUpdate(String updateName, Class<R> resultClass, Object... args);
137+
138+
/**
139+
* Asynchronously update a workflow execution by invoking its update handler and returning a
140+
* handle to the update request.
141+
*
142+
* @param updateName name of the update handler. Usually it is a method name.
143+
* @param updateId is an application-layer identifier for the requested update. It must be unique
144+
* within the scope of a workflow execution.
145+
* @param firstExecutionRunId specifies the RunID expected to identify the first run in the
146+
* workflow execution chain. If this expectation does not match then the server will reject
147+
* the update request with an error.
148+
* @param resultClass class of the update return value.
149+
* @param <R> type of the update return value.
150+
* @param resultType type of the workflow return value. Differs from resultClass for generic
151+
* types.
152+
* @param args update method arguments
153+
* @return update reference that can be used to get the result of the update.
154+
*/
155+
@Experimental
156+
<R> UpdateHandle<R> startUpdate(
157+
String updateName,
158+
String updateId,
159+
String firstExecutionRunId,
160+
Class<R> resultClass,
161+
Type resultType,
162+
Object... args);
163+
76164
WorkflowExecution start(Object... args);
77165

78166
WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs);

temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.temporal.api.errordetails.v1.WorkflowNotReadyFailure;
3030
import io.temporal.common.interceptors.Header;
3131
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
32+
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor.UpdateOutput;
3233
import io.temporal.failure.CanceledFailure;
3334
import io.temporal.serviceclient.CheckedExceptionWrapper;
3435
import io.temporal.serviceclient.StatusUtils;
@@ -292,6 +293,81 @@ public <R> R query(String queryType, Class<R> resultClass, Type resultType, Obje
292293
return result.getResult();
293294
}
294295

296+
@Override
297+
public <R> R update(String updateName, Class<R> resultClass, Object... args) {
298+
return update(updateName, "", "", resultClass, resultClass, args);
299+
}
300+
301+
@Override
302+
public <R> R update(
303+
String updateName,
304+
String updateId,
305+
String firstExecutionRunId,
306+
Class<R> resultClass,
307+
Type resultType,
308+
Object... args) {
309+
checkStarted();
310+
UpdateOutput<R> result;
311+
WorkflowExecution targetExecution = execution.get();
312+
try {
313+
result =
314+
workflowClientInvoker.update(
315+
new WorkflowClientCallsInterceptor.UpdateInput<>(
316+
targetExecution,
317+
updateName,
318+
updateId,
319+
args,
320+
resultClass,
321+
resultType,
322+
firstExecutionRunId));
323+
return result.getResult();
324+
325+
} catch (Exception e) {
326+
Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
327+
throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable);
328+
}
329+
}
330+
331+
@Override
332+
public <R> UpdateHandle<R> startUpdate(String updateName, Class<R> resultClass, Object... args) {
333+
return startUpdate(updateName, "", "", resultClass, resultClass, args);
334+
}
335+
336+
@Override
337+
public <R> UpdateHandle<R> startUpdate(
338+
String updateName,
339+
String updateId,
340+
String firstExecutionRunId,
341+
Class<R> resultClass,
342+
Type resultType,
343+
Object... args) {
344+
checkStarted();
345+
WorkflowExecution targetExecution = execution.get();
346+
347+
WorkflowClientCallsInterceptor.UpdateAsyncOutput<R> result =
348+
workflowClientInvoker.updateAsync(
349+
new WorkflowClientCallsInterceptor.UpdateInput<>(
350+
targetExecution,
351+
updateName,
352+
updateId,
353+
args,
354+
resultClass,
355+
resultType,
356+
firstExecutionRunId));
357+
358+
return new UpdateHandleImpl<>(
359+
updateId,
360+
targetExecution,
361+
result
362+
.getResult()
363+
.exceptionally(
364+
e -> {
365+
Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
366+
throw new WorkflowServiceException(
367+
targetExecution, workflowType.orElse(null), throwable);
368+
}));
369+
}
370+
295371
@Override
296372
public void cancel() {
297373
checkStarted();

0 commit comments

Comments
 (0)