Skip to content

Commit e066457

Browse files
author
Liang Mei
committed
Revert "Revert "Integration get raw history #443 (#472)""
This reverts commit 7cbfb7b.
1 parent 7cbfb7b commit e066457

File tree

5 files changed

+83
-11
lines changed

5 files changed

+83
-11
lines changed

src/main/idls

Submodule idls updated from 56ca0b5 to 8e3046c

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,30 @@
1818
package com.uber.cadence.internal.common;
1919

2020
import com.google.common.base.Defaults;
21+
import com.uber.cadence.DataBlob;
22+
import com.uber.cadence.History;
23+
import com.uber.cadence.HistoryEvent;
24+
import com.uber.cadence.HistoryEventFilterType;
2125
import com.uber.cadence.SearchAttributes;
2226
import com.uber.cadence.TaskList;
2327
import com.uber.cadence.TaskListKind;
28+
import com.uber.cadence.EntityNotExistsError;
2429
import com.uber.cadence.converter.DataConverter;
2530
import com.uber.cadence.converter.JsonDataConverter;
2631
import com.uber.cadence.internal.worker.Shutdownable;
2732
import com.uber.cadence.workflow.WorkflowMethod;
2833
import java.lang.reflect.Method;
2934
import java.nio.ByteBuffer;
35+
import java.util.Arrays;
36+
import java.util.ArrayList;
37+
import java.util.List;
3038
import java.util.HashMap;
3139
import java.util.Map;
3240
import java.util.concurrent.ExecutorService;
3341
import java.util.concurrent.TimeUnit;
42+
import org.apache.thrift.TDeserializer;
43+
import org.apache.thrift.TException;
44+
import org.apache.thrift.TSerializer;
3445

