Skip to content

Commit 79d4c44

Browse files
andrewjdawson2016mfateev
authored andcommitted
Support filtering query based on close status (#394)
1 parent d134e2f commit 79d4c44

File tree

10 files changed

+176
-5
lines changed

10 files changed

+176
-5
lines changed

src/main/java/com/uber/cadence/client/WorkflowStub.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package com.uber.cadence.client;
1919

20+
import com.uber.cadence.QueryRejectCondition;
2021
import com.uber.cadence.WorkflowExecution;
22+
import com.uber.cadence.internal.common.QueryResponse;
2123
import java.lang.reflect.Type;
2224
import java.util.Optional;
2325
import java.util.concurrent.CompletableFuture;
@@ -124,6 +126,19 @@ <R> CompletableFuture<R> getResultAsync(
124126

125127
<R> R query(String queryType, Class<R> resultClass, Type resultType, Object... args);
126128

129+
<R> QueryResponse<R> query(
130+
String queryType,
131+
Class<R> resultClass,
132+
QueryRejectCondition queryRejectCondition,
133+
Object... args);
134+
135+
<R> QueryResponse<R> query(
136+
String queryType,
137+
Class<R> resultClass,
138+
Type resultType,
139+
QueryRejectCondition queryRejectCondition,
140+
Object... args);
141+
127142
/** Request cancellation. */
128143
void cancel();
129144

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.common;
19+
20+
import com.uber.cadence.QueryRejected;
21+
22+
public final class QueryResponse<T> {
23+
private final QueryRejected queryRejected;
24+
private final T result;
25+
26+
public QueryResponse(QueryRejected queryRejected, T result) {
27+
this.queryRejected = queryRejected;
28+
this.result = result;
29+
}
30+
31+
/**
32+
* Returns the value of the QueryRejected property for this object.
33+
*
34+
* @return The value of the QueryRejected property for this object.
35+
*/
36+
public QueryRejected getQueryRejected() {
37+
return queryRejected;
38+
}
39+
40+
/**
41+
* Returns the value of the Result property for this object.
42+
*
43+
* @return The value of the Result property for this object.
44+
*/
45+
public T getResult() {
46+
return result;
47+
}
48+
}

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternal.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.external;
1919

20+
import com.uber.cadence.QueryWorkflowResponse;
2021
import com.uber.cadence.WorkflowExecution;
2122
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
2223
import com.uber.cadence.internal.common.SignalWithStartWorkflowExecutionParameters;
@@ -38,7 +39,7 @@ WorkflowExecution signalWithStartWorkflowExecution(
3839

3940
void requestCancelWorkflowExecution(WorkflowExecution execution);
4041

41-
byte[] queryWorkflow(QueryWorkflowParameters queryParameters);
42+
QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParameters);
4243

4344
void terminateWorkflowExecution(TerminateWorkflowExecutionParameters terminateParameters);
4445

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public void requestCancelWorkflowExecution(WorkflowExecution execution) {
268268
}
269269

270270
@Override
271-
public byte[] queryWorkflow(QueryWorkflowParameters queryParameters) {
271+
public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParameters) {
272272
QueryWorkflowRequest request = new QueryWorkflowRequest();
273273
request.setDomain(domain);
274274
WorkflowExecution execution = new WorkflowExecution();
@@ -278,12 +278,13 @@ public byte[] queryWorkflow(QueryWorkflowParameters queryParameters) {
278278
query.setQueryArgs(queryParameters.getInput());
279279
query.setQueryType(queryParameters.getQueryType());
280280
request.setQuery(query);
281+
request.setQueryRejectCondition(queryParameters.getQueryRejectCondition());
281282
try {
282283
QueryWorkflowResponse response =
283284
Retryer.retryWithResult(
284285
Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS,
285286
() -> service.QueryWorkflow(request));
286-
return response.getQueryResult();
287+
return response;
287288
} catch (TException e) {
288289
throw CheckedExceptionWrapper.wrap(e);
289290
}

src/main/java/com/uber/cadence/internal/replay/QueryWorkflowParameters.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import com.uber.cadence.QueryRejectCondition;
2021
import java.nio.charset.StandardCharsets;
2122

2223
public class QueryWorkflowParameters implements Cloneable {
@@ -29,6 +30,8 @@ public class QueryWorkflowParameters implements Cloneable {
2930

3031
private String workflowId;
3132

33+
private QueryRejectCondition queryRejectCondition;
34+
3235
public QueryWorkflowParameters() {}
3336

3437
public byte[] getInput() {
@@ -83,6 +86,20 @@ public QueryWorkflowParameters withWorkflowId(String workflowId) {
8386
return this;
8487
}
8588

89+
public QueryRejectCondition getQueryRejectCondition() {
90+
return queryRejectCondition;
91+
}
92+
93+
public void setQueryRejectCondition(QueryRejectCondition queryRejectCondition) {
94+
this.queryRejectCondition = queryRejectCondition;
95+
}
96+
97+
public QueryWorkflowParameters withQueryRejectCondition(
98+
QueryRejectCondition queryRejectCondition) {
99+
this.queryRejectCondition = queryRejectCondition;
100+
return this;
101+
}
102+
86103
@Override
87104
public String toString() {
88105
StringBuilder sb = new StringBuilder();
@@ -91,6 +108,7 @@ public String toString() {
91108
sb.append("Input: " + new String(input, 0, 512, StandardCharsets.UTF_8) + ", ");
92109
sb.append("WorkflowId: " + workflowId + ", ");
93110
sb.append("RunId: " + runId + ", ");
111+
sb.append("QueryRejectCondition: " + queryRejectCondition + ", ");
94112
sb.append("}");
95113
return sb.toString();
96114
}
@@ -101,6 +119,7 @@ public QueryWorkflowParameters copy() {
101119
result.setRunId(runId);
102120
result.setQueryType(queryType);
103121
result.setWorkflowId(workflowId);
122+
result.setQueryRejectCondition(queryRejectCondition);
104123
return result;
105124
}
106125
}

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import com.uber.cadence.PollForDecisionTaskRequest;
5151
import com.uber.cadence.PollForDecisionTaskResponse;
5252
import com.uber.cadence.QueryFailedError;
53+
import com.uber.cadence.QueryRejectCondition;
5354
import com.uber.cadence.QueryWorkflowRequest;
5455
import com.uber.cadence.QueryWorkflowResponse;
5556
import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest;
@@ -87,6 +88,7 @@
8788
import com.uber.cadence.client.WorkflowClientOptions;
8889
import com.uber.cadence.client.WorkflowOptions;
8990
import com.uber.cadence.client.WorkflowStub;
91+
import com.uber.cadence.internal.common.QueryResponse;
9092
import com.uber.cadence.internal.testservice.TestWorkflowService;
9193
import com.uber.cadence.serviceclient.IWorkflowService;
9294
import com.uber.cadence.testing.TestEnvironmentOptions;
@@ -901,6 +903,25 @@ public <R> R query(String queryType, Class<R> resultClass, Type resultType, Obje
901903
return next.query(queryType, resultClass, resultType, args);
902904
}
903905

906+
@Override
907+
public <R> QueryResponse<R> query(
908+
String queryType,
909+
Class<R> resultClass,
910+
QueryRejectCondition queryRejectCondition,
911+
Object... args) {
912+
return next.query(queryType, resultClass, queryRejectCondition, args);
913+
}
914+
915+
@Override
916+
public <R> QueryResponse<R> query(
917+
String queryType,
918+
Class<R> resultClass,
919+
Type resultType,
920+
QueryRejectCondition queryRejectCondition,
921+
Object... args) {
922+
return next.query(queryType, resultClass, resultType, queryRejectCondition, args);
923+
}
924+
904925
@Override
905926
public void cancel() {
906927
next.cancel();

src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.uber.cadence.EntityNotExistsError;
2121
import com.uber.cadence.InternalServiceError;
2222
import com.uber.cadence.QueryFailedError;
23+
import com.uber.cadence.QueryRejectCondition;
24+
import com.uber.cadence.QueryWorkflowResponse;
2325
import com.uber.cadence.WorkflowExecution;
2426
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
2527
import com.uber.cadence.WorkflowType;
@@ -35,6 +37,7 @@
3537
import com.uber.cadence.converter.DataConverterException;
3638
import com.uber.cadence.converter.JsonDataConverter;
3739
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
40+
import com.uber.cadence.internal.common.QueryResponse;
3841
import com.uber.cadence.internal.common.SignalWithStartWorkflowExecutionParameters;
3942
import com.uber.cadence.internal.common.StartWorkflowExecutionParameters;
4043
import com.uber.cadence.internal.common.WorkflowExecutionFailedException;
@@ -343,14 +346,40 @@ public <R> R query(String queryType, Class<R> resultClass, Object... args) {
343346

344347
@Override
345348
public <R> R query(String queryType, Class<R> resultClass, Type resultType, Object... args) {
349+
return query(queryType, resultClass, resultType, null, args).getResult();
350+
}
351+
352+
@Override
353+
public <R> QueryResponse<R> query(
354+
String queryType,
355+
Class<R> resultClass,
356+
QueryRejectCondition queryRejectCondition,
357+
Object... args) {
358+
return query(queryType, resultClass, resultClass, queryRejectCondition, args);
359+
}
360+
361+
@Override
362+
public <R> QueryResponse<R> query(
363+
String queryType,
364+
Class<R> resultClass,
365+
Type resultType,
366+
QueryRejectCondition queryRejectCondition,
367+
Object... args) {
346368
checkStarted();
347369
QueryWorkflowParameters p = new QueryWorkflowParameters();
348370
p.setInput(dataConverter.toData(args));
349371
p.setQueryType(queryType);
350372
p.setWorkflowId(execution.get().getWorkflowId());
373+
p.setQueryRejectCondition(queryRejectCondition);
351374
try {
352-
byte[] result = genericClient.queryWorkflow(p);
353-
return dataConverter.fromData(result, resultClass, resultType);
375+
QueryWorkflowResponse result = genericClient.queryWorkflow(p);
376+
if (result.queryRejected == null) {
377+
return new QueryResponse<>(
378+
null, dataConverter.fromData(result.getQueryResult(), resultClass, resultType));
379+
} else {
380+
return new QueryResponse<>(result.getQueryRejected(), null);
381+
}
382+
354383
} catch (RuntimeException e) {
355384
Exception unwrapped = CheckedExceptionWrapper.unwrap(e);
356385
if (unwrapped instanceof EntityNotExistsError) {

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,20 @@ public void requestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest
13911391
public QueryWorkflowResponse query(QueryWorkflowRequest queryRequest) throws TException {
13921392
QueryId queryId = new QueryId(executionId);
13931393

1394+
Optional<WorkflowExecutionCloseStatus> optCloseStatus = getCloseStatus();
1395+
if (optCloseStatus.isPresent() && queryRequest.getQueryRejectCondition() != null) {
1396+
WorkflowExecutionCloseStatus closeStatus = optCloseStatus.get();
1397+
boolean rejectNotOpen =
1398+
queryRequest.getQueryRejectCondition() == QueryRejectCondition.NOT_OPEN;
1399+
boolean rejectNotCompletedCleanly =
1400+
queryRequest.getQueryRejectCondition() == QueryRejectCondition.NOT_COMPLETED_CLEANLY
1401+
&& closeStatus != WorkflowExecutionCloseStatus.COMPLETED;
1402+
if (rejectNotOpen || rejectNotCompletedCleanly) {
1403+
return new QueryWorkflowResponse()
1404+
.setQueryRejected(new QueryRejected().setCloseStatus(closeStatus));
1405+
}
1406+
}
1407+
13941408
PollForDecisionTaskResponse task =
13951409
new PollForDecisionTaskResponse()
13961410
.setTaskToken(queryId.toBytes())

src/main/thrift/shared.thrift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,13 @@ enum EncodingType {
288288
ThriftRW,
289289
}
290290

291+
enum QueryRejectCondition {
292+
// NOT_OPEN indicates that query should be rejected if workflow is not open
293+
NOT_OPEN
294+
// NOT_COMPLETED_CLEANLY indicates that query should be rejected if workflow did not complete cleanly
295+
NOT_COMPLETED_CLEANLY
296+
}
297+
291298
struct DataBlob {
292299
10: optional EncodingType EncodingType
293300
20: optional binary Data
@@ -1305,10 +1312,17 @@ struct QueryWorkflowRequest {
13051312
10: optional string domain
13061313
20: optional WorkflowExecution execution
13071314
30: optional WorkflowQuery query
1315+
// QueryRejectCondition can used to reject the query if workflow state does not satisify condition
1316+
40: optional QueryRejectCondition queryRejectCondition
1317+
}
1318+
1319+
struct QueryRejected {
1320+
10: optional WorkflowExecutionCloseStatus closeStatus
13081321
}
13091322

13101323
struct QueryWorkflowResponse {
13111324
10: optional binary queryResult
1325+
20: optional QueryRejected queryRejected
13121326
}
13131327

13141328
struct WorkflowQuery {

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
3131
import com.uber.cadence.HistoryEvent;
3232
import com.uber.cadence.Memo;
33+
import com.uber.cadence.QueryRejectCondition;
3334
import com.uber.cadence.SearchAttributes;
3435
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
3536
import com.uber.cadence.TimeoutType;
3637
import com.uber.cadence.WorkflowExecution;
38+
import com.uber.cadence.WorkflowExecutionCloseStatus;
3739
import com.uber.cadence.WorkflowIdReusePolicy;
3840
import com.uber.cadence.activity.Activity;
3941
import com.uber.cadence.activity.ActivityMethod;
@@ -57,6 +59,7 @@
5759
import com.uber.cadence.common.MethodRetry;
5860
import com.uber.cadence.common.RetryOptions;
5961
import com.uber.cadence.converter.JsonDataConverter;
62+
import com.uber.cadence.internal.common.QueryResponse;
6063
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
6164
import com.uber.cadence.internal.sync.DeterministicRunnerTest;
6265
import com.uber.cadence.internal.worker.PollerOptions;
@@ -2453,6 +2456,12 @@ public void testSignalUntyped() {
24532456
});
24542457
execution.set(client.start());
24552458
assertEquals("Hello World!", client.getResult(String.class));
2459+
assertEquals("World!", client.query("QueryableWorkflow::getState", String.class));
2460+
QueryResponse<String> queryResponse =
2461+
client.query("QueryableWorkflow::getState", String.class, QueryRejectCondition.NOT_OPEN);
2462+
assertNull(queryResponse.getResult());
2463+
assertEquals(
2464+
WorkflowExecutionCloseStatus.COMPLETED, queryResponse.getQueryRejected().closeStatus);
24562465
}
24572466

24582467
static final AtomicInteger decisionCount = new AtomicInteger();

0 commit comments

Comments
 (0)