Skip to content

Commit 5aa464a

Browse files
authored
Maintain correct runID during reset for random ID generation (#313)
1 parent b3621d5 commit 5aa464a

File tree

12 files changed

+1726
-500
lines changed

12 files changed

+1726
-500
lines changed

docker/buildkite/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ services:
2323
- statsd
2424

2525
cadence:
26-
image: ubercadence/server:0.5.6
26+
image: ubercadence/server:0.5.9
2727
ports:
2828
- "7933:7933"
2929
- "7934:7934"

src/main/java/com/uber/cadence/common/WorkflowExecutionHistory.java

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.gson.JsonPrimitive;
2828
import com.google.gson.JsonSerializationContext;
2929
import com.google.gson.JsonSerializer;
30+
import com.google.gson.reflect.TypeToken;
3031
import com.uber.cadence.EventType;
3132
import com.uber.cadence.HistoryEvent;
3233
import com.uber.cadence.WorkflowExecution;
@@ -37,20 +38,9 @@
3738

3839
/** Contains workflow execution ids and the history */
3940
public final class WorkflowExecutionHistory {
40-
private final String workflowId;
41-
private final String runId;
4241
private final List<HistoryEvent> events;
4342

44-
public WorkflowExecutionHistory(String workflowId, String runId, List<HistoryEvent> events) {
45-
this.workflowId = workflowId;
46-
this.runId = runId;
47-
checkHistory(events);
48-
this.events = ImmutableList.copyOf(events);
49-
}
50-
51-
public WorkflowExecutionHistory(WorkflowExecution workflowExecution, List<HistoryEvent> events) {
52-
this.workflowId = workflowExecution.getWorkflowId();
53-
this.runId = workflowExecution.getRunId();
43+
public WorkflowExecutionHistory(List<HistoryEvent> events) {
5444
checkHistory(events);
5545
this.events = ImmutableList.copyOf(events);
5646
}
@@ -59,9 +49,10 @@ public static WorkflowExecutionHistory fromJson(String serialized) {
5949
GsonBuilder gsonBuilder = new GsonBuilder();
6050
gsonBuilder.registerTypeAdapter(ByteBuffer.class, new ByteBufferJsonDeserializer());
6151
Gson gson = gsonBuilder.create();
62-
WorkflowExecutionHistory result = gson.fromJson(serialized, WorkflowExecutionHistory.class);
63-
checkHistory(result.getEvents());
64-
return result;
52+
Type eventsType = new TypeToken<List<HistoryEvent>>() {}.getType();
53+
List<HistoryEvent> events = gson.fromJson(serialized, eventsType);
54+
checkHistory(events);
55+
return new WorkflowExecutionHistory(events);
6556
}
6657

6758
private static void checkHistory(List<HistoryEvent> events) {
@@ -84,16 +75,10 @@ public String toJson() {
8475
return gson.toJson(this);
8576
}
8677

87-
public String getWorkflowId() {
88-
return workflowId;
89-
}
90-
91-
public String getRunId() {
92-
return runId;
93-
}
94-
9578
public WorkflowExecution getWorkflowExecution() {
96-
return new WorkflowExecution().setWorkflowId(workflowId).setRunId(runId);
79+
return new WorkflowExecution()
80+
.setWorkflowId("workflow_id_in_replay")
81+
.setRunId("run_id_in_replay");
9782
}
9883

9984
public List<HistoryEvent> getEvents() {

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,13 @@ public void handleMarkerRecorded(HistoryEvent event) {
327327
workflowClock.handleMarkerRecorded(event);
328328
}
329329

330+
public void handleDecisionTaskFailed(HistoryEvent event) {
331+
DecisionTaskFailedEventAttributes attr = event.getDecisionTaskFailedEventAttributes();
332+
if (attr != null && attr.getCause() == DecisionTaskFailedCause.RESET_WORKFLOW) {
333+
workflowContext.setCurrentRunId(attr.getNewRunId());
334+
}
335+
}
336+
330337
boolean startUnstartedLaTasks(Duration maxWaitAllowed) {
331338
return workflowClock.startUnstartedLaTasks(maxWaitAllowed);
332339
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ private void processEvent(HistoryEvent event) {
233233
decisionsHelper.handleCancelTimerFailed(event);
234234
break;
235235
case DecisionTaskFailed:
236+
context.handleDecisionTaskFailed(event);
236237
break;
237238
}
238239
}

src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20-
import com.uber.cadence.ChildPolicy;
21-
import com.uber.cadence.PollForDecisionTaskResponse;
22-
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
20+
import com.uber.cadence.*;
2321

2422
final class WorkflowContext {
2523

@@ -28,6 +26,9 @@ final class WorkflowContext {
2826
private ContinueAsNewWorkflowExecutionParameters continueAsNewOnCompletion;
2927
private WorkflowExecutionStartedEventAttributes startedAttributes;
3028
private final String domain;
29+
// RunId can change when reset happens. This remembers the actual runId that is used
30+
// as in this particular part of the history.
31+
private String currentRunId;
3132

3233
WorkflowContext(
3334
String domain,
@@ -36,13 +37,14 @@ final class WorkflowContext {
3637
this.domain = domain;
3738
this.decisionTask = decisionTask;
3839
this.startedAttributes = startedAttributes;
40+
this.currentRunId = startedAttributes.getOriginalExecutionRunId();
3941
}
4042

41-
com.uber.cadence.WorkflowExecution getWorkflowExecution() {
43+
WorkflowExecution getWorkflowExecution() {
4244
return decisionTask.getWorkflowExecution();
4345
}
4446

45-
com.uber.cadence.WorkflowType getWorkflowType() {
47+
WorkflowType getWorkflowType() {
4648
return decisionTask.getWorkflowType();
4749
}
4850

@@ -121,4 +123,12 @@ private WorkflowExecutionStartedEventAttributes getWorkflowStartedEventAttribute
121123
public ChildPolicy getChildPolicy() {
122124
return startedAttributes.getChildPolicy();
123125
}
126+
127+
void setCurrentRunId(String currentRunId) {
128+
this.currentRunId = currentRunId;
129+
}
130+
131+
String getCurrentRunId() {
132+
return currentRunId;
133+
}
124134
}

src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,7 @@ void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionParameters continue
219219

220220
/** Replay safe UUID */
221221
UUID randomUUID() {
222-
WorkflowExecution workflowExecution = workflowContext.getWorkflowExecution();
223-
String runId = workflowExecution.getRunId();
222+
String runId = workflowContext.getCurrentRunId();
224223
String id = runId + ":" + decisions.getAndIncrementNextId();
225224
byte[] bytes = id.getBytes(StandardCharsets.UTF_8);
226225
return UUID.nameUUIDFromBytes(bytes);

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public byte[] queryWorkflowExecution(WorkflowExecution exec, String queryType, b
122122
WorkflowExecutionUtils.getHistoryPage(null, service, domain, exec);
123123
History history = historyResponse.getHistory();
124124
WorkflowExecutionHistory workflowExecutionHistory =
125-
new WorkflowExecutionHistory(exec.getWorkflowId(), exec.getRunId(), history.getEvents());
125+
new WorkflowExecutionHistory(history.getEvents());
126126
return queryWorkflowExecution(
127127
queryType, args, workflowExecutionHistory, historyResponse.getNextPageToken());
128128
}

src/main/thrift/shared.thrift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ enum DecisionTaskFailedCause {
192192
FAILOVER_CLOSE_DECISION,
193193
BAD_SIGNAL_INPUT_SIZE,
194194
RESET_WORKFLOW,
195+
BAD_BINARY,
195196
}
196197

197198
enum CancelExternalWorkflowExecutionFailedCause {
@@ -439,7 +440,9 @@ struct WorkflowExecutionStartedEventAttributes {
439440
56: optional string continuedFailureReason
440441
57: optional binary continuedFailureDetails
441442
58: optional binary lastCompletionResult
443+
59: optional string originalExecutionRunId // This is the runID when the WorkflowExecutionStarted event is written.
442444
60: optional string identity
445+
61: optional string firstExecutionRunId // This is the very first runID along the chain of ContinueAsNew and Reset.
443446
70: optional RetryPolicy retryPolicy
444447
80: optional i32 attempt
445448
90: optional i64 (js.type = "Long") expirationTimestamp

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public class WorkflowTest {
127127
@Parameters(name = "{1}")
128128
public static Object[] data() {
129129
if (!useDockerService) {
130-
return new Object[][] {{false, "TestService Sticky Off", true}};
130+
return new Object[][] {{false, "TestService Sticky OFF", true}};
131131
} else {
132132
return new Object[][] {
133133
{true, "Docker Sticky " + (stickyOff ? "OFF" : "ON"), stickyOff},
@@ -768,9 +768,9 @@ public void testAsyncActivityRetry() {
768768
@Test
769769
public void testAsyncActivityRetryReplay() throws Exception {
770770
// Avoid executing 4 times
771-
if (!testName.getMethodName().equals("testAsyncActivityRetryReplay[Docker Sticky OFF]")) {
772-
return;
773-
}
771+
Assume.assumeFalse("skipping for docker tests", useExternalService);
772+
Assume.assumeFalse("skipping for sticky off", stickyOff);
773+
774774
WorkflowReplayer.replayWorkflowExecutionFromResource(
775775
"testAsyncActivityRetryHistory.json", TestAsyncActivityRetry.class);
776776
}
@@ -2696,9 +2696,9 @@ public WorkflowStub newUntypedWorkflowStub(
26962696
*/
26972697
@Test
26982698
public void testChildWorkflowRetryReplay() throws Exception {
2699-
if (!testName.getMethodName().equals("testChildWorkflowRetryReplay[Docker Sticky OFF]")) {
2700-
return;
2701-
}
2699+
Assume.assumeFalse("skipping for docker tests", useExternalService);
2700+
Assume.assumeFalse("skipping for sticky off", stickyOff);
2701+
27022702
WorkflowReplayer.replayWorkflowExecutionFromResource(
27032703
"testChildWorkflowRetryHistory.json", TestChildWorkflowRetryWorkflow.class);
27042704
}
@@ -4627,6 +4627,68 @@ public void testSignalOrderingWorkflow() {
46274627
assertEquals(expected, result);
46284628
}
46294629

4630+
public static class TestWorkflowResetReplayWorkflow implements TestWorkflow1 {
4631+
@Override
4632+
public String execute(String taskList) {
4633+
ChildWorkflowOptions workflowOptions =
4634+
new ChildWorkflowOptions.Builder()
4635+
.setTaskList(taskList)
4636+
.setRetryOptions(
4637+
new RetryOptions.Builder()
4638+
.setMaximumAttempts(3)
4639+
.setInitialInterval(Duration.ofSeconds(1))
4640+
.build())
4641+
.build();
4642+
4643+
ActivityOptions options =
4644+
new ActivityOptions.Builder()
4645+
.setTaskList(taskList)
4646+
.setHeartbeatTimeout(Duration.ofSeconds(5))
4647+
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
4648+
.setScheduleToStartTimeout(Duration.ofSeconds(5))
4649+
.setStartToCloseTimeout(Duration.ofSeconds(10))
4650+
.build();
4651+
4652+
for (int i = 0; i < 10; i++) {
4653+
if (Workflow.newRandom().nextDouble() > 0.5) {
4654+
Workflow.getLogger("test").info("Execute child workflow");
4655+
TestMultiargsWorkflowsFunc stubF =
4656+
Workflow.newChildWorkflowStub(TestMultiargsWorkflowsFunc.class, workflowOptions);
4657+
stubF.func();
4658+
} else {
4659+
Workflow.getLogger("test").info("Execute activity");
4660+
TestActivities activities = Workflow.newActivityStub(TestActivities.class, options);
4661+
activities.activity();
4662+
}
4663+
}
4664+
4665+
return "done";
4666+
}
4667+
}
4668+
4669+
@Test
4670+
public void testWorkflowReset() throws Exception {
4671+
// Leave the following code to generate history.
4672+
// startWorkerFor(TestWorkflowResetReplayWorkflow.class, TestMultiargsWorkflowsImpl.class);
4673+
// TestWorkflow1 workflowStub =
4674+
// workflowClient.newWorkflowStub(
4675+
// TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
4676+
// workflowStub.execute(taskList);
4677+
//
4678+
// try {
4679+
// Thread.sleep(60000000);
4680+
// } catch (InterruptedException e) {
4681+
// e.printStackTrace();
4682+
// }
4683+
4684+
// Avoid executing 4 times
4685+
Assume.assumeFalse("skipping for docker tests", useExternalService);
4686+
Assume.assumeFalse("skipping for sticky off", stickyOff);
4687+
4688+
WorkflowReplayer.replayWorkflowExecutionFromResource(
4689+
"resetWorkflowHistory.json", TestWorkflowResetReplayWorkflow.class);
4690+
}
4691+
46304692
private static class FilteredTrace {
46314693

46324694
private final List<String> impl = Collections.synchronizedList(new ArrayList<>());

0 commit comments

Comments
 (0)