Skip to content

Commit 6fd74c6

Browse files
Add consistent query support (#530)
1 parent 93424e2 commit 6fd74c6

File tree

14 files changed

+223
-23
lines changed

14 files changed

+223
-23
lines changed

docker/buildkite/docker-compose.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ services:
2525
- "8126:8126"
2626

2727
cadence:
28-
image: ubercadence/server:latestRelease-auto-setup
28+
image: ubercadence/server:master-auto-setup
2929
deploy:
3030
resources:
3131
limits:
@@ -122,4 +122,4 @@ services:
122122
- COVERALLS_REPO_TOKEN
123123
volumes:
124124
- "../../:/cadence-java-client"
125-
- /usr/bin/buildkite-agent:/usr/bin/buildkite-agent
125+
- /usr/bin/buildkite-agent:/usr/bin/buildkite-agent

src/main/idls

Submodule idls updated from b1a9b9e to 7d2d225

src/main/java/com/uber/cadence/client/WorkflowStub.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.client;
1919

20+
import com.uber.cadence.QueryConsistencyLevel;
2021
import com.uber.cadence.QueryRejectCondition;
2122
import com.uber.cadence.WorkflowExecution;
2223
import java.lang.reflect.InvocationHandler;
@@ -170,6 +171,14 @@ <R> R query(
170171
QueryRejectCondition queryRejectCondition,
171172
Object... args);
172173

174+
<R> R query(
175+
String queryType,
176+
Class<R> resultClass,
177+
Type resultType,
178+
QueryRejectCondition queryRejectCondition,
179+
QueryConsistencyLevel queryConsistencyLevel,
180+
Object... args);
181+
173182
/** Request cancellation. */
174183
void cancel();
175184

src/main/java/com/uber/cadence/internal/Version.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class Version {
4343
* support. This can be used for client capibility check, on Cadence server, for backward
4444
* compatibility Format: MAJOR.MINOR.PATCH
4545
*/
46-
public static final String FEATURE_VERSION = "1.4.0";
46+
public static final String FEATURE_VERSION = "1.5.0";
4747

4848
static {
4949
// Load version from version.properties generated by Gradle into build/resources/main directory.

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParamete
418418
query.setQueryType(queryParameters.getQueryType());
419419
request.setQuery(query);
420420
request.setQueryRejectCondition(queryParameters.getQueryRejectCondition());
421+
request.setQueryConsistencyLevel(queryParameters.getQueryConsistencyLevel());
421422
try {
422423
QueryWorkflowResponse response =
423424
RpcRetryer.retryWithResult(

src/main/java/com/uber/cadence/internal/replay/Decider.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import com.uber.cadence.Decision;
2121
import com.uber.cadence.PollForDecisionTaskResponse;
2222
import com.uber.cadence.WorkflowQuery;
23+
import com.uber.cadence.WorkflowQueryResult;
2324
import java.util.List;
25+
import java.util.Map;
2426

2527
public interface Decider {
2628

@@ -32,17 +34,26 @@ public interface Decider {
3234

3335
class DecisionResult {
3436
private final List<Decision> decisions;
37+
private final Map<String, WorkflowQueryResult> queryResults;
3538
private final boolean forceCreateNewDecisionTask;
3639

37-
public DecisionResult(List<Decision> decisions, boolean forceCreateNewDecisionTask) {
40+
public DecisionResult(
41+
List<Decision> decisions,
42+
Map<String, WorkflowQueryResult> queryResults,
43+
boolean forceCreateNewDecisionTask) {
3844
this.decisions = decisions;
45+
this.queryResults = queryResults;
3946
this.forceCreateNewDecisionTask = forceCreateNewDecisionTask;
4047
}
4148

4249
public List<Decision> getDecisions() {
4350
return decisions;
4451
}
4552

53+
public Map<String, WorkflowQueryResult> getQueryResults() {
54+
return queryResults;
55+
}
56+
4657
public boolean getForceCreateNewDecisionTask() {
4758
return forceCreateNewDecisionTask;
4859
}

src/main/java/com/uber/cadence/internal/replay/QueryWorkflowParameters.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import com.uber.cadence.QueryConsistencyLevel;
2021
import com.uber.cadence.QueryRejectCondition;
2122
import java.nio.charset.StandardCharsets;
2223

@@ -32,6 +33,8 @@ public class QueryWorkflowParameters implements Cloneable {
3233

3334
private QueryRejectCondition queryRejectCondition;
3435

36+
private QueryConsistencyLevel queryConsistencyLevel;
37+
3538
public QueryWorkflowParameters() {}
3639

3740
public byte[] getInput() {
@@ -100,6 +103,20 @@ public QueryWorkflowParameters withQueryRejectCondition(
100103
return this;
101104
}
102105

106+
public QueryConsistencyLevel getQueryConsistencyLevel() {
107+
return queryConsistencyLevel;
108+
}
109+
110+
public void setQueryConsistencyLevel(QueryConsistencyLevel queryConsistencyLevel) {
111+
this.queryConsistencyLevel = queryConsistencyLevel;
112+
}
113+
114+
public QueryWorkflowParameters withQueryConsistencyLevel(
115+
QueryConsistencyLevel queryConsistencyLevel) {
116+
this.queryConsistencyLevel = queryConsistencyLevel;
117+
return this;
118+
}
119+
103120
@Override
104121
public String toString() {
105122
StringBuilder sb = new StringBuilder();
@@ -109,6 +126,7 @@ public String toString() {
109126
sb.append("WorkflowId: " + workflowId + ", ");
110127
sb.append("RunId: " + runId + ", ");
111128
sb.append("QueryRejectCondition: " + queryRejectCondition + ", ");
129+
sb.append("queryConsistencyLevel: " + queryConsistencyLevel + ", ");
112130
sb.append("}");
113131
return sb.toString();
114132
}
@@ -120,6 +138,7 @@ public QueryWorkflowParameters copy() {
120138
result.setQueryType(queryType);
121139
result.setWorkflowId(workflowId);
122140
result.setQueryRejectCondition(queryRejectCondition);
141+
result.setQueryConsistencyLevel(queryConsistencyLevel);
123142
return result;
124143
}
125144
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525
import com.uber.cadence.History;
2626
import com.uber.cadence.HistoryEvent;
2727
import com.uber.cadence.PollForDecisionTaskResponse;
28+
import com.uber.cadence.QueryResultType;
2829
import com.uber.cadence.TimerFiredEventAttributes;
2930
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
3031
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
3132
import com.uber.cadence.WorkflowQuery;
33+
import com.uber.cadence.WorkflowQueryResult;
3234
import com.uber.cadence.WorkflowType;
3335
import com.uber.cadence.common.RetryOptions;
3436
import com.uber.cadence.internal.common.OptionsUtils;
@@ -51,6 +53,7 @@
5153
import java.util.Arrays;
5254
import java.util.Iterator;
5355
import java.util.List;
56+
import java.util.Map;
5457
import java.util.Objects;
5558
import java.util.concurrent.CancellationException;
5659
import java.util.concurrent.TimeUnit;
@@ -59,6 +62,7 @@
5962
import java.util.concurrent.locks.ReentrantLock;
6063
import java.util.function.BiFunction;
6164
import java.util.function.Consumer;
65+
import java.util.stream.Collectors;
6266
import org.apache.thrift.TException;
6367
import org.slf4j.Logger;
6468
import org.slf4j.LoggerFactory;
@@ -387,13 +391,40 @@ private void handleWorkflowExecutionSignaled(HistoryEvent event) {
387391
public DecisionResult decide(PollForDecisionTaskResponse decisionTask) throws Throwable {
388392
lock.lock();
389393
try {
390-
boolean forceCreateNewDecisionTask = decideImpl(decisionTask, null);
391-
return new DecisionResult(decisionsHelper.getDecisions(), forceCreateNewDecisionTask);
394+
AtomicReference<Map<String, WorkflowQueryResult>> queryResults = new AtomicReference<>();
395+
boolean forceCreateNewDecisionTask =
396+
decideImpl(
397+
decisionTask, () -> queryResults.set(getQueryResults(decisionTask.getQueries())));
398+
return new DecisionResult(
399+
decisionsHelper.getDecisions(), queryResults.get(), forceCreateNewDecisionTask);
392400
} finally {
393401
lock.unlock();
394402
}
395403
}
396404

405+
private Map<String, WorkflowQueryResult> getQueryResults(Map<String, WorkflowQuery> queries) {
406+
if (queries == null) {
407+
return null;
408+
}
409+
410+
return queries
411+
.entrySet()
412+
.stream()
413+
.collect(Collectors.toMap(q -> q.getKey(), q -> queryWorkflow(q.getValue())));
414+
}
415+
416+
private WorkflowQueryResult queryWorkflow(WorkflowQuery query) {
417+
try {
418+
return new WorkflowQueryResult()
419+
.setResultType(QueryResultType.ANSWERED)
420+
.setAnswer(workflow.query(query));
421+
} catch (Throwable e) {
422+
return new WorkflowQueryResult()
423+
.setResultType(QueryResultType.FAILED)
424+
.setErrorMessage(e.getMessage());
425+
}
426+
}
427+
397428
// Returns boolean to indicate whether we need to force create new decision task for local
398429
// activity heartbeating.
399430
private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc query)

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ private Result createCompletedRequest(
245245
new RespondDecisionTaskCompletedRequest();
246246
completedRequest.setTaskToken(decisionTask.getTaskToken());
247247
completedRequest.setDecisions(result.getDecisions());
248+
completedRequest.setQueryResults(result.getQueryResults());
248249
completedRequest.setForceCreateNewDecisionTask(result.getForceCreateNewDecisionTask());
249250

250251
if (stickyTaskListName != null) {

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import com.uber.cadence.PollForActivityTaskResponse;
5151
import com.uber.cadence.PollForDecisionTaskRequest;
5252
import com.uber.cadence.PollForDecisionTaskResponse;
53+
import com.uber.cadence.QueryConsistencyLevel;
5354
import com.uber.cadence.QueryFailedError;
5455
import com.uber.cadence.QueryRejectCondition;
5556
import com.uber.cadence.QueryWorkflowRequest;
@@ -990,6 +991,18 @@ public <R> R query(
990991
return next.query(queryType, resultClass, resultType, queryRejectCondition, args);
991992
}
992993

994+
@Override
995+
public <R> R query(
996+
String queryType,
997+
Class<R> resultClass,
998+
Type resultType,
999+
QueryRejectCondition queryRejectCondition,
1000+
QueryConsistencyLevel queryConsistencyLevel,
1001+
Object... args) {
1002+
return next.query(
1003+
queryType, resultClass, resultType, queryRejectCondition, queryConsistencyLevel, args);
1004+
}
1005+
9931006
@Override
9941007
public void cancel() {
9951008
next.cancel();

0 commit comments

Comments
 (0)