Skip to content

Commit 1178730

Browse files
authored
Fix busy loop in local activity poller if there is no task (#337)
* Also fix an issue in get history timeout
1 parent b696a14 commit 1178730

File tree

3 files changed

+20
-5
lines changed

3 files changed

+20
-5
lines changed

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public static HistoryEvent getInstanceCloseEvent(
196196
r.setExecution(workflowExecution);
197197
r.setHistoryEventFilterType(HistoryEventFilterType.CLOSE_EVENT);
198198
r.setNextPageToken(pageToken);
199+
r.setWaitForNewEvent(true);
199200
try {
200201
response =
201202
Retryer.retryWithResult(retryParameters, () -> service.GetWorkflowExecutionHistory(r));

src/main/java/com/uber/cadence/internal/worker/LocalActivityPollTask.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ final class LocalActivityPollTask
3333

3434
@Override
3535
public LocalActivityWorker.Task poll() throws TException {
36-
return pendingTasks.poll();
36+
try {
37+
return pendingTasks.take();
38+
} catch (InterruptedException e) {
39+
throw new RuntimeException("local activity poll task interrupted", e);
40+
}
3741
}
3842

3943
@Override

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -808,10 +808,20 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
808808
GetWorkflowExecutionHistoryRequest getRequest) throws TException {
809809
ThriftResponse<WorkflowService.GetWorkflowExecutionHistory_result> response = null;
810810
try {
811-
ThriftRequest<WorkflowService.GetWorkflowExecutionHistory_args> request =
812-
buildThriftRequest(
813-
"GetWorkflowExecutionHistory",
814-
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest));
811+
ThriftRequest<WorkflowService.GetWorkflowExecutionHistory_args> request;
812+
if (getRequest.isWaitForNewEvent()) {
813+
request =
814+
buildThriftRequest(
815+
"GetWorkflowExecutionHistory",
816+
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest),
817+
options.getRpcLongPollTimeoutMillis());
818+
} else {
819+
request =
820+
buildThriftRequest(
821+
"GetWorkflowExecutionHistory",
822+
new WorkflowService.GetWorkflowExecutionHistory_args(getRequest));
823+
}
824+
815825
response = doRemoteCall(request);
816826
WorkflowService.GetWorkflowExecutionHistory_result result =
817827
response.getBody(WorkflowService.GetWorkflowExecutionHistory_result.class);

0 commit comments

Comments
 (0)