Skip to content

Commit d05514c

Browse files
authored
Add Upsert Search Attributes (#406)
Support upsert search attributes inside workflow.
1 parent af394dc commit d05514c

20 files changed

+301
-8
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ target
77
.gradle
88
/build
99
/out
10+
dummy

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -972,7 +972,8 @@ public static boolean isDecisionEvent(HistoryEvent event) {
972972
|| eventType == EventType.CancelTimerFailed
973973
|| eventType == EventType.RequestCancelExternalWorkflowExecutionInitiated
974974
|| eventType == EventType.MarkerRecorded
975-
|| eventType == EventType.SignalExternalWorkflowExecutionInitiated));
975+
|| eventType == EventType.SignalExternalWorkflowExecutionInitiated
976+
|| eventType == EventType.UpsertWorkflowSearchAttributes));
976977
return result;
977978
}
978979

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.uber.cadence.ActivityType;
2222
import com.uber.cadence.HistoryEvent;
2323
import com.uber.cadence.MarkerRecordedEventAttributes;
24+
import com.uber.cadence.SearchAttributes;
2425
import com.uber.cadence.StartTimerDecisionAttributes;
2526
import com.uber.cadence.TimerCanceledEventAttributes;
2627
import com.uber.cadence.TimerFiredEventAttributes;
@@ -217,6 +218,14 @@ Optional<byte[]> mutableSideEffect(
217218
return mutableSideEffectHandler.handle(id, converter, func);
218219
}
219220

221+
void upsertSearchAttributes(SearchAttributes searchAttributes) {
222+
decisions.upsertSearchAttributes(searchAttributes);
223+
}
224+
225+
void handleUpsertSearchAttributesEvent(HistoryEvent event) {
226+
// todo: update workflow info
227+
}
228+
220229
void handleMarkerRecorded(HistoryEvent event) {
221230
MarkerRecordedEventAttributes attributes = event.getMarkerRecordedEventAttributes();
222231
String name = attributes.getMarkerName();

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.internal.replay;
1919

2020
import com.uber.cadence.ChildPolicy;
21+
import com.uber.cadence.SearchAttributes;
2122
import com.uber.cadence.WorkflowExecution;
2223
import com.uber.cadence.WorkflowType;
2324
import com.uber.cadence.converter.DataConverter;
@@ -179,4 +180,6 @@ Optional<byte[]> mutableSideEffect(
179180

180181
/** @return replay safe UUID */
181182
UUID randomUUID();
183+
184+
void upsertSearchAttributes(SearchAttributes searchAttributes);
182185
}

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.DecisionTaskFailedEventAttributes;
2323
import com.uber.cadence.HistoryEvent;
2424
import com.uber.cadence.PollForDecisionTaskResponse;
25+
import com.uber.cadence.SearchAttributes;
2526
import com.uber.cadence.TimerFiredEventAttributes;
2627
import com.uber.cadence.WorkflowExecution;
2728
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
@@ -368,4 +369,14 @@ int numPendingLaTasks() {
368369
void awaitTaskCompletion(Duration duration) throws InterruptedException {
369370
workflowClock.awaitTaskCompletion(duration);
370371
}
372+
373+
@Override
374+
public void upsertSearchAttributes(SearchAttributes searchAttributes) {
375+
workflowClock.upsertSearchAttributes(searchAttributes);
376+
}
377+
378+
@Override
379+
public void handleUpsertSearchAttributes(HistoryEvent event) {
380+
workflowClock.handleUpsertSearchAttributesEvent(event);
381+
}
371382
}

src/main/java/com/uber/cadence/internal/replay/DecisionTarget.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@ enum DecisionTarget {
2424
SIGNAL_EXTERNAL_WORKFLOW,
2525
TIMER,
2626
MARKER,
27+
UPSERT_SEARCH_ATTRIBUTES,
2728
SELF
2829
}

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.uber.cadence.RequestCancelExternalWorkflowExecutionDecisionAttributes;
4545
import com.uber.cadence.RequestCancelExternalWorkflowExecutionFailedEventAttributes;
4646
import com.uber.cadence.ScheduleActivityTaskDecisionAttributes;
47+
import com.uber.cadence.SearchAttributes;
4748
import com.uber.cadence.SignalExternalWorkflowExecutionDecisionAttributes;
4849
import com.uber.cadence.StartChildWorkflowExecutionDecisionAttributes;
4950
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
@@ -52,6 +53,7 @@
5253
import com.uber.cadence.TaskList;
5354
import com.uber.cadence.TimerCanceledEventAttributes;
5455
import com.uber.cadence.TimerFiredEventAttributes;
56+
import com.uber.cadence.UpsertWorkflowSearchAttributesDecisionAttributes;
5557
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
5658
import com.uber.cadence.WorkflowType;
5759
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
@@ -545,6 +547,20 @@ void recordMarker(String markerName, Header header, byte[] details) {
545547
addDecision(decisionId, new MarkerDecisionStateMachine(decisionId, decision));
546548
}
547549

550+
void upsertSearchAttributes(SearchAttributes searchAttributes) {
551+
UpsertWorkflowSearchAttributesDecisionAttributes decisionAttr =
552+
new UpsertWorkflowSearchAttributesDecisionAttributes()
553+
.setSearchAttributes(searchAttributes);
554+
Decision decision =
555+
new Decision()
556+
.setDecisionType(DecisionType.UpsertWorkflowSearchAttributes)
557+
.setUpsertWorkflowSearchAttributesDecisionAttributes(decisionAttr);
558+
long nextDecisionEventId = getNextDecisionEventId();
559+
DecisionId decisionId =
560+
new DecisionId(DecisionTarget.UPSERT_SEARCH_ATTRIBUTES, nextDecisionEventId);
561+
addDecision(decisionId, new UpsertSearchAttributesDecisionStateMachine(decisionId, decision));
562+
}
563+
548564
List<Decision> getDecisions() {
549565
List<Decision> result = new ArrayList<>(MAXIMUM_DECISIONS_PER_COMPLETION + 1);
550566
for (DecisionStateMachine decisionStateMachine : decisions.values()) {

src/main/java/com/uber/cadence/internal/replay/HistoryEventHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,6 @@ interface HistoryEventHandler {
5353
void handleExternalWorkflowExecutionSignaled(HistoryEvent event);
5454

5555
void handleMarkerRecorded(HistoryEvent event);
56+
57+
void handleUpsertSearchAttributes(HistoryEvent event);
5658
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ private void processEvent(HistoryEvent event) {
229229
context.handleDecisionTaskFailed(event);
230230
break;
231231
case UpsertWorkflowSearchAttributes:
232-
// TODO: https://github.com/uber/cadence-java-client/issues/360
232+
context.handleUpsertSearchAttributes(event);
233233
break;
234234
}
235235
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.replay;
19+
20+
import com.uber.cadence.Decision;
21+
22+
public class UpsertSearchAttributesDecisionStateMachine extends DecisionStateMachineBase {
23+
24+
private final Decision decision;
25+
26+
UpsertSearchAttributesDecisionStateMachine(DecisionId id, Decision decision) {
27+
super(id);
28+
this.decision = decision;
29+
}
30+
31+
@Override
32+
public Decision getDecision() {
33+
if (state == DecisionState.CREATED) {
34+
return decision;
35+
}
36+
return null;
37+
}
38+
}

0 commit comments

Comments
 (0)