Skip to content

Commit 722d8cf

Browse files
authored
Retry getting result from passive in case of replication delay (#473)
* Pull in latest idls * Retry getting result in passive cluster in case of replication lag * Review comments
1 parent e066457 commit 722d8cf

File tree

5 files changed

+54
-31
lines changed

5 files changed

+54
-31
lines changed

src/main/idls

Submodule idls updated from 8e3046c to 085f956

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,16 @@
2525
import com.uber.cadence.SearchAttributes;
2626
import com.uber.cadence.TaskList;
2727
import com.uber.cadence.TaskListKind;
28-
import com.uber.cadence.EntityNotExistsError;
2928
import com.uber.cadence.converter.DataConverter;
3029
import com.uber.cadence.converter.JsonDataConverter;
3130
import com.uber.cadence.internal.worker.Shutdownable;
3231
import com.uber.cadence.workflow.WorkflowMethod;
3332
import java.lang.reflect.Method;
3433
import java.nio.ByteBuffer;
35-
import java.util.Arrays;
3634
import java.util.ArrayList;
37-
import java.util.List;
35+
import java.util.Arrays;
3836
import java.util.HashMap;
37+
import java.util.List;
3938
import java.util.Map;
4039
import java.util.concurrent.ExecutorService;
4140
import java.util.concurrent.TimeUnit;
@@ -183,17 +182,15 @@ public static History DeserializeFromBlobToHistoryEvents(
183182
return new History().setEvents(events);
184183
}
185184

186-
// This method deserialize the history event data to blob data
187-
public static List<DataBlob> DeserializeFromHistoryEventToBlobData(List<HistoryEvent> events)
188-
throws EntityNotExistsError {
189-
190-
List<DataBlob> blobs = new ArrayList<DataBlob>();
185+
// This method serializes history event to blob data
186+
public static List<DataBlob> SerializeFromHistoryEventToBlobData(List<HistoryEvent> events) {
187+
List<DataBlob> blobs = new ArrayList<>(events.size());
191188
for (HistoryEvent event : events) {
192189
DataBlob blob = new DataBlob();
193190
try {
194191
blob.setData(serializer.serialize(event));
195192
} catch (org.apache.thrift.TException err) {
196-
throw new EntityNotExistsError("Deserialize blob data to history event failed with unknown error");
193+
throw new RuntimeException("Serialize to blob data failed", err);
197194
}
198195
blobs.add(blob);
199196
}

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ public class WorkflowExecutionUtils {
100100
.setDoNotRetry(BadRequestError.class, EntityNotExistsError.class)
101101
.build();
102102

103+
// Wait period for passive cluster to retry getting workflow result in case of replication delay.
104+
private static final long ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS = 500;
105+
103106
/**
104107
* Returns result of a workflow instance execution or throws an exception if workflow did not
105108
* complete successfully.
@@ -178,7 +181,7 @@ private static byte[] getResultFromCloseEvent(
178181
}
179182

180183
/** Returns an instance closing event, potentially waiting for workflow to complete. */
181-
public static HistoryEvent getInstanceCloseEvent(
184+
private static HistoryEvent getInstanceCloseEvent(
182185
IWorkflowService service,
183186
String domain,
184187
WorkflowExecution workflowExecution,
@@ -191,31 +194,53 @@ public static HistoryEvent getInstanceCloseEvent(
191194
long start = System.currentTimeMillis();
192195
HistoryEvent event;
193196
do {
197+
if (timeout != 0 && System.currentTimeMillis() - start > unit.toMillis(timeout)) {
198+
throw new TimeoutException(
199+
"WorkflowId="
200+
+ workflowExecution.getWorkflowId()
201+
+ ", runId="
202+
+ workflowExecution.getRunId()
203+
+ ", timeout="
204+
+ timeout
205+
+ ", unit="
206+
+ unit);
207+
}
208+
194209
GetWorkflowExecutionHistoryRequest r = new GetWorkflowExecutionHistoryRequest();
195210
r.setDomain(domain);
196211
r.setExecution(workflowExecution);
197212
r.setHistoryEventFilterType(HistoryEventFilterType.CLOSE_EVENT);
198213
r.setNextPageToken(pageToken);
199214
r.setWaitForNewEvent(true);
215+
r.setSkipArchival(true);
200216
try {
201217
response =
202218
Retryer.retryWithResult(retryParameters, () -> service.GetWorkflowExecutionHistory(r));
203219
} catch (EntityNotExistsError e) {
220+
if (e.activeCluster != null
221+
&& e.currentCluster != null
222+
&& !e.activeCluster.equals(e.currentCluster)) {
223+
// Current cluster is passive cluster. Execution might not exist because of replication
224+
// lag. If we are still within timeout, wait for a little bit and retry.
225+
if (timeout != 0
226+
&& System.currentTimeMillis() + ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS - start
227+
> unit.toMillis(timeout)) {
228+
throw e;
229+
}
230+
231+
try {
232+
Thread.sleep(ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS);
233+
} catch (InterruptedException ie) {
234+
// Throw entity not exist here.
235+
throw e;
236+
}
237+
continue;
238+
}
204239
throw e;
205240
} catch (TException e) {
206241
throw CheckedExceptionWrapper.wrap(e);
207242
}
208-
if (timeout != 0 && System.currentTimeMillis() - start > unit.toMillis(timeout)) {
209-
throw new TimeoutException(
210-
"WorkflowId="
211-
+ workflowExecution.getWorkflowId()
212-
+ ", runId="
213-
+ workflowExecution.getRunId()
214-
+ ", timeout="
215-
+ timeout
216-
+ ", unit="
217-
+ unit);
218-
}
243+
219244
pageToken = response.getNextPageToken();
220245
History history = response.getHistory();
221246
if (history != null && history.getEvents().size() > 0) {

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.List;
3232
import java.util.Objects;
3333
import java.util.Optional;
34-
import org.apache.thrift.TException;
3534

3635
interface TestWorkflowStore {
3736

@@ -156,7 +155,8 @@ void sendQueryTask(ExecutionId executionId, TaskListId taskList, PollForDecision
156155
throws EntityNotExistsError;
157156

158157
GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
159-
ExecutionId executionId, GetWorkflowExecutionHistoryRequest getRequest) throws EntityNotExistsError;
158+
ExecutionId executionId, GetWorkflowExecutionHistoryRequest getRequest)
159+
throws EntityNotExistsError;
160160

161161
void getDiagnostics(StringBuilder result);
162162

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
package com.uber.cadence.internal.testservice;
1919

2020
import com.uber.cadence.BadRequestError;
21+
import com.uber.cadence.DataBlob;
2122
import com.uber.cadence.EntityNotExistsError;
2223
import com.uber.cadence.EventType;
23-
import com.uber.cadence.DataBlob;
24-
import com.uber.cadence.internal.common.InternalUtils;
2524
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
2625
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
2726
import com.uber.cadence.History;
@@ -35,6 +34,7 @@
3534
import com.uber.cadence.StickyExecutionAttributes;
3635
import com.uber.cadence.WorkflowExecution;
3736
import com.uber.cadence.WorkflowExecutionInfo;
37+
import com.uber.cadence.internal.common.InternalUtils;
3838
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3939
import com.uber.cadence.internal.testservice.RequestContext.Timer;
4040
import java.time.Duration;
@@ -49,7 +49,6 @@
4949
import java.util.concurrent.locks.Condition;
5050
import java.util.concurrent.locks.Lock;
5151
import java.util.concurrent.locks.ReentrantLock;
52-
import org.apache.thrift.TException;
5352

5453
class TestWorkflowStoreImpl implements TestWorkflowStore {
5554

@@ -338,7 +337,8 @@ public void sendQueryTask(
338337

339338
@Override
340339
public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
341-
ExecutionId executionId, GetWorkflowExecutionHistoryRequest getRequest) throws EntityNotExistsError {
340+
ExecutionId executionId, GetWorkflowExecutionHistoryRequest getRequest)
341+
throws EntityNotExistsError {
342342
HistoryStore history;
343343
// Used to eliminate the race condition on waitForNewEvents
344344
long expectedNextEventId;
@@ -348,19 +348,20 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
348348
if (!getRequest.isWaitForNewEvent()
349349
&& getRequest.getHistoryEventFilterType() != HistoryEventFilterType.CLOSE_EVENT) {
350350
List<HistoryEvent> events = history.getEventsLocked();
351-
List<DataBlob> blobs = InternalUtils.DeserializeFromHistoryEventToBlobData(events);
351+
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
352352
// Copy the list as it is mutable. Individual events assumed immutable.
353353
ArrayList<HistoryEvent> eventsCopy = new ArrayList<>(events);
354354
return new GetWorkflowExecutionHistoryResponse()
355-
.setHistory(new History().setEvents(eventsCopy)).setRawHistory(blobs);
355+
.setHistory(new History().setEvents(eventsCopy))
356+
.setRawHistory(blobs);
356357
}
357358
expectedNextEventId = history.getNextEventIdLocked();
358359
} finally {
359360
lock.unlock();
360361
}
361362
List<HistoryEvent> events =
362363
history.waitForNewEvents(expectedNextEventId, getRequest.getHistoryEventFilterType());
363-
List<DataBlob> blobs = InternalUtils.DeserializeFromHistoryEventToBlobData(events);
364+
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
364365
GetWorkflowExecutionHistoryResponse result = new GetWorkflowExecutionHistoryResponse();
365366
if (events != null) {
366367
result.setHistory(new History().setEvents(events));

0 commit comments

Comments
 (0)