Skip to content

Commit 3b2b572

Browse files
authored
Child workflow server side retry (#231)
1 parent 2a65ac4 commit 3b2b572

21 files changed

+1189
-112
lines changed

src/main/java/com/uber/cadence/client/WorkflowClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@
103103
public interface WorkflowClient {
104104

105105
/** Use this constant as a query type to get a workflow stack trace. */
106-
String QUERY_TYPE_STACK_TRCE = "__stack_trace";
106+
String QUERY_TYPE_STACK_TRACE = "__stack_trace";
107+
/** Replays workflow to the current state and returns empty result or error if replay failed. */
108+
String QUERY_TYPE_REPLAY_ONLY = "__replay_only";
107109

108110
/**
109111
* Creates worker that connects to the local instance of the Cadence Service that listens on a
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.common;
19+
20+
import com.google.common.collect.ImmutableList;
21+
import com.google.gson.Gson;
22+
import com.google.gson.GsonBuilder;
23+
import com.google.gson.JsonDeserializationContext;
24+
import com.google.gson.JsonDeserializer;
25+
import com.google.gson.JsonElement;
26+
import com.google.gson.JsonParseException;
27+
import com.google.gson.JsonPrimitive;
28+
import com.google.gson.JsonSerializationContext;
29+
import com.google.gson.JsonSerializer;
30+
import com.uber.cadence.EventType;
31+
import com.uber.cadence.HistoryEvent;
32+
import com.uber.cadence.WorkflowExecution;
33+
import java.lang.reflect.Type;
34+
import java.nio.ByteBuffer;
35+
import java.util.Base64;
36+
import java.util.List;
37+
38+
/** Contains workflow execution ids and the history */
39+
public final class WorkflowExecutionHistory {
40+
private final String workflowId;
41+
private final String runId;
42+
private final List<HistoryEvent> events;
43+
44+
public WorkflowExecutionHistory(String workflowId, String runId, List<HistoryEvent> events) {
45+
this.workflowId = workflowId;
46+
this.runId = runId;
47+
checkHistory(events);
48+
this.events = ImmutableList.copyOf(events);
49+
}
50+
51+
public WorkflowExecutionHistory(WorkflowExecution workflowExecution, List<HistoryEvent> events) {
52+
this.workflowId = workflowExecution.getWorkflowId();
53+
this.runId = workflowExecution.getRunId();
54+
checkHistory(events);
55+
this.events = ImmutableList.copyOf(events);
56+
}
57+
58+
public static WorkflowExecutionHistory fromJson(String serialized) {
59+
GsonBuilder gsonBuilder = new GsonBuilder();
60+
gsonBuilder.registerTypeAdapter(ByteBuffer.class, new ByteBufferJsonDeserializer());
61+
Gson gson = gsonBuilder.create();
62+
WorkflowExecutionHistory result = gson.fromJson(serialized, WorkflowExecutionHistory.class);
63+
checkHistory(result.getEvents());
64+
return result;
65+
}
66+
67+
private static void checkHistory(List<HistoryEvent> events) {
68+
if (events == null || events.size() == 0) {
69+
throw new IllegalArgumentException("Empty history");
70+
}
71+
HistoryEvent startedEvent = events.get(0);
72+
if (startedEvent.getEventType() != EventType.WorkflowExecutionStarted) {
73+
throw new IllegalArgumentException(
74+
"First event is not WorkflowExecutionStarted but " + startedEvent);
75+
}
76+
if (startedEvent.getWorkflowExecutionStartedEventAttributes() == null) {
77+
throw new IllegalArgumentException("First event is corrupted");
78+
}
79+
}
80+
81+
public String toJson() {
82+
GsonBuilder gsonBuilder = new GsonBuilder();
83+
Gson gson = gsonBuilder.create();
84+
return gson.toJson(this);
85+
}
86+
87+
public String getWorkflowId() {
88+
return workflowId;
89+
}
90+
91+
public String getRunId() {
92+
return runId;
93+
}
94+
95+
public WorkflowExecution getWorkflowExecution() {
96+
return new WorkflowExecution().setWorkflowId(workflowId).setRunId(runId);
97+
}
98+
99+
public List<HistoryEvent> getEvents() {
100+
return events;
101+
}
102+
103+
private static final class ByteBufferJsonDeserializer
104+
implements JsonDeserializer<ByteBuffer>, JsonSerializer<ByteBuffer> {
105+
106+
@Override
107+
public JsonElement serialize(ByteBuffer value, Type type, JsonSerializationContext ctx) {
108+
if (value.arrayOffset() > 0) {
109+
throw new IllegalArgumentException("non zero value array offset: " + value.arrayOffset());
110+
}
111+
return new JsonPrimitive(Base64.getEncoder().encodeToString(value.array()));
112+
}
113+
114+
@Override
115+
public ByteBuffer deserialize(JsonElement e, Type type, JsonDeserializationContext ctx)
116+
throws JsonParseException {
117+
return ByteBuffer.wrap(Base64.getDecoder().decode(e.getAsString()));
118+
}
119+
}
120+
}

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package com.uber.cadence.internal.common;
1919

20+
import static java.nio.charset.StandardCharsets.UTF_8;
21+
22+
import com.google.common.io.CharStreams;
2023
import com.google.gson.Gson;
2124
import com.google.gson.GsonBuilder;
2225
import com.google.gson.JsonElement;
@@ -49,11 +52,15 @@
4952
import com.uber.cadence.client.WorkflowTerminatedException;
5053
import com.uber.cadence.client.WorkflowTimedOutException;
5154
import com.uber.cadence.common.RetryOptions;
55+
import com.uber.cadence.common.WorkflowExecutionHistory;
5256
import com.uber.cadence.serviceclient.IWorkflowService;
57+
import java.io.File;
58+
import java.io.IOException;
59+
import java.io.Reader;
5360
import java.lang.reflect.InvocationTargetException;
5461
import java.lang.reflect.Method;
5562
import java.lang.reflect.Modifier;
56-
import java.nio.charset.StandardCharsets;
63+
import java.nio.file.Files;
5764
import java.time.Duration;
5865
import java.util.Collection;
5966
import java.util.Date;
@@ -141,7 +148,7 @@ private static byte[] getResultFromCloseEvent(
141148
return closeEvent.getWorkflowExecutionCompletedEventAttributes().getResult();
142149
case WorkflowExecutionCanceled:
143150
byte[] details = closeEvent.getWorkflowExecutionCanceledEventAttributes().getDetails();
144-
String message = details != null ? new String(details, StandardCharsets.UTF_8) : null;
151+
String message = details != null ? new String(details, UTF_8) : null;
145152
throw new CancellationException(message);
146153
case WorkflowExecutionFailed:
147154
WorkflowExecutionFailedEventAttributes failed =
@@ -754,7 +761,7 @@ private static String prettyPrintObject(
754761
return (String) object;
755762
}
756763
if (clz.equals(byte[].class)) {
757-
return new String((byte[]) object, StandardCharsets.UTF_8);
764+
return new String((byte[]) object, UTF_8);
758765
}
759766

760767
if (clz.equals(Date.class)) {
@@ -833,7 +840,7 @@ private static String prettyPrintObject(
833840
result.append(" = ");
834841
// Pretty print JSON serialized exceptions.
835842
if (name.equals("getDetails") && value instanceof byte[]) {
836-
String details = new String((byte[]) value, StandardCharsets.UTF_8);
843+
String details = new String((byte[]) value, UTF_8);
837844
details = prettyPrintJson(details, INDENTATION + INDENTATION);
838845
// GSON pretty prints, but doesn't let to set an initial indentation.
839846
// Thus indenting the pretty printed JSON through regexp :(.
@@ -961,4 +968,19 @@ public static EventType getEventTypeForDecision(DecisionType decisionType) {
961968
}
962969
throw new IllegalArgumentException("Unknown decisionType");
963970
}
971+
972+
public static WorkflowExecutionHistory readHistoryFromResource(String resourceFileName)
973+
throws IOException {
974+
ClassLoader classLoader = WorkflowExecutionUtils.class.getClassLoader();
975+
String historyUrl = classLoader.getResource(resourceFileName).getFile();
976+
File historyFile = new File(historyUrl);
977+
return readHistory(historyFile);
978+
}
979+
980+
public static WorkflowExecutionHistory readHistory(File historyFile) throws IOException {
981+
try (Reader reader = Files.newBufferedReader(historyFile.toPath(), UTF_8)) {
982+
String jsonHistory = CharStreams.toString(reader);
983+
return WorkflowExecutionHistory.fromJson(jsonHistory);
984+
}
985+
}
964986
}

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ Consumer<Exception> startChildWorkflow(
9595
Consumer<WorkflowExecution> executionCallback,
9696
BiConsumer<byte[], Exception> callback);
9797

98+
/**
99+
* Is the next event in the history is child workflow initiated event and it has attached retry
100+
* policy. Used for backwards compatibility with the code that used local workflow retry when
101+
* RetryOptions were specified.
102+
*/
103+
boolean isServerSideChildWorkflowRetry();
104+
98105
Consumer<Exception> signalWorkflowExecution(
99106
SignalExternalWorkflowParameters signalParameters, BiConsumer<Void, Exception> callback);
100107

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ public Consumer<Exception> startChildWorkflow(
167167
return workflowClient.startChildWorkflow(parameters, executionCallback, callback);
168168
}
169169

170+
@Override
171+
public boolean isServerSideChildWorkflowRetry() {
172+
return workflowClient.isChildWorkflowExecutionStartedWithRetryOptions();
173+
}
174+
170175
@Override
171176
public Consumer<Exception> signalWorkflowExecution(
172177
SignalExternalWorkflowParameters signalParameters, BiConsumer<Void, Exception> callback) {

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.uber.cadence.SignalExternalWorkflowExecutionDecisionAttributes;
4545
import com.uber.cadence.StartChildWorkflowExecutionDecisionAttributes;
4646
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
47+
import com.uber.cadence.StartChildWorkflowExecutionInitiatedEventAttributes;
4748
import com.uber.cadence.StartTimerDecisionAttributes;
4849
import com.uber.cadence.TaskList;
4950
import com.uber.cadence.TimerCanceledEventAttributes;
@@ -197,6 +198,27 @@ long startChildWorkflowExecution(StartChildWorkflowExecutionDecisionAttributes c
197198
return nextDecisionEventId;
198199
}
199200

201+
/**
202+
* @return true if it is not replay or retryOptions are present in the
203+
* StartChildWorkflowExecutionInitiated event.
204+
*/
205+
boolean isChildWorkflowExecutionStartedWithRetryOptions() {
206+
Optional<HistoryEvent> optionalEvent = getOptionalDecisionEvent(nextDecisionEventId);
207+
if (!optionalEvent.isPresent()) {
208+
return true;
209+
}
210+
HistoryEvent event = optionalEvent.get();
211+
if (event.getEventType() != EventType.StartChildWorkflowExecutionInitiated) {
212+
return false;
213+
}
214+
StartChildWorkflowExecutionInitiatedEventAttributes attr =
215+
event.getStartChildWorkflowExecutionInitiatedEventAttributes();
216+
if (attr == null) {
217+
throw new Error("Corrupted event: " + event);
218+
}
219+
return attr.getRetryPolicy() != null;
220+
}
221+
200222
void handleStartChildWorkflowExecutionInitiated(HistoryEvent event) {
201223
DecisionStateMachine decision =
202224
getDecision(new DecisionId(DecisionTarget.CHILD_WORKFLOW, event.getEventId()));

src/main/java/com/uber/cadence/internal/replay/StartChildWorkflowExecutionParameters.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import com.uber.cadence.ChildPolicy;
2121
import com.uber.cadence.WorkflowIdReusePolicy;
2222
import com.uber.cadence.WorkflowType;
23+
import com.uber.cadence.internal.common.RetryParameters;
2324
import java.util.Arrays;
25+
import java.util.Objects;
2426

2527
public final class StartChildWorkflowExecutionParameters {
2628

@@ -46,6 +48,8 @@ public static final class Builder {
4648

4749
private WorkflowIdReusePolicy workflowIdReusePolicy;
4850

51+
private RetryParameters retryParameters;
52+
4953
public Builder setDomain(String domain) {
5054
this.domain = domain;
5155
return this;
@@ -97,6 +101,11 @@ public Builder setWorkflowIdReusePolicy(WorkflowIdReusePolicy workflowIdReusePol
97101
return this;
98102
}
99103

104+
public Builder setRetryParameters(RetryParameters retryParameters) {
105+
this.retryParameters = retryParameters;
106+
return this;
107+
}
108+
100109
public StartChildWorkflowExecutionParameters build() {
101110
return new StartChildWorkflowExecutionParameters(
102111
domain,
@@ -108,7 +117,8 @@ public StartChildWorkflowExecutionParameters build() {
108117
workflowId,
109118
workflowType,
110119
childPolicy,
111-
workflowIdReusePolicy);
120+
workflowIdReusePolicy,
121+
retryParameters);
112122
}
113123
}
114124

@@ -132,6 +142,8 @@ public StartChildWorkflowExecutionParameters build() {
132142

133143
private final WorkflowIdReusePolicy workflowIdReusePolicy;
134144

145+
private final RetryParameters retryParameters;
146+
135147
private StartChildWorkflowExecutionParameters(
136148
String domain,
137149
byte[] input,
@@ -142,7 +154,8 @@ private StartChildWorkflowExecutionParameters(
142154
String workflowId,
143155
WorkflowType workflowType,
144156
ChildPolicy childPolicy,
145-
WorkflowIdReusePolicy workflowIdReusePolicy) {
157+
WorkflowIdReusePolicy workflowIdReusePolicy,
158+
RetryParameters retryParameters) {
146159
this.domain = domain;
147160
this.input = input;
148161
this.control = control;
@@ -153,6 +166,7 @@ private StartChildWorkflowExecutionParameters(
153166
this.workflowType = workflowType;
154167
this.childPolicy = childPolicy;
155168
this.workflowIdReusePolicy = workflowIdReusePolicy;
169+
this.retryParameters = retryParameters;
156170
}
157171

158172
public String getDomain() {
@@ -195,6 +209,46 @@ public WorkflowIdReusePolicy getWorkflowIdReusePolicy() {
195209
return workflowIdReusePolicy;
196210
}
197211

212+
public RetryParameters getRetryParameters() {
213+
return retryParameters;
214+
}
215+
216+
@Override
217+
public boolean equals(Object o) {
218+
if (this == o) return true;
219+
if (o == null || getClass() != o.getClass()) return false;
220+
StartChildWorkflowExecutionParameters that = (StartChildWorkflowExecutionParameters) o;
221+
return executionStartToCloseTimeoutSeconds == that.executionStartToCloseTimeoutSeconds
222+
&& taskStartToCloseTimeoutSeconds == that.taskStartToCloseTimeoutSeconds
223+
&& Objects.equals(domain, that.domain)
224+
&& Objects.equals(control, that.control)
225+
&& Arrays.equals(input, that.input)
226+
&& Objects.equals(taskList, that.taskList)
227+
&& Objects.equals(workflowId, that.workflowId)
228+
&& Objects.equals(workflowType, that.workflowType)
229+
&& childPolicy == that.childPolicy
230+
&& workflowIdReusePolicy == that.workflowIdReusePolicy
231+
&& Objects.equals(retryParameters, that.retryParameters);
232+
}
233+
234+
@Override
235+
public int hashCode() {
236+
int result =
237+
Objects.hash(
238+
domain,
239+
control,
240+
executionStartToCloseTimeoutSeconds,
241+
taskList,
242+
taskStartToCloseTimeoutSeconds,
243+
workflowId,
244+
workflowType,
245+
childPolicy,
246+
workflowIdReusePolicy,
247+
retryParameters);
248+
result = 31 * result + Arrays.hashCode(input);
249+
return result;
250+
}
251+
198252
@Override
199253
public String toString() {
200254
return "StartChildWorkflowExecutionParameters{"
@@ -222,6 +276,8 @@ public String toString() {
222276
+ childPolicy
223277
+ ", workflowIdReusePolicy="
224278
+ workflowIdReusePolicy
279+
+ ", retryParameters="
280+
+ retryParameters
225281
+ '}';
226282
}
227283
}

0 commit comments

Comments
 (0)