Skip to content

Commit 5b28096

Browse files
vancexumeiliang86
authored andcommitted
Support get search attributes inside workflow (#409)
1 parent d05514c commit 5b28096

16 files changed

+313
-56
lines changed

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@
1818
package com.uber.cadence.internal.common;
1919

2020
import com.google.common.base.Defaults;
21+
import com.uber.cadence.SearchAttributes;
2122
import com.uber.cadence.TaskList;
2223
import com.uber.cadence.TaskListKind;
24+
import com.uber.cadence.converter.DataConverter;
25+
import com.uber.cadence.converter.JsonDataConverter;
2326
import com.uber.cadence.internal.worker.Shutdownable;
2427
import com.uber.cadence.workflow.WorkflowMethod;
2528
import java.lang.reflect.Method;
29+
import java.nio.ByteBuffer;
30+
import java.util.HashMap;
31+
import java.util.Map;
2632
import java.util.concurrent.ExecutorService;
2733
import java.util.concurrent.TimeUnit;
2834

@@ -126,6 +132,17 @@ public static Object getValueOrDefault(Object value, Class<?> valueClass) {
126132
return Defaults.defaultValue(valueClass);
127133
}
128134

135+
public static SearchAttributes convertMapToSearchAttributes(
136+
Map<String, Object> searchAttributes) {
137+
DataConverter converter = JsonDataConverter.getInstance();
138+
Map<String, ByteBuffer> mapOfByteBuffer = new HashMap<>();
139+
searchAttributes.forEach(
140+
(key, value) -> {
141+
mapOfByteBuffer.put(key, ByteBuffer.wrap(converter.toData(value)));
142+
});
143+
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
144+
}
145+
129146
/** Prohibit instantiation */
130147
private InternalUtils() {}
131148
}

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,6 @@ void upsertSearchAttributes(SearchAttributes searchAttributes) {
222222
decisions.upsertSearchAttributes(searchAttributes);
223223
}
224224

