Skip to content

Commit 393d19f

Browse files
authored
Change markers to use headers to serialize internal fields (#324)
* Change local activity marker to use headers * Change mutable side effects and version marker to use header * Update pretty print object method for test framework * Add backward compatibility test
1 parent 1d7c227 commit 393d19f

13 files changed

+432
-129
lines changed

src/main/java/com/uber/cadence/internal/common/LocalActivityMarkerData.java

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,25 @@
1717

1818
package com.uber.cadence.internal.common;
1919

20+
import com.google.common.base.Strings;
2021
import com.uber.cadence.ActivityType;
22+
import com.uber.cadence.Header;
23+
import com.uber.cadence.MarkerRecordedEventAttributes;
2124
import com.uber.cadence.RespondActivityTaskCanceledRequest;
2225
import com.uber.cadence.RespondActivityTaskFailedRequest;
26+
import com.uber.cadence.converter.DataConverter;
27+
import com.uber.m3.util.ImmutableMap;
28+
import java.nio.ByteBuffer;
2329
import java.nio.charset.StandardCharsets;
2430
import java.time.Duration;
2531

2632
public final class LocalActivityMarkerData {
33+
private static final String LOCAL_ACTIVITY_HEADER_KEY = "LocalActivityHeader";
34+
2735
public static final class Builder {
2836
private String activityId;
2937
private String activityType;
3038
private String errReason;
31-
private byte[] errJson;
3239
private byte[] result;
3340
private long replayTimeMillis;
3441
private int attempt;
@@ -47,13 +54,13 @@ public Builder setActivityType(ActivityType activityType) {
4754

4855
public Builder setTaskFailedRequest(RespondActivityTaskFailedRequest request) {
4956
this.errReason = request.getReason();
50-
this.errJson = request.getDetails();
57+
this.result = request.getDetails();
5158
return this;
5259
}
5360

5461
public Builder setTaskCancelledRequest(RespondActivityTaskCanceledRequest request) {
5562
this.errReason = new String(request.getDetails(), StandardCharsets.UTF_8);
56-
this.errJson = request.getDetails();
63+
this.result = request.getDetails();
5764
this.isCancelled = true;
5865
return this;
5966
}
@@ -85,77 +92,114 @@ public LocalActivityMarkerData build() {
8592
replayTimeMillis,
8693
result,
8794
errReason,
88-
errJson,
8995
attempt,
9096
backoff,
9197
isCancelled);
9298
}
9399
}
94100

95-
private final String activityId;
96-
private final String activityType;
97-
private final String errReason;
98-
private final byte[] errJson;
101+
private static class LocalActivityMarkerHeader {
102+
private final String activityId;
103+
private final String activityType;
104+
private final String errReason;
105+
private final long replayTimeMillis;
106+
private final int attempt;
107+
private final Duration backoff;
108+
private final boolean isCancelled;
109+
110+
LocalActivityMarkerHeader(
111+
String activityId,
112+
String activityType,
113+
long replayTimeMillis,
114+
String errReason,
115+
int attempt,
116+
Duration backoff,
117+
boolean isCancelled) {
118+
this.activityId = activityId;
119+
this.activityType = activityType;
120+
this.replayTimeMillis = replayTimeMillis;
121+
this.errReason = errReason;
122+
this.attempt = attempt;
123+
this.backoff = backoff;
124+
this.isCancelled = isCancelled;
125+
}
126+
}
127+
128+
private final LocalActivityMarkerHeader headers;
99129
private final byte[] result;
100-
private final long replayTimeMillis;
101-
private final int attempt;
102-
private final Duration backoff;
103-
private final boolean isCancelled;
104130

105131
private LocalActivityMarkerData(
106132
String activityId,
107133
String activityType,
108134
long replayTimeMillis,
109135
byte[] result,
110136
String errReason,
111-
byte[] errJson,
112137
int attempt,
113138
Duration backoff,
114139
boolean isCancelled) {
115-
this.activityId = activityId;
116-
this.activityType = activityType;
117-
this.replayTimeMillis = replayTimeMillis;
140+
this.headers =
141+
new LocalActivityMarkerHeader(
142+
activityId, activityType, replayTimeMillis, errReason, attempt, backoff, isCancelled);
118143
this.result = result;
119-
this.errReason = errReason;
120-
this.errJson = errJson;
121-
this.attempt = attempt;
122-
this.backoff = backoff;
123-
this.isCancelled = isCancelled;
144+
}
145+
146+
private LocalActivityMarkerData(LocalActivityMarkerHeader headers, byte[] result) {
147+
this.headers = headers;
148+
this.result = result;
149+
if (headers == null) {
150+
System.out.println("test");
151+
}
124152
}
125153

126154
public String getActivityId() {
127-
return activityId;
155+
return headers.activityId;
128156
}
129157

130158
public String getActivityType() {
131-
return activityType;
159+
return headers.activityType;
132160
}
133161

134162
public String getErrReason() {
135-
return errReason;
163+
return headers.errReason;
136164
}
137165

138166
public byte[] getErrJson() {
139-
return errJson;
167+
return Strings.isNullOrEmpty(headers.errReason) ? null : result;
140168
}
141169

142170
public byte[] getResult() {
143171
return result;
144172
}
145173

146174
public long getReplayTimeMillis() {
147-
return replayTimeMillis;
175+
return headers.replayTimeMillis;
148176
}
149177

150178
public int getAttempt() {
151-
return attempt;
179+
return headers.attempt;
152180
}
153181

154182
public Duration getBackoff() {
155-
return backoff;
183+
return headers.backoff;
156184
}
157185

158186
public boolean getIsCancelled() {
159-
return isCancelled;
187+
return headers.isCancelled;
188+
}
189+
190+
public Header getHeader(DataConverter converter) {
191+
byte[] headerData = converter.toData(headers);
192+
Header header = new Header();
193+
header.setFields(ImmutableMap.of(LOCAL_ACTIVITY_HEADER_KEY, ByteBuffer.wrap(headerData)));
194+
return header;
195+
}
196+
197+
public static LocalActivityMarkerData fromEventAttributes(
198+
MarkerRecordedEventAttributes attributes, DataConverter converter) {
199+
ByteBuffer byteBuffer = attributes.getHeader().getFields().get(LOCAL_ACTIVITY_HEADER_KEY);
200+
byte[] bytes = org.apache.thrift.TBaseHelper.byteBufferToByteArray(byteBuffer);
201+
LocalActivityMarkerHeader header =
202+
converter.fromData(bytes, LocalActivityMarkerHeader.class, LocalActivityMarkerHeader.class);
203+
return new LocalActivityMarkerData(header, attributes.getDetails());
160204
}
161205
}

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import com.uber.cadence.History;
4040
import com.uber.cadence.HistoryEvent;
4141
import com.uber.cadence.HistoryEventFilterType;
42-
import com.uber.cadence.MarkerRecordedEventAttributes;
4342
import com.uber.cadence.StartWorkflowExecutionRequest;
4443
import com.uber.cadence.TaskList;
4544
import com.uber.cadence.WorkflowExecution;
@@ -61,7 +60,7 @@
6160
import java.lang.reflect.InvocationTargetException;
6261
import java.lang.reflect.Method;
6362
import java.lang.reflect.Modifier;
64-
import java.nio.charset.Charset;
63+
import java.nio.ByteBuffer;
6564
import java.nio.file.Files;
6665
import java.time.Duration;
6766
import java.util.Collection;
@@ -713,27 +712,9 @@ private static String prettyPrintHistoryEvent(HistoryEvent event, long firstTime
713712
result.append(String.format(" [%s ms]", timestamp));
714713
}
715714
result.append(" ");
716-
717-
if (event.getEventType() == EventType.MarkerRecorded) {
718-
MarkerRecordedEventAttributes markerAttributes = event.getMarkerRecordedEventAttributes();
719-
result
720-
.append("{\n")
721-
.append(" MarkerName = ")
722-
.append(markerAttributes.getMarkerName())
723-
.append(";\n");
724-
result
725-
.append(" DecisionTaskCompletedEventId = ")
726-
.append(markerAttributes.getDecisionTaskCompletedEventId())
727-
.append(";\n");
728-
result
729-
.append(" Details = ")
730-
.append(new String(markerAttributes.getDetails(), Charset.defaultCharset()))
731-
.append(";\n }");
732-
} else {
733-
result.append(
734-
prettyPrintObject(
735-
getEventAttributes(event), "getFieldValue", true, INDENTATION, false, false));
736-
}
715+
result.append(
716+
prettyPrintObject(
717+
getEventAttributes(event), "getFieldValue", true, INDENTATION, false, false));
737718

738719
return result.toString();
739720
}
@@ -784,7 +765,9 @@ private static String prettyPrintObject(
784765
if (clz.equals(byte[].class)) {
785766
return new String((byte[]) object, UTF_8);
786767
}
787-
768+
if (ByteBuffer.class.isAssignableFrom(clz)) {
769+
return new String(((ByteBuffer) object).array(), UTF_8);
770+
}
788771
if (clz.equals(Date.class)) {
789772
return String.valueOf(object);
790773
}
@@ -797,9 +780,40 @@ private static String prettyPrintObject(
797780
if (clz.equals(WorkflowType.class)) {
798781
return String.valueOf(((WorkflowType) object).getName());
799782
}
800-
783+
if (Map.Entry.class.isAssignableFrom(clz)) {
784+
result.append(
785+
prettyPrintObject(
786+
((Map.Entry) object).getKey(),
787+
methodToSkip,
788+
skipNullsAndEmptyCollections,
789+
"",
790+
skipLevel,
791+
printTypeName));
792+
result.append("=");
793+
result.append(
794+
prettyPrintObject(
795+
((Map.Entry) object).getValue(),
796+
methodToSkip,
797+
skipNullsAndEmptyCollections,
798+
"",
799+
skipLevel,
800+
printTypeName));
801+
return result.toString();
802+
}
801803
if (Map.class.isAssignableFrom(clz)) {
802-
return String.valueOf(object);
804+
result.append("{ ");
805+
806+
String prefix = "";
807+
for (Object entry : ((Map) object).entrySet()) {
808+
result.append(prefix);
809+
prefix = ", ";
810+
result.append(
811+
prettyPrintObject(
812+
entry, methodToSkip, skipNullsAndEmptyCollections, "", skipLevel, printTypeName));
813+
}
814+
815+
result.append(" }");
816+
return result.toString();
803817
}
804818
if (Collection.class.isAssignableFrom(clz)) {
805819
return String.valueOf(object);

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
import com.uber.cadence.TimerCanceledEventAttributes;
2626
import com.uber.cadence.TimerFiredEventAttributes;
2727
import com.uber.cadence.converter.DataConverter;
28-
import com.uber.cadence.converter.JsonDataConverter;
2928
import com.uber.cadence.internal.common.LocalActivityMarkerData;
30-
import com.uber.cadence.internal.replay.MarkerHandler.MarkerData;
3129
import com.uber.cadence.internal.sync.WorkflowInternal;
3230
import com.uber.cadence.internal.worker.LocalActivityWorker;
3331
import com.uber.cadence.workflow.ActivityFailureException;
@@ -88,20 +86,23 @@ public void accept(Exception reason) {
8886
private final Map<String, OpenRequestInfo<byte[], ActivityType>> pendingLaTasks = new HashMap<>();
8987
private final Map<String, ExecuteLocalActivityParameters> unstartedLaTasks = new HashMap<>();
9088
private final ReplayDecider replayDecider;
89+
private final DataConverter dataConverter;
9190
private final Lock laTaskLock = new ReentrantLock();
9291
private final Condition taskCondition = laTaskLock.newCondition();
9392
private boolean taskCompleted = false;
9493

9594
ClockDecisionContext(
9695
DecisionsHelper decisions,
9796
BiFunction<LocalActivityWorker.Task, Duration, Boolean> laTaskPoller,
98-
ReplayDecider replayDecider) {
97+
ReplayDecider replayDecider,
98+
DataConverter dataConverter) {
9999
this.decisions = decisions;
100100
mutableSideEffectHandler =
101101
new MarkerHandler(decisions, MUTABLE_SIDE_EFFECT_MARKER_NAME, () -> replaying);
102102
versionHandler = new MarkerHandler(decisions, VERSION_MARKER_NAME, () -> replaying);
103103
this.laTaskPoller = laTaskPoller;
104104
this.replayDecider = replayDecider;
105+
this.dataConverter = dataConverter;
105106
}
106107

107108
public long currentTimeMillis() {
@@ -192,7 +193,7 @@ byte[] sideEffect(Func<byte[]> func) {
192193
throw new Error("sideEffect function failed", e);
193194
}
194195
}
195-
decisions.recordMarker(SIDE_EFFECT_MARKER_NAME, result);
196+
decisions.recordMarker(SIDE_EFFECT_MARKER_NAME, null, result);
196197
return result;
197198
}
198199

@@ -222,16 +223,13 @@ void handleMarkerRecorded(HistoryEvent event) {
222223

223224
private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes) {
224225
LocalActivityMarkerData marker =
225-
JsonDataConverter.getInstance()
226-
.fromData(
227-
attributes.getDetails(),
228-
LocalActivityMarkerData.class,
229-
LocalActivityMarkerData.class);
226+
LocalActivityMarkerData.fromEventAttributes(attributes, dataConverter);
230227

231228
if (pendingLaTasks.containsKey(marker.getActivityId())) {
232229
log.debug("Handle LocalActivityMarker for activity " + marker.getActivityId());
233230

234-
decisions.recordMarker(LOCAL_ACTIVITY_MARKER_NAME, attributes.getDetails());
231+
decisions.recordMarker(
232+
LOCAL_ACTIVITY_MARKER_NAME, marker.getHeader(dataConverter), attributes.getDetails());
235233

236234
OpenRequestInfo<byte[], ActivityType> scheduled =
237235
pendingLaTasks.remove(marker.getActivityId());
@@ -242,8 +240,7 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes)
242240
failure = new CancellationException(marker.getErrReason());
243241
} else if (marker.getErrJson() != null) {
244242
Throwable cause =
245-
JsonDataConverter.getInstance()
246-
.fromData(marker.getErrJson(), Throwable.class, Throwable.class);
243+
dataConverter.fromData(marker.getErrJson(), Throwable.class, Throwable.class);
247244
ActivityType activityType = new ActivityType();
248245
activityType.setName(marker.getActivityType());
249246
failure =
@@ -271,10 +268,10 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes)
271268
}
272269

273270
int getVersion(String changeId, DataConverter converter, int minSupported, int maxSupported) {
274-
Predicate<byte[]> changeIdEquals =
275-
(bytesInEvent) -> {
276-
MarkerData markerData =
277-
converter.fromData(bytesInEvent, MarkerData.class, MarkerData.class);
271+
Predicate<MarkerRecordedEventAttributes> changeIdEquals =
272+
(attributes) -> {
273+
MarkerHandler.MarkerInterface markerData =
274+
MarkerHandler.MarkerInterface.fromEventAttributes(attributes, converter);
278275
return markerData.getId().equals(changeId);
279276
};
280277
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));

0 commit comments

Comments
 (0)