3546
/** Utility functions shared by the implementation code. */
3647
public final class InternalUtils {
@@ -143,6 +154,56 @@ public static SearchAttributes convertMapToSearchAttributes(
143154
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
144155
}
145156

157+
// This method deserialize the DataBlob data to the HistoriyEvent data
158+
public static History DeserializeFromBlobToHistoryEvents(
159+
List<DataBlob> blobData, HistoryEventFilterType historyEventFilterType) throws TException {
160+
161+
List<HistoryEvent> events = new ArrayList<HistoryEvent>();
162+
for (DataBlob data : blobData) {
163+
History history = new History();
164+
try {
165+
byte[] dataByte = data.getData();
166+
dataByte = Arrays.copyOfRange(dataByte, 1, dataByte.length);
167+
deSerializer.deserialize(history, dataByte);
168+
169+
if (history == null || history.getEvents() == null || history.getEvents().size() == 0) {
170+
return null;
171+
}
172+
} catch (org.apache.thrift.TException err) {
173+
throw new TException("Deserialize blob data to history event failed with unknown error");
174+
}
175+
176+
events.addAll(history.getEvents());
177+
}
178+
179+
if (events.size() > 0 && historyEventFilterType == HistoryEventFilterType.CLOSE_EVENT) {
180+
events = events.subList(events.size() - 1, events.size());
181+
}
182+
183+
return new History().setEvents(events);
184+
}
185+
186+
// This method deserialize the history event data to blob data
187+
public static List<DataBlob> DeserializeFromHistoryEventToBlobData(List<HistoryEvent> events)
188+
throws EntityNotExistsError {
189+
190+
List<DataBlob> blobs = new ArrayList<DataBlob>();
191+
for (HistoryEvent event : events) {
192+
DataBlob blob = new DataBlob();
193+
try {
194+
blob.setData(serializer.serialize(event));
195+
} catch (org.apache.thrift.TException err) {
196+
throw new EntityNotExistsError("Deserialize blob data to history event failed with unknown error");
197+
}
198+
blobs.add(blob);
199+
}
200+
201+
return blobs;
202+
}
203+
204+
private static final TDeserializer deSerializer = new TDeserializer();
205+
private static final TSerializer serializer = new TSerializer();
206+
146207
/** Prohibit instantiation */
147208
private InternalUtils() {}
148209
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Objects;
3333
import java.util.Optional;
34+
import org.apache.thrift.TException;
3435

3536
interface TestWorkflowStore {
3637

@@ -155,8 +156,7 @@ void sendQueryTask(ExecutionId executionId, TaskListId taskList, PollForDecision
155156
throws EntityNotExistsError;
156157

157158
GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
158-
ExecutionId executionId, GetWorkflowExecutionHistoryRequest getRequest)
159-
throws EntityNotExistsError;
159+
ExecutionId executionId, GetWorkflowExecutionHistoryRequest getRequest) throws EntityNotExistsError;
160160

161161
void getDiagnostics(StringBuilder result);
162162

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.uber.cadence.BadRequestError;
2121
import com.uber.cadence.EntityNotExistsError;
2222
import com.uber.cadence.EventType;
23+
import com.uber.cadence.DataBlob;
24+
import com.uber.cadence.internal.common.InternalUtils;
2325
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
2426
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
2527
import com.uber.cadence.History;
@@ -47,6 +49,7 @@
4749
import java.util.concurrent.locks.Condition;
4850
import java.util.concurrent.locks.Lock;
4951
import java.util.concurrent.locks.ReentrantLock;
52+
import org.apache.thrift.TException;
5053

5154
class TestWorkflowStoreImpl implements TestWorkflowStore {
5255

@@ -335,8 +338,7 @@ public void sendQueryTask(
335338

336339
@Override
337340
public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
338-
ExecutionId executionId, GetWorkflowExecutionHistoryRequest getRequest)
339-
throws EntityNotExistsError {
341+
ExecutionId executionId, GetWorkflowExecutionHistoryRequest getRequest) throws EntityNotExistsError {
340342
HistoryStore history;
341343
// Used to eliminate the race condition on waitForNewEvents
342344
long expectedNextEventId;
@@ -346,20 +348,23 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
346348
if (!getRequest.isWaitForNewEvent()
347349
&& getRequest.getHistoryEventFilterType() != HistoryEventFilterType.CLOSE_EVENT) {
348350
List<HistoryEvent> events = history.getEventsLocked();
351+
List<DataBlob> blobs = InternalUtils.DeserializeFromHistoryEventToBlobData(events);
349352
// Copy the list as it is mutable. Individual events assumed immutable.
350353
ArrayList<HistoryEvent> eventsCopy = new ArrayList<>(events);
351354
return new GetWorkflowExecutionHistoryResponse()
352-
.setHistory(new History().setEvents(eventsCopy));
355+
.setHistory(new History().setEvents(eventsCopy)).setRawHistory(blobs);
353356
}
354357
expectedNextEventId = history.getNextEventIdLocked();
355358
} finally {
356359
lock.unlock();
357360
}
358361
List<HistoryEvent> events =
359362
history.waitForNewEvents(expectedNextEventId, getRequest.getHistoryEventFilterType());
363+
List<DataBlob> blobs = InternalUtils.DeserializeFromHistoryEventToBlobData(events);
360364
GetWorkflowExecutionHistoryResponse result = new GetWorkflowExecutionHistoryResponse();
361365
if (events != null) {
362366
result.setHistory(new History().setEvents(events));
367+
result.setRawHistory(blobs);
363368
}
364369
return result;
365370
}

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.uber.cadence.GetSearchAttributesResponse;
3838
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
3939
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
40+
import com.uber.cadence.History;
4041
import com.uber.cadence.InternalServiceError;
4142
import com.uber.cadence.LimitExceededError;
4243
import com.uber.cadence.ListArchivedWorkflowExecutionsRequest;
@@ -90,6 +91,7 @@
9091
import com.uber.cadence.WorkflowService.GetWorkflowExecutionHistory_result;
9192
import com.uber.cadence.internal.Version;
9293
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
94+
import com.uber.cadence.internal.common.InternalUtils;
9395
import com.uber.cadence.internal.metrics.MetricsType;
9496
import com.uber.cadence.internal.metrics.NoopScope;
9597
import com.uber.cadence.internal.metrics.ServiceMethod;
@@ -106,10 +108,7 @@
106108
import java.net.InetAddress;
107109
import java.net.InetSocketAddress;
108110
import java.net.UnknownHostException;
109-
import java.util.ArrayList;
110-
import java.util.HashMap;
111-
import java.util.Map;
112-
import java.util.UUID;
111+
import java.util.*;
113112
import java.util.concurrent.CompletableFuture;
114113
import java.util.concurrent.ExecutionException;
115114
import org.apache.thrift.TException;
@@ -853,7 +852,14 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
853852
WorkflowService.GetWorkflowExecutionHistory_result result =
854853
response.getBody(WorkflowService.GetWorkflowExecutionHistory_result.class);
855854
if (response.getResponseCode() == ResponseCode.OK) {
856-
return result.getSuccess();
855+
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
856+
if (res.getRawHistory() != null) {
857+
History history =
858+
InternalUtils.DeserializeFromBlobToHistoryEvents(
859+
res.getRawHistory(), getRequest.getHistoryEventFilterType());
860+
res.setHistory(history);
861+
}
862+
return res;
857863
}
858864
if (result.isSetBadRequestError()) {
859865
throw result.getBadRequestError();

0 commit comments

Comments
 (0)