Skip to content

Commit 1efbbb3

Browse files
authored
Retry getHistory request from passive cluster in case of replication lag (#952)
* Retry getHistory request from passive cluster in case of replication lag * Fix unit test
1 parent 04ccee6 commit 1efbbb3

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

internal/internal_workflow_client.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -487,11 +487,16 @@ func (wc *workflowClient) TerminateWorkflow(ctx context.Context, workflowID stri
487487
}
488488

489489
// GetWorkflowHistory return a channel which contains the history events of a given workflow
490-
func (wc *workflowClient) GetWorkflowHistory(ctx context.Context, workflowID string, runID string,
491-
isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator {
490+
func (wc *workflowClient) GetWorkflowHistory(
491+
ctx context.Context,
492+
workflowID string,
493+
runID string,
494+
isLongPoll bool,
495+
filterType s.HistoryEventFilterType,
496+
) HistoryEventIterator {
492497

493498
domain := wc.domain
494-
paginate := func(nexttoken []byte) (*s.GetWorkflowExecutionHistoryResponse, error) {
499+
paginate := func(nextToken []byte) (*s.GetWorkflowExecutionHistoryResponse, error) {
495500
request := &s.GetWorkflowExecutionHistoryRequest{
496501
Domain: common.StringPtr(domain),
497502
Execution: &s.WorkflowExecution{
@@ -500,7 +505,8 @@ func (wc *workflowClient) GetWorkflowHistory(ctx context.Context, workflowID str
500505
},
501506
WaitForNewEvent: common.BoolPtr(isLongPoll),
502507
HistoryEventFilterType: &filterType,
503-
NextPageToken: nexttoken,
508+
NextPageToken: nextToken,
509+
SkipArchival: common.BoolPtr(isLongPoll),
504510
}
505511

506512
var response *s.GetWorkflowExecutionHistoryResponse
@@ -530,7 +536,12 @@ func (wc *workflowClient) GetWorkflowHistory(ctx context.Context, workflowID str
530536
response.History = history
531537
}
532538
return err1
533-
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
539+
},
540+
createDynamicServiceRetryPolicy(ctx),
541+
func(err error) bool {
542+
return isServiceTransientError(err) || isEntityNonExistFromPassive(err)
543+
},
544+
)
534545

535546
if err != nil {
536547
return nil, err
@@ -549,6 +560,16 @@ func (wc *workflowClient) GetWorkflowHistory(ctx context.Context, workflowID str
549560
}
550561
}
551562

563+
func isEntityNonExistFromPassive(err error) bool {
564+
if nonExistError, ok := err.(*s.EntityNotExistsError); ok {
565+
return nonExistError.GetActiveCluster() != "" &&
566+
nonExistError.GetCurrentCluster() != "" &&
567+
nonExistError.GetActiveCluster() != nonExistError.GetCurrentCluster()
568+
}
569+
570+
return false
571+
}
572+
552573
// CompleteActivity reports activity completed. activity Execute method can return activity.ErrResultPending to
553574
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
554575
// should be called when that activity is completed with the actual result and error. If err is nil, activity task

internal/internal_workflow_client_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,7 @@ func getGetWorkflowExecutionHistoryRequest(filterType shared.HistoryEventFilterT
934934
},
935935
WaitForNewEvent: common.BoolPtr(isLongPoll),
936936
HistoryEventFilterType: &filterType,
937+
SkipArchival: common.BoolPtr(true),
937938
}
938939

939940
return request

0 commit comments

Comments
 (0)