Skip to content

Commit 50d15bd

Browse files
💥 Carry over search attributes on continue-as-new if none are explicitly set (#2731)
Carry over search attributes on continue-as-new if none are explicitly specified
1 parent 9bc6e15 commit 50d15bd

File tree

5 files changed

+36
-8
lines changed

5 files changed

+36
-8
lines changed

‎temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java‎

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,6 +1365,12 @@ public void continueAsNew(ContinueAsNewInput input) {
13651365
&& options.getTypedSearchAttributes().size() > 0) {
13661366
attributes.setSearchAttributes(
13671367
SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1368+
} else if (options.getTypedSearchAttributes() == null && searchAttributes == null) {
1369+
// Carry over existing search attributes if none are specified.
1370+
SearchAttributes existing = replayContext.getSearchAttributes();
1371+
if (existing != null && !existing.getIndexedFieldsMap().isEmpty()) {
1372+
attributes.setSearchAttributes(existing);
1373+
}
13681374
}
13691375
Map<String, Object> memo = options.getMemo();
13701376
if (memo != null) {
@@ -1379,11 +1385,21 @@ public void continueAsNew(ContinueAsNewInput input) {
13791385
.determineUseCompatibleFlag(
13801386
replayContext.getTaskQueue().equals(options.getTaskQueue())));
13811387
}
1382-
} else if (replayContext.getRetryOptions() != null) {
1383-
// Have to copy retry options as server doesn't copy them.
1388+
}
1389+
1390+
if (options == null && replayContext.getRetryOptions() != null) {
1391+
// Have to copy certain options as server doesn't copy them.
13841392
attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions()));
13851393
}
13861394

1395+
if (options == null && replayContext.getSearchAttributes() != null) {
1396+
// Carry over existing search attributes if none are specified.
1397+
SearchAttributes existing = replayContext.getSearchAttributes();
1398+
if (existing != null && !existing.getIndexedFieldsMap().isEmpty()) {
1399+
attributes.setSearchAttributes(existing);
1400+
}
1401+
}
1402+
13871403
List<ContextPropagator> propagators =
13881404
options != null && options.getContextPropagators() != null
13891405
? options.getContextPropagators()

‎temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import javax.annotation.Nullable;
99

1010
/**
11-
* This class contain overrides for continueAsNew call. Every field can be null and it means that
11+
* This class contain overrides for continueAsNew call. Every field can be null, and it means that
1212
* the value of the option should be taken from the originating workflow run.
1313
*/
1414
public final class ContinueAsNewOptions {

‎temporal-sdk/src/test/java/io/temporal/workflow/ContinueAsNewTest.java‎

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.junit.Test;
1717

1818
public class ContinueAsNewTest {
19+
static final SearchAttributeKey<String> CUSTOM_KEYWORD_SA =
20+
SearchAttributeKey.forKeyword("CustomKeywordField");
1921

2022
public static final int INITIAL_COUNT = 4;
2123

@@ -30,6 +32,8 @@ public void testContinueAsNew() {
3032
options =
3133
WorkflowOptions.newBuilder(options)
3234
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(10).build())
35+
.setTypedSearchAttributes(
36+
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD_SA, "foo0").build())
3337
.build();
3438
TestContinueAsNew client =
3539
testWorkflowRule.getWorkflowClient().newWorkflowStub(TestContinueAsNew.class, options);
@@ -68,8 +72,10 @@ public int execute(int count, String continueAsNewTaskQueue) {
6872
String taskQueue = Workflow.getInfo().getTaskQueue();
6973
if (count >= INITIAL_COUNT - 2) {
7074
assertEquals(10, Workflow.getInfo().getRetryOptions().getMaximumAttempts());
75+
assertEquals("foo0", Workflow.getTypedSearchAttributes().get(CUSTOM_KEYWORD_SA));
7176
} else {
7277
assertEquals(5, Workflow.getInfo().getRetryOptions().getMaximumAttempts());
78+
assertEquals("foo1", Workflow.getTypedSearchAttributes().get(CUSTOM_KEYWORD_SA));
7379
}
7480
if (count == 0) {
7581
assertEquals(continueAsNewTaskQueue, taskQueue);
@@ -78,22 +84,22 @@ public int execute(int count, String continueAsNewTaskQueue) {
7884
Map<String, Object> memo = new HashMap<>();
7985
memo.put("myKey", "MyValue");
8086
RetryOptions retryOptions = null;
87+
SearchAttributes searchAttributes = null;
8188
// don't specify ContinueAsNewOptions on the first continue-as-new to test that RetryOptions
89+
// and SearchAttributes
8290
// are copied from the previous run.
8391
if (count == INITIAL_COUNT) {
8492
TestContinueAsNew next = Workflow.newContinueAsNewStub(TestContinueAsNew.class);
8593
next.execute(count - 1, continueAsNewTaskQueue);
8694
throw new RuntimeException("unreachable");
8795
}
88-
// don't specify RetryOptions on the second continue-as-new to test that they are copied from
96+
// don't specify RetryOptions and SearchAttributes on the second continue-as-new to test that
97+
// they are copied from
8998
// the previous run.
9099
if (count < INITIAL_COUNT - 1) {
91100
retryOptions = RetryOptions.newBuilder().setMaximumAttempts(5).build();
101+
searchAttributes = SearchAttributes.newBuilder().set(CUSTOM_KEYWORD_SA, "foo1").build();
92102
}
93-
SearchAttributes searchAttributes =
94-
SearchAttributes.newBuilder()
95-
.set(SearchAttributeKey.forKeyword("CustomKeywordField"), "foo1")
96-
.build();
97103
ContinueAsNewOptions options =
98104
ContinueAsNewOptions.newBuilder()
99105
.setTaskQueue(continueAsNewTaskQueue)

‎temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,6 +1420,9 @@ private static void continueAsNewWorkflow(
14201420
if (d.hasFailure()) {
14211421
a.setFailure(d.getFailure());
14221422
}
1423+
if (d.hasSearchAttributes()) {
1424+
a.setSearchAttributes(d.getSearchAttributes());
1425+
}
14231426
a.setNewExecutionRunId(UUID.randomUUID().toString());
14241427
HistoryEvent event =
14251428
HistoryEvent.newBuilder()

‎temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1660,6 +1660,9 @@ public String continueAsNew(
16601660
if (ea.hasHeader()) {
16611661
startRequestBuilder.setHeader(ea.getHeader());
16621662
}
1663+
if (ea.hasSearchAttributes()) {
1664+
startRequestBuilder.setSearchAttributes(ea.getSearchAttributes());
1665+
}
16631666
StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
16641667
lock.lock();
16651668
Optional<Failure> lastFail =

0 commit comments

Comments
 (0)