Skip to content

Commit a885812

Browse files
Ad support for updating schedule search attributes (#2168)
Support update SA on a schedule
1 parent e2d2608 commit a885812

File tree

4 files changed

+125
-5
lines changed

4 files changed

+125
-5
lines changed

temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleUpdate.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,33 @@
2020

2121
package io.temporal.client.schedules;
2222

23+
import io.temporal.common.SearchAttributes;
24+
2325
/** An update returned from a schedule updater. */
2426
public final class ScheduleUpdate {
2527
private final Schedule schedule;
28+
private final SearchAttributes typedSearchAttributes;
2629

30+
/**
31+
* Create a new ScheduleUpdate.
32+
*
33+
* @param schedule schedule to replace the existing schedule with
34+
*/
2735
public ScheduleUpdate(Schedule schedule) {
2836
this.schedule = schedule;
37+
this.typedSearchAttributes = null;
38+
}
39+
40+
/**
41+
* Create a new ScheduleUpdate.
42+
*
43+
* @param schedule schedule to replace the existing schedule with
44+
* @param typedSearchAttributes search attributes to replace the existing search attributes with.
45+
* Returning null will not update the search attributes.
46+
*/
47+
public ScheduleUpdate(Schedule schedule, SearchAttributes typedSearchAttributes) {
48+
this.schedule = schedule;
49+
this.typedSearchAttributes = typedSearchAttributes;
2950
}
3051

3152
/**
@@ -36,4 +57,13 @@ public ScheduleUpdate(Schedule schedule) {
3657
public Schedule getSchedule() {
3758
return schedule;
3859
}
60+
61+
/**
62+
* Get the search attributes to update.
63+
*
64+
* @return search attributes to update
65+
*/
66+
public SearchAttributes getTypedSearchAttributes() {
67+
return typedSearchAttributes;
68+
}
3969
}

temporal-sdk/src/main/java/io/temporal/internal/client/RootScheduleClientInvoker.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.grpc.Status;
2727
import io.grpc.StatusRuntimeException;
2828
import io.temporal.api.common.v1.Memo;
29+
import io.temporal.api.common.v1.SearchAttributes;
2930
import io.temporal.api.schedule.v1.*;
3031
import io.temporal.api.workflowservice.v1.*;
3132
import io.temporal.client.ListScheduleListDescriptionIterator;
@@ -277,16 +278,23 @@ public void updateSchedule(UpdateScheduleInput input) {
277278
return;
278279
}
279280

280-
UpdateScheduleRequest request =
281+
UpdateScheduleRequest.Builder request =
281282
UpdateScheduleRequest.newBuilder()
282283
.setNamespace(clientOptions.getNamespace())
283284
.setIdentity(clientOptions.getIdentity())
284285
.setScheduleId(input.getDescription().getId())
285286
.setRequestId(UUID.randomUUID().toString())
286-
.setSchedule(scheduleRequestHeader.scheduleToProto(schedule.getSchedule()))
287-
.build();
287+
.setSchedule(scheduleRequestHeader.scheduleToProto(schedule.getSchedule()));
288+
if (schedule.getTypedSearchAttributes() != null) {
289+
SearchAttributes encodedSa =
290+
SearchAttributesUtil.encodeTyped(schedule.getTypedSearchAttributes());
291+
if (encodedSa == null) {
292+
encodedSa = SearchAttributes.getDefaultInstance();
293+
}
294+
request.setSearchAttributes(encodedSa);
295+
}
288296
try {
289-
genericClient.updateSchedule(request);
297+
genericClient.updateSchedule(request.build());
290298
} catch (Exception e) {
291299
throw new ScheduleException(e);
292300
}

temporal-sdk/src/test/java/io/temporal/client/schedules/ScheduleTest.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.temporal.common.SearchAttributes;
3030
import io.temporal.common.converter.EncodedValues;
3131
import io.temporal.common.interceptors.ScheduleClientInterceptor;
32+
import io.temporal.testUtils.Eventually;
3233
import io.temporal.testing.internal.SDKTestWorkflowRule;
3334
import io.temporal.workflow.shared.TestWorkflows;
3435
import java.time.Duration;
@@ -415,9 +416,12 @@ public void describeSchedules() {
415416
public void updateSchedules() {
416417
ScheduleClient client = createScheduleClient();
417418
// Create the schedule
419+
String keywordSAValue = "keyword";
418420
ScheduleOptions options =
419421
ScheduleOptions.newBuilder()
420422
.setMemo(Collections.singletonMap("memokey2", "memoval2"))
423+
.setTypedSearchAttributes(
424+
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD_SA, keywordSAValue).build())
421425
.build();
422426
String scheduleId = UUID.randomUUID().toString();
423427
Schedule schedule = createTestSchedule().build();
@@ -469,7 +473,7 @@ public void updateSchedules() {
469473
.setAction(input.getDescription().getSchedule().getAction())
470474
.setSpec(ScheduleSpec.newBuilder().build());
471475
builder.setState(ScheduleState.newBuilder().setPaused(true).build());
472-
return new ScheduleUpdate(builder.build());
476+
return new ScheduleUpdate(builder.build(), null);
473477
});
474478
description = handle.describe();
475479
//
@@ -481,6 +485,32 @@ public void updateSchedules() {
481485
//
482486
Assert.assertNotEquals(expectedUpdateTime, description.getInfo().getLastUpdatedAt());
483487
Assert.assertEquals(true, description.getSchedule().getState().isPaused());
488+
Assert.assertEquals(1, description.getTypedSearchAttributes().size());
489+
Assert.assertEquals(
490+
keywordSAValue, description.getTypedSearchAttributes().get(CUSTOM_KEYWORD_SA));
491+
// Update the schedule search attribute by clearing them
492+
handle.update(
493+
(ScheduleUpdateInput input) ->
494+
new ScheduleUpdate(input.getDescription().getSchedule(), SearchAttributes.EMPTY));
495+
Eventually.assertEventually(
496+
Duration.ofSeconds(1),
497+
() -> {
498+
ScheduleDescription desc = handle.describe();
499+
Assert.assertEquals(0, desc.getTypedSearchAttributes().size());
500+
});
501+
// Update the schedule search attribute by adding a new search attribute
502+
handle.update(
503+
(ScheduleUpdateInput input) ->
504+
new ScheduleUpdate(
505+
input.getDescription().getSchedule(),
506+
SearchAttributes.newBuilder().set(CUSTOM_KEYWORD_SA, "newkeyword").build()));
507+
Eventually.assertEventually(
508+
Duration.ofSeconds(1),
509+
() -> {
510+
ScheduleDescription desc = handle.describe();
511+
Assert.assertEquals(1, desc.getTypedSearchAttributes().size());
512+
Assert.assertEquals("newkeyword", desc.getTypedSearchAttributes().get(CUSTOM_KEYWORD_SA));
513+
});
484514
// Cleanup schedule
485515
handle.delete();
486516
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.testUtils;
22+
23+
import java.time.Duration;
24+
import java.time.Instant;
25+
26+
public class Eventually {
27+
public static void assertEventually(Duration timeout, Runnable command) {
28+
final Instant start = Instant.now();
29+
final Instant deadline = start.plus(timeout);
30+
31+
boolean failed;
32+
do {
33+
try {
34+
command.run();
35+
failed = false;
36+
} catch (Throwable t) {
37+
failed = true;
38+
if (Instant.now().isBefore(deadline)) {
39+
// Try again after a short nap
40+
try {
41+
Thread.sleep(100);
42+
} catch (InterruptedException e) {
43+
Thread.currentThread().interrupt();
44+
throw new RuntimeException(e);
45+
}
46+
} else {
47+
throw t;
48+
}
49+
}
50+
} while (failed);
51+
}
52+
}

0 commit comments

Comments
 (0)