Skip to content

Commit 56dcb62

Browse files
authored
1. Refactor decider (#201)
decider returns list of decisions on decide
1 parent 53c5286 commit 56dcb62

File tree

8 files changed

+25
-57
lines changed

8 files changed

+25
-57
lines changed

src/main/java/com/uber/cadence/internal/replay/Decider.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import com.uber.cadence.Decision;
2021
import com.uber.cadence.PollForDecisionTaskResponse;
2122
import com.uber.cadence.WorkflowQuery;
23+
import java.util.List;
2224

2325
public interface Decider {
2426

25-
// TODO: refactor in future CR. Merge methods and decide should return a list of decisions.
26-
void decide(PollForDecisionTaskResponse decisionTask) throws Throwable;
27+
List<Decision> decide(PollForDecisionTaskResponse decisionTask) throws Throwable;
2728

2829
byte[] query(PollForDecisionTaskResponse decisionTask, WorkflowQuery query) throws Throwable;
2930

src/main/java/com/uber/cadence/internal/replay/DeciderCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ public long size() {
110110

111111
private boolean isFullHistory(PollForDecisionTaskResponse decisionTask) {
112112
return decisionTask.getHistory() != null
113-
&& decisionTask.getHistory().getEventsSize() > 0
114-
&& decisionTask.history.events.get(0).getEventId() == 1;
113+
&& decisionTask.getHistory().getEvents().size() > 0
114+
&& decisionTask.getHistory().getEvents().get(0).getEventId() == 1;
115115
}
116116

117117
public void invalidateAll() {

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.uber.cadence.CompleteWorkflowExecutionDecisionAttributes;
3131
import com.uber.cadence.ContinueAsNewWorkflowExecutionDecisionAttributes;
3232
import com.uber.cadence.Decision;
33-
import com.uber.cadence.DecisionTaskCompletedEventAttributes;
3433
import com.uber.cadence.DecisionType;
3534
import com.uber.cadence.EventType;
3635
import com.uber.cadence.ExternalWorkflowExecutionCancelRequestedEventAttributes;
@@ -55,7 +54,6 @@
5554
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
5655
import com.uber.cadence.internal.worker.WorkflowExecutionException;
5756
import java.util.ArrayList;
58-
import java.util.Arrays;
5957
import java.util.HashMap;
6058
import java.util.Iterator;
6159
import java.util.LinkedHashMap;
@@ -101,10 +99,6 @@ class DecisionsHelper {
10199
// TODO: removal of completed activities
102100
private final Map<String, Long> activityIdToScheduledEventId = new HashMap<>();
103101

104-
private byte[] workflowContextData;
105-
106-
private byte[] workfowContextFromLastDecisionCompletion;
107-
108102
DecisionsHelper(PollForDecisionTaskResponse task) {
109103
this.task = task;
110104
}
@@ -583,25 +577,6 @@ public String toString() {
583577
return WorkflowExecutionUtils.prettyPrintDecisions(getDecisions());
584578
}
585579

586-
void setWorkflowContextData(byte[] workflowState) {
587-
this.workflowContextData = workflowState;
588-
}
589-
590-
/** @return new workflow state or null if it didn't change since the last decision completion */
591-
byte[] getWorkflowContextDataToReturn() {
592-
if (workfowContextFromLastDecisionCompletion == null
593-
|| !Arrays.equals(workfowContextFromLastDecisionCompletion, workflowContextData)) {
594-
return workflowContextData;
595-
}
596-
return null;
597-
}
598-
599-
void handleDecisionCompletion(
600-
DecisionTaskCompletedEventAttributes decisionTaskCompletedEventAttributes) {
601-
workfowContextFromLastDecisionCompletion =
602-
decisionTaskCompletedEventAttributes.getExecutionContext();
603-
}
604-
605580
PollForDecisionTaskResponse getTask() {
606581
return task;
607582
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import com.uber.cadence.Decision;
2021
import com.uber.cadence.EventType;
2122
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
2223
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
@@ -39,17 +40,17 @@
3940
import com.uber.cadence.workflow.Functions;
4041
import com.uber.m3.tally.Scope;
4142
import com.uber.m3.tally.Stopwatch;
42-
import org.apache.thrift.TException;
43-
import org.slf4j.Logger;
44-
import org.slf4j.LoggerFactory;
45-
4643
import java.time.Duration;
4744
import java.util.Iterator;
45+
import java.util.List;
4846
import java.util.Objects;
4947
import java.util.concurrent.CancellationException;
5048
import java.util.concurrent.TimeUnit;
5149
import java.util.concurrent.atomic.AtomicReference;
5250
import java.util.function.Consumer;
51+
import org.apache.thrift.TException;
52+
import org.slf4j.Logger;
53+
import org.slf4j.LoggerFactory;
5354

5455
/**
5556
* Implements decider that relies on replay of a worklfow code. An instance of this class is created
@@ -155,7 +156,7 @@ private void processEvent(HistoryEvent event) throws Throwable {
155156
context.handleChildWorkflowExecutionTimedOut(event);
156157
break;
157158
case DecisionTaskCompleted:
158-
handleDecisionTaskCompleted(event);
159+
// NOOP
159160
break;
160161
case DecisionTaskScheduled:
161162
// NOOP
@@ -361,13 +362,10 @@ private void handleWorkflowExecutionSignaled(HistoryEvent event) {
361362
signalAttributes.getSignalName(), signalAttributes.getInput(), event.getEventId());
362363
}
363364

364-
private void handleDecisionTaskCompleted(HistoryEvent event) {
365-
decisionsHelper.handleDecisionCompletion(event.getDecisionTaskCompletedEventAttributes());
366-
}
367-
368365
@Override
369-
public void decide(PollForDecisionTaskResponse decisionTask) throws Throwable {
366+
public List<Decision> decide(PollForDecisionTaskResponse decisionTask) throws Throwable {
370367
decideImpl(decisionTask, null);
368+
return getDecisionsHelper().getDecisions();
371369
}
372370

373371
private void decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc query)

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,7 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws
120120
? createDecider(decisionTask)
121121
: cache.getOrCreate(decisionTask, this::createDecider);
122122
try {
123-
decider.decide(decisionTask);
124-
DecisionsHelper decisionsHelper = ((ReplayDecider) decider).getDecisionsHelper();
125-
List<Decision> decisions = decisionsHelper.getDecisions();
126-
123+
List<Decision> decisions = decider.decide(decisionTask);
127124
if (log.isTraceEnabled()) {
128125
WorkflowExecution execution = decisionTask.getWorkflowExecution();
129126
log.trace(
@@ -148,7 +145,7 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws
148145
+ decisions.size()
149146
+ " new decisions");
150147
}
151-
return createCompletedRequest(decisionTask, decisionsHelper, decisions);
148+
return createCompletedRequest(decisionTask, decisions);
152149
} catch (Exception e) {
153150
if (stickyTaskListName != null) {
154151
cache.invalidate(decisionTask);
@@ -184,15 +181,11 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) {
184181
}
185182

186183
private Result createCompletedRequest(
187-
PollForDecisionTaskResponse decisionTask,
188-
DecisionsHelper decisionsHelper,
189-
List<Decision> decisions) {
190-
byte[] context = decisionsHelper.getWorkflowContextDataToReturn();
184+
PollForDecisionTaskResponse decisionTask, List<Decision> decisions) {
191185
RespondDecisionTaskCompletedRequest completedRequest =
192186
new RespondDecisionTaskCompletedRequest();
193187
completedRequest.setTaskToken(decisionTask.getTaskToken());
194188
completedRequest.setDecisions(decisions);
195-
completedRequest.setExecutionContext(context);
196189

197190
if (stickyTaskListName != null) {
198191
StickyExecutionAttributes attributes = new StickyExecutionAttributes();

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,6 @@
8282
import com.uber.cadence.internal.testservice.StateMachines.TimerData;
8383
import com.uber.cadence.internal.testservice.StateMachines.WorkflowData;
8484
import com.uber.cadence.internal.testservice.TestWorkflowStore.TaskListId;
85-
import org.apache.thrift.TException;
86-
import org.slf4j.Logger;
87-
import org.slf4j.LoggerFactory;
88-
8985
import java.io.ByteArrayInputStream;
9086
import java.io.ByteArrayOutputStream;
9187
import java.io.DataInputStream;
@@ -106,6 +102,9 @@
106102
import java.util.concurrent.locks.Lock;
107103
import java.util.concurrent.locks.ReentrantLock;
108104
import java.util.function.LongSupplier;
105+
import org.apache.thrift.TException;
106+
import org.slf4j.Logger;
107+
import org.slf4j.LoggerFactory;
109108

110109
class TestWorkflowMutableStateImpl implements TestWorkflowMutableState {
111110

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@
3434
import com.uber.cadence.WorkflowExecutionInfo;
3535
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3636
import com.uber.cadence.internal.testservice.RequestContext.Timer;
37-
import org.slf4j.Logger;
38-
import org.slf4j.LoggerFactory;
39-
4037
import java.time.Duration;
4138
import java.util.ArrayList;
4239
import java.util.HashMap;
@@ -49,6 +46,8 @@
4946
import java.util.concurrent.locks.Condition;
5047
import java.util.concurrent.locks.Lock;
5148
import java.util.concurrent.locks.ReentrantLock;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
5251

5352
class TestWorkflowStoreImpl implements TestWorkflowStore {
5453

src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.mockito.Mockito.verify;
2828
import static org.mockito.Mockito.when;
2929

30+
import com.uber.cadence.Decision;
3031
import com.uber.cadence.PollForDecisionTaskResponse;
3132
import com.uber.cadence.WorkflowQuery;
3233
import com.uber.cadence.WorkflowType;
@@ -790,7 +791,9 @@ private static class DetermisiticRunnerContainerDecider implements Decider {
790791
}
791792

792793
@Override
793-
public void decide(PollForDecisionTaskResponse decisionTask) throws Throwable {}
794+
public List<Decision> decide(PollForDecisionTaskResponse decisionTask) throws Throwable {
795+
return new ArrayList<>();
796+
}
794797

795798
@Override
796799
public byte[] query(PollForDecisionTaskResponse decisionTask, WorkflowQuery query)

0 commit comments

Comments
 (0)