Skip to content

Commit 53c5286

Browse files
authored
Support sticky queries (#205)
1 parent 951cef0 commit 53c5286

File tree

4 files changed

+81
-53
lines changed

4 files changed

+81
-53
lines changed

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,17 @@
3939
import com.uber.cadence.workflow.Functions;
4040
import com.uber.m3.tally.Scope;
4141
import com.uber.m3.tally.Stopwatch;
42+
import org.apache.thrift.TException;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
4246
import java.time.Duration;
4347
import java.util.Iterator;
4448
import java.util.Objects;
4549
import java.util.concurrent.CancellationException;
4650
import java.util.concurrent.TimeUnit;
4751
import java.util.concurrent.atomic.AtomicReference;
4852
import java.util.function.Consumer;
49-
import org.apache.thrift.TException;
50-
import org.slf4j.Logger;
51-
import org.slf4j.LoggerFactory;
5253

5354
/**
5455
* Implements decider that relies on replay of a worklfow code. An instance of this class is created
@@ -381,7 +382,8 @@ private void decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc
381382
!= historyHelper.getPreviousStartedEventId()
382383
+ 2) // getNextDecisionEventId() skips over completed.
383384
&& (decisionsHelper.getNextDecisionEventId() != 0
384-
&& historyHelper.getPreviousStartedEventId() != 0)) {
385+
&& historyHelper.getPreviousStartedEventId() != 0)
386+
&& (decisionTask.getHistory().getEventsSize() > 0)) {
385387
throw new IllegalStateException(
386388
String.format(
387389
"ReplayDecider expects next event id at %d. History's previous started event id is %d",

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -106,16 +106,49 @@ public DecisionTaskHandler.Result handleDecisionTask(PollForDecisionTaskResponse
106106

107107
private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask) throws Throwable {
108108

109+
if (decisionTask.isSetQuery()) {
110+
return processQuery(decisionTask);
111+
} else {
112+
113+
return processDecision(decisionTask);
114+
}
115+
}
116+
117+
private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable {
109118
Decider decider =
110119
stickyTaskListName == null
111120
? createDecider(decisionTask)
112121
: cache.getOrCreate(decisionTask, this::createDecider);
113122
try {
114-
if (decisionTask.isSetQuery()) {
115-
return processQuery(decisionTask, (ReplayDecider) decider);
116-
} else {
117-
return processDecision(decisionTask, (ReplayDecider) decider);
123+
decider.decide(decisionTask);
124+
DecisionsHelper decisionsHelper = ((ReplayDecider) decider).getDecisionsHelper();
125+
List<Decision> decisions = decisionsHelper.getDecisions();
126+
127+
if (log.isTraceEnabled()) {
128+
WorkflowExecution execution = decisionTask.getWorkflowExecution();
129+
log.trace(
130+
"WorkflowTask startedEventId="
131+
+ decisionTask.getStartedEventId()
132+
+ ", WorkflowID="
133+
+ execution.getWorkflowId()
134+
+ ", RunID="
135+
+ execution.getRunId()
136+
+ " completed with "
137+
+ WorkflowExecutionUtils.prettyPrintDecisions(decisions));
138+
} else if (log.isDebugEnabled()) {
139+
WorkflowExecution execution = decisionTask.getWorkflowExecution();
140+
log.debug(
141+
"WorkflowTask startedEventId="
142+
+ decisionTask.getStartedEventId()
143+
+ ", WorkflowID="
144+
+ execution.getWorkflowId()
145+
+ ", RunID="
146+
+ execution.getRunId()
147+
+ " completed with "
148+
+ decisions.size()
149+
+ " new decisions");
118150
}
151+
return createCompletedRequest(decisionTask, decisionsHelper, decisions);
119152
} catch (Exception e) {
120153
if (stickyTaskListName != null) {
121154
cache.invalidate(decisionTask);
@@ -128,44 +161,14 @@ private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask)
128161
}
129162
}
130163

131-
private Result processDecision(PollForDecisionTaskResponse decisionTask, ReplayDecider decider)
132-
throws Throwable {
133-
decider.decide(decisionTask);
134-
DecisionsHelper decisionsHelper = decider.getDecisionsHelper();
135-
List<Decision> decisions = decisionsHelper.getDecisions();
136-
137-
if (log.isTraceEnabled()) {
138-
WorkflowExecution execution = decisionTask.getWorkflowExecution();
139-
log.trace(
140-
"WorkflowTask startedEventId="
141-
+ decisionTask.getStartedEventId()
142-
+ ", WorkflowID="
143-
+ execution.getWorkflowId()
144-
+ ", RunID="
145-
+ execution.getRunId()
146-
+ " completed with "
147-
+ WorkflowExecutionUtils.prettyPrintDecisions(decisions));
148-
} else if (log.isDebugEnabled()) {
149-
WorkflowExecution execution = decisionTask.getWorkflowExecution();
150-
log.debug(
151-
"WorkflowTask startedEventId="
152-
+ decisionTask.getStartedEventId()
153-
+ ", WorkflowID="
154-
+ execution.getWorkflowId()
155-
+ ", RunID="
156-
+ execution.getRunId()
157-
+ " completed with "
158-
+ decisions.size()
159-
+ " new decisions");
160-
}
161-
162-
return createCompletedRequest(decisionTask, decisionsHelper, decisions);
163-
}
164-
165-
private Result processQuery(PollForDecisionTaskResponse decisionTask, ReplayDecider decider) {
164+
private Result processQuery(PollForDecisionTaskResponse decisionTask) {
166165
RespondQueryTaskCompletedRequest queryCompletedRequest = new RespondQueryTaskCompletedRequest();
167166
queryCompletedRequest.setTaskToken(decisionTask.getTaskToken());
168167
try {
168+
Decider decider =
169+
stickyTaskListName == null
170+
? createDecider(decisionTask)
171+
: cache.getOrCreate(decisionTask, this::createDecider);
169172
byte[] queryResult = decider.query(decisionTask, decisionTask.getQuery());
170173
queryCompletedRequest.setQueryResult(queryResult);
171174
queryCompletedRequest.setCompletedType(QueryTaskCompletedType.COMPLETED);

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@
8282
import com.uber.cadence.internal.testservice.StateMachines.TimerData;
8383
import com.uber.cadence.internal.testservice.StateMachines.WorkflowData;
8484
import com.uber.cadence.internal.testservice.TestWorkflowStore.TaskListId;
85+
import org.apache.thrift.TException;
86+
import org.slf4j.Logger;
87+
import org.slf4j.LoggerFactory;
88+
8589
import java.io.ByteArrayInputStream;
8690
import java.io.ByteArrayOutputStream;
8791
import java.io.DataInputStream;
@@ -102,9 +106,6 @@
102106
import java.util.concurrent.locks.Lock;
103107
import java.util.concurrent.locks.ReentrantLock;
104108
import java.util.function.LongSupplier;
105-
import org.apache.thrift.TException;
106-
import org.slf4j.Logger;
107-
import org.slf4j.LoggerFactory;
108109

109110
class TestWorkflowMutableStateImpl implements TestWorkflowMutableState {
110111

@@ -138,6 +139,7 @@ void apply(RequestContext ctx)
138139
private long lastNonFailedDecisionStartEventId;
139140
private final Map<String, CompletableFuture<QueryWorkflowResponse>> queries =
140141
new ConcurrentHashMap<>();
142+
private final Map<String, PollForDecisionTaskResponse> queryRequests = new ConcurrentHashMap<>();
141143
public StickyExecutionAttributes stickyExecutionAttributes;
142144

143145
/** @param parentChildInitiatedEventId id of the child initiated event in the parent history */
@@ -1183,15 +1185,22 @@ public void requestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest
11831185
@Override
11841186
public QueryWorkflowResponse query(QueryWorkflowRequest queryRequest) throws TException {
11851187
QueryId queryId = new QueryId(executionId);
1188+
11861189
PollForDecisionTaskResponse task =
11871190
new PollForDecisionTaskResponse()
11881191
.setTaskToken(queryId.toBytes())
11891192
.setWorkflowExecution(executionId.getExecution())
11901193
.setWorkflowType(startRequest.getWorkflowType())
1191-
.setQuery(queryRequest.getQuery());
1194+
.setQuery(queryRequest.getQuery())
1195+
.setWorkflowExecutionTaskList(startRequest.getTaskList());
11921196
TaskListId taskListId =
1193-
new TaskListId(queryRequest.getDomain(), startRequest.getTaskList().getName());
1197+
new TaskListId(
1198+
queryRequest.getDomain(),
1199+
stickyExecutionAttributes == null
1200+
? startRequest.getTaskList().getName()
1201+
: stickyExecutionAttributes.getWorkerTaskList().getName());
11941202
CompletableFuture<QueryWorkflowResponse> result = new CompletableFuture<>();
1203+
queryRequests.put(queryId.getQueryId(), task);
11951204
queries.put(queryId.getQueryId(), result);
11961205
store.sendQueryTask(executionId, taskListId, task);
11971206
try {
@@ -1218,6 +1227,13 @@ public void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest comp
12181227
QueryWorkflowResponse response =
12191228
new QueryWorkflowResponse().setQueryResult(completeRequest.getQueryResult());
12201229
result.complete(response);
1230+
} else if (stickyExecutionAttributes != null) {
1231+
stickyExecutionAttributes = null;
1232+
PollForDecisionTaskResponse task = queryRequests.remove(queryId.getQueryId());
1233+
1234+
TaskListId taskListId =
1235+
new TaskListId(startRequest.getDomain(), startRequest.getTaskList().getName());
1236+
store.sendQueryTask(executionId, taskListId, task);
12211237
} else {
12221238
QueryFailedError error = new QueryFailedError().setMessage(completeRequest.getErrorMessage());
12231239
result.completeExceptionally(error);

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import com.uber.cadence.WorkflowExecutionInfo;
3535
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3636
import com.uber.cadence.internal.testservice.RequestContext.Timer;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
3740
import java.time.Duration;
3841
import java.util.ArrayList;
3942
import java.util.HashMap;
@@ -46,8 +49,6 @@
4649
import java.util.concurrent.locks.Condition;
4750
import java.util.concurrent.locks.Lock;
4851
import java.util.concurrent.locks.ReentrantLock;
49-
import org.slf4j.Logger;
50-
import org.slf4j.LoggerFactory;
5152

5253
class TestWorkflowStoreImpl implements TestWorkflowStore {
5354

@@ -316,9 +317,15 @@ public void sendQueryTask(
316317
throws EntityNotExistsError {
317318
lock.lock();
318319
try {
319-
HistoryStore history = getHistoryStore(executionId);
320-
List<HistoryEvent> events = new ArrayList<>(history.getEventsLocked());
321-
task.setHistory(new History().setEvents(events));
320+
HistoryStore historyStore = getHistoryStore(executionId);
321+
List<HistoryEvent> events = new ArrayList<>(historyStore.getEventsLocked());
322+
History history = new History();
323+
if (taskList.getTaskListName().equals(task.getWorkflowExecutionTaskList().getName())) {
324+
history.setEvents(events);
325+
} else {
326+
history.setEvents(new ArrayList<>());
327+
}
328+
task.setHistory(history);
322329
} finally {
323330
lock.unlock();
324331
}

0 commit comments

Comments
 (0)