Skip to content

Commit ac474fa

Browse files
Add new workflow info fields (#1853)
Add new workflow info fields
1 parent 4298dad commit ac474fa

File tree

10 files changed

+373
-4
lines changed

10 files changed

+373
-4
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,18 @@ boolean getVersion(
336336
*/
337337
long getCurrentWorkflowTaskStartedEventId();
338338

339+
/**
340+
* @return size of Workflow history in bytes up until the current moment of execution. This value
341+
* changes during the lifetime of a Workflow Execution.
342+
*/
343+
long getHistorySize();
344+
345+
/**
346+
* @return true if the server is configured to suggest continue as new and it is suggested. This
347+
* value changes during the lifetime of a Workflow Execution.
348+
*/
349+
boolean isContinueAsNewSuggested();
350+
339351
/**
340352
* @return true if cancellation of the workflow is requested.
341353
*/

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,16 @@ public long getCurrentWorkflowTaskStartedEventId() {
353353
return workflowStateMachines.getCurrentStartedEventId();
354354
}
355355

356+
@Override
357+
public long getHistorySize() {
358+
return workflowStateMachines.getHistorySize();
359+
}
360+
361+
@Override
362+
public boolean isContinueAsNewSuggested() {
363+
return workflowStateMachines.isContinueAsNewSuggested();
364+
}
365+
356366
/*
357367
* MUTABLE STATE OPERATIONS
358368
*/

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ enum HandleEventStatus {
8181
/** EventId of the last WorkflowTaskStarted event handled by these state machines. */
8282
private long currentStartedEventId;
8383

84+
private long historySize;
85+
86+
private boolean isContinueAsNewSuggested;
87+
8488
/**
8589
* EventId of the last event seen by these state machines. Events earlier than this one will be
8690
* discarded.
@@ -205,6 +209,14 @@ public long getCurrentStartedEventId() {
205209
return currentStartedEventId;
206210
}
207211

212+
public long getHistorySize() {
213+
return historySize;
214+
}
215+
216+
public boolean isContinueAsNewSuggested() {
217+
return isContinueAsNewSuggested;
218+
}
219+
208220
public void setReplaying(boolean replaying) {
209221
this.replaying = replaying;
210222
}
@@ -1126,7 +1138,11 @@ private void assertMatch(
11261138
private class WorkflowTaskCommandsListener implements WorkflowTaskStateMachine.Listener {
11271139
@Override
11281140
public void workflowTaskStarted(
1129-
long startedEventId, long currentTimeMillis, boolean nonProcessedWorkflowTask) {
1141+
long startedEventId,
1142+
long currentTimeMillis,
1143+
boolean nonProcessedWorkflowTask,
1144+
long historySize,
1145+
boolean isContinueAsNewSuggested) {
11301146
setCurrentTimeMillis(currentTimeMillis);
11311147
for (CancellableCommand cancellableCommand : commands) {
11321148
cancellableCommand.handleWorkflowTaskStarted();
@@ -1140,6 +1156,8 @@ public void workflowTaskStarted(
11401156
}
11411157
}
11421158
WorkflowStateMachines.this.currentStartedEventId = startedEventId;
1159+
WorkflowStateMachines.this.historySize = historySize;
1160+
WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;
11431161

11441162
eventLoop();
11451163
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,15 @@ public interface Listener {
4444
* workflow execution this is the last event in the history. During replay (due to query for
4545
* example) the last workflow task still can return false if it is followed by other events
4646
* like WorkflowExecutionCompleted.
47+
* @param historySize current size, in bytes, of the workflow history.
48+
* @param isContinueAsNewSuggested true if the server suggests this workflow to continue as new.
4749
*/
4850
void workflowTaskStarted(
49-
long startEventId, long currentTimeMillis, boolean nonProcessedWorkflowTask);
51+
long startEventId,
52+
long currentTimeMillis,
53+
boolean nonProcessedWorkflowTask,
54+
long historySize,
55+
boolean isContinueAsNewSuggested);
5056

5157
void updateRunId(String currentRunId);
5258
}
@@ -58,6 +64,8 @@ void workflowTaskStarted(
5864
// TODO write a comment describing the difference between workflowTaskStartedEventId and
5965
// startedEventId
6066
private long startedEventId;
67+
private long historySize;
68+
private boolean isContinueAsNewSuggested;
6169

6270
public static WorkflowTaskStateMachine newInstance(
6371
long workflowTaskStartedEventId, Listener listener) {
@@ -112,6 +120,10 @@ enum State {
112120
private void handleStarted() {
113121
eventTimeOfTheLastWorkflowStartTask = Timestamps.toMillis(currentEvent.getEventTime());
114122
startedEventId = currentEvent.getEventId();
123+
historySize = currentEvent.getWorkflowTaskStartedEventAttributes().getHistorySizeBytes();
124+
isContinueAsNewSuggested =
125+
currentEvent.getWorkflowTaskStartedEventAttributes().getSuggestContinueAsNew();
126+
115127
// The last started event in the history. So no completed is expected.
116128
if (currentEvent.getEventId() >= workflowTaskStartedEventId && !hasNextEvent) {
117129
handleCompleted();
@@ -125,7 +137,11 @@ private void handleCompleted() {
125137
// If the workflow task has FAILED or other unsuccessful finish event, we don't replay such
126138
// workflow tasks
127139
listener.workflowTaskStarted(
128-
startedEventId, eventTimeOfTheLastWorkflowStartTask, lastTaskInHistory);
140+
startedEventId,
141+
eventTimeOfTheLastWorkflowStartTask,
142+
lastTaskInHistory,
143+
historySize,
144+
isContinueAsNewSuggested);
129145
}
130146

131147
private void handleFailed() {

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ public long getHistoryLength() {
133133
return context.getCurrentWorkflowTaskStartedEventId();
134134
}
135135

136+
@Override
137+
public long getHistorySize() {
138+
return context.getHistorySize();
139+
}
140+
141+
@Override
142+
public boolean isContinueAsNewSuggested() {
143+
return context.isContinueAsNewSuggested();
144+
}
145+
136146
@Override
137147
public String toString() {
138148
return "WorkflowInfo{"

temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,16 @@ public interface WorkflowInfo {
148148
* call {@link Workflow#continueAsNew(Object...)}.
149149
*/
150150
long getHistoryLength();
151+
152+
/**
153+
* @return size of Workflow history in bytes up until the current moment of execution. This value
154+
* changes during the lifetime of a Workflow Execution.
155+
*/
156+
long getHistorySize();
157+
158+
/**
159+
* @return true if the server is configured to suggest continue as new and it is suggested. This
160+
* value changes during the lifetime of a Workflow Execution.
161+
*/
162+
boolean isContinueAsNewSuggested();
151163
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.activity.LocalActivityOptions;
26+
import io.temporal.testing.WorkflowReplayer;
27+
import io.temporal.testing.internal.SDKTestWorkflowRule;
28+
import io.temporal.workflow.shared.TestActivities;
29+
import io.temporal.workflow.shared.TestWorkflows;
30+
import java.time.Duration;
31+
import java.util.concurrent.TimeUnit;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
35+
public class GetHistorySizeTest {
36+
private static final TestActivities.VariousTestActivities activitiesImpl =
37+
new TestActivities.TestActivitiesImpl();
38+
39+
@Rule
40+
public SDKTestWorkflowRule testWorkflowRule =
41+
SDKTestWorkflowRule.newBuilder()
42+
.setWorkflowTypes(TestWorkflowImpl.class)
43+
.setActivityImplementations(activitiesImpl)
44+
.build();
45+
46+
@Test
47+
public void replay() throws Exception {
48+
WorkflowReplayer.replayWorkflowExecutionFromResource(
49+
"testGetHistorySize.json", TestWorkflowImpl.class);
50+
}
51+
52+
public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {
53+
54+
@Override
55+
public String execute() {
56+
LocalActivityOptions options =
57+
LocalActivityOptions.newBuilder()
58+
.setScheduleToCloseTimeout(Duration.ofSeconds(30))
59+
.build();
60+
61+
TestActivities.VariousTestActivities activities =
62+
Workflow.newLocalActivityStub(TestActivities.VariousTestActivities.class, options);
63+
64+
assertEquals(408, Workflow.getInfo().getHistorySize());
65+
assertEquals(false, Workflow.getInfo().isContinueAsNewSuggested());
66+
67+
// Force WFT heartbeat
68+
activities.sleepActivity(TimeUnit.SECONDS.toMillis(10), 1);
69+
70+
assertEquals(897, Workflow.getInfo().getHistorySize());
71+
assertEquals(true, Workflow.getInfo().isContinueAsNewSuggested());
72+
73+
return "done";
74+
}
75+
}
76+
}

temporal-sdk/src/test/java/io/temporal/workflow/searchattributes/TypedSearchAttributesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
/** Typed attribute translation of {@link SearchAttributesTest} */
5656
public class TypedSearchAttributesTest {
5757
private static final SearchAttributeKey<List<String>> TEST_NEW_KEY =
58-
SearchAttributeKey.forKeywordList("NewKey");
58+
SearchAttributeKey.forKeywordList("NewKeyList");
5959
private static final List<String> TEST_NEW_VALUE = Arrays.asList("foo", "bar");
6060
private static final SearchAttributeKey<String> TEST_UNKNOWN_KEY =
6161
SearchAttributeKey.forText("UnknownKey");

0 commit comments

Comments
 (0)