225-
void handleUpsertSearchAttributesEvent(HistoryEvent event) {
226-
// todo: update workflow info
227-
}
228-
229225
void handleMarkerRecorded(HistoryEvent event) {
230226
MarkerRecordedEventAttributes attributes = event.getMarkerRecordedEventAttributes();
231227
String name = attributes.getMarkerName();

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ public interface DecisionContext extends ReplayAware {
7272

7373
ChildPolicy getChildPolicy();
7474

75+
/**
76+
* Used to retrieve search attributes.
77+
*
78+
* @return SearchAttribute object which can be used by {@code
79+
* WorkflowUtils.getValueFromSearchAttributes} to retrieve concrete value.
80+
*/
81+
SearchAttributes getSearchAttributes();
82+
7583
/**
7684
* Used to dynamically schedule an activity for execution
7785
*

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.uber.cadence.PollForDecisionTaskResponse;
2525
import com.uber.cadence.SearchAttributes;
2626
import com.uber.cadence.TimerFiredEventAttributes;
27+
import com.uber.cadence.UpsertWorkflowSearchAttributesEventAttributes;
2728
import com.uber.cadence.WorkflowExecution;
2829
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2930
import com.uber.cadence.WorkflowType;
@@ -166,6 +167,11 @@ public Duration getExecutionStartToCloseTimeout() {
166167
return Duration.ofSeconds(workflowContext.getExecutionStartToCloseTimeoutSeconds());
167168
}
168169

170+
@Override
171+
public SearchAttributes getSearchAttributes() {
172+
return workflowContext.getSearchAttributes();
173+
}
174+
169175
@Override
170176
public Consumer<Exception> scheduleActivityTask(
171177
ExecuteActivityParameters parameters, BiConsumer<byte[], Exception> callback) {
@@ -373,10 +379,16 @@ void awaitTaskCompletion(Duration duration) throws InterruptedException {
373379
@Override
374380
public void upsertSearchAttributes(SearchAttributes searchAttributes) {
375381
workflowClock.upsertSearchAttributes(searchAttributes);
382+
workflowContext.mergeSearchAttributes(searchAttributes);
376383
}
377384

378385
@Override
379386
public void handleUpsertSearchAttributes(HistoryEvent event) {
380-
workflowClock.handleUpsertSearchAttributesEvent(event);
387+
UpsertWorkflowSearchAttributesEventAttributes attr =
388+
event.getUpsertWorkflowSearchAttributesEventAttributes();
389+
if (attr != null) {
390+
SearchAttributes searchAttributes = attr.getSearchAttributes();
391+
workflowContext.mergeSearchAttributes(searchAttributes);
392+
}
381393
}
382394
}

src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package com.uber.cadence.internal.replay;
1919

2020
import com.uber.cadence.*;
21+
import java.nio.ByteBuffer;
22+
import java.util.HashMap;
23+
import java.util.Map;
2124

2225
final class WorkflowContext {
2326

@@ -29,6 +32,7 @@ final class WorkflowContext {
2932
// RunId can change when reset happens. This remembers the actual runId that is used
3033
// as in this particular part of the history.
3134
private String currentRunId;
35+
private SearchAttributes searchAttributes;
3236

3337
WorkflowContext(
3438
String domain,
@@ -38,6 +42,7 @@ final class WorkflowContext {
3842
this.decisionTask = decisionTask;
3943
this.startedAttributes = startedAttributes;
4044
this.currentRunId = startedAttributes.getOriginalExecutionRunId();
45+
this.searchAttributes = startedAttributes.getSearchAttributes();
4146
}
4247

4348
WorkflowExecution getWorkflowExecution() {
@@ -131,4 +136,30 @@ void setCurrentRunId(String currentRunId) {
131136
String getCurrentRunId() {
132137
return currentRunId;
133138
}
139+
140+
SearchAttributes getSearchAttributes() {
141+
return searchAttributes;
142+
}
143+
144+
void mergeSearchAttributes(SearchAttributes searchAttributes) {
145+
if (searchAttributes == null) {
146+
return;
147+
}
148+
if (this.searchAttributes == null) {
149+
this.searchAttributes = newSearchAttributes();
150+
}
151+
Map<String, ByteBuffer> current = this.searchAttributes.getIndexedFields();
152+
searchAttributes
153+
.getIndexedFields()
154+
.forEach(
155+
(k, v) -> {
156+
current.put(k, v);
157+
});
158+
}
159+
160+
private SearchAttributes newSearchAttributes() {
161+
SearchAttributes result = new SearchAttributes();
162+
result.setIndexedFields(new HashMap<String, ByteBuffer>());
163+
return result;
164+
}
134165
}

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,11 @@ public ChildPolicy getChildPolicy() {
557557
return ChildPolicy.TERMINATE;
558558
}
559559

560+
@Override
561+
public SearchAttributes getSearchAttributes() {
562+
throw new UnsupportedOperationException("not implemented");
563+
}
564+
560565
@Override
561566
public Consumer<Exception> scheduleActivityTask(
562567
ExecuteActivityParameters parameters, BiConsumer<byte[], Exception> callback) {

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import com.uber.cadence.activity.LocalActivityOptions;
2828
import com.uber.cadence.common.RetryOptions;
2929
import com.uber.cadence.converter.DataConverter;
30-
import com.uber.cadence.converter.JsonDataConverter;
30+
import com.uber.cadence.internal.common.InternalUtils;
3131
import com.uber.cadence.internal.common.RetryParameters;
3232
import com.uber.cadence.internal.replay.ActivityTaskFailedException;
3333
import com.uber.cadence.internal.replay.ActivityTaskTimeoutException;
@@ -56,7 +56,6 @@
5656
import com.uber.cadence.workflow.WorkflowInterceptor;
5757
import com.uber.m3.tally.Scope;
5858
import java.lang.reflect.Type;
59-
import java.nio.ByteBuffer;
6059
import java.time.Duration;
6160
import java.util.HashMap;
6261
import java.util.Map;
@@ -703,17 +702,7 @@ public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
703702
throw new IllegalArgumentException("Empty search attributes");
704703
}
705704

706-
SearchAttributes attr = convertMapToSearchAttributes(searchAttributes);
705+
SearchAttributes attr = InternalUtils.convertMapToSearchAttributes(searchAttributes);
707706
context.upsertSearchAttributes(attr);
708707
}
709-
710-
protected SearchAttributes convertMapToSearchAttributes(Map<String, Object> searchAttributes) {
711-
DataConverter converter = JsonDataConverter.getInstance();
712-
Map<String, ByteBuffer> mapOfByteBuffer = new HashMap<>();
713-
searchAttributes.forEach(
714-
(key, value) -> {
715-
mapOfByteBuffer.put(key, ByteBuffer.wrap(converter.toData(value)));
716-
});
717-
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
718-
}
719708
}

src/main/java/com/uber/cadence/internal/sync/WorkflowInfoImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.internal.sync;
1919

2020
import com.uber.cadence.ChildPolicy;
21+
import com.uber.cadence.SearchAttributes;
2122
import com.uber.cadence.internal.replay.DecisionContext;
2223
import com.uber.cadence.workflow.WorkflowInfo;
2324
import java.time.Duration;
@@ -64,4 +65,9 @@ public Duration getExecutionStartToCloseTimeout() {
6465
public ChildPolicy getChildPolicy() {
6566
return context.getChildPolicy();
6667
}
68+
69+
@Override
70+
public SearchAttributes getSearchAttributes() {
71+
return context.getSearchAttributes();
72+
}
6773
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ public byte[] queryWorkflowExecution(
149149
private byte[] queryWorkflowExecution(
150150
String queryType, byte[] args, WorkflowExecutionHistory history, byte[] nextPageToken)
151151
throws Exception {
152-
PollForDecisionTaskResponse task;
153-
task = new PollForDecisionTaskResponse();
152+
PollForDecisionTaskResponse task = new PollForDecisionTaskResponse();
154153
task.setWorkflowExecution(history.getWorkflowExecution());
155154
task.setStartedEventId(Long.MAX_VALUE);
156155
task.setPreviousStartedEventId(Long.MAX_VALUE);
@@ -164,7 +163,7 @@ private byte[] queryWorkflowExecution(
164163
startedEvent.getWorkflowExecutionStartedEventAttributes();
165164
if (started == null) {
166165
throw new IllegalStateException(
167-
"First event of the history is not WorkflowExecutionStarted: " + startedEvent);
166+
"First event of the history is not WorkflowExecutionStarted: " + startedEvent);
168167
}
169168
WorkflowType workflowType = started.getWorkflowType();
170169
task.setWorkflowType(workflowType);

src/main/java/com/uber/cadence/workflow/WorkflowInfo.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.workflow;
1919

2020
import com.uber.cadence.ChildPolicy;
21+
import com.uber.cadence.SearchAttributes;
2122
import java.time.Duration;
2223

2324
public interface WorkflowInfo {
@@ -35,4 +36,6 @@ public interface WorkflowInfo {
3536
Duration getExecutionStartToCloseTimeout();
3637

3738
ChildPolicy getChildPolicy();
39+
40+
SearchAttributes getSearchAttributes();
3841
}

0 commit comments

Comments
 (0)