Skip to content

Commit 5d22bb5

Browse files
Add getCurrentUpdateInfo (#2158)
Add getCurrentUpdateInfo
1 parent 27a1fc2 commit 5d22bb5

File tree

10 files changed

+286
-30
lines changed

10 files changed

+286
-30
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public interface ReplayWorkflow {
4141
/** Handle an update workflow execution event */
4242
void handleUpdate(
4343
String updateName,
44+
String updateId,
4445
Optional<Payloads> input,
4546
long eventId,
4647
Header header,

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public void handleWorkflowExecutionUpdated(UpdateMessage updateMessage) {
161161
Optional<Payloads> args = Optional.ofNullable(input.getArgs());
162162
this.workflow.handleUpdate(
163163
input.getName(),
164+
protocolMessage.getProtocolInstanceId(),
164165
args,
165166
protocolMessage.getEventId(),
166167
input.getHeader(),

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.temporal.internal.worker.WorkflowExecutionException;
4040
import io.temporal.internal.worker.WorkflowExecutorCache;
4141
import io.temporal.worker.WorkflowImplementationOptions;
42+
import io.temporal.workflow.UpdateInfo;
4243
import java.util.List;
4344
import java.util.Objects;
4445
import java.util.Optional;
@@ -151,38 +152,45 @@ public void handleSignal(
151152
@Override
152153
public void handleUpdate(
153154
String updateName,
155+
String updateId,
154156
Optional<Payloads> input,
155157
long eventId,
156158
Header header,
157159
UpdateProtocolCallback callbacks) {
160+
final UpdateInfo updateInfo = new UpdateInfoImpl(updateName, updateId);
158161
runner.executeInWorkflowThread(
159162
"update " + updateName,
160163
() -> {
161-
// Skip validator on replay
162-
if (!callbacks.isReplaying()) {
164+
try {
165+
workflowContext.setCurrentUpdateInfo(updateInfo);
166+
// Skip validator on replay
167+
if (!callbacks.isReplaying()) {
168+
try {
169+
workflowContext.setReadOnly(true);
170+
workflowProc.handleValidateUpdate(updateName, input, eventId, header);
171+
} catch (ReadOnlyException r) {
172+
// Rethrow instead on rejecting the update to fail the WFT
173+
throw r;
174+
} catch (Exception e) {
175+
callbacks.reject(
176+
workflowContext
177+
.getDataConverterWithCurrentWorkflowContext()
178+
.exceptionToFailure(e));
179+
return;
180+
} finally {
181+
workflowContext.setReadOnly(false);
182+
}
183+
}
184+
callbacks.accept();
163185
try {
164-
workflowContext.setReadOnly(true);
165-
workflowProc.handleValidateUpdate(updateName, input, eventId, header);
166-
} catch (ReadOnlyException r) {
167-
// Rethrow instead on rejecting the update to fail the WFT
168-
throw r;
169-
} catch (Exception e) {
170-
callbacks.reject(
171-
workflowContext
172-
.getDataConverterWithCurrentWorkflowContext()
173-
.exceptionToFailure(e));
174-
return;
175-
} finally {
176-
workflowContext.setReadOnly(false);
186+
Optional<Payloads> result =
187+
workflowProc.handleExecuteUpdate(updateName, input, eventId, header);
188+
callbacks.complete(result, null);
189+
} catch (WorkflowExecutionException e) {
190+
callbacks.complete(Optional.empty(), e.getFailure());
177191
}
178-
}
179-
callbacks.accept();
180-
try {
181-
Optional<Payloads> result =
182-
workflowProc.handleExecuteUpdate(updateName, input, eventId, header);
183-
callbacks.complete(result, null);
184-
} catch (WorkflowExecutionException e) {
185-
callbacks.complete(Optional.empty(), e.getFailure());
192+
} finally {
193+
workflowContext.setCurrentUpdateInfo(null);
186194
}
187195
});
188196
}

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,8 @@
6868
import io.temporal.payload.context.ActivitySerializationContext;
6969
import io.temporal.payload.context.WorkflowSerializationContext;
7070
import io.temporal.worker.WorkflowImplementationOptions;
71-
import io.temporal.workflow.CancellationScope;
72-
import io.temporal.workflow.ChildWorkflowOptions;
73-
import io.temporal.workflow.CompletablePromise;
74-
import io.temporal.workflow.ContinueAsNewOptions;
75-
import io.temporal.workflow.Functions;
71+
import io.temporal.workflow.*;
7672
import io.temporal.workflow.Functions.Func;
77-
import io.temporal.workflow.Promise;
78-
import io.temporal.workflow.Workflow;
7973
import java.lang.reflect.Type;
8074
import java.time.Duration;
8175
import java.time.Instant;
@@ -124,6 +118,7 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall
124118
private LocalActivityOptions defaultLocalActivityOptions = null;
125119
private Map<String, LocalActivityOptions> localActivityOptionsMap;
126120
private boolean readOnly = false;
121+
private final WorkflowThreadLocal<UpdateInfo> currentUpdateInfo = new WorkflowThreadLocal<>();
127122

128123
public SyncWorkflowContext(
129124
@Nonnull String namespace,
@@ -1275,6 +1270,14 @@ public Map<String, Object> getPropagatedContexts() {
12751270
return contextData;
12761271
}
12771272

1273+
public void setCurrentUpdateInfo(UpdateInfo updateInfo) {
1274+
currentUpdateInfo.set(updateInfo);
1275+
}
1276+
1277+
public Optional<UpdateInfo> getCurrentUpdateInfo() {
1278+
return Optional.ofNullable(currentUpdateInfo.get());
1279+
}
1280+
12781281
/** Simple wrapper over a failure just to allow completing the CompletablePromise as a failure */
12791282
private static class FailureWrapperException extends RuntimeException {
12801283
private final Failure failure;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.sync;
22+
23+
import io.temporal.workflow.UpdateInfo;
24+
25+
public final class UpdateInfoImpl implements UpdateInfo {
26+
final String updateName;
27+
final String updateId;
28+
29+
UpdateInfoImpl(String updateName, String updateId) {
30+
this.updateName = updateName;
31+
this.updateId = updateId;
32+
}
33+
34+
@Override
35+
public String getUpdateName() {
36+
return updateName;
37+
}
38+
39+
@Override
40+
public String getUpdateId() {
41+
return updateId;
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "UpdateInfoImpl{"
47+
+ "updateName='"
48+
+ updateName
49+
+ '\''
50+
+ ", updateId='"
51+
+ updateId
52+
+ '\''
53+
+ '}';
54+
}
55+
}

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,10 @@ public static WorkflowInfo getWorkflowInfo() {
623623
return new WorkflowInfoImpl(getRootWorkflowContext().getReplayContext());
624624
}
625625

626+
public static Optional<UpdateInfo> getCurrentUpdateInfo() {
627+
return getRootWorkflowContext().getCurrentUpdateInfo();
628+
}
629+
626630
public static Scope getMetricsScope() {
627631
return getRootWorkflowContext().getMetricsScope();
628632
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow;
22+
23+
/** Provides information about the current workflow Update. */
24+
public interface UpdateInfo {
25+
/**
26+
* @return Update name
27+
*/
28+
String getUpdateName();
29+
30+
/**
31+
* @return Update ID
32+
*/
33+
String getUpdateId();
34+
}

temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,10 +339,31 @@ public static void continueAsNew(
339339
WorkflowInternal.continueAsNew(workflowType, options, args);
340340
}
341341

342+
/**
343+
* Returns information about current workflow execution.
344+
*
345+
* <p>Note: Can only be called within the context of a workflow. Will throw an error if called
346+
* outside a workflow context.
347+
*
348+
* @return current workflow info.
349+
*/
342350
public static WorkflowInfo getInfo() {
343351
return WorkflowInternal.getWorkflowInfo();
344352
}
345353

354+
/**
355+
* Returns information about current workflow update.
356+
*
357+
* <p>Note: Should only be called within the context of an update handler thread in a workflow.
358+
* Will return an empty Optional if called outside an update handler thread inside a workflow
359+
* context. Will throw an error if called outside a workflow context.
360+
*
361+
* @return current workflow update info.
362+
*/
363+
public static Optional<UpdateInfo> getCurrentUpdateInfo() {
364+
return WorkflowInternal.getCurrentUpdateInfo();
365+
}
366+
346367
/**
347368
* Extract deserialized Memo associated with given key
348369
*

temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ public void handleSignal(
299299
@Override
300300
public void handleUpdate(
301301
String updateName,
302+
String updateId,
302303
Optional<Payloads> input,
303304
long eventId,
304305
Header header,

0 commit comments

Comments
 (